!/usr/bin/perl
use strict;
use warnings;
use Async;
my $workercount = 10;
my @workerqueue;
for my $i (1..$workercount) {
my $proc = Async->new( sub { sleep($i*3); print "$i\n"; } );
print "--> enqueue $proc->{PID}\n";
push @workerqueue, $proc;
}
while (1) {
sleep(1);
my @unfinished;
while (my $proc = shift @workerqueue) {
if ($proc and $proc->ready) {
print "--> $proc->{PID} is ready\n";
if (my $e = $proc->error) {
print "Something went wrong. The error was: $e\n";
}
undef $proc;
} else {
push @unfinished, $proc;
}
}
push @workerqueue, @unfinished;
## START refill queue block ##
my $free = $workercount - @workerqueue;
for my $i (1..$free) {
my $proc = Async->new( sub { sleep($i*3); print "$i new\n"; } );
print "--> enqueue $proc->{PID}\n";
push @workerqueue, $proc;
}
## END refill queue block ##
print "mainloop\n";
}
####
--> enqueue 28304
--> enqueue 28305
--> enqueue 28306
--> enqueue 28307
--> enqueue 28308
--> enqueue 28309
--> enqueue 28310
--> enqueue 28311
--> enqueue 28312
--> enqueue 28313
mainloop
mainloop
1
--> 28304 is ready
mainloop
mainloop
mainloop
2
--> 28305 is ready
mainloop
mainloop
mainloop
3
--> 28306 is ready
mainloop
mainloop
mainloop
4
--> 28307 is ready
mainloop
mainloop
mainloop
5
--> 28308 is ready
mainloop
mainloop
mainloop
6
--> 28309 is ready
mainloop
mainloop
mainloop
7
--> 28310 is ready
...
####
--> enqueue 28319
--> enqueue 28320
--> enqueue 28321
--> enqueue 28322
--> enqueue 28323
--> enqueue 28324
--> enqueue 28325
--> enqueue 28326
--> enqueue 28327
--> enqueue 28328
mainloop
mainloop
1
--> 28319 is ready
--> enqueue 28329
mainloop
mainloop
mainloop
2
1 new
--> 28320 is ready
--> 28327 is ready
--> 28328 is ready
--> 28329 is ready
--> enqueue 28330
--> enqueue 28331
--> enqueue 28332
--> enqueue 28333
mainloop
--> 28321 is ready
--> 28322 is ready
--> 28323 is ready
--> 28324 is ready
--> 28325 is ready
--> 28326 is ready
--> enqueue 28334
--> enqueue 28335
--> enqueue 28336
--> enqueue 28337
--> enqueue 28338
--> enqueue 28339
mainloop
mainloop
1 new
--> 28330 is ready
--> enqueue 28340
...
####
kill 9 => $pid; # I don't care.
####
#!/usr/bin/perl
use strict;
use warnings;
use threads;
use threads::shared;
use Data::Dumper;
my %state :shared;
my $workercount = 10;
my $run = 1;
$SIG{TERM} = sub { $run = 0; };
$SIG{INT} = sub { $run = 0; };
my @workerqueue;
for my $i (1..$workercount) {
my $thr = threads->create(\&mysub, $i);
my $id = $thr->tid();
print "--> enqueue $id\n";
$state{$id} = 'running';
push @workerqueue, $thr;
}
while ($run) {
sleep(1);
my @unfinished;
while (my $thr = shift @workerqueue) {
my $id = $thr->tid();
if ($thr and $state{$id} eq 'finished') {
print "--> $id is ready\n";
my $x = $thr->join();
delete $state{$id};
print Dumper $x; # do something useful with the data
} else {
push @unfinished, $thr;
}
}
push @workerqueue, @unfinished;
## START refill queue block ##
my $free = $workercount - @workerqueue;
for my $i (1..$free) {
my $thr = threads->create(\&mysub, $i);
my $id = $thr->tid();
print "--> enqueue $id\n";
$state{$id} = 'running';
push @workerqueue, $thr;
}
## END refill queue block ##
print "mainloop\n";
}
my @threads = threads->list();
foreach my $thr (@threads) {
$thr->join();
}
sub mysub {
my $i = shift;
$i *= 3;
sleep $i;
my $id = threads->tid();
print "$id : slept for $i sec\n";
my $x = {
'ID' => $id,
'a' => [1,2,3],
'B' => { 'a' => 'A', },
};
$state{$id} = 'finished';
return $x;
}