Hi again,

Discloser: I also tried chunking with threads and MCE::Hobo using a queue for comparison with MCE (Iterator & Gather/Relay) demonstrations.

Please find demonstrations using threads and MCE::Hobo for testing IPC overhead. On Unix platforms (particular BSD variants), the MCE solution may run up to 2x faster than threads. Increasing CHUNK_SIZE will decrease IPC overhead plus increase the memory footprint for the $output variable. Chunking/batching is helpful. Just be mindful of memory consumption in the output routine.

Regarding the input queue, I limit to 50 items to prevent the worker from populating the queue out of control. Thread::Queue must be recent with limit capability or will fail.

Threads & Thread::Queue->new() demonstration

use strict; use warnings; use threads; use Thread::Queue 3.07; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_thrs.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = Thread::Queue->new(); $que_in->limit = 50; my $que_out = Thread::Queue->new(); my $start = time; push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, threads->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

MCE::Hobo & MCE::Shared->queue() demonstration

use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $que_out = MCE::Shared->queue(); my $start = time; push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, MCE::Hobo->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); # limit $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

Update:

Workers may write directly to the output handle, one at a time. This is possible with a mutex. Do not forget to enable autoflush on the file handle. Recent version of MCE and MCE::Shared load IO::Handle automatically in the event the autoflush method is not found.

use strict; use warnings; use MCE::Hobo; use MCE::Mutex; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo2.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $mutex = MCE::Mutex->new(); my $start = time; open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; close $fh_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $mutex->enter(sub { print {$fh_out} $output; }); } }

Regards, Mario


In reply to Re^3: MCE: Slow IPC between child and gather process in parent by marioroy
in thread MCE: Slow IPC between child and gather process in parent by learnedbyerror

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.