in reply to Re: Tread::Queue enqueue blocks all of my threads
in thread Tread::Queue enqueue blocks all of my threads

Hm, I don't think the problem can be that the queue is being filled too quickly, since at most 5 items should be in the queue at any time. The reason is that I only allow threads to pick up a new job once their results have been collected by the writer thread. I could be wrong, though.

Here's a more complete listing of the code:

Main process

The main process starts the worker threads and the writer thread. It then reads from a file called SEED_FILE. Since it's important that the results are written in the same order as SEED_FILE, every thread is stamped with a unique serial number (the curLot, curBatch, and curPart variables). The main process creates a new job only when a thread is ready, i.e. the writer has put a worker back into the queue readyQ after processing its results (see the Writer Thread section). This is meant to ensure that the worker threads act like a thread pool.

my $NUM_THREADS = 5; my $BUFFER_SIZE = 1; my @inQueues = (); my @threads = (); my $outQ = Thread::Queue->new(); my $readyQ = Thread::Queue->new(); # create worker threads for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { my $inQ = Thread::Queue->new(); my $thr = threads->create( sub { # see the Worker Threads section for this ... } ); push @inQueues, $inQ; push @threads, $thr; # start the threads $readyQ->enqueue( $i ); } # create writer thread my $curLot = 0; my $curBatch = 0; my $curPart = 0; my $writerThr = threads->create( sub { # see the Writer Thread section for this ... } ); # give a line from SEED_FILE to each thread for ( ;; ) { last if eof( $SEED_FILE ); my $nextThread = $readyQ->dequeue(); chomp( my $fastaComment = <$SEED_FILE> ); chomp( my $readSequence = <$SEED_FILE> ); my %item = ( 'lot' => $curLot, 'batch' => $curBatch, 'part' => $curPart, 'fasta' => $fastaComment, 'read' => $readSequence, ); $inQueues[ $nextThread ]->enqueue( \%item ); # get the next batch/part number $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } } # let threads know that input is done for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { $inQueues[ $i ]->enqueue( undef ); } # wait for threads to finish foreach my $thr ( @threads ) { $thr->join(); } $writerThr->join();
Worker Threads

Every worker thread is given a unique ID. It polls its input queue inQ for new data from the main process, does an operation on that data using the REFERENCE_FILE, and puts its result into queue outQ. Each thread stamps its result with the serial number of the input item.

my $thr = threads->create( sub { my $threadId = $i; open my $REFERENCE_FILE, '<', "$refFileName" or die "Can't open $refFileName: $!"; while ( defined( my $item = $inQ->dequeue() ) ) { my $lot = $item->{ 'lot' }; my $batch = $item->{ 'batch' }; my $part = $item->{ 'part' }; my $fastaComment = $item->{ 'fasta' }; my $readSequence = $item->{ 'read' }; my @hits = split /[,]/, $fastaComment; my $beadID = shift( @hits ); # do some sutff to the current item that makes use of # REFERENCE_FILE my $bestSeedOverallMismatches = $INF; my @bestSeedExtensions = (); foreach my $hitString ( @hits ) { ... } my %result = ( 'lot' => $lot, 'batch' => $batch, 'part' => $part, 'thread' => $threadId, 'extensions' => \@bestSeedExtensions, 'num_mm' => $bestSeedOverallMismatches, 'beadid' => $beadID, 'read' => $readSequence, ); $outQ->enqueue( \%result ); } close $REFERENCE_FILE; } );#end sub
Writer Thread

This is the more complex of the threads. It also contains some important variables. First, the writer knows how many lines are in SEED_FILE and it will run till it sees every one of them. Second, it has a data buffer to hold results that have arrived out of order according to the serial numbers. Specifically, the buffer is like a $batch row by $part column array, where $batch is 1 in this case and $part is 5.

Using this data, the writer works in the following way: it will poll outQ for results and check if the serial number is the next in line. If it is, the thread is put back into readyQ and its result is printed to either EXTENSION_FILE or UNMATCHED_FILE. The writer then prints some progress information and prints results from the buffer that are in the proper sequence. If a result has arrived and it's not the next in line, one of 2 things will happen. If the buffer is full (i.e. the lot number of the result is different than the current lot number), the thread is suspended (it is put in an array called suspendedThreads). Otherwise, its result is added to the buffer and the associated worker is put back into readyQ. Suspended threads are put back into readyQ when the entire buffer has been cleared.

my $writerThr = threads->create( sub { my $numSeedsInv = 1.0 / $numSeeds; open my $EXTENSION_FILE, ">", $extendFileName or die "ERROR: could not write to $extendFileName: $!"; open my $UNMATCHED_FILE, ">", $unmatchedFileName or die "ERROR: could not write to $unmatchedFileName: $!"; print STDOUT "INFO: 0% complete\n"; # print out the best extension(s) for this seed; otherwise, print # this read out to the unmatchable file my $numSeedsExtended = 0; my $nextTarget = 10; my %buffer = (); my @suspendedThreads = (); while ( $numSeedsExtended < $numSeeds ) { my $resultRef = $outQ->dequeue(); my $lot = $resultRef->{ 'lot' }; my $batch = $resultRef->{ 'batch' }; my $part = $resultRef->{ 'part' }; my $threadId = $resultRef->{ 'thread' }; if ( $lot == $curLot && $batch == $curBatch && $part == $curPart ) { # thread is free to go $readyQ->enqueue( $threadId ); # print its results and check the buffer for the next set # of results for ( ;; ) { my $bestSeedExtensions = $resultRef->{ 'extensi +ons' }; my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' + }; my $beadID = $resultRef->{ 'beadid' + }; my $readSequence = $resultRef->{ 'read' + }; my $readIsExtendable = @$bestSeedExtensions; if ( $readIsExtendable ) { printBestExtensions( $beadID, $EXTENSION_FILE, $maxNumHits, $bestSeedExtensions, $bestSeedOverallMismatches ); print $EXTENSION_FILE "\n$readSequence\n"; } else { print $UNMATCHED_FILE "$beadID\n"; print $UNMATCHED_FILE "$readSequence\n"; } # print progress ++$numSeedsExtended; my $percentComplete = $numSeedsExtended * $numSeedsInv * 100; if ( $percentComplete >= $nextTarget ) { $percentComplete = int( $percentComplete * 0.1 ) * 10; print STDOUT "INFO: $percentComplete% complete\n"; $nextTarget = $percentComplete + 10; } $buffer{ $batch }->[ $part ] = undef; # go to the next needed result $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } # release suspended threads, if we're back at the # start of the buffer if ( @suspendedThreads && $curBatch == 0 && $curPart == 0 ) { foreach my $resultRef_s ( @suspendedThreads ) { my $batch_s = $resultRef_s->{ 'batch' }; my $part_s = $resultRef_s->{ 'part' }; my $threadId_s = $resultRef_s->{ 'thread' }; $buffer{ $batch_s }->[ $part_s ] = $resultRef_s; $readyQ->enqueue( $threadId_s ); } @suspendedThreads = (); } # check if the result for the next needed result is # already in the buffer $resultRef = $buffer{ $curBatch }->[ $curPart ]; last unless( defined $resultRef ); $batch = $curBatch; $part = $curPart; }#for ever } else { if ( $lot == $curLot ) { $buffer{ $batch }->[ $part ] = $resultRef; $readyQ->enqueue( $threadId ); } else { push @suspendedThreads, $resultRef; } } }# while numSeedsExtended close $EXTENSION_FILE; close $UNMATCHED_FILE; } );

Replies are listed 'Best First'.
Re^3: Tread::Queue enqueue blocks all of my threads
by BrowserUk (Patriarch) on Nov 10, 2009 at 19:43 UTC

    Indeed, Thread::Queues relatively recently acquired ability to convey unshared lexical structures by automatically sharing them leaks like the proverbial sieve. As demonstrated by this:

    #! perl -slw use strict; use Data::Dump qw[ pp ]; use threads; use threads::shared; use thread::queue; our $THREADS ||= 5; --$THREADS; my @Qins = map Thread::Queue->new(), 0 .. $THREADS; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { my %h = ( 'a'..'z' ); $Qout->enqueue( \%h ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qins[ $_ ], $Qout ), 0 .. $ +THREADS; my $tOut = async{ while( my $out = $Qout->dequeue ) { pp $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { $Qins[ $i ]->enqueue( $_ ); $i %= $THREADS; } warn "Finished reading & queing file\n"; $_->enqueue( undef ) for @Qins; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

    Contrast that with this version that queues simple scalars which doesn't leak at all:

    #! perl -slw use strict; use Data::Dump qw[ pp ]; use threads qw[ yield ]; use threads::shared; use thread::queue; our $THREADS ||= 5; our $MAX ||= 1e3; --$THREADS; my $Qin = Thread::Queue->new; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { $Qout->enqueue( "$work - processed by thread $tid" ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qin, $Qout ), 0 .. $THREADS +; my $tOut = async{ while( my $out = $Qout->dequeue ) { print $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { printf STDERR "\rPending: %d (%d)", $Qin->pending, $i++; $Qin->enqueue( $_ ); # yield while $Qin->pending > $MAX; } warn "Finished reading & queing file\n"; $Qin->enqueue( undef ) for @threads; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

    You will have to raise a bug report against the module and hope that it can be fixed. That said, from my breif reading of the code, I wouldn't get my hopes up for a quick fix. If at all.

    The only alternative I can offer you is to avoid queueing hashes. If I need to pass more than one piece of information between threads, it has been my habit to join them into a scalar before enquing them and spliting them on dequeue. I've found this to be far more efficient that either Storable freeze/thaw or the automatic sharing of structures as currently implemented. Most importantly, it doesn't leak!


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      That was really helpful, thanks! My computer's tied up running this program on a large file at the moment, but I'll try the modified version out soon and see if it works