This is a script that uses POE to run a series of tasks in parallel. It's "easy" because the code to run the tasks is simply defined as subroutines and executed. One in the parent, one in the children. Behold!

package PoeSubs; use strict; use warnings; use Data::Dumper; use POE qw(Wheel::Run Filter::Reference); # sub run_tasks #{{{ sub run_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( $handle_task_sub ) = delete $ARGS{handle_task_sub}; my( $start_tasks ) = get_start_tasks( %ARGS ); no warnings 'redefine'; local *POE::Kernel::_warn = sub {} if not $debug; POE::Session->create ( inline_states => { _start => $start_tasks, next_task => $start_tasks, task_result => sub { $handle_task_sub->( $_[ARG0] ); }, task_done => \&PoeSubs::handle_task_done, task_debug => \&PoeSubs::handle_task_stderr, } ); # Run until there are no more tasks. POE::Kernel->run(); my( $reduce_sub ) = delete $ARGS{reduce_sub}; if( 'CODE' eq ref $reduce_sub ) { &$reduce_sub; } } #}}} # sub get_start_tasks #{{{ sub get_start_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( @tasks ) = @{ $ARGS{tasks} }; my( $max_concurrent_tasks ) = $ARGS{max_concurrent_tasks} || 1; my( $run_task_sub ) = $ARGS{run_task_sub}; return sub { my $heap = $_[HEAP]; while ( keys( %{ $heap->{task} } ) < $max_concurrent_tasks ) { my $next_task = shift @tasks; last unless defined $next_task; print "Starting task for $next_task...\n" if $debug; my $task = POE::Wheel::Run->new ( # Program => sub { $run_task_sub->( $next_task ) }, Program => sub { get_program_sub( $run_task_sub )->( $ +next_task ) }, StdoutFilter => POE::Filter::Reference->new(), StdoutEvent => "task_result", StderrEvent => "task_debug", CloseEvent => "task_done", ); $heap->{task}->{ $task->ID } = $task; } }; } #}}} # sub get_program_sub #{{{ sub get_program_sub { my( $run_task_sub ) = @_; return sub { my( $task ) = @_; # warn '$task = ', $task; my( $filter ) = POE::Filter::Reference->new(); my( $task_output ) = $run_task_sub->( $task ); # warn Dumper( $task_output ); my $output = $filter->put( [ $task_output ] ); print @$output; }; } #}}} # sub handle_task_stderr #{{{ # Catch and display information from the child's STDERR. This was # useful for debugging since the child's warnings and errors were not # being displayed otherwise. sub handle_task_stderr { my $stderr = $_[ARG0]; # print "Debug: $stderr\n"; warn "handle_task_stderr: $stderr\n"; } #}}} # sub handle_task_done #{{{ # The task is done. Delete the child wheel, and try to start a new # task to take its place. sub handle_task_done { my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ]; delete $heap->{task}->{$task_id}; $kernel->yield("next_task"); } #}}} 1; __END__
And the test script.
#!/usr/bin/perl use strict; use warnings; use Data::Dumper; use PoeSubs; use constant MAX_CONCURRENT_TASKS => 100; # use constant TASKS => qw( 4 10 3 7 ); my( $run_task_sub, $handle_task_sub, $reduce_sub ) = get_subs(); # for( 1..20 ) { PoeSubs::run_tasks( # debug => 1, tasks => [ 1..100 ], max_concurrent_tasks => MAX_CONCURRENT_TASKS, run_task_sub => $run_task_sub, handle_task_sub => $handle_task_sub, reduce_sub => $reduce_sub, ); # } exit 0; sub get_subs { # Gets the task as the first argument # Should return a reference of some type my( $run_task_sub ) = sub { my( $task ) = @_; my( $sleep ) = int( rand 4 ) + 1; sleep( $sleep ); return { task => $task, sleep => $sleep, message => sprintf( ' +[Task %s: slept %s seconds]', $task, $sleep ) }; }; # Gets the reference returned by run_task as the first argument my( $totals ) = 0; my( $handle_task_sub ) = sub { my( $result ) = @_; $totals += $result->{sleep}; printf( "{Task: %s, Got message: %s}\n", $result->{task}, $res +ult->{message} ); }; # Prints out the totals my( $reduce_sub ) = sub { print '$totals = ', $totals, "\n"; }; return( $run_task_sub, $handle_task_sub, $reduce_sub ); }

I'm just looking to get some feedback. Is it idiotic? Is it cool? Is it a CPAN module already?

I tend to like the way that all you need to do is define a few subs and let er rip. I'm a big fan of easy interfaces.

Thank you, that is all.

Replies are listed 'Best First'.
Re: Easily parallelize program execution
by BrowserUk (Patriarch) on Aug 24, 2006 at 01:57 UTC

    How about showing it doing something useful? That is, performing some real processing rather than the make-work of the test script.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      I'm afraid I can't think of any other examples off the top of my head that will run anywhere but on my system.

      I like the way this example illustrates a couple of principals. It shows that the script only takes as long as the longest process. It also demonstrates how one can aggregate the results of each process.

      I don't think it takes too much imagination to see how this can be useful. For instance, it could be used to connect to 100 servers, grep their access log for a pattern, return the line count, then display the total line count. Or, on a 4 processor system, look through a directory of gzipped files 4 files at a time, do something useful, then return the aggregate result.

      It's not limited to aggregation either. One could create a complex data structure also. For instance, building upon the gzip example, look for a particular query string parameter in a directory of gzipped log files and generate a hash of the values of the params and the count of times they appear.

      Ok so that last one was kind of aggregation too. Of course, the reduce_sub is optional. One can simply print the results of each task...

        Thankyou for taking my request seriously. I agree that it is difficult to come up with good examples that are simple enough to allow the unfamiliar to see what's going on but complex enough to demonstrate the api.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
        I'll try using it to parallelize some testing we're doing and see if it works for that. Should be a big runtime win if so.
Re: Easily parallelize program execution
by diotalevi (Canon) on Aug 24, 2006 at 03:49 UTC

    If you're running POE, it's not really parallel. You've a single "thread" of execution that's switching between each process. You will not be taking advantage of multiple cores, CPUs, or machines. You need to be using something that actually spawns threads or forks to get something that is parallel and what you've shown doesn't appear to be anything like that.

    ⠤⠤ ⠙⠊⠕⠞⠁⠇⠑⠧⠊

      You need to be using something that actually spawns threads or forks
      That's exactly what POE::Wheel::Run does.