A chunking engine allows one to specify a smaller chunk size for more possibilities. What we can do is have workers append to a local string variable. Upon completing the chunk, workers write the buffer to the target file. This works out quite well, further reducing the overall time. This rid of the time taken before and after W.R.T. temp dirs/files creation, reading, and cleanup.
Why smaller chunk size? Workers append to a local string variable for this demonstration. A greater chunk size also means greater memory consumption, potentially.
Making this possible is MCE::Relay (for orderly) or MCE::Mutex (unorderly). Not seeing all CPU cores at 100% utilization? That's because workers now cooperate orderly or serially when appending to the target file. Please no need to spin 4,000+ workers or beyond the physical limit of the box. That's really unnecessary.
Look at the time saved. Merging results is now handled by workers versus post-processing, previously taking 31.510 seconds.
# MCE::Relay
$ time ../mario-mceiter2.pl
Parsing 345701 files
maxforks: 64
chunksize: 50
regex: 13.628399
real 0m13.670s
user 11m11.484s
sys 0m 4.739s
# MCE::Mutex
$ time ../mario-mceiter2.pl
Parsing 345701 files
maxforks: 64
chunksize: 50
regex: 12.646211
real 0m12.689s
user 11m28.535s
sys 0m 4.827s
mario-mceiter2.pl
#!/usr/bin/perl
use strict;
use feature qw{ say };
use warnings;
use Env;
use utf8;
use Time::HiRes qw(gettimeofday tv_interval usleep);
use open ':std', ':encoding(UTF-8)';
use MCE::Loop;
use MCE::Mutex;
my $benchmark = 1; # print timings for loops
my $wordfile="data.dat";
truncate $wordfile, 0;
#$|=1;
# substitute whole words
my %whole = qw{
going go
getting get
goes go
knew know
trying try
tried try
told tell
coming come
saying say
men man
women woman
took take
lying lie
dying die
};
# substitute on prefix
my %prefix = qw{
need need
talk talk
tak take
used use
using use
};
# substitute on substring
my %substring = qw{
mean mean
work work
read read
allow allow
gave give
bought buy
want want
hear hear
came come
destr destroy
paid pay
selve self
cities city
fight fight
creat create
makin make
includ include
};
my $re1 = qr{\b(@{[ join '|', reverse sort keys %whole ]})\b}i;
my $re2 = qr{\b(@{[ join '|', reverse sort keys %prefix ]})\w*}i;
my $re3 = qr{\b\w*?(@{[ join '|', reverse sort keys %substring ]})\w*}
+i;
truncate $wordfile, 0;
my $maxforks = 64;
my $chunksize = 50; # keep this small, workers append to string buffe
+r
my $t0 = [gettimeofday];
my $mutex = MCE::Mutex->new;
sub process_files {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $output = "";
while (my $infile = shift @{ $chunk_ref }) {
open my $IN, '<', $infile
or warn("open error: infile"), MCE->last;
while (<$IN>) {
tr/-!"#%&()*',.\/:;?@\[\\\]”_“{’}><^)(|/ /; # no punct "
s/^/ /;
s/\n/ \n/;
s/[[:digit:]]{1,12}//g;
s/w(as|ere)/be/gi;
s{$re2}{ $prefix{lc $1} }g; # prefix
s{$re3}{ $substring{lc $1} }g; # part
s{$re1}{ $whole{lc $1} }g; # whole
$output .= "$_";
}
close $IN;
}
# Choose one: relay (orderly) or mutex (unorderly), but not both.
# MCE::relay {
# open my $OUT, '>>', $wordfile or die "Error opening $wordfil
+e";
# print {$OUT} $output;
# close $OUT;
# };
$mutex->enter(sub {
open my $OUT, '>>', $wordfile or die "Error opening $wordfile"
+;
print {$OUT} $output;
close $OUT;
});
undef $output;
return;
}
sub input_iterator {
my ($filecount, @data);
my $init_data = 1;
return sub {
if ($init_data) {
@data = glob("data-* ??/data-*");
$filecount = scalar @data;
say "Parsing $filecount files"; # okay, zero files
say "maxforks: $maxforks";
say "chunksize: $chunksize";
$init_data = 0;
}
return unless @data;
return splice @data, 0, $chunksize;
};
}
MCE::Loop->init(
chunk_size => $chunksize,
max_workers => $maxforks,
# init_relay => 1, # uncomment, if choosing MCE::relay above
posix_exit => 1,
use_threads => 0, # use emulated fork on Windows
);
MCE::Loop->run(\&process_files, input_iterator());
MCE::Loop->finish;
my $elapsed = tv_interval($t0);
print "regex: $elapsed\n" if $benchmark;