use MCE::Flow; die "Not enough arguments given\n" if @ARGV < 1; my $file = shift; my $odir = "/path/to/output_dir"; sub merge_to_iter { my ($ofile) = @_; my %tmp; my $order_id = 1; open my $ofh, '>', $ofile or die "Cannot open $ofile: $!\n"; select $ofh; $| = 1; # flush immediately return sub { my ($chunk_id, $opart) = @_; $tmp{$chunk_id} = $opart; while (1) { last unless exists $tmp{ $order_id }; $opart = delete $tmp{ $order_id++ }; # slurp (append $ifh) to $ofh open my $ifh, '<', $opart; local $/; print $ofh scalar <$ifh>; close $ifh; unlink $opart; } }; } mce_flow_f { gather => merge_to_iter("$odir/$file.out"), max_workers => 16, chunk_size => 200, use_slurpio => 1, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; my $part = "$odir/$file.$chunk_id"; open my $fh, '>', $part or die "Cannot open $part: $!\n"; print $fh $$slurp_ref; close $fh; exec("sh text_tool $part > $part.out") or die "Cannot exec $part\n"; print {*STDERR} "Finished processing $part at ".localtime."\n"; $mce->gather($chunk_id, "$part.out"); unlink $part; }, $file;