hfi has asked for the wisdom of the Perl Monks concerning the following question:

Hi, what I want to do is write some daemon with a defined count of worker processes. To keep things readable I will only post a minimal example of the problem I'm facing (some kind of worker queue):

!/usr/bin/perl use strict; use warnings; use Async; my $workercount = 10; my @workerqueue; for my $i (1..$workercount) { my $proc = Async->new( sub { sleep($i*3); print "$i\n"; } ); print "--> enqueue $proc->{PID}\n"; push @workerqueue, $proc; } while (1) { sleep(1); my @unfinished; while (my $proc = shift @workerqueue) { if ($proc and $proc->ready) { print "--> $proc->{PID} is ready\n"; if (my $e = $proc->error) { print "Something went wrong. The error was: $e\n"; } undef $proc; } else { push @unfinished, $proc; } } push @workerqueue, @unfinished; ## START refill queue block ## my $free = $workercount - @workerqueue; for my $i (1..$free) { my $proc = Async->new( sub { sleep($i*3); print "$i new\n"; } +); print "--> enqueue $proc->{PID}\n"; push @workerqueue, $proc; } ## END refill queue block ## print "mainloop\n"; }

Actually if I comment out the "refill queue block" everything works as expected, the output looks something like:

--> enqueue 28304 --> enqueue 28305 --> enqueue 28306 --> enqueue 28307 --> enqueue 28308 --> enqueue 28309 --> enqueue 28310 --> enqueue 28311 --> enqueue 28312 --> enqueue 28313 mainloop mainloop 1 --> 28304 is ready mainloop mainloop mainloop 2 --> 28305 is ready mainloop mainloop mainloop 3 --> 28306 is ready mainloop mainloop mainloop 4 --> 28307 is ready mainloop mainloop mainloop 5 --> 28308 is ready mainloop mainloop mainloop 6 --> 28309 is ready mainloop mainloop mainloop 7 --> 28310 is ready ...

But as soon as I include the refill queue block it looks like:

--> enqueue 28319 --> enqueue 28320 --> enqueue 28321 --> enqueue 28322 --> enqueue 28323 --> enqueue 28324 --> enqueue 28325 --> enqueue 28326 --> enqueue 28327 --> enqueue 28328 mainloop mainloop 1 --> 28319 is ready --> enqueue 28329 mainloop mainloop mainloop 2 1 new --> 28320 is ready --> 28327 is ready --> 28328 is ready --> 28329 is ready --> enqueue 28330 --> enqueue 28331 --> enqueue 28332 --> enqueue 28333 mainloop --> 28321 is ready --> 28322 is ready --> 28323 is ready --> 28324 is ready --> 28325 is ready --> 28326 is ready --> enqueue 28334 --> enqueue 28335 --> enqueue 28336 --> enqueue 28337 --> enqueue 28338 --> enqueue 28339 mainloop mainloop 1 new --> 28330 is ready --> enqueue 28340 ...

So what is happening here? As far as my debugging got me, I can tell that for any reason all running processes are considered to be ->ready() as soon as I start a new process out of the main loop. This leads to cleanup and therefore process being killed, so that it never gets a chance to print its output, after that a new process is created in the workerqueue. Since all previous processes are considered ready at a time this leads to the "--> enqueue ..." spam.

Now the really interesting question: What am I doing wrong so that Async module considers my processes been ready?

Note: I tried reading through the Async module and as for lets say 90% I get what it is doing. I tried commenting out the following line, but it didn't change anything:

kill 9 => $pid; # I don't care.

EDIT: Refering to http://www.perlmonks.org/?node_id=966939 , the example posted by Tanktalus sounds absolutely like what I need. However looking at http://perldoc.perl.org/perlthrtut.html and considering the hint of arpad.szasz: "It seems You are mixing old-style and deprecated threads model(Thread module) with the new ithreads thread model..." I'm wondering. Isn't Tanktalus' example therefore also using the depracated Version of Threads? Is there an equivalent implementaion with the newer ithreads? And could this code be modified to share (or better return to) data with its parent process?

EDIT 2: I think I will now use something like this (unless anyone has a good reason not to do so):

#!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; use Data::Dumper; my %state :shared; my $workercount = 10; my $run = 1; $SIG{TERM} = sub { $run = 0; }; $SIG{INT} = sub { $run = 0; }; my @workerqueue; for my $i (1..$workercount) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } while ($run) { sleep(1); my @unfinished; while (my $thr = shift @workerqueue) { my $id = $thr->tid(); if ($thr and $state{$id} eq 'finished') { print "--> $id is ready\n"; my $x = $thr->join(); delete $state{$id}; print Dumper $x; # do something useful with the data } else { push @unfinished, $thr; } } push @workerqueue, @unfinished; ## START refill queue block ## my $free = $workercount - @workerqueue; for my $i (1..$free) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } ## END refill queue block ## print "mainloop\n"; } my @threads = threads->list(); foreach my $thr (@threads) { $thr->join(); } sub mysub { my $i = shift; $i *= 3; sleep $i; my $id = threads->tid(); print "$id : slept for $i sec\n"; my $x = { 'ID' => $id, 'a' => [1,2,3], 'B' => { 'a' => 'A', }, }; $state{$id} = 'finished'; return $x; }

Replies are listed 'Best First'.
Re: Problems using module Async
by Anonymous Monk on Nov 29, 2015 at 13:56 UTC

        Thanks for the answer. :) Yes I saw the correction.

        I've had a deeper look into perlthrtut meanwhile, too. I think best for me would be a threads->create in combination with detach (since I want a nonblocking behavior for parents's mainloop, where join and also Thread::Queues's dequeue if theres nothing in queue would cause the process to wait, I have tasks that differ a lot in execution time so I want to ensure parent's always in control of what's going on and not "hang" for minutes waiting for child) and share a hash with threads::shared where the thread can report its state and maybe return (simple/non-reference) values to the parent.

        Can you confirm that, or is there a better way?

        Awww. I just encountered another problem with that. Since these threads are threads and not forks, there are no PIDs, I could use to identify the thread (I would have used that PIDs in the above named shared hash with thread state etc.). Is there some ID for threads? I know I could use the object itself as key in the hash since I can retrieve it in child via threads->self(). But it if should be something, that is not recycled so fast. I mean if the thread is finished I need to depend on no new thread by coincidence gets the same "ID" before the parent gets a chance to check the hash for the status and returns. This should never take longer than lets say worst case a few seconds (more likely ms) so PIDs where a nice ID for that job.

Re: Problems using module Async
by Anonymous Monk on Nov 29, 2015 at 15:18 UTC
    Note: I tried reading through the Async module and as for lets say 90% I get what it is doing. I tried commenting out the following line, but it didn't change anything:
    kill 9 => $pid; # I don't care.
    You should pay more attention; the output does change.

      Oops. You're right. In fact I tried that for the original code (using a very similar code as the above example shows in context of a Proc::Daemon fork) where I couldn't see any change. And since I could reproduce the very same behavior with the above code, I didn't do the check again for the example code. Thanks for that hint!

      However I think I was going the wrong way anyways, I will recode the queuing system as ithreads.

Re: Problems using module Async
by hfi (Novice) on Dec 10, 2015 at 11:53 UTC

    Hi, I wrote the multithreading using ithreads now. Everything seems to work great by now, but I have another question on ithreads:

    A friend of mine told me that perl is only "simulating" threads so that every thread actually runs on the same processor core. I can't test that now, since I'm working on a single core machine (with only hyperthreading as its "second core"). But the intention is definitely to take advantage of a multi-core machine.

    So is that true? Will there be no advantage running an ithreads code on a multi processor machine?

      Your friend is wrong. You can show them the source if they need a proof.

      Update: The possible source of confusion is the fact that threads are used to emulate fork on MSWin.

      ($q=q:Sq=~/;[c](.)(.)/;chr(-||-|5+lengthSq)`"S|oS2"`map{chr |+ord }map{substrSq`S_+|`|}3E|-|`7**2-3:)=~y+S|`+$1,++print+eval$q,q,a,
        Thanks for the quick answer (although I don't get what the code is actually doing at all, I trust you are right) :) I really was afraid that I need to write the whole thing once again, switching back to fork.