#!/opt/perl/bin/perl ## http://www.perlmonks.org/?node_id=1182580 use strict; use warnings; use MCE; use Time::HiRes 'time'; my $iterations = 100; my $chunksize = 50; my $threads = 5; my $output = "m.txt"; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } test_mce(); sub test_mce { my $start = time; my $mce = MCE->new( max_workers => $threads, chunk_size => $chunksize, input_data => make_iter_input($chunksize, $iterations), gather => make_iter_output($output), user_func => \&work, )->run(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } MCE->gather($chunk_id, \@ret); } # make an input closure, returns an iterator sub make_iter_input { my ($chunk_size, $iterations) = @_; my $seq_a = 1; return sub { return if $seq_a > $iterations; my ($chunk_size) = @_; my @chunk = (); foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = $seq_a * $seq_b; push @chunk, \%retdata; } $seq_a += 1; return \@chunk; }; } # make an output closure, returns an iterator sub make_iter_output { my ($path) = @_; my %hold; my $order_id = 1; open my $fh, '>', $path or die "open error: $!"; return sub { my $chunk_id = shift; # hold temporarily, until orderly $hold{$chunk_id} = shift; # \@ret while (1) { last unless exists $hold{$order_id}; foreach my $data (@{ delete $hold{$order_id} }) { foreach my $key (sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $order_id++; } }; }