package PoeSubs; use strict; use warnings; use Data::Dumper; use POE qw(Wheel::Run Filter::Reference); # sub run_tasks #{{{ sub run_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( $handle_task_sub ) = delete $ARGS{handle_task_sub}; my( $start_tasks ) = get_start_tasks( %ARGS ); no warnings 'redefine'; local *POE::Kernel::_warn = sub {} if not $debug; POE::Session->create ( inline_states => { _start => $start_tasks, next_task => $start_tasks, task_result => sub { $handle_task_sub->( $_[ARG0] ); }, task_done => \&PoeSubs::handle_task_done, task_debug => \&PoeSubs::handle_task_stderr, } ); # Run until there are no more tasks. POE::Kernel->run(); my( $reduce_sub ) = delete $ARGS{reduce_sub}; if( 'CODE' eq ref $reduce_sub ) { &$reduce_sub; } } #}}} # sub get_start_tasks #{{{ sub get_start_tasks { my( %ARGS ) = @_; my( $debug ) = $ARGS{debug}; my( @tasks ) = @{ $ARGS{tasks} }; my( $max_concurrent_tasks ) = $ARGS{max_concurrent_tasks} || 1; my( $run_task_sub ) = $ARGS{run_task_sub}; return sub { my $heap = $_[HEAP]; while ( keys( %{ $heap->{task} } ) < $max_concurrent_tasks ) { my $next_task = shift @tasks; last unless defined $next_task; print "Starting task for $next_task...\n" if $debug; my $task = POE::Wheel::Run->new ( # Program => sub { $run_task_sub->( $next_task ) }, Program => sub { get_program_sub( $run_task_sub )->( $next_task ) }, StdoutFilter => POE::Filter::Reference->new(), StdoutEvent => "task_result", StderrEvent => "task_debug", CloseEvent => "task_done", ); $heap->{task}->{ $task->ID } = $task; } }; } #}}} # sub get_program_sub #{{{ sub get_program_sub { my( $run_task_sub ) = @_; return sub { my( $task ) = @_; # warn '$task = ', $task; my( $filter ) = POE::Filter::Reference->new(); my( $task_output ) = $run_task_sub->( $task ); # warn Dumper( $task_output ); my $output = $filter->put( [ $task_output ] ); print @$output; }; } #}}} # sub handle_task_stderr #{{{ # Catch and display information from the child's STDERR. This was # useful for debugging since the child's warnings and errors were not # being displayed otherwise. sub handle_task_stderr { my $stderr = $_[ARG0]; # print "Debug: $stderr\n"; warn "handle_task_stderr: $stderr\n"; } #}}} # sub handle_task_done #{{{ # The task is done. Delete the child wheel, and try to start a new # task to take its place. sub handle_task_done { my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ]; delete $heap->{task}->{$task_id}; $kernel->yield("next_task"); } #}}} 1; __END__ #### #!/usr/bin/perl use strict; use warnings; use Data::Dumper; use PoeSubs; use constant MAX_CONCURRENT_TASKS => 100; # use constant TASKS => qw( 4 10 3 7 ); my( $run_task_sub, $handle_task_sub, $reduce_sub ) = get_subs(); # for( 1..20 ) { PoeSubs::run_tasks( # debug => 1, tasks => [ 1..100 ], max_concurrent_tasks => MAX_CONCURRENT_TASKS, run_task_sub => $run_task_sub, handle_task_sub => $handle_task_sub, reduce_sub => $reduce_sub, ); # } exit 0; sub get_subs { # Gets the task as the first argument # Should return a reference of some type my( $run_task_sub ) = sub { my( $task ) = @_; my( $sleep ) = int( rand 4 ) + 1; sleep( $sleep ); return { task => $task, sleep => $sleep, message => sprintf( '[Task %s: slept %s seconds]', $task, $sleep ) }; }; # Gets the reference returned by run_task as the first argument my( $totals ) = 0; my( $handle_task_sub ) = sub { my( $result ) = @_; $totals += $result->{sleep}; printf( "{Task: %s, Got message: %s}\n", $result->{task}, $result->{message} ); }; # Prints out the totals my( $reduce_sub ) = sub { print '$totals = ', $totals, "\n"; }; return( $run_task_sub, $handle_task_sub, $reduce_sub ); }