$ for i in $(seq 1 210); do cat large.txt; done >/tmp/big.txt
$ ls -lh /tmp/big.txt
-rw-r--r-- 1 mario mario 5.0G May 19 11:19 /tmp/big.txt
####
$ time grep -c "[aA].*[eE].*[iI].*[oO].*[uU]" /tmp/big.txt
2195760
real 0m8.721s
user 0m8.317s
sys 0m0.404s
####
$ time ./egrep.pl --max-workers=2 -c "[aA].*[eE].*[iI].*[oO].*[uU]" /tmp/big.txt
2195760
N-thds 2 5 10 20 30
real 0m39.068s 0m16.088s 0m8.146s 0m4.229s 0m3.084s
user 1m17.614s 1m19.624s 1m20.441s 1m21.823s 1m25.296s
sys 0m0.419s 0m0.464s 0m0.434s 0m0.526s 0m0.645s
####
$ time NUM_THREADS=2 ./grep-count-pcre2 "[aA].*[eE].*[iI].*[oO].*[uU]" /tmp/big.txt
parallel (2) Total Lines: 105000000, Matching Lines: 2195760
N-thds 2 5 10 20 30
real 0m9.256s 0m4.382s 0m2.748s 0m1.929s 0m1.689s
user 0m15.858s 0m16.563s 0m17.267s 0m18.391s 0m19.497s
sys 0m1.678s 0m1.339s 0m1.186s 0m1.225s 0m1.260s
####
$ time NUM_THREADS=2 ./grep-count-pmap "[aA].*[eE].*[iI].*[oO].*[uU]" /tmp/big.txt
parallel (2) Total Lines: 105000000, Matching Lines: 2195760
N-thds 2 5 10 20 30
real 0m8.113s 0m3.249s 0m1.675s 0m0.958s 0m0.707s
user 0m15.333s 0m15.646s 0m15.809s 0m16.775s 0m18.110s
sys 0m0.840s 0m0.266s 0m0.343s 0m0.489s 0m0.506s
####
$ time NUM_THREADS=2 ./grep-count-chunk "[aA].*[eE].*[iI].*[oO].*[uU]" /tmp/big.txt
parallel (2) Total Lines: 105000000, Matching Lines: 2195760
N-thds 2 5 10 20 30
real 0m8.017s 0m3.257s 0m1.537s 0m0.801s 0m0.618s
user 0m14.524s 0m14.997s 0m14.906s 0m15.397s 0m17.519s
sys 0m1.010s 0m1.019s 0m0.412s 0m0.515s 0m0.649s
####
// Grep PCRE2 and Chunking C++ OpenMP demonstration for website:
// https://cpufun.substack.com/p/processing-a-file-with-openmp
//
// By Mario Roy - May 29, 2023
// License: GNU General Public License.
// See http://www.gnu.org/licenses/ for license information.
// SPDX-License-Identifier: GPL-3.0-or-later
// Based on rtoa-pgatram-allinone2c.cpp:
// https://perlmonks.org/?node_id=11152306
//
// Install the C++ wrapper for PCRE2 Library (required dependency).
// https://github.com/jpcre2/jpcre2
// sudo dnf install jpcre2-devel for RPM-based systems
//
// Compile with g++ or clang++: (Select the library pcre2-8, pcre2-16, or pcre2-32)
// g++ -o grep-count-chunk -std=c++20 -fopenmp -O3 -msse4.1 -Wall grep-count-chunk.cc -lpcre2-8
//
// On macOS, you need Homebrew (https://brew.sh).
// 1. Install gcc@12 and pcre2.
// brew install gcc@12 pcre2
// 2. Install C++ wrapper for the PCRE2 Library.
// export CPATH="/usr/local/include"
// export LIBRARY_PATH="/usr/local/lib"
// export CC="gcc-12"
// export CXX="g++-12"
// git clone --depth=1 https://github.com/jpcre2/jpcre2
// cd jpcre2 && ./configure && make install
// 3. Specify the -msse4.1 option on Intel Mac.
// g++-12 -o grep-count-chunk -std=c++20 -fopenmp -O3 -Wall grep-count-chunk.cc -lpcre2-8
// g++-12 -o grep-count-chunk -std=c++20 -fopenmp -O3 -msse4.1 -Wall grep-count-chunk.cc -lpcre2-8
//
// Running:
// Usage: grep-count-chunk regular-expression [ file... ]
// $ time ./grep-count-chunk "[aA].*[eE].*[iI].*[oO].*[uU]" < large.txt
// $ time ./grep-count-chunk "[aA].*[eE].*[iI].*[oO].*[uU]" large.txt
//
// OpenMP Little Book:
// https://nanxiao.gitbooks.io/openmp-little-book/content/
#include
#include
#include
#include
#include
#include
#include
// C++ wrapper for PCRE2 Library
#include
// Select the basic data type (char, wchar_t, char16_t or char32_t)
typedef jpcre2::select jp;
#ifdef _OPENMP
#include
#endif
inline constexpr auto CHUNK_SIZE = 2097152;
inline constexpr auto LINE_LENGTH = 255;
// Helper function to count character occurrences using SSE4.1.
// https://stackoverflow.com/questions/54541129/how-to-count-character-occurrences-using-simd
// https://gist.github.com/powturbo/456edcae788a61ebe2fc
//
#if defined(__SSE4_1__)
#include
size_t memcount_simd(const char *s, int c, size_t n)
{
// https://gist.github.com/powturbo/456edcae788a61ebe2fc
// build source with -msse4.1, -mavx, -mavx2, or -march=native
//
__m128i cv = _mm_set1_epi8(c), sum = _mm_setzero_si128(), acr0, acr1, acr2, acr3;
const char *p, *pe;
for (p = s; p != (char *)s + (n- (n % (252*16))); ) {
for (acr0 = acr1 = acr2 = acr3 = _mm_setzero_si128(), pe = p+252*16; p != pe; p += 64) {
acr0 = _mm_add_epi8(acr0, _mm_cmpeq_epi8(cv, _mm_loadu_si128((const __m128i *)p)));
acr1 = _mm_add_epi8(acr1, _mm_cmpeq_epi8(cv, _mm_loadu_si128((const __m128i *)(p+16))));
acr2 = _mm_add_epi8(acr2, _mm_cmpeq_epi8(cv, _mm_loadu_si128((const __m128i *)(p+32))));
acr3 = _mm_add_epi8(acr3, _mm_cmpeq_epi8(cv, _mm_loadu_si128((const __m128i *)(p+48)))); __builtin_prefetch(p+1024);
}
sum = _mm_add_epi64(sum, _mm_sad_epu8(_mm_sub_epi8(_mm_setzero_si128(), acr0), _mm_setzero_si128()));
sum = _mm_add_epi64(sum, _mm_sad_epu8(_mm_sub_epi8(_mm_setzero_si128(), acr1), _mm_setzero_si128()));
sum = _mm_add_epi64(sum, _mm_sad_epu8(_mm_sub_epi8(_mm_setzero_si128(), acr2), _mm_setzero_si128()));
sum = _mm_add_epi64(sum, _mm_sad_epu8(_mm_sub_epi8(_mm_setzero_si128(), acr3), _mm_setzero_si128()));
}
size_t count = _mm_extract_epi64(sum, 0) + _mm_extract_epi64(sum, 1);
while (p != (char *)s + n) count += *p++ == c;
return count;
}
#endif
// Helper function to find '\n'.
inline constexpr char const* find_lf(char const* first, char const* last)
{
while (first != last) {
if (*first == '\n') break;
++first;
}
return first;
}
// Grep file, count matching lines
static void grep_file(
std::string_view fname, // in: file name
auto& fin, // in: file input stream
jp::Regex& re, // in: regular expression
int nthds // in: number of threads
)
{
std::string mode = (nthds > 1) ? "parallel" : "serial";
size_t lines = 0, matched_lines = 0, next_chunk_id = 0;
std::binary_semaphore *sem[nthds];
// Create semaphores for orderly output.
for (int i = 0; i < nthds; ++i)
sem[i] = new std::binary_semaphore{0};
#pragma omp parallel reduction(+:lines, matched_lines)
{
std::string buf; buf.resize(CHUNK_SIZE + LINE_LENGTH + 1, '\0');
const char *first, *last;
size_t chunk_id, len;
while (fin.good()) {
len = 0;
// Read the next chunk serially.
//
#pragma omp critical
{
fin.read(&buf[0], CHUNK_SIZE);
if ((len = fin.gcount()) > 0) {
chunk_id = ++next_chunk_id;
if (buf[len - 1] != '\n' && fin.getline(&buf[len], LINE_LENGTH)) {
// Getline discards the newline char and appends null char.
// Therefore, change '\0' to '\n'.
len += fin.gcount();
buf[len - 1] = '\n';
}
}
}
if (!len) break;
buf[len] = '\0';
first = &buf[0];
last = &buf[len];
// Process max Nthreads chunks concurrently.
#if 1
// Match all and tally the match count - faster than matching per line.
// This requires anchoring in the event a line has multiple matches.
// E.g. "^.*" prepended to pattern during regex construction.
matched_lines += re.match(std::basic_string(first, last - first + 1), "g");
// Count the number of lines in this chunk.
#if defined(__SSE4_1__)
lines += memcount_simd(&buf[0], '\n', last - first + 1);
#else
for (size_t i = 0; i < len; i++) {
lines += (buf[i] == '\n');
}
#endif
#else
// Process line-by-line.
while (first < last) {
auto beg_ptr{first}; first = find_lf(first, last);
auto end_ptr{first};
// include trailing '\n'
if (re.match(std::basic_string(beg_ptr, end_ptr - beg_ptr + 1)))
++matched_lines;
++lines;
++first;
}
#endif
// Output completed chunk, orderly by chunk_id.
//
if (nthds > 1 && chunk_id > 1)
sem[chunk_id % nthds]->acquire();
// std::cerr << "Chunk " << chunk_id << ": done\n";
// std::cout << std::basic_string(first, last - first + 1);
if (nthds > 1)
sem[(chunk_id + 1) % nthds]->release();
}
}
// Destroy the dynamically allocated semaphores.
for (int i = 0; i < nthds; ++i)
delete sem[i];
std::cerr \
<< fname << ": " << mode << " (" << nthds << ") Total Lines: " << lines \
<< ", Matching Lines: " << matched_lines << '\n';
}
int main(int argc, char* argv[])
{
if (argc < 2) {
if (argc > 0)
std::cerr << "Usage: grep-count-chunk regular-expression [ file... ]\n";
return 1;
}
#ifdef _OPENMP
// Determine the number of threads.
int nthds = std::thread::hardware_concurrency();
const char* env_nthds1 = std::getenv("OMP_NUM_THREADS");
const char* env_nthds2 = std::getenv("NUM_THREADS");
if (env_nthds1 && strlen(env_nthds1))
nthds = ::atoi(env_nthds1);
else if (env_nthds2 && strlen(env_nthds2))
nthds = ::atoi(env_nthds2);
omp_set_dynamic(false);
omp_set_num_threads(nthds);
#else
int nthds = 1;
#endif
jp::Regex re(std::string{"^.*"} + argv[1], "mS");
// anchor pattern to allow searching once per chunk
// 'm' modifier enables multi-line mode for the regex
// 'S' modifier is an optimization mode
if (!re) {
std::cerr << "Invalid regular expression: " << re.getErrorMessage() << '\n';
return 1;
}
if (argc == 2) {
grep_file("STDIN", std::cin, re, nthds);
}
else {
int nfiles = argc - 2;
char** fname = &argv[2];
for (int i = 0; i < nfiles; ++i) {
std::ifstream fin(fname[i], std::ifstream::binary);
if (!fin.is_open()) {
std::cerr << "Error opening '" << fname[i] << "' : " << strerror(errno) << '\n';
continue;
}
grep_file(fname[i], fin, re, nthds);
fin.close();
}
}
return 0;
}