in reply to Re: MCE: Slow IPC between child and gather process in parent
in thread MCE: Slow IPC between child and gather process in parent
Hi again,
Discloser: I tried various things in the event the application is IPC-bound which may not be the case for the OP.
Chunking has the effect of reducing the number of operations for IPC. Thus, relieving pressure from the manager process. Please find the same demonstrations with chunking applied. Basically, chunking requires looping inside the iterator and user_func. In other words, chunking is a way to prevent the application from becoming IPC-bound.
Iterator & Gather demonstration
use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # gather with chunk_id must be called one time only MCE->gather($chunk_id, $output); }
Iterator & Relay demonstration
use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # relay must be called one time only MCE::relay { print {$fh_out} $output }; }
Regards, Mario
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^3: MCE: Slow IPC between child and gather process in parent
by marioroy (Prior) on May 07, 2018 at 02:19 UTC | |
Re^3: MCE: Slow IPC between child and gather process in parent
by learnedbyerror (Monk) on May 07, 2018 at 12:59 UTC | |
Re^3: MCE: Slow IPC between child and gather process in parent
by learnedbyerror (Monk) on May 07, 2018 at 13:06 UTC |