#! perl -slw use strict; use threads; use threads::shared; our $T //= 8; my @buffers; share( $buffers[ $_ ] ) for 0 .. $T-1; my $stdoutSem :shared; sub reader { my $fname = shift; open my $fh, '<', $fname or die $!; my $next = 0; while( <$fh> ) { chomp; lock $buffers[ $next ]; $buffers[ $next ] = $_; $next = ++$next % $T; } close $fh; for( 0 .. $T -1 ) { lock $buffers[ $_ ]; $buffers[ $_ ] = undef; } } sub worker { my $tid = threads->tid; my $bufn = shift; my $localbuf; while( 1 ) { { lock $buffers[ $bufn ]; last unless defined( $buffers[ $bufn ] ); $localbuf = $buffers[ $bufn ]; } ## process localbuf here. { lock $stdoutSem; print "[$tid] processed record: '", $localbuf, "'"; } } } my $reader = threads->new( \&reader, $ARGV[ 0 ] ); my @workers = map threads->new( \&worker, $_ ), 0 .. $T-1; $reader->join; $_->join for @workers;