my $NUM_THREADS = 5;
my $BUFFER_SIZE = 1;
my @inQueues = ();
my @threads = ();
my $outQ = Thread::Queue->new();
my $readyQ = Thread::Queue->new();
# create worker threads
for ( my $i = 0; $i < $NUM_THREADS; $i++ )
{
my $inQ = Thread::Queue->new();
my $thr = threads->create( sub {
# see the Worker Threads section for this
...
} );
push @inQueues, $inQ;
push @threads, $thr;
# start the threads
$readyQ->enqueue( $i );
}
# create writer thread
my $curLot = 0;
my $curBatch = 0;
my $curPart = 0;
my $writerThr = threads->create( sub {
# see the Writer Thread section for this
...
} );
# give a line from SEED_FILE to each thread
for ( ;; )
{
last if eof( $SEED_FILE );
my $nextThread = $readyQ->dequeue();
chomp( my $fastaComment = <$SEED_FILE> );
chomp( my $readSequence = <$SEED_FILE> );
my %item = ( 'lot' => $curLot,
'batch' => $curBatch,
'part' => $curPart,
'fasta' => $fastaComment,
'read' => $readSequence, );
$inQueues[ $nextThread ]->enqueue( \%item );
# get the next batch/part number
$curPart = ($curPart + 1) % $NUM_THREADS;
if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; }
if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; }
}
# let threads know that input is done
for ( my $i = 0; $i < $NUM_THREADS; $i++ )
{
$inQueues[ $i ]->enqueue( undef );
}
# wait for threads to finish
foreach my $thr ( @threads )
{
$thr->join();
}
$writerThr->join();
####
my $thr = threads->create( sub {
my $threadId = $i;
open my $REFERENCE_FILE, '<', "$refFileName" or
die "Can't open $refFileName: $!";
while ( defined( my $item = $inQ->dequeue() ) )
{
my $lot = $item->{ 'lot' };
my $batch = $item->{ 'batch' };
my $part = $item->{ 'part' };
my $fastaComment = $item->{ 'fasta' };
my $readSequence = $item->{ 'read' };
my @hits = split /[,]/, $fastaComment;
my $beadID = shift( @hits );
# do some sutff to the current item that makes use of
# REFERENCE_FILE
my $bestSeedOverallMismatches = $INF;
my @bestSeedExtensions = ();
foreach my $hitString ( @hits )
{
...
}
my %result = ( 'lot' => $lot,
'batch' => $batch,
'part' => $part,
'thread' => $threadId,
'extensions' => \@bestSeedExtensions,
'num_mm' => $bestSeedOverallMismatches,
'beadid' => $beadID,
'read' => $readSequence, );
$outQ->enqueue( \%result );
}
close $REFERENCE_FILE;
} );#end sub
####
my $writerThr = threads->create( sub {
my $numSeedsInv = 1.0 / $numSeeds;
open my $EXTENSION_FILE, ">", $extendFileName
or die "ERROR: could not write to $extendFileName: $!";
open my $UNMATCHED_FILE, ">", $unmatchedFileName
or die "ERROR: could not write to $unmatchedFileName: $!";
print STDOUT "INFO: 0% complete\n";
# print out the best extension(s) for this seed; otherwise, print
# this read out to the unmatchable file
my $numSeedsExtended = 0;
my $nextTarget = 10;
my %buffer = ();
my @suspendedThreads = ();
while ( $numSeedsExtended < $numSeeds )
{
my $resultRef = $outQ->dequeue();
my $lot = $resultRef->{ 'lot' };
my $batch = $resultRef->{ 'batch' };
my $part = $resultRef->{ 'part' };
my $threadId = $resultRef->{ 'thread' };
if ( $lot == $curLot && $batch == $curBatch &&
$part == $curPart )
{
# thread is free to go
$readyQ->enqueue( $threadId );
# print its results and check the buffer for the next set
# of results
for ( ;; )
{
my $bestSeedExtensions = $resultRef->{ 'extensions' };
my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' };
my $beadID = $resultRef->{ 'beadid' };
my $readSequence = $resultRef->{ 'read' };
my $readIsExtendable = @$bestSeedExtensions;
if ( $readIsExtendable )
{
printBestExtensions( $beadID, $EXTENSION_FILE,
$maxNumHits, $bestSeedExtensions,
$bestSeedOverallMismatches );
print $EXTENSION_FILE "\n$readSequence\n";
}
else
{
print $UNMATCHED_FILE "$beadID\n";
print $UNMATCHED_FILE "$readSequence\n";
}
# print progress
++$numSeedsExtended;
my $percentComplete = $numSeedsExtended *
$numSeedsInv * 100;
if ( $percentComplete >= $nextTarget )
{
$percentComplete = int( $percentComplete * 0.1 ) *
10;
print STDOUT "INFO: $percentComplete% complete\n";
$nextTarget = $percentComplete + 10;
}
$buffer{ $batch }->[ $part ] = undef;
# go to the next needed result
$curPart = ($curPart + 1) % $NUM_THREADS;
if ( $curPart == 0 )
{ ++$curBatch; $curBatch %= $BUFFER_SIZE; }
if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; }
# release suspended threads, if we're back at the
# start of the buffer
if ( @suspendedThreads && $curBatch == 0 &&
$curPart == 0 )
{
foreach my $resultRef_s ( @suspendedThreads )
{
my $batch_s = $resultRef_s->{ 'batch' };
my $part_s = $resultRef_s->{ 'part' };
my $threadId_s = $resultRef_s->{ 'thread' };
$buffer{ $batch_s }->[ $part_s ] =
$resultRef_s;
$readyQ->enqueue( $threadId_s );
}
@suspendedThreads = ();
}
# check if the result for the next needed result is
# already in the buffer
$resultRef = $buffer{ $curBatch }->[ $curPart ];
last unless( defined $resultRef );
$batch = $curBatch;
$part = $curPart;
}#for ever
}
else
{
if ( $lot == $curLot )
{
$buffer{ $batch }->[ $part ] = $resultRef;
$readyQ->enqueue( $threadId );
}
else
{
push @suspendedThreads, $resultRef;
}
}
}# while numSeedsExtended
close $EXTENSION_FILE;
close $UNMATCHED_FILE;
} );