etheleon has asked for the wisdom of the Perl Monks concerning the following question:

Hi all, have question regarding the use of MCE package, I have a workflow in mind: where I build a hash table, by reading in a couple of key-value pair file, ultimately linking fileA's keys to the fileC's values via fileB. next using this hash table i want to apply a subroutine on multiple input files and output the results into separate files. I want to try to use some parallel work here. I use R a lot for doing parallel work mainly using the mclapply function in the parallel library, I was hunting around for the parallel packages in Perl and found prefork, mce and fork manager. i tried implementing the parallel portion with MCE as shown in the following: *mostly took the code from: https://metacpan.org/pod/MCE::Examples
#!/usr/bin/env perl use v5.18; use strict; use warnings; use autodie; use MCE; my @input_data = (0 .. 100 - 1); ## Make an output iterator for gather. Output order is preserved. sub output_iterator { my %tmp; my $order_id = 1; return sub { my ($result, $chunk_id) = @_; $tmp{$chunk_id} = $result; while (1) { last unless (exists $tmp{$order_id}); open my $output, '>', "/path/to/my/files/$chunk_id.txt"; foreach (1..10) { print $output "\t",fibonacci($_)}; say $output; close $output; delete $tmp{$order_id++}; } }; } ## Use $chunk_ref->[0] or $_ to retrieve the element. my $mce = MCE->new( chunk_size => 1, #setting to 1 = do not chunk max_workers => 8, #number of CPU cores gather => output_iterator(), #the function which will be applied to + each element of the array ); MCE->foreach( \@input_data, sub { my ($mce, $chunk_ref, $chunk_id) = @_; my $result = sqrt($chunk_ref->[0]); MCE->gather($result, $chunk_id); }); sub fibonacci { my $n = shift; return undef if $n < 0; my $f; if ($n == 0) { $f = 0; } elsif ($n == 1) { $f = 1; } else { $f = fibonacci($n-1) + fibonacci($n-2); } return $f; }
however, i noticed that the number of output files are not consistent with the size of my input ie. my array input_data. i would get 96 output files, thereafter 97, and finally 100, if i rerun the problem without deleting the output files. what's wrong?

Replies are listed 'Best First'.
Re: Using MCE to write to multiple files.
by Anonymous Monk on Nov 22, 2014 at 03:04 UTC

    ...number of output files are not consistent with the size of my input ie....what's wrong?

    What is supposed to happen?

      since the array @data_input held 10 elements, i should get 10 files output but i don't.

        since the array @data_input held 10 elements, i should get 10 files output but i don't.

        Um, there is @input_data, and it has 100 elements, :)

        But ok, are you using fork or threads? Try getting MCE to choose fork, then repeat your test with treads ... if there is a difference between the two, then there is a bug in one of the backends, or it hasn't finished running

        If both fork/threads backends behave the same, then there might be a problem with your logic

        I've not scrutinized your code, but  $tmp{$chunk_id} stuff looks like it will never make any difference :)

        But if you want something to compare, try

        $ perl threads-jobadder-jobworker-fibonacci.pl $ dir /b fibojob* |wc --lines 100
        #!/usr/bin/perl -- ## threads-jobadder-jobworker-fibonacci.pl ## ## ## ## ## ## perltidy -olq -csc -csci=3 -cscl="sub : BEGIN END " -otr -opr -ce +-nibc -i=4 -pt=0 "-nsak=*" ## perltidy -olq -csc -csci=10 -cscl="sub : BEGIN END if " -otr -opr +-ce -nibc -i=4 -pt=0 "-nsak=*" ## perltidy -olq -csc -csci=10 -cscl="sub : BEGIN END if while " -otr + -opr -ce -nibc -i=4 -pt=0 "-nsak=*" #!/usr/bin/perl -- use strict; use warnings; use threads stack_size => 4096; use Thread::Queue; Main( @ARGV ); exit( 0 ); sub Main { use autodie qw/ chdir /; chdir "/path/to/my/files/" ; my $maxJobs = 8; my $q = Thread::Queue->new; for( 1 .. $maxJobs ) { ## threads->create( \&JobWorker, $queue, \&TheJob ); threads->create( \&JobWorker, $q, \&GetFibby ); } threads->create( \&JobAdder, $q, $maxJobs, [ 0 .. 99 ] ); $_->join for threads->list; ## wait for threads to finish } ## end sub Main sub JobWorker { my( $q, $callBack ) = @_; while( defined( my $argRef = $q->dequeue ) ) { ## GetFibby( @$argRef ); $callBack->( @$argRef ); } return; } ## end sub JobWorker sub GetFibby { my( $chunkId ) = @_; use autodie qw/ open close /; open my( $outfh ), '>', "fibojob-$chunkId.txt"; print $outfh "\t", fibonacci( $_ ) for 1 .. 10; print $outfh "\n"; close $outfh; } ## end sub GetFibby sub JobAdder { my( $q, $maxJobs, $inputs ) = @_; while( @$inputs ) { AddJob( $q, shift @$inputs ); } SignalNoMoreJobs( $q, $maxJobs ); } ## end sub JobAdder sub SignalNoMoreJobs { my( $q, $maxJobs ) = @_; $q->enqueue( undef ) for 1 .. $maxJobs; return; } ## end sub SignalNoMoreJobs sub AddJob { my $q = shift; $q->enqueue( [@_] ); return; } ## end sub AddJob sub fibonacci { my $n = shift; return undef if $n < 0; my $f; if( $n == 0 ) { $f = 0; } elsif( $n == 1 ) { $f = 1; } else { $f = fibonacci( $n - 1 ) + fibonacci( $n - 2 ); } return $f; } ## end sub fibonacci