I was expecting
IPC::Run to be the simplest way to orchestrate a pool of non-perl workers that stream text, but it might not actually be any less effort than than
MCE; this bit of code took longer to write than I expected. It reads STDIN in large chunks, and hands off those chunks to the input queues of the worker with the least in its input queue. It also takes care to divide the input on line boundaries so that it isn't splitting a line between different workers. But, it should be much more efficient than the simpler design of reading STDIN one line at a time.
I tested it with "wc" as a simple way to verify that all input lines were seen by a worker.
$ time perl parallel.pl wc < LARGE_FILE
(snip)...
Waiting for workers to finish
955 5389459 56444912
252 6883234 68045448
284 3803959 62591850
422 6429932 63866545
real 0m1.438s
user 0m3.864s
sys 0m0.242s
$ time wc < LARGE_FILE
1913 22506581 250948755
real 0m2.332s
user 0m2.277s
sys 0m0.052s
parallel.pl
use v5.20;
use strict;
use warnings;
use IPC::Run 'start';
use List::Util qw/ all any reduce /;
my $workers= 4;
my $read_size= 512*1024;
my $queue_full= 256*1024;
my $command= [@ARGV];
# Start a pool of 10 workers
my @jobs= map { { id => $_, in => '', out => '' } } 1..$workers;
my @run_spec= map { ('&', $command, '<', \$_->{in}, '>', \$_->{out}) }
+ @jobs;
shift @run_spec;
my $harness= start(@run_spec);
my $buf= '';
my $got= 1;
while ($got) {
# The reading of input could just use $buf .= <STDIN> but this is
+much more efficient,
# reading 1MB at a time.
$got= read(STDIN, $buf, $read_size, length $buf);
if ($got) {
# look for the final newline within the buffer. This makes su
+re we only pass whole
# lines to the workers
my $end= rindex $buf, '\n';
next unless $end >= 0;
# append the lines to the input queue of the worker with the s
+mallest input queue
$jobs[0]{in} .= substr($buf, 0, $end+1, '');
say "Add ".($end+1)." bytes to job $jobs[0]{id}";
}
elsif (!defined $got) {
next if $!{EINTR} || $!{EAGAIN};
die "read: $!";
}
# No more STDIN, so take any leftover string and add it to an inpu
+t buffer
elsif (length $buf) {
$jobs[0]{in} .= $buf;
say "Add ".length($buf)." bytes to job $jobs[0]{id}";
}
# stay in this loop until there is room on one of the input buffer
+s,
# or after EOF, stay here until all input buffers are flushed
while ($got? (all { length $_->{in} > $queue_full } @jobs)
: (any { length $_->{in} > 0 } @jobs)
) {
# say "I/O: ".join(' ', map sprintf("[%8d:%8d]", length $_->{
+in}, length $_->{out}), @jobs);
# send input, receive output
$harness->pump;
# process all output from jobs so far
process_results();
}
# sort the worker with the smallest input queue to the front
@jobs= sort { length $a->{in} <=> length $b->{in} } @jobs;
}
say "Waiting for workers to finish";
# Close the input pipes and wait for workers to exit
$_->{in}= undef for @jobs;
$harness->finish;
# process all the rest
process_results();
sub process_results {
for (@jobs) {
my $end= rindex $_->{out}, "\n";
print substr($_->{out}, 0, $end+1, '') if $end >= 0;
}
}
-
Are you posting in the right place? Check out Where do I post X? to know for sure.
-
Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
<code> <a> <b> <big>
<blockquote> <br /> <dd>
<dl> <dt> <em> <font>
<h1> <h2> <h3> <h4>
<h5> <h6> <hr /> <i>
<li> <nbsp> <ol> <p>
<small> <strike> <strong>
<sub> <sup> <table>
<td> <th> <tr> <tt>
<u> <ul>
-
Snippets of code should be wrapped in
<code> tags not
<pre> tags. In fact, <pre>
tags should generally be avoided. If they must
be used, extreme care should be
taken to ensure that their contents do not
have long lines (<70 chars), in order to prevent
horizontal scrolling (and possible janitor
intervention).
-
Want more info? How to link
or How to display code and escape characters
are good places to start.