Hello, chris212. Welcome to the monastery.
Hello fellow monks. I've been busy, for some time. I saw this thread and wanted to pass on knowledge. This is an interesting problem. The following is a demonstration one might write using the MCE module. Please notice the lack of semaphore and locking at the application level. The time to run is similar to the testa example by Chris. The output file (m.txt), generated here, matches with (a.txt).
The script runs on all the major platforms, including Windows. The workers persist between chunks. Output order is preserved as per requirement. Both input and output iterators are handled by the MCE-manager process, the main process. Workers request the next input chunk, compute data, then submit their results via MCE->gather. This works reasonably well.
#!/opt/perl/bin/perl
## http://www.perlmonks.org/?node_id=1182580
use strict;
use warnings;
use MCE;
use Time::HiRes 'time';
my $iterations = 100;
my $chunksize = 50;
my $threads = 5;
my $output = "m.txt";
my %data = ();
foreach ('a'..'z') {
$data{$_} = $_ x 200;
}
test_mce();
sub test_mce {
my $start = time;
my $mce = MCE->new(
max_workers => $threads, chunk_size => $chunksize,
input_data => make_iter_input($chunksize, $iterations),
gather => make_iter_output($output),
user_func => \&work,
)->run();
printf STDERR "testa done in %0.02f seconds\n", time - $start;
}
# MCE task to run in parallel
sub work {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $data = $chunk_ref->[0];
my @ret = ();
foreach my $chunk (@$data) {
my %output = ();
foreach my $key (keys %$chunk) {
if ($key eq '.') {
$output{$key} = $$chunk{$key};
next;
}
my $val = $$chunk{$key};
my $uc = uc($key);
$val =~ s/$key/$uc/g;
$output{$key} = $val;
}
push(@ret,\%output);
}
MCE->gather($chunk_id, \@ret);
}
# make an input closure, returns an iterator
sub make_iter_input {
my ($chunk_size, $iterations) = @_;
my $seq_a = 1;
return sub {
return if $seq_a > $iterations;
my ($chunk_size) = @_;
my @chunk = ();
foreach my $seq_b ( 1 .. $chunk_size ) {
my %retdata = %data;
$retdata{'.'} = $seq_a * $seq_b;
push @chunk, \%retdata;
}
$seq_a += 1;
return \@chunk;
};
}
# make an output closure, returns an iterator
sub make_iter_output {
my ($path) = @_;
my %hold; my $order_id = 1;
open my $fh, '>', $path or die "open error: $!";
return sub {
my $chunk_id = shift;
# hold temporarily, until orderly
$hold{$chunk_id} = shift; # \@ret
while (1) {
last unless exists $hold{$order_id};
foreach my $data (@{ delete $hold{$order_id} }) {
foreach my $key (sort keys %$data) {
print {$fh} $$data{$key};
}
print {$fh} "\n";
}
$order_id++;
}
};
}
Regards, Mario.