use strict; use warnings qw( all ); use threads; use Thread::Queue qw( ); # 3.01+ use constant NUM_WORKERS => 10; sub worker { my ($job) = @_; return "[$job]"; } sub collector { my ($job) = @_; print "{$job}\n"; } { my $tx_q = Thread::Queue->new(); my $rx_q = Thread::Queue->new(); my @workers; for (1..NUM_WORKERS) { push @workers, async { while (defined( my $job = $tx_q->dequeue() )) { $rx_q->enqueue(worker($job)); } }; } my $collector_thread = async { while (defined( my $job = $rx_q->dequeue() )) { collector($job); } }; while (my $job = <>) { chomp($job); $tx_q->enqueue($job); } $tx_q->end(); $_->join() for @workers; $rx_q->end(); $collector_thread->join(); }