in reply to Wait for individual sub processes [SOLVED]
The following is an example using MCE to process a file. Splitting the file into parts before running is not necessary. Chunking is integrated into MCE allowing for maximum CPU utilization from start to end.
Update 1: Added merge_to_iter to merge the output into one file while preserving order.
Update 2: $slurp_ref is a scalar reference, thus print $fh $$slurp_ref;
Update 3: Changed chunk_size from 'auto' to 200. A chunk size smaller than or equal to 8192 is the number of records (or # of lines). A value greater than 8192 is the number of bytes with MCE reading until the end of record. MCE quietly sets to 64M if higher than 64M. The OP provided timings with 100 rows taking ~ 1 minute.
Update 4: Changed max_workers from 'auto' to 16. The 'auto' value will never go higher than 8. Thus, one must set explicitly if wanting to run on all available cores or with max_workers => MCE::Util::get_ncpu().
use MCE::Flow; die "Not enough arguments given\n" if @ARGV < 1; my $file = shift; my $odir = "/path/to/output_dir"; sub merge_to_iter { my ($ofile) = @_; my %tmp; my $order_id = 1; open my $ofh, '>', $ofile or die "Cannot open $ofile: $!\n"; select $ofh; $| = 1; # flush immediately return sub { my ($chunk_id, $opart) = @_; $tmp{$chunk_id} = $opart; while (1) { last unless exists $tmp{ $order_id }; $opart = delete $tmp{ $order_id++ }; # slurp (append $ifh) to $ofh open my $ifh, '<', $opart; local $/; print $ofh scalar <$ifh>; close $ifh; unlink $opart; } }; } mce_flow_f { gather => merge_to_iter("$odir/$file.out"), max_workers => 16, chunk_size => 200, use_slurpio => 1, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; my $part = "$odir/$file.$chunk_id"; open my $fh, '>', $part or die "Cannot open $part: $!\n"; print $fh $$slurp_ref; close $fh; exec("sh text_tool $part > $part.out") or die "Cannot exec $part\n"; print {*STDERR} "Finished processing $part at ".localtime."\n"; $mce->gather($chunk_id, "$part.out"); unlink $part; }, $file;
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^2: Wait for individual sub processes
by BrowserUk (Patriarch) on Apr 25, 2015 at 12:39 UTC | |
by marioroy (Prior) on Apr 25, 2015 at 13:23 UTC | |
by BrowserUk (Patriarch) on Apr 25, 2015 at 14:01 UTC | |
by marioroy (Prior) on Apr 25, 2015 at 14:12 UTC |