Hm, I don't think the problem can be that the queue is being filled too quickly, since at most 5 items should be in the queue at any time. The reason is that I only allow threads to pick up a new job once their results have been collected by the writer thread. I could be wrong, though.
Here's a more complete listing of the code:
Main process
The main process starts the worker threads and the writer thread. It then reads from a file called SEED_FILE. Since it's important that the results are written in the same order as SEED_FILE, every thread is stamped with a unique serial number (the curLot, curBatch, and curPart variables). The main process creates a new job only when a thread is ready, i.e. the writer has put a worker back into the queue readyQ after processing its results (see the Writer Thread section). This is meant to ensure that the worker threads act like a thread pool.
my $NUM_THREADS = 5;
my $BUFFER_SIZE = 1;
my @inQueues = ();
my @threads = ();
my $outQ = Thread::Queue->new();
my $readyQ = Thread::Queue->new();
# create worker threads
for ( my $i = 0; $i < $NUM_THREADS; $i++ )
{
my $inQ = Thread::Queue->new();
my $thr = threads->create( sub {
# see the Worker Threads section for this
...
} );
push @inQueues, $inQ;
push @threads, $thr;
# start the threads
$readyQ->enqueue( $i );
}
# create writer thread
my $curLot = 0;
my $curBatch = 0;
my $curPart = 0;
my $writerThr = threads->create( sub {
# see the Writer Thread section for this
...
} );
# give a line from SEED_FILE to each thread
for ( ;; )
{
last if eof( $SEED_FILE );
my $nextThread = $readyQ->dequeue();
chomp( my $fastaComment = <$SEED_FILE> );
chomp( my $readSequence = <$SEED_FILE> );
my %item = ( 'lot' => $curLot,
'batch' => $curBatch,
'part' => $curPart,
'fasta' => $fastaComment,
'read' => $readSequence, );
$inQueues[ $nextThread ]->enqueue( \%item );
# get the next batch/part number
$curPart = ($curPart + 1) % $NUM_THREADS;
if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; }
if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; }
}
# let threads know that input is done
for ( my $i = 0; $i < $NUM_THREADS; $i++ )
{
$inQueues[ $i ]->enqueue( undef );
}
# wait for threads to finish
foreach my $thr ( @threads )
{
$thr->join();
}
$writerThr->join();
Worker Threads
Every worker thread is given a unique ID. It polls its input queue inQ for new data from the main process, does an operation on that data using the REFERENCE_FILE, and puts its result into queue outQ. Each thread stamps its result with the serial number of the input item.
my $thr = threads->create( sub {
my $threadId = $i;
open my $REFERENCE_FILE, '<', "$refFileName" or
die "Can't open $refFileName: $!";
while ( defined( my $item = $inQ->dequeue() ) )
{
my $lot = $item->{ 'lot' };
my $batch = $item->{ 'batch' };
my $part = $item->{ 'part' };
my $fastaComment = $item->{ 'fasta' };
my $readSequence = $item->{ 'read' };
my @hits = split /[,]/, $fastaComment;
my $beadID = shift( @hits );
# do some sutff to the current item that makes use of
# REFERENCE_FILE
my $bestSeedOverallMismatches = $INF;
my @bestSeedExtensions = ();
foreach my $hitString ( @hits )
{
...
}
my %result = ( 'lot' => $lot,
'batch' => $batch,
'part' => $part,
'thread' => $threadId,
'extensions' => \@bestSeedExtensions,
'num_mm' => $bestSeedOverallMismatches,
'beadid' => $beadID,
'read' => $readSequence, );
$outQ->enqueue( \%result );
}
close $REFERENCE_FILE;
} );#end sub
Writer Thread
This is the more complex of the threads. It also contains some important variables. First, the writer knows how many lines are in SEED_FILE and it will run till it sees every one of them. Second, it has a data buffer to hold results that have arrived out of order according to the serial numbers. Specifically, the buffer is like a $batch row by $part column array, where $batch is 1 in this case and $part is 5.
Using this data, the writer works in the following way: it will poll outQ for results and check if the serial number is the next in line. If it is, the thread is put back into readyQ and its result is printed to either EXTENSION_FILE or UNMATCHED_FILE. The writer then prints some progress information and prints results from the buffer that are in the proper sequence. If a result has arrived and it's not the next in line, one of 2 things will happen. If the buffer is full (i.e. the lot number of the result is different than the current lot number), the thread is suspended (it is put in an array called suspendedThreads). Otherwise, its result is added to the buffer and the associated worker is put back into readyQ. Suspended threads are put back into readyQ when the entire buffer has been cleared.
my $writerThr = threads->create( sub {
my $numSeedsInv = 1.0 / $numSeeds;
open my $EXTENSION_FILE, ">", $extendFileName
or die "ERROR: could not write to $extendFileName: $!";
open my $UNMATCHED_FILE, ">", $unmatchedFileName
or die "ERROR: could not write to $unmatchedFileName: $!";
print STDOUT "INFO: 0% complete\n";
# print out the best extension(s) for this seed; otherwise, print
# this read out to the unmatchable file
my $numSeedsExtended = 0;
my $nextTarget = 10;
my %buffer = ();
my @suspendedThreads = ();
while ( $numSeedsExtended < $numSeeds )
{
my $resultRef = $outQ->dequeue();
my $lot = $resultRef->{ 'lot' };
my $batch = $resultRef->{ 'batch' };
my $part = $resultRef->{ 'part' };
my $threadId = $resultRef->{ 'thread' };
if ( $lot == $curLot && $batch == $curBatch &&
$part == $curPart )
{
# thread is free to go
$readyQ->enqueue( $threadId );
# print its results and check the buffer for the next set
# of results
for ( ;; )
{
my $bestSeedExtensions = $resultRef->{ 'extensi
+ons' };
my $bestSeedOverallMismatches = $resultRef->{ 'num_mm'
+ };
my $beadID = $resultRef->{ 'beadid'
+ };
my $readSequence = $resultRef->{ 'read'
+ };
my $readIsExtendable = @$bestSeedExtensions;
if ( $readIsExtendable )
{
printBestExtensions( $beadID, $EXTENSION_FILE,
$maxNumHits, $bestSeedExtensions,
$bestSeedOverallMismatches );
print $EXTENSION_FILE "\n$readSequence\n";
}
else
{
print $UNMATCHED_FILE "$beadID\n";
print $UNMATCHED_FILE "$readSequence\n";
}
# print progress
++$numSeedsExtended;
my $percentComplete = $numSeedsExtended *
$numSeedsInv * 100;
if ( $percentComplete >= $nextTarget )
{
$percentComplete = int( $percentComplete * 0.1 ) *
10;
print STDOUT "INFO: $percentComplete% complete\n";
$nextTarget = $percentComplete + 10;
}
$buffer{ $batch }->[ $part ] = undef;
# go to the next needed result
$curPart = ($curPart + 1) % $NUM_THREADS;
if ( $curPart == 0 )
{ ++$curBatch; $curBatch %= $BUFFER_SIZE; }
if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; }
# release suspended threads, if we're back at the
# start of the buffer
if ( @suspendedThreads && $curBatch == 0 &&
$curPart == 0 )
{
foreach my $resultRef_s ( @suspendedThreads )
{
my $batch_s = $resultRef_s->{ 'batch' };
my $part_s = $resultRef_s->{ 'part' };
my $threadId_s = $resultRef_s->{ 'thread' };
$buffer{ $batch_s }->[ $part_s ] =
$resultRef_s;
$readyQ->enqueue( $threadId_s );
}
@suspendedThreads = ();
}
# check if the result for the next needed result is
# already in the buffer
$resultRef = $buffer{ $curBatch }->[ $curPart ];
last unless( defined $resultRef );
$batch = $curBatch;
$part = $curPart;
}#for ever
}
else
{
if ( $lot == $curLot )
{
$buffer{ $batch }->[ $part ] = $resultRef;
$readyQ->enqueue( $threadId );
}
else
{
push @suspendedThreads, $resultRef;
}
}
}# while numSeedsExtended
close $EXTENSION_FILE;
close $UNMATCHED_FILE;
} );
|