This is a script that uses POE to run a series of tasks in parallel. It's "easy" because the code to run the tasks is simply defined as subroutines and executed. One in the parent, one in the children. Behold!
package PoeSubs; use strict; use warnings; use Data::Dumper; use POE qw(Wheel::Run Filter::Reference); # sub run_tasks #{{{ sub run_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( $handle_task_sub ) = delete $ARGS{handle_task_sub}; my( $start_tasks ) = get_start_tasks( %ARGS ); no warnings 'redefine'; local *POE::Kernel::_warn = sub {} if not $debug; POE::Session->create ( inline_states => { _start => $start_tasks, next_task => $start_tasks, task_result => sub { $handle_task_sub->( $_[ARG0] ); }, task_done => \&PoeSubs::handle_task_done, task_debug => \&PoeSubs::handle_task_stderr, } ); # Run until there are no more tasks. POE::Kernel->run(); my( $reduce_sub ) = delete $ARGS{reduce_sub}; if( 'CODE' eq ref $reduce_sub ) { &$reduce_sub; } } #}}} # sub get_start_tasks #{{{ sub get_start_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( @tasks ) = @{ $ARGS{tasks} }; my( $max_concurrent_tasks ) = $ARGS{max_concurrent_tasks} || 1; my( $run_task_sub ) = $ARGS{run_task_sub}; return sub { my $heap = $_[HEAP]; while ( keys( %{ $heap->{task} } ) < $max_concurrent_tasks ) { my $next_task = shift @tasks; last unless defined $next_task; print "Starting task for $next_task...\n" if $debug; my $task = POE::Wheel::Run->new ( # Program => sub { $run_task_sub->( $next_task ) }, Program => sub { get_program_sub( $run_task_sub )->( $ +next_task ) }, StdoutFilter => POE::Filter::Reference->new(), StdoutEvent => "task_result", StderrEvent => "task_debug", CloseEvent => "task_done", ); $heap->{task}->{ $task->ID } = $task; } }; } #}}} # sub get_program_sub #{{{ sub get_program_sub { my( $run_task_sub ) = @_; return sub { my( $task ) = @_; # warn '$task = ', $task; my( $filter ) = POE::Filter::Reference->new(); my( $task_output ) = $run_task_sub->( $task ); # warn Dumper( $task_output ); my $output = $filter->put( [ $task_output ] ); print @$output; }; } #}}} # sub handle_task_stderr #{{{ # Catch and display information from the child's STDERR. This was # useful for debugging since the child's warnings and errors were not # being displayed otherwise. sub handle_task_stderr { my $stderr = $_[ARG0]; # print "Debug: $stderr\n"; warn "handle_task_stderr: $stderr\n"; } #}}} # sub handle_task_done #{{{ # The task is done. Delete the child wheel, and try to start a new # task to take its place. sub handle_task_done { my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ]; delete $heap->{task}->{$task_id}; $kernel->yield("next_task"); } #}}} 1; __END__
#!/usr/bin/perl use strict; use warnings; use Data::Dumper; use PoeSubs; use constant MAX_CONCURRENT_TASKS => 100; # use constant TASKS => qw( 4 10 3 7 ); my( $run_task_sub, $handle_task_sub, $reduce_sub ) = get_subs(); # for( 1..20 ) { PoeSubs::run_tasks( # debug => 1, tasks => [ 1..100 ], max_concurrent_tasks => MAX_CONCURRENT_TASKS, run_task_sub => $run_task_sub, handle_task_sub => $handle_task_sub, reduce_sub => $reduce_sub, ); # } exit 0; sub get_subs { # Gets the task as the first argument # Should return a reference of some type my( $run_task_sub ) = sub { my( $task ) = @_; my( $sleep ) = int( rand 4 ) + 1; sleep( $sleep ); return { task => $task, sleep => $sleep, message => sprintf( ' +[Task %s: slept %s seconds]', $task, $sleep ) }; }; # Gets the reference returned by run_task as the first argument my( $totals ) = 0; my( $handle_task_sub ) = sub { my( $result ) = @_; $totals += $result->{sleep}; printf( "{Task: %s, Got message: %s}\n", $result->{task}, $res +ult->{message} ); }; # Prints out the totals my( $reduce_sub ) = sub { print '$totals = ', $totals, "\n"; }; return( $run_task_sub, $handle_task_sub, $reduce_sub ); }
I'm just looking to get some feedback. Is it idiotic? Is it cool? Is it a CPAN module already?
I tend to like the way that all you need to do is define a few subs and let er rip. I'm a big fan of easy interfaces.
Thank you, that is all.
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re: Easily parallelize program execution
by BrowserUk (Patriarch) on Aug 24, 2006 at 01:57 UTC | |
by bennymack (Pilgrim) on Aug 24, 2006 at 02:43 UTC | |
by BrowserUk (Patriarch) on Aug 24, 2006 at 02:58 UTC | |
by pemungkah (Priest) on Sep 12, 2006 at 23:15 UTC | |
|
Re: Easily parallelize program execution
by diotalevi (Canon) on Aug 24, 2006 at 03:49 UTC | |
by rhesa (Vicar) on Aug 24, 2006 at 08:26 UTC |