use strict; use warnings; use feature 'say';
use Data::Dumper; ++$Data::Dumper::Sortkeys;
use MCE::Flow;
use MCE::Candy;
my $aref = [];
MCE::Flow::init(
max_workers => 4,
chunk_size => 1,
);
for ( 0, 1 ) {
say "Test $_";
mce_flow {
gather => MCE::Candy::out_iter_array( $aref )
},
sub {
my ( $mce, $chunk_ref, $chunk_id ) = @_;
# warn "chunk_ref " . Dumper $chunk_ref;
my ($chunk) = @{ $chunk_ref };
# warn "chunk " . Dumper $chunk;
MCE->gather( $chunk_id, $chunk->{'foo'} );
},
get_data( $_ );
say "$_ : " . Dumper $aref;
$aref = [];
}
sub get_data {
my $which = shift;
if ( $which == 1 ) {
return [
{ foo => 'bar', baz => 'qux' }
];
} else {
return [
{ foo => 'bar', baz => 'qux' },
{ foo => 'qux', baz => 'bar' },
];
}
}
Output
Test 0
0 : $VAR1 = [
'bar',
'qux'
];
Test 1
1 : $VAR1 = [
'bar'
];
Regards, Mario.
Update:
When gathering data and running inside a loop, it's important to specify the gather option each time. MCE::Candy::out_iter_array($aref) returns a closure block contaning a lexical order_id variable starting at 1. Basically, order_id correlates to chunk_id. Out of order items are held temporarily.
sub out_iter_array {
my $_aref = shift; my %_tmp; my $_order_id = 1;
_croak("The argument to \"out_iter_array\" is not an array ref.")
unless ( ref $_aref eq 'ARRAY' );
return sub {
my $_chunk_id = shift;
if ( $_chunk_id == $_order_id && keys %_tmp == 0 ) {
# already orderly
$_order_id++; push @{ $_aref }, @_;
}
else {
# hold temporarily otherwise until orderly
@{ $_tmp{ $_chunk_id } } = @_;
while ( 1 ) {
last unless exists $_tmp{ $_order_id };
push @{ $_aref }, @{ delete $_tmp{ $_order_id++ } };
}
}
};
}
|