$ time perl parallel.pl wc < LARGE_FILE
(snip)...
Waiting for workers to finish
955 5389459 56444912
252 6883234 68045448
284 3803959 62591850
422 6429932 63866545
real 0m1.438s
user 0m3.864s
sys 0m0.242s
$ time wc < LARGE_FILE
1913 22506581 250948755
real 0m2.332s
user 0m2.277s
sys 0m0.052s
parallel.pl
use v5.20;
use strict;
use warnings;
use IPC::Run 'start';
use List::Util qw/ all any reduce /;
my $workers= 4;
my $read_size= 512*1024;
my $queue_full= 256*1024;
my $command= [@ARGV];
# Start a pool of 10 workers
my @jobs= map { { id => $_, in => '', out => '' } } 1..$workers;
my @run_spec= map { ('&', $command, '<', \$_->{in}, '>', \$_->{out}) }
+ @jobs;
shift @run_spec;
my $harness= start(@run_spec);
my $buf= '';
my $got= 1;
while ($got) {
# The reading of input could just use $buf .= <STDIN> but this is
+much more efficient,
# reading 1MB at a time.
$got= read(STDIN, $buf, $read_size, length $buf);
if ($got) {
# look for the final newline within the buffer. This makes su
+re we only pass whole
# lines to the workers
my $end= rindex $buf, '\n';
next unless $end >= 0;
# append the lines to the input queue of the worker with the s
+mallest input queue
$jobs[0]{in} .= substr($buf, 0, $end+1, '');
say "Add ".($end+1)." bytes to job $jobs[0]{id}";
}
elsif (!defined $got) {
next if $!{EINTR} || $!{EAGAIN};
die "read: $!";
}
# No more STDIN, so take any leftover string and add it to an inpu
+t buffer
elsif (length $buf) {
$jobs[0]{in} .= $buf;
say "Add ".length($buf)." bytes to job $jobs[0]{id}";
}
# stay in this loop until there is room on one of the input buffer
+s,
# or after EOF, stay here until all input buffers are flushed
while ($got? (all { length $_->{in} > $queue_full } @jobs)
: (any { length $_->{in} > 0 } @jobs)
) {
# say "I/O: ".join(' ', map sprintf("[%8d:%8d]", length $_->{
+in}, length $_->{out}), @jobs);
# send input, receive output
$harness->pump;
# process all output from jobs so far
process_results();
}
# sort the worker with the smallest input queue to the front
@jobs= sort { length $a->{in} <=> length $b->{in} } @jobs;
}
say "Waiting for workers to finish";
# Close the input pipes and wait for workers to exit
$_->{in}= undef for @jobs;
$harness->finish;
# process all the rest
process_results();
sub process_results {
for (@jobs) {
my $end= rindex $_->{out}, "\n";
print substr($_->{out}, 0, $end+1, '') if $end >= 0;
}
}
|