mlin has asked for the wisdom of the Perl Monks concerning the following question:

While I'm trying to the ForkManager to do the parallel tasks. I find that I should pass the N data files to program A using a pipe. The form is like this:
open $h, "| program_A" or die ...; foreach (@data_files) { print $h ...; print $h ...; ... } close($h);
If I want to deal with these files parallelly, like using 4 processors, I think I must establish 4 pipes first(right?), then how to tell the perl to use these pipes automatically? I'm a little confused about the progress. Could you please tell me some schemes to solve this problem and give me some explanation? Thanks a lot! ----- complement: If while the program A runs, it usually print some information or warnings to the screen? Will it be a trouble for the forked processes? or I must throw the output of A, like:
foreach (@data_files) { open $h, "| program_A 2>& /dev/null" or die ...; print $h ...; print $h ...; ... } close($h);

Replies are listed 'Best First'.
Re: How to set pipe first and then use the forkmanager?
by BrowserUk (Patriarch) on Sep 22, 2016 at 09:22 UTC
    I think I must establish 4 pipes first(right?),

    Why? Why not fork first and have each fork establish its own pipe?

    Untested example:

    for my $file ( @data_files ) { $pm->fork and next; open my $h, "| ...." or die $!; print $h ...; ... $pm->finish; } $pm->wait;

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Because establish the pipe to the program may be a little bit time-consuming. So I would like to move the open pipe outside the foreach() in the serial type. Similarly I consider may be I should establish pipe first and then use fork.
        Presumably, you're forking because your input data files are very large. The time taken to open a pipe is then negligible compared to the time needed for processing your data.

        Don't forget that premature optimization is the source of almost all evil.

Re: How to set pipe first and then use the forkmanager?
by marioroy (Prior) on Sep 22, 2016 at 14:23 UTC

    Greetings,

    The following is a demonstration using MCE::Loop and MCE::Shared.

    MCE::Shared v1.805 or later is required if running via Perl5i on a Unix platform.

    use strict; use warnings; use MCE::Loop; use MCE::Shared; # Configure MCE to run with 4 workers. # A worker receives 1 item per iteration. MCE::Loop->init( max_workers => 4, chunk_size => 1, ); # Populate an array with test data. my @data_files = qw/ file1 file2 file3 file4 /; # Open a shared file handle to the external cat command. mce_open my $fh, "| cat" or die "open error: $!\n"; # Process the array in parallel. # Workers send data to the shared file handle. mce_loop { my $file = $_; printf $fh "wid: %d, name: %s\n", MCE->wid(), $file; } @data_files; # Close the shared file handle. close $fh;

    The sample code generates the following output. It may differ depending on which worker obtains data first.

    wid: 3, name: file1 wid: 2, name: file2 wid: 4, name: file3 wid: 1, name: file4

    On Windows, replace the mce_open line with the following. This will work if Cygwin is installed on the C: drive. The code works with Strawberry Perl, ActiveState Perl, and Cygwin Perl.

    mce_open my $fh, '| c:/cygwin/bin/cat.exe' or die "open error: $!\n";

    Warm regards, Mario.

      I'm not familiar with MCE module, maybe I'll test it later.Thanks!
Re: How to set pipe first and then use the forkmanager?
by tybalt89 (Monsignor) on Sep 22, 2016 at 13:56 UTC

    This is my guess as to what you want...

    I worry about data from the four forked processes arriving interleaved on the pipe, maybe even in the middle of each other's lines :(

    This solution caches each fork's response until complete and then forwards it to program_A in one piece. I wasn't sure on how to do that using ForkManager.

    For testing's sake I combined program_A with the forking program.

    #!/usr/bin/perl # http://perlmonks.org/?node_id=1172353 use strict; use warnings; use IO::Select; $| = 1; my @data_files = qw( one two three four five six seven ); my $maxchildren = 4; my %data_for_handles; my $fh_A; if( open $fh_A, '|-' ) { # parent warn "pipe opened\n"; } else { # child print "program_A started\n"; print while <STDIN>; print "program_A ended\n"; exit; } my $sel = IO::Select->new; while( @data_files or $sel->count ) { while( @data_files and $sel->count < $maxchildren ) { my $file = shift @data_files; if( open my $fh, '-|' ) { # parent $sel->add($fh); } else { # child $| = 1; warn "child $file started\n"; print "$file\n"; sleep 1; print "$file\n"; sleep 1; print "$file\n"; exit; } } if( $sel->count > 0 ) { for my $fh ($sel->can_read) { if(0 < sysread $fh, my $buffer, 16 * 1024 ) { $data_for_handles{$fh} .= $buffer; } else { print {$fh_A} delete $data_for_handles{$fh}; $sel->remove($fh); } } } } close $fh_A;
      It's a little bit complicated for me now. I'll learn about IO module first. Thanks!