!/usr/bin/perl use strict; use warnings; use Async; my $workercount = 10; my @workerqueue; for my $i (1..$workercount) { my $proc = Async->new( sub { sleep($i*3); print "$i\n"; } ); print "--> enqueue $proc->{PID}\n"; push @workerqueue, $proc; } while (1) { sleep(1); my @unfinished; while (my $proc = shift @workerqueue) { if ($proc and $proc->ready) { print "--> $proc->{PID} is ready\n"; if (my $e = $proc->error) { print "Something went wrong. The error was: $e\n"; } undef $proc; } else { push @unfinished, $proc; } } push @workerqueue, @unfinished; ## START refill queue block ## my $free = $workercount - @workerqueue; for my $i (1..$free) { my $proc = Async->new( sub { sleep($i*3); print "$i new\n"; } ); print "--> enqueue $proc->{PID}\n"; push @workerqueue, $proc; } ## END refill queue block ## print "mainloop\n"; } #### --> enqueue 28304 --> enqueue 28305 --> enqueue 28306 --> enqueue 28307 --> enqueue 28308 --> enqueue 28309 --> enqueue 28310 --> enqueue 28311 --> enqueue 28312 --> enqueue 28313 mainloop mainloop 1 --> 28304 is ready mainloop mainloop mainloop 2 --> 28305 is ready mainloop mainloop mainloop 3 --> 28306 is ready mainloop mainloop mainloop 4 --> 28307 is ready mainloop mainloop mainloop 5 --> 28308 is ready mainloop mainloop mainloop 6 --> 28309 is ready mainloop mainloop mainloop 7 --> 28310 is ready ... #### --> enqueue 28319 --> enqueue 28320 --> enqueue 28321 --> enqueue 28322 --> enqueue 28323 --> enqueue 28324 --> enqueue 28325 --> enqueue 28326 --> enqueue 28327 --> enqueue 28328 mainloop mainloop 1 --> 28319 is ready --> enqueue 28329 mainloop mainloop mainloop 2 1 new --> 28320 is ready --> 28327 is ready --> 28328 is ready --> 28329 is ready --> enqueue 28330 --> enqueue 28331 --> enqueue 28332 --> enqueue 28333 mainloop --> 28321 is ready --> 28322 is ready --> 28323 is ready --> 28324 is ready --> 28325 is ready --> 28326 is ready --> enqueue 28334 --> enqueue 28335 --> enqueue 28336 --> enqueue 28337 --> enqueue 28338 --> enqueue 28339 mainloop mainloop 1 new --> 28330 is ready --> enqueue 28340 ... #### kill 9 => $pid; # I don't care. #### #!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; use Data::Dumper; my %state :shared; my $workercount = 10; my $run = 1; $SIG{TERM} = sub { $run = 0; }; $SIG{INT} = sub { $run = 0; }; my @workerqueue; for my $i (1..$workercount) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } while ($run) { sleep(1); my @unfinished; while (my $thr = shift @workerqueue) { my $id = $thr->tid(); if ($thr and $state{$id} eq 'finished') { print "--> $id is ready\n"; my $x = $thr->join(); delete $state{$id}; print Dumper $x; # do something useful with the data } else { push @unfinished, $thr; } } push @workerqueue, @unfinished; ## START refill queue block ## my $free = $workercount - @workerqueue; for my $i (1..$free) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } ## END refill queue block ## print "mainloop\n"; } my @threads = threads->list(); foreach my $thr (@threads) { $thr->join(); } sub mysub { my $i = shift; $i *= 3; sleep $i; my $id = threads->tid(); print "$id : slept for $i sec\n"; my $x = { 'ID' => $id, 'a' => [1,2,3], 'B' => { 'a' => 'A', }, }; $state{$id} = 'finished'; return $x; }