sub handle_task_result { my ($result) = $_[ARG0]; $LOG->debug("Result: $result"); } sub handle_task_debug { my ($result) = $_[ARG0]; $LOG->debug("Debug: $result"); } sub handle_task_done { my ($kernel, $heap, $task_id) = @_[ KERNEL, HEAP, ARG0 ]; delete $heap->{task}->{$task_id}; $kernel->yield("next_task"); } sub Run_Job { my ($task) = shift; my ($filter) = POE::Filter::Reference->new(); # ... stuff ... return; } sub start_tasks { my ($heap) = $_[HEAP]; while ( keys( %{ $heap->{task} } ) < MAX_CONCURRENT_TASKS ) { # Returns an array_ref from the select call to the job table my $next_task = Get_Next_Task(); # If we don't have a task, sleep and one will come do { sleep 10; next; } unless exists $next_task->{'id'}; $LOG->debug("Starting task for $next_task->{'url'}...\n"); my ($task) = POE::Wheel::Run->new( Program => sub { Run_Job($next_task) }, StdoutFilter => POE::Filter::Reference->new(), StdoutEvent => "task_result", StderrEvent => "task_debug", CloseEvent => "task_done", ); $heap->{task}->{ $task->ID } = $task; } } # Setup the POE session if we are in daemon mode POE::Session->create( inline_states => { _start => \&start_tasks, next_task => \&start_tasks, task_result => \&handle_task_result, task_done => \&handle_task_done, task_debug => \&handle_task_debug, } ); POE::Kernel->run();