Hi again,
Discloser: I tried various things in the event the application is IPC-bound which may not be the case for the OP.
Chunking has the effect of reducing the number of operations for IPC. Thus, relieving pressure from the manager process. Please find the same demonstrations with chunking applied. Basically, chunking requires looping inside the iterator and user_func. In other words, chunking is a way to prevent the application from becoming IPC-bound.
Iterator & Gather demonstration
use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # gather with chunk_id must be called one time only MCE->gather($chunk_id, $output); }
Iterator & Relay demonstration
use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # relay must be called one time only MCE::relay { print {$fh_out} $output }; }
Regards, Mario
In reply to Re^2: MCE: Slow IPC between child and gather process in parent
by marioroy
in thread MCE: Slow IPC between child and gather process in parent
by learnedbyerror
| For: | Use: | ||
| & | & | ||
| < | < | ||
| > | > | ||
| [ | [ | ||
| ] | ] |