As previously mentioned, process STDIN and $rx_q in separate threads. But also, use dequeue instead of dequeue_nb.
use strict;
use warnings qw( all );
use threads;
use Thread::Queue qw( ); # 3.01+
use constant NUM_WORKERS => 10;
sub worker {
my ($job) = @_;
return "[$job]";
}
sub collector {
my ($job) = @_;
print "{$job}\n";
}
{
my $tx_q = Thread::Queue->new();
my $rx_q = Thread::Queue->new();
my @workers;
for (1..NUM_WORKERS) {
push @workers, async {
while (defined( my $job = $tx_q->dequeue() )) {
$rx_q->enqueue(worker($job));
}
};
}
my $collector_thread = async {
while (defined( my $job = $rx_q->dequeue() )) {
collector($job);
}
};
while (my $job = <>) {
chomp($job);
$tx_q->enqueue($job);
}
$tx_q->end();
$_->join() for @workers;
$rx_q->end();
$collector_thread->join();
}