#! perl -slw use strict; use Data::Dump qw[ pp ]; use threads qw[ yield ]; use threads::shared; use thread::queue; our $THREADS ||= 5; our $MAX ||= 1e3; --$THREADS; my $Qin = Thread::Queue->new; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { $Qout->enqueue( "$work - processed by thread $tid" ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qin, $Qout ), 0 .. $THREADS; my $tOut = async{ while( my $out = $Qout->dequeue ) { print $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { printf STDERR "\rPending: %d (%d)", $Qin->pending, $i++; $Qin->enqueue( $_ ); # yield while $Qin->pending > $MAX; } warn "Finished reading & queing file\n"; $Qin->enqueue( undef ) for @threads; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";