#!/opt/perl/bin/perl ## http://www.perlmonks.org/?node_id=1182580 use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Time::HiRes 'time'; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } my $chunk_size = 50; my $iterations = 100; my $num_workers = 5; my $output_path = "h.txt"; my $iter_i = Iter::Input->new($chunk_size, $iterations); my $iter_o = Iter::Output->new($output_path); test_mce_hobo(); sub test_mce_hobo { my $start = time; MCE::Hobo->create('work') for (1..$num_workers); MCE::Hobo->waitall; $iter_o->close(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # Hobo task to run in parallel sub work { while ( my ($chunk_id, $data) = $iter_i->recv() ) { my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } $iter_o->send($chunk_id, $buf); } } ##################################################################### package Iter::Input; sub new { my ( $class, $chunk_size, $iterations ) = @_; my ( $chunk_id, $seq_a ) = ( 0, 1 ); MCE::Shared->share( bless [ $iterations, $chunk_size, \$chunk_id, \$seq_a ], $class ); } sub recv { my ( $self ) = @_; my ( $iters, $chunk_size, $chunk_id, $seq_a ) = @{ $self }; return if ( ${$seq_a} > $iters ); my @chunk; foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = ${$seq_a} * $seq_b; push @chunk, \%retdata; } # These were made references on purpose, during construction, # to minimize array access: e.g. $self->[1]++, $self->[3]++ ${$chunk_id}++, ${$seq_a}++; return ${$chunk_id}, \@chunk; } 1; ##################################################################### package Iter::Output; sub new { my ( $class, $path ) = @_; my ( $order_id, $fh, %hold ) = ( 1 ); # Note: Do not open the file handle here, during construction. # The reason is that sharing will fail (cannot serialize $fh). MCE::Shared->share( bless [ $path, $fh, \$order_id, \%hold ], $class ); } sub send { my ( $self, $chunk_id, $output ) = @_; my ( $path, $fh, $order_id, $hold ) = @{ $self }; if ( !defined $fh ) { open $fh, '>', $path or die "open error: $!"; $self->[1] = $fh; } # hold temporarily, until orderly $hold->{$chunk_id} = $output; while (1) { last unless exists( $hold->{ ${$order_id} } ); print {$fh} delete( $hold->{ ${$order_id} } ); ${$order_id}++; } return; } sub close { my ( $self ) = @_; if ( defined $self->[1] ) { CORE::close $self->[1]; $self->[1] = undef; } return; } 1;