in reply to Re: How do you parallelize STDIN for large file processing?
in thread How do you parallelize STDIN for large file processing?

Here's a cut down of some code i'm using which is based on Parallel::Fork::BossWorker to speed up some processing. You'll need to monkey with it to get it to do what you want...
my $Worker_count = 2; fork_workers(); sub autoflush { select((select(shift), $| = 1)[0]) } sub fork_workers { # worker sends pid to boss when ready for more work pipe my $from_workers, my $to_boss; autoflush($to_boss); my %workers; foreach (1 .. $Worker_count) { # pipe for boss to send 'work' to worker pipe my $from_boss, my $to_worker; # pipe for worker to send message back to boss pipe my $from_worker2, my $to_boss2; autoflush($to_worker); autoflush($to_boss2); my $pid = fork; die "failed to fork: $!" unless defined $pid; if ($pid) { $workers{$pid} = [$to_worker, $from_worker2]; close $from_boss; close $to_boss2; } else { close $from_workers; close $from_worker2; close $to_worker; close STDIN; send_msg($to_boss, $$); worker($from_boss, $to_boss, $to_boss2); exit; } } close $to_boss; my $INT_handler = $SIG{INT}; local $SIG{INT} = sub { close $from_workers; for (keys %workers) { my ($to_worker, $from_worker2) = @{$workers{$_}}; close $to_worker; close $from_worker2; } $INT_handler->('INT'); }; my @work = get_work(); # ??? while (my $pid = receive_msg($from_workers)) { my ($to_worker, $from_worker2) = @{$workers{$pid}}; my $msg = receive_msg($from_worker2); if (@work) { my $whatever = something(@work); send_msg($to_worker, $whatever); @work = (); } else { close $to_worker; close $from_worker2; delete $workers{$pid}; } if (length $msg) { # something } } close $from_workers; } sub worker { my ($from_boss, $to_boss, $to_boss2) = @_; while (my $whatever = receive_msg($from_boss)) { my $msg = something_else($whatever); send_msg($to_boss, $$); send_msg($to_boss2, $msg); } } sub receive_msg { my $fh = shift; local $/ = "\003\000"; my $msg = <$fh>; chomp $msg if defined $msg; return $msg; } sub send_msg { my ($fh, $msg) = @_; print $fh $msg, "\003\000"; }
There is one issue with the above mechanism that i'd be interested in having resolved... is it _safe_ for multiple child proceses to write to the same pipe? In the above code the  send_msg($to_boss, $$) is used by each child to notify the parent that it has something for it to process. The code used by Parallel::Fork::BossWorker allows the children to write arbitrarily large amounts to this pipe, but i found that this could occasionally result in corrupted data when writing large amounts of data.

However by changing to the scheme used in the above code of only sending the pid (a small amount of data) down the shared pipe i've yet to see any corruption.

So... is it safe to write small amounts (less than some internal perlio buffer size) to a shared pipe; or, is it unsafe, and i've just yet to hit the problem?

Thanks for any tips! :-) alex.