use v5.20; use strict; use warnings; use IPC::Run 'start'; use List::Util qw/ all any reduce /; my $workers= 4; my $read_size= 512*1024; my $queue_full= 256*1024; my $command= [@ARGV]; # Start a pool of 10 workers my @jobs= map { { id => $_, in => '', out => '' } } 1..$workers; my @run_spec= map { ('&', $command, '<', \$_->{in}, '>', \$_->{out}) } @jobs; shift @run_spec; my $harness= start(@run_spec); my $buf= ''; my $got= 1; while ($got) { # The reading of input could just use $buf .= but this is much more efficient, # reading 1MB at a time. $got= read(STDIN, $buf, $read_size, length $buf); if ($got) { # look for the final newline within the buffer. This makes sure we only pass whole # lines to the workers my $end= rindex $buf, '\n'; next unless $end >= 0; # append the lines to the input queue of the worker with the smallest input queue $jobs[0]{in} .= substr($buf, 0, $end+1, ''); say "Add ".($end+1)." bytes to job $jobs[0]{id}"; } elsif (!defined $got) { next if $!{EINTR} || $!{EAGAIN}; die "read: $!"; } # No more STDIN, so take any leftover string and add it to an input buffer elsif (length $buf) { $jobs[0]{in} .= $buf; say "Add ".length($buf)." bytes to job $jobs[0]{id}"; } # stay in this loop until there is room on one of the input buffers, # or after EOF, stay here until all input buffers are flushed while ($got? (all { length $_->{in} > $queue_full } @jobs) : (any { length $_->{in} > 0 } @jobs) ) { # say "I/O: ".join(' ', map sprintf("[%8d:%8d]", length $_->{in}, length $_->{out}), @jobs); # send input, receive output $harness->pump; # process all output from jobs so far process_results(); } # sort the worker with the smallest input queue to the front @jobs= sort { length $a->{in} <=> length $b->{in} } @jobs; } say "Waiting for workers to finish"; # Close the input pipes and wait for workers to exit $_->{in}= undef for @jobs; $harness->finish; # process all the rest process_results(); sub process_results { for (@jobs) { my $end= rindex $_->{out}, "\n"; print substr($_->{out}, 0, $end+1, '') if $end >= 0; } }