wl2776 has asked for the wisdom of the Perl Monks concerning the following question:

I am writing a service, obtaining file names from an external source (other application is assumed to upload a file and send its name to my service in the HTTP GET request) and feeding the files to the processing engine. This engine sometimes can become busy. I would like to repeat the processing instead of dropping the file.

I use POE::Component::Server::HTTPServer to serve HTTP requests. Its handlers post messages with file names to passive POE::Component::JobQueue. The sessions, created by the workers of the queue, use POE::Wheel::Run::Win32, run another executable and capture its output.

The question is - where should I put the call to post event, when that executable writes "ENGINE IS BUSY"?

The code below works only first time.
It doesn't stop a created worker session - I don't see the messages, printed by the subroutine, assigned to _stop (scroll down to sub make_a_worker).
If I comment the call to POE::Kernel->post in the sub worker_closed, the session is destroyed, and the service is ready to process next file.

use POE qw( Wheel::Run::Win32 Filter::Line); use POE::Component::Server::HTTPServer; use POE::Component::Server::HTTPServer::Handler; use POE::Component::JobQueue; use POE::Filter::Reference; use File::Basename; use Data::Dump qw(dump); use Cwd; my $port=8080; my $max_jobs=1; sub _start { my ( $kernel, $heap ) = @_[ KERNEL, HEAP ]; $heap->{httpserver} = POE::Component::Server::HTTPServer->new( port=>$port, handlers => [ "/add" => \&add_task] ); $heap->{httpserver}->create_server(); POE::Component::JobQueue->spawn ( Alias => 'passive', WorkerLimit => $max_jobs, Worker => \&make_a_worker, Passive => { } ); } sub add_task { my $context = shift; # request has a form http://server:port/add?file/name.mpg $context->{request}->uri=~/\/add\?(.*)/; my @job_params=($1); POE::Kernel->post( 'passive', 'enqueue', 'postback', @job_params ); $context->{response}->code(200); $context->{response}->content( "<HTML><BODY><H1>add task ".join(" ",@job_params)." </H1></BODY></HTM +L>\n" ); return H_FINAL; } sub make_a_worker { my ($postback, $message) = @_; POE::Session->create( inline_states => { _start => \&worker_start, _stop => sub {my $session=$_[SESSION]; print "session ",$session->ID," stop\n";}, worker_child_stdout => \&worker_child_stdout, worker_closed=> \&worker_closed, sig_child => \&sig_child, }, args => [ @_ ] ); } sub worker_start { my ($kernel,$session,$heap,@args) = @_[KERNEL, SESSION, HEAP, ARG0 .. + $#_]; $heap->{postback} =$args[0]; $heap->{filename} =$args[1]; $heap->{cmdline} = POE::Wheel::Run::Win32->new( Program => executable, ProgramArgs => \@args, StdoutEvent => "worker_child_stdout", StderrEvent => "worker_child_stdout", CloseEvent => "worker_closed", StdioFilter => POE::Filter::Line->new(), StderrFilter => POE::Filter::Line->new() ); } sub worker_child_stdout { my ( $heap, $stdout ) = @_[ HEAP, ARG0 ]; # this code sets $heap->{results}="BUSY" # when captures this line from the executable's output } ## NOW THE CLOSEST TO THE QUESTION PART sub worker_closed { my ( $kernel, $heap ) = @_[ KERNEL, HEAP ]; my $postback=delete $heap->{postback}; my @results=($heap->{results}); # skipped code, deleting the intermediate created files my $d=delete $heap->{cmdline}; $postback->(@results); if($results[0] eq "BUSY") { print "worker_closed: engine busy, repeat\n"; my @job_params=($heap->{filename}); POE::Kernel->post( 'passive', 'enqueue', 'postback', @job_params); } }

Replies are listed 'Best First'.
Re: How to enqueue again from the job in POE::Component::JobQueue?
by rcaputo (Chaplain) on Mar 05, 2009 at 14:09 UTC

    First, you should use POE::Kernel's sig_child() method to register a SIGCHLD handler for POE::Wheel::Run children. This will ensure your child processes get reaped, otherwise your machine will clog up with them.

    sub worker_start { my ($kernel,$session,$heap,@args) = @_[KERNEL, SESSION, HEAP, ARG0 .. + $#_]; $heap->{postback} =$args[0]; $heap->{filename} =$args[1]; $heap->{cmdline} = POE::Wheel::Run::Win32->new(...); $kernel->sig_child($heap->{cmdline}->PID, "sig_child"); }

    The problem with reposting the job request from a worker is that the worker becomes the requester. It will be kept alive until the reposted job is done, and it will receive the results of that job. Not what you want.

    As I see it, you have two options. First, you could return a "busy, try again" sort of response in this case. The requesting session would then need to repost the request itself. This is perhaps the cleanest generic design, since the application should have final say over policy.

    Your second option is to contact the author and ask for a "retry" feature where a worker can reënqueue a job when necessary. If you can provide a patch and testcase, that would even be better. See http://rt.cpan.org/ for contact instructions.

      > First, you could return a "busy, try again" sort of response in this case.
      > The requesting session would then need to repost the request itself.

      Ok, this is the option.

      I am new to POE. Could you be more specific? How and where should I get this reply?

        The reply should be sent back to the session that post()'ed the request. The reply will be returned via a "postback" event, which the worker will "post back" to you. The purpose of the third parameter in the request post() is to tell POE::Component::JobQueue which of the requesting session's handlers will accept the response.

Re: How to enqueue again from the job in POE::Component::JobQueue?
by locked_user sundialsvc4 (Abbot) on Mar 05, 2009 at 17:08 UTC

    I am certainly not a POE-person. Yet it seems to me that a good metaphor for what you're wanting is... that the worker, having dequeued a request, basically realizes that he can't process this request, and so it wants to put this request right back onto the queue where it came from:   not “another request just like the old one,” but “this very request.” The request-ID, the originator, the destination, everything ... is therefore unchanged.

Re: How to enqueue again from the job in POE::Component::JobQueue?
by wl2776 (Novice) on Mar 05, 2009 at 14:25 UTC
    Also I don't understand the 3rd parameter 'postback' in the call
    POE::Kernel->post( 'passive', 'enqueue', 'postback', @job_params);
    I have copied it from the POD, however, could not find any explanations
Re: How to enqueue again from the job in POE::Component::JobQueue?
by wl2776 (Novice) on Mar 05, 2009 at 14:15 UTC
    Looks like this is impossible: "The act of enqueuing a message keeps the sender alive at least until the message is delivered." (from POE::Kernel)
Re: How to enqueue again from the job in POE::Component::JobQueue?
by wl2776 (Novice) on Mar 05, 2009 at 15:58 UTC
    I have solved this problem of resubmitting jobs in another way.

    Just have created a global array, where HTTPServer pushes filenames, and workers get the names with shift.

    Of course, this required switching from passive workers to active ones.

    Still very interested in the third parameter 'postback' in call to POE::Kernel->post