in reply to Re: Tread::Queue enqueue blocks all of my threads
in thread Tread::Queue enqueue blocks all of my threads
Hm, I don't think the problem can be that the queue is being filled too quickly, since at most 5 items should be in the queue at any time. The reason is that I only allow threads to pick up a new job once their results have been collected by the writer thread. I could be wrong, though.
Here's a more complete listing of the code:
Main processThe main process starts the worker threads and the writer thread. It then reads from a file called SEED_FILE. Since it's important that the results are written in the same order as SEED_FILE, every thread is stamped with a unique serial number (the curLot, curBatch, and curPart variables). The main process creates a new job only when a thread is ready, i.e. the writer has put a worker back into the queue readyQ after processing its results (see the Writer Thread section). This is meant to ensure that the worker threads act like a thread pool.
Worker Threadsmy $NUM_THREADS = 5; my $BUFFER_SIZE = 1; my @inQueues = (); my @threads = (); my $outQ = Thread::Queue->new(); my $readyQ = Thread::Queue->new(); # create worker threads for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { my $inQ = Thread::Queue->new(); my $thr = threads->create( sub { # see the Worker Threads section for this ... } ); push @inQueues, $inQ; push @threads, $thr; # start the threads $readyQ->enqueue( $i ); } # create writer thread my $curLot = 0; my $curBatch = 0; my $curPart = 0; my $writerThr = threads->create( sub { # see the Writer Thread section for this ... } ); # give a line from SEED_FILE to each thread for ( ;; ) { last if eof( $SEED_FILE ); my $nextThread = $readyQ->dequeue(); chomp( my $fastaComment = <$SEED_FILE> ); chomp( my $readSequence = <$SEED_FILE> ); my %item = ( 'lot' => $curLot, 'batch' => $curBatch, 'part' => $curPart, 'fasta' => $fastaComment, 'read' => $readSequence, ); $inQueues[ $nextThread ]->enqueue( \%item ); # get the next batch/part number $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } } # let threads know that input is done for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { $inQueues[ $i ]->enqueue( undef ); } # wait for threads to finish foreach my $thr ( @threads ) { $thr->join(); } $writerThr->join();
Every worker thread is given a unique ID. It polls its input queue inQ for new data from the main process, does an operation on that data using the REFERENCE_FILE, and puts its result into queue outQ. Each thread stamps its result with the serial number of the input item.
Writer Threadmy $thr = threads->create( sub { my $threadId = $i; open my $REFERENCE_FILE, '<', "$refFileName" or die "Can't open $refFileName: $!"; while ( defined( my $item = $inQ->dequeue() ) ) { my $lot = $item->{ 'lot' }; my $batch = $item->{ 'batch' }; my $part = $item->{ 'part' }; my $fastaComment = $item->{ 'fasta' }; my $readSequence = $item->{ 'read' }; my @hits = split /[,]/, $fastaComment; my $beadID = shift( @hits ); # do some sutff to the current item that makes use of # REFERENCE_FILE my $bestSeedOverallMismatches = $INF; my @bestSeedExtensions = (); foreach my $hitString ( @hits ) { ... } my %result = ( 'lot' => $lot, 'batch' => $batch, 'part' => $part, 'thread' => $threadId, 'extensions' => \@bestSeedExtensions, 'num_mm' => $bestSeedOverallMismatches, 'beadid' => $beadID, 'read' => $readSequence, ); $outQ->enqueue( \%result ); } close $REFERENCE_FILE; } );#end sub
This is the more complex of the threads. It also contains some important variables. First, the writer knows how many lines are in SEED_FILE and it will run till it sees every one of them. Second, it has a data buffer to hold results that have arrived out of order according to the serial numbers. Specifically, the buffer is like a $batch row by $part column array, where $batch is 1 in this case and $part is 5.
Using this data, the writer works in the following way: it will poll outQ for results and check if the serial number is the next in line. If it is, the thread is put back into readyQ and its result is printed to either EXTENSION_FILE or UNMATCHED_FILE. The writer then prints some progress information and prints results from the buffer that are in the proper sequence. If a result has arrived and it's not the next in line, one of 2 things will happen. If the buffer is full (i.e. the lot number of the result is different than the current lot number), the thread is suspended (it is put in an array called suspendedThreads). Otherwise, its result is added to the buffer and the associated worker is put back into readyQ. Suspended threads are put back into readyQ when the entire buffer has been cleared.
my $writerThr = threads->create( sub { my $numSeedsInv = 1.0 / $numSeeds; open my $EXTENSION_FILE, ">", $extendFileName or die "ERROR: could not write to $extendFileName: $!"; open my $UNMATCHED_FILE, ">", $unmatchedFileName or die "ERROR: could not write to $unmatchedFileName: $!"; print STDOUT "INFO: 0% complete\n"; # print out the best extension(s) for this seed; otherwise, print # this read out to the unmatchable file my $numSeedsExtended = 0; my $nextTarget = 10; my %buffer = (); my @suspendedThreads = (); while ( $numSeedsExtended < $numSeeds ) { my $resultRef = $outQ->dequeue(); my $lot = $resultRef->{ 'lot' }; my $batch = $resultRef->{ 'batch' }; my $part = $resultRef->{ 'part' }; my $threadId = $resultRef->{ 'thread' }; if ( $lot == $curLot && $batch == $curBatch && $part == $curPart ) { # thread is free to go $readyQ->enqueue( $threadId ); # print its results and check the buffer for the next set # of results for ( ;; ) { my $bestSeedExtensions = $resultRef->{ 'extensi +ons' }; my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' + }; my $beadID = $resultRef->{ 'beadid' + }; my $readSequence = $resultRef->{ 'read' + }; my $readIsExtendable = @$bestSeedExtensions; if ( $readIsExtendable ) { printBestExtensions( $beadID, $EXTENSION_FILE, $maxNumHits, $bestSeedExtensions, $bestSeedOverallMismatches ); print $EXTENSION_FILE "\n$readSequence\n"; } else { print $UNMATCHED_FILE "$beadID\n"; print $UNMATCHED_FILE "$readSequence\n"; } # print progress ++$numSeedsExtended; my $percentComplete = $numSeedsExtended * $numSeedsInv * 100; if ( $percentComplete >= $nextTarget ) { $percentComplete = int( $percentComplete * 0.1 ) * 10; print STDOUT "INFO: $percentComplete% complete\n"; $nextTarget = $percentComplete + 10; } $buffer{ $batch }->[ $part ] = undef; # go to the next needed result $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } # release suspended threads, if we're back at the # start of the buffer if ( @suspendedThreads && $curBatch == 0 && $curPart == 0 ) { foreach my $resultRef_s ( @suspendedThreads ) { my $batch_s = $resultRef_s->{ 'batch' }; my $part_s = $resultRef_s->{ 'part' }; my $threadId_s = $resultRef_s->{ 'thread' }; $buffer{ $batch_s }->[ $part_s ] = $resultRef_s; $readyQ->enqueue( $threadId_s ); } @suspendedThreads = (); } # check if the result for the next needed result is # already in the buffer $resultRef = $buffer{ $curBatch }->[ $curPart ]; last unless( defined $resultRef ); $batch = $curBatch; $part = $curPart; }#for ever } else { if ( $lot == $curLot ) { $buffer{ $batch }->[ $part ] = $resultRef; $readyQ->enqueue( $threadId ); } else { push @suspendedThreads, $resultRef; } } }# while numSeedsExtended close $EXTENSION_FILE; close $UNMATCHED_FILE; } );
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^3: Tread::Queue enqueue blocks all of my threads
by BrowserUk (Patriarch) on Nov 10, 2009 at 19:43 UTC | |
by sivert (Initiate) on Nov 11, 2009 at 16:23 UTC |