#!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; use Data::Dumper; my %state :shared; my $workercount = 10; my $run = 1; $SIG{TERM} = sub { $run = 0; }; $SIG{INT} = sub { $run = 0; }; my @workerqueue; for my $i (1..$workercount) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } while ($run) { sleep(1); my @unfinished; while (my $thr = shift @workerqueue) { my $id = $thr->tid(); if ($thr and $state{$id} eq 'finished') { print "--> $id is ready\n"; my $x = $thr->join(); delete $state{$id}; print Dumper $x; # do something useful with the data } else { push @unfinished, $thr; } } push @workerqueue, @unfinished; ## START refill queue block ## my $free = $workercount - @workerqueue; for my $i (1..$free) { my $thr = threads->create(\&mysub, $i); my $id = $thr->tid(); print "--> enqueue $id\n"; $state{$id} = 'running'; push @workerqueue, $thr; } ## END refill queue block ## print "mainloop\n"; } my @threads = threads->list(); foreach my $thr (@threads) { $thr->join(); } sub mysub { my $i = shift; $i *= 3; sleep $i; my $id = threads->tid(); print "$id : slept for $i sec\n"; my $x = { 'ID' => $id, 'a' => [1,2,3], 'B' => { 'a' => 'A', }, }; $state{$id} = 'finished'; return $x; }