Marseille07 has asked for the wisdom of the Perl Monks concerning the following question:

Hi, I'm having troble handling timeouts in the following code. I'm trying to implement Coro::Channel with worker threads where each thread simply does tcp_connect on bunch of hosts. However, I'm having trouble going to the next iteration of the worker loop from the AnyEvent->timer call back. I tried next (which doesn't work within the loop), next with label on the loop (didn't work because the Event Loop didn't recognize the label) so forth. I'm sure there is a way to do this that I'm not aware, otherwise it makes it very difficult to implement timeouts from a worker-thread using AnyEvent. Any thoughts or insights are greatly appreciated. Thanks.
#!/usr/local/bin/perl use strict; use warnings; use AnyEvent; use AnyEvent::Socket; use Coro; use Coro::AnyEvent; my $input = new Coro::Channel 5; foreach my $i ( 1 .. 10 ) { async { while (1) { my $host = $input->get(); print "$i got $host\n"; tcp_connect $host, 65432, Coro::rouse_cb; my ($fh) = Coro::rouse_wait; next unless $fh; print "$i got socket for $host\n"; my $t; $t = AnyEvent->timer ( after => 10, cb => sub { undef $t; print "$host: timed out\n"; retu +rn; } # ideally I want to move onto the next iteration of the parent + loop ); } }; } my @hosts = qw( host1.server.com host2.server.com ); foreach ( @hosts ) { $input->put( $_ ); }

Replies are listed 'Best First'.
Re: Coro::Channel, worker thread and timeout
by Corion (Patriarch) on Sep 12, 2011 at 16:11 UTC

    If you want to cancel a certain Coro thread, you will need to find out what primitives Coro has for cancelling a (Coro) thread. Personally, I would not use the synchronous tcp_connect, but a version that returns a guard. That way, you can at least cancel the connect by resetting the guard.

    Personally, I wouldn't use Coro explicitly but rather look at what AnyEvent provides as primitives to handle the socket multiplexing and cancellation.

      It turned out tcp_connect returns a guard object unless it is in the void context, so I played with it a little bit -- unfortunately, it is not allowing me to get out of the timer call back and then proceed with the worker thread. I was hoping there is a way to do that, but maybe after the Anyevent->timer CB gets invoked, there is no way back to the while(1) loop -- which I think was strange but it may just be how it is.
      Thank you, this is definitely a good point. I'll look into socket calls that utilizes a guard. Will post back with a result.
Re: Coro::Channel, worker thread and timeout
by zentara (Cardinal) on Sep 12, 2011 at 15:59 UTC
    I havn't played much with AnyEvent, but from my experience with Glib and Gtk2 and threads, the way to go in a thread is to use an "idle" callback when calling something from a thread. So I looked at AnyEvent, and there is this in the README
    # called when event loop idle (if applicable) my $w = AnyEvent->idle (cb => sub { ... }); my $w = AnyEvent->condvar; # stores whether a condition was fla +gged $w->send; # wake up current and all future recv's $w->recv; # enters "main loop" till $condvar gets ->send # use a condvar in callback mode: $w->cb (sub { $_[0]->recv });
    I would try an play with those 2 methods.

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
      Thanks for your response. The AnyEvent->idle is definitely an interesting one to explore when you want invoke callbacks while your event becomes idle. Maybe I'm missing something, but I wasn't certain how it would relate to AnyEvent->timer though -- my understanding is that the watcher can legitimately go off when my event loop is doing tcp_connect (and it is taking a long time) -- and I wasn't certain how to have the Worker thread proceed onto the next host.
        my understanding is that the watcher can legitimately go off when my event loop is doing tcp_connect (and it is taking a long time)

        I don't know for sure how the AnyEvent timer works, but I know that a hung IO operation, as you describe will totally block a thread from running as expected, even signals won't make it in. My guess is the AnyEvent event-loop is being blocked by the hung IO operation, it just isn't getting any cpu time to function.

        I was just looking at a few modules that my be what you need, Async-Interrupt and EV-Loop-Async and [IO-Async-Loop-AnyEvent.

        Those modules put the eventloop in a separate thread, and it says somewhere in the README's that AnyEvent is a convenience module, for cases when you don't need the full power of EV and EV-Loop-Async.

        You could do something yourself manually, create a separate thread that won't be blocked by another thread's hung IO. Here is a crude example:

        #!/usr/bin/perl -w use strict; use threads; use threads::shared; my $timer_go:shared = 0; my $worker = threads->create(\&worker); my $timer = threads->create(\&timer,$worker); print "hit enter to start\n"; <>; $timer_go=1; <>; $timer->join(); $worker->join(); sub timer { my $worker = shift; while(1){ if($timer_go){ my $count = 0; while(1){ $count++; if($count > 5){ print "timed out\nHit enter to finish\n"; # Send a signal to a thread $worker->kill('INT'); return; } sleep 1; print "timing $count\n"; } }else{sleep 1} } } sub worker { $|++; $SIG{INT} = sub{ warn "Caught Zap!\n"; sleep 1; exit; }; my $worker_pid = open( READ, "top -d 1 -b |" ); print "\t$worker_pid\n"; while(<READ>){} return; }

        I'm not really a human, but I play one on earth.
        Old Perl Programmer Haiku ................... flash japh
Re: Coro::Channel, worker thread and timeout
by Tanktalus (Canon) on Sep 14, 2011 at 02:23 UTC

    Rough thoughts here. First off, the code you have will go and create 10 worker threads. I'm not sure why you need to do that. If you're just re-using threads because their overhead is too much, try async_pool. In fact, that's about what I'd do.

    Second, inside your async_pool thread, you capture $Coro::current, and use that in a closure for your timer to wake it up or terminate it. Terminating it is fine - Coro will create a new thread for the pool later if required. Unfortunately, I'm not really sure where that timer is going, since you don't even have a comment explaining what goes on after that.

    So, something like this:

    async { while (my $host = $input->get()) # note that putting false will caus +e this to exit - probably a good thing { print "got $host\n"; async_pool { tcp_connect $host, 65432, Coro::rouse_cb; my ($fh) = Coro::rouse_wait; return unless $fh; print "got socket for $host\n"; my $coro = $Coro::current; my $t = AnyEvent->timer( after => 10, cb => sub { print "$host: +timed out\n"; $coro->cancel } ); # do stuff? } # async pool } }
    Or something like that. Of course, this can get far more threads going at once, but is still likely to be handled easily, unless perhaps you go up to hundreds or maybe thousands of hosts that you're dealing with, and even then, it's more a network issue than a Coro issue.

      I think your suggestion is more elegant than what I had originally with the 10 worker threads. And $Coro::current->cancel appears like a sure way to get rid of a coro on timeout, so I'll try quick samples and see what happens. Thank you for sharing a thorough example.