Greetings, fellow monks.
This is a continuation of my previous post. I try the same thing with MCE::Shared. MCE::Hobo spawns threads on the Windows platform and processes otherwise, on other platforms. The two iterators return a shared object during construction. Maybe the OP can run without having to spawn a million threads. Thus, the reason for writing this and the prior post.
#!/opt/perl/bin/perl
## http://www.perlmonks.org/?node_id=1182580
use strict;
use warnings;
use MCE::Hobo;
use MCE::Shared;
use Time::HiRes 'time';
my %data = ();
foreach ('a'..'z') {
$data{$_} = $_ x 200;
}
my $chunk_size = 50;
my $iterations = 100;
my $num_workers = 5;
my $output_path = "h.txt";
my $iter_i = Iter::Input->new($chunk_size, $iterations);
my $iter_o = Iter::Output->new($output_path);
test_mce_hobo();
sub test_mce_hobo {
my $start = time;
MCE::Hobo->create('work') for (1..$num_workers);
MCE::Hobo->waitall;
$iter_o->close();
printf STDERR "testa done in %0.02f seconds\n", time - $start;
}
# Hobo task to run in parallel
sub work {
while ( my ($chunk_id, $data) = $iter_i->recv() ) {
my @ret = ();
foreach my $chunk (@$data) {
my %output = ();
foreach my $key (keys %$chunk) {
if ($key eq '.') {
$output{$key} = $$chunk{$key};
next;
}
my $val = $$chunk{$key};
my $uc = uc($key);
$val =~ s/$key/$uc/g;
$output{$key} = $val;
}
push(@ret,\%output);
}
my $buf = '';
foreach my $data (@ret) {
foreach my $key (sort keys %$data) {
$buf .= $$data{$key};
}
$buf .= "\n";
}
$iter_o->send($chunk_id, $buf);
}
}
#####################################################################
package Iter::Input;
sub new {
my ( $class, $chunk_size, $iterations ) = @_;
my ( $chunk_id, $seq_a ) = ( 0, 1 );
MCE::Shared->share(
bless [ $iterations, $chunk_size, \$chunk_id, \$seq_a ], $class
);
}
sub recv {
my ( $self ) = @_;
my ( $iters, $chunk_size, $chunk_id, $seq_a ) = @{ $self };
return if ( ${$seq_a} > $iters );
my @chunk;
foreach my $seq_b ( 1 .. $chunk_size ) {
my %retdata = %data;
$retdata{'.'} = ${$seq_a} * $seq_b;
push @chunk, \%retdata;
}
# These were made references on purpose, during construction,
# to minimize array access: e.g. $self->[1]++, $self->[3]++
${$chunk_id}++, ${$seq_a}++;
return ${$chunk_id}, \@chunk;
}
1;
#####################################################################
package Iter::Output;
sub new {
my ( $class, $path ) = @_;
my ( $order_id, $fh, %hold ) = ( 1 );
# Note: Do not open the file handle here, during construction.
# The reason is that sharing will fail (cannot serialize $fh).
MCE::Shared->share(
bless [ $path, $fh, \$order_id, \%hold ], $class
);
}
sub send {
my ( $self, $chunk_id, $output ) = @_;
my ( $path, $fh, $order_id, $hold ) = @{ $self };
if ( !defined $fh ) {
open $fh, '>', $path or die "open error: $!";
$self->[1] = $fh;
}
# hold temporarily, until orderly
$hold->{$chunk_id} = $output;
while (1) {
last unless exists( $hold->{ ${$order_id} } );
print {$fh} delete( $hold->{ ${$order_id} } );
${$order_id}++;
}
return;
}
sub close {
my ( $self ) = @_;
if ( defined $self->[1] ) {
CORE::close $self->[1];
$self->[1] = undef;
}
return;
}
1;
Regards, Mario.