sivert has asked for the wisdom of the Perl Monks concerning the following question:

Hello,

I am having a strange concurrency issue with Thread::Queue. Specifically, in some code that uses threads, the program runs for a time, but eventually all threads are put to sleep. The goal of my program is to read lines from text file A, have each thread perform an operation on a line using another text file B, and then write each result to another text file C. Here is a description of the threads:

My problem is that somehow the 5 workers become blocked, which blocks the main process (since there are no workers to work) and then the writer thread (since there are no results to write). The code for the workers looks as follows:

use threads; use threads::shared; use Thread::Queue; ... my $thr = threads->create( sub { my $threadId = $i; open my $REFERENCE_FILE, '<', "$refFileName" or die "Can't open $refFileName: $!"; while ( defined( my $item = $inQ->dequeue() ) ) { # get info from the current item my $lot = $item->{ 'lot' }; my $batch = $item->{ 'batch' }; my $part = $item->{ 'part' }; ... # perform an operation on the item ... my %result = ( 'lot' => $lot, 'batch' => $batch, 'part' => $part, 'thread' => $threadId, 'extensions' => \@bestSeedExtensions, 'num_mm' => $bestSeedOverallMismatches, 'beadid' => $beadID, 'read' => $readSequence, ); $outQ->enqueue( \%result ); # <------ I AM BROKEN :( } } );# end sub

Using print statements, I've traced the problem to the outQ->enqueue() statement. The strange part is that everything runs fine for millions of lines and then suddenly all progress stops. Can anyone tell me why this might be happening? I have tried both perl v5.8.5 and v5.10.1.

Replies are listed 'Best First'.
Re: Tread::Queue enqueue blocks all of my threads
by BrowserUk (Patriarch) on Nov 09, 2009 at 19:16 UTC

    There is nothing obviously wrong in the code you've posted.

    I suspect that you are running out of memory by filling the input queue faster than it can be processed and the process doesn't actually stop, but rather stalls whilst it attempts to free up enough memory to increase the size of the input queue.

    You will have to post the complete program, preferably cut down, but still demonstrating the problem.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Hm, I don't think the problem can be that the queue is being filled too quickly, since at most 5 items should be in the queue at any time. The reason is that I only allow threads to pick up a new job once their results have been collected by the writer thread. I could be wrong, though.

      Here's a more complete listing of the code:

      Main process

      The main process starts the worker threads and the writer thread. It then reads from a file called SEED_FILE. Since it's important that the results are written in the same order as SEED_FILE, every thread is stamped with a unique serial number (the curLot, curBatch, and curPart variables). The main process creates a new job only when a thread is ready, i.e. the writer has put a worker back into the queue readyQ after processing its results (see the Writer Thread section). This is meant to ensure that the worker threads act like a thread pool.

      my $NUM_THREADS = 5; my $BUFFER_SIZE = 1; my @inQueues = (); my @threads = (); my $outQ = Thread::Queue->new(); my $readyQ = Thread::Queue->new(); # create worker threads for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { my $inQ = Thread::Queue->new(); my $thr = threads->create( sub { # see the Worker Threads section for this ... } ); push @inQueues, $inQ; push @threads, $thr; # start the threads $readyQ->enqueue( $i ); } # create writer thread my $curLot = 0; my $curBatch = 0; my $curPart = 0; my $writerThr = threads->create( sub { # see the Writer Thread section for this ... } ); # give a line from SEED_FILE to each thread for ( ;; ) { last if eof( $SEED_FILE ); my $nextThread = $readyQ->dequeue(); chomp( my $fastaComment = <$SEED_FILE> ); chomp( my $readSequence = <$SEED_FILE> ); my %item = ( 'lot' => $curLot, 'batch' => $curBatch, 'part' => $curPart, 'fasta' => $fastaComment, 'read' => $readSequence, ); $inQueues[ $nextThread ]->enqueue( \%item ); # get the next batch/part number $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } } # let threads know that input is done for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { $inQueues[ $i ]->enqueue( undef ); } # wait for threads to finish foreach my $thr ( @threads ) { $thr->join(); } $writerThr->join();
      Worker Threads

      Every worker thread is given a unique ID. It polls its input queue inQ for new data from the main process, does an operation on that data using the REFERENCE_FILE, and puts its result into queue outQ. Each thread stamps its result with the serial number of the input item.

      my $thr = threads->create( sub { my $threadId = $i; open my $REFERENCE_FILE, '<', "$refFileName" or die "Can't open $refFileName: $!"; while ( defined( my $item = $inQ->dequeue() ) ) { my $lot = $item->{ 'lot' }; my $batch = $item->{ 'batch' }; my $part = $item->{ 'part' }; my $fastaComment = $item->{ 'fasta' }; my $readSequence = $item->{ 'read' }; my @hits = split /[,]/, $fastaComment; my $beadID = shift( @hits ); # do some sutff to the current item that makes use of # REFERENCE_FILE my $bestSeedOverallMismatches = $INF; my @bestSeedExtensions = (); foreach my $hitString ( @hits ) { ... } my %result = ( 'lot' => $lot, 'batch' => $batch, 'part' => $part, 'thread' => $threadId, 'extensions' => \@bestSeedExtensions, 'num_mm' => $bestSeedOverallMismatches, 'beadid' => $beadID, 'read' => $readSequence, ); $outQ->enqueue( \%result ); } close $REFERENCE_FILE; } );#end sub
      Writer Thread

      This is the more complex of the threads. It also contains some important variables. First, the writer knows how many lines are in SEED_FILE and it will run till it sees every one of them. Second, it has a data buffer to hold results that have arrived out of order according to the serial numbers. Specifically, the buffer is like a $batch row by $part column array, where $batch is 1 in this case and $part is 5.

      Using this data, the writer works in the following way: it will poll outQ for results and check if the serial number is the next in line. If it is, the thread is put back into readyQ and its result is printed to either EXTENSION_FILE or UNMATCHED_FILE. The writer then prints some progress information and prints results from the buffer that are in the proper sequence. If a result has arrived and it's not the next in line, one of 2 things will happen. If the buffer is full (i.e. the lot number of the result is different than the current lot number), the thread is suspended (it is put in an array called suspendedThreads). Otherwise, its result is added to the buffer and the associated worker is put back into readyQ. Suspended threads are put back into readyQ when the entire buffer has been cleared.

      my $writerThr = threads->create( sub { my $numSeedsInv = 1.0 / $numSeeds; open my $EXTENSION_FILE, ">", $extendFileName or die "ERROR: could not write to $extendFileName: $!"; open my $UNMATCHED_FILE, ">", $unmatchedFileName or die "ERROR: could not write to $unmatchedFileName: $!"; print STDOUT "INFO: 0% complete\n"; # print out the best extension(s) for this seed; otherwise, print # this read out to the unmatchable file my $numSeedsExtended = 0; my $nextTarget = 10; my %buffer = (); my @suspendedThreads = (); while ( $numSeedsExtended < $numSeeds ) { my $resultRef = $outQ->dequeue(); my $lot = $resultRef->{ 'lot' }; my $batch = $resultRef->{ 'batch' }; my $part = $resultRef->{ 'part' }; my $threadId = $resultRef->{ 'thread' }; if ( $lot == $curLot && $batch == $curBatch && $part == $curPart ) { # thread is free to go $readyQ->enqueue( $threadId ); # print its results and check the buffer for the next set # of results for ( ;; ) { my $bestSeedExtensions = $resultRef->{ 'extensi +ons' }; my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' + }; my $beadID = $resultRef->{ 'beadid' + }; my $readSequence = $resultRef->{ 'read' + }; my $readIsExtendable = @$bestSeedExtensions; if ( $readIsExtendable ) { printBestExtensions( $beadID, $EXTENSION_FILE, $maxNumHits, $bestSeedExtensions, $bestSeedOverallMismatches ); print $EXTENSION_FILE "\n$readSequence\n"; } else { print $UNMATCHED_FILE "$beadID\n"; print $UNMATCHED_FILE "$readSequence\n"; } # print progress ++$numSeedsExtended; my $percentComplete = $numSeedsExtended * $numSeedsInv * 100; if ( $percentComplete >= $nextTarget ) { $percentComplete = int( $percentComplete * 0.1 ) * 10; print STDOUT "INFO: $percentComplete% complete\n"; $nextTarget = $percentComplete + 10; } $buffer{ $batch }->[ $part ] = undef; # go to the next needed result $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } # release suspended threads, if we're back at the # start of the buffer if ( @suspendedThreads && $curBatch == 0 && $curPart == 0 ) { foreach my $resultRef_s ( @suspendedThreads ) { my $batch_s = $resultRef_s->{ 'batch' }; my $part_s = $resultRef_s->{ 'part' }; my $threadId_s = $resultRef_s->{ 'thread' }; $buffer{ $batch_s }->[ $part_s ] = $resultRef_s; $readyQ->enqueue( $threadId_s ); } @suspendedThreads = (); } # check if the result for the next needed result is # already in the buffer $resultRef = $buffer{ $curBatch }->[ $curPart ]; last unless( defined $resultRef ); $batch = $curBatch; $part = $curPart; }#for ever } else { if ( $lot == $curLot ) { $buffer{ $batch }->[ $part ] = $resultRef; $readyQ->enqueue( $threadId ); } else { push @suspendedThreads, $resultRef; } } }# while numSeedsExtended close $EXTENSION_FILE; close $UNMATCHED_FILE; } );

        Indeed, Thread::Queues relatively recently acquired ability to convey unshared lexical structures by automatically sharing them leaks like the proverbial sieve. As demonstrated by this:

        #! perl -slw use strict; use Data::Dump qw[ pp ]; use threads; use threads::shared; use thread::queue; our $THREADS ||= 5; --$THREADS; my @Qins = map Thread::Queue->new(), 0 .. $THREADS; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { my %h = ( 'a'..'z' ); $Qout->enqueue( \%h ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qins[ $_ ], $Qout ), 0 .. $ +THREADS; my $tOut = async{ while( my $out = $Qout->dequeue ) { pp $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { $Qins[ $i ]->enqueue( $_ ); $i %= $THREADS; } warn "Finished reading & queing file\n"; $_->enqueue( undef ) for @Qins; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

        Contrast that with this version that queues simple scalars which doesn't leak at all:

        #! perl -slw use strict; use Data::Dump qw[ pp ]; use threads qw[ yield ]; use threads::shared; use thread::queue; our $THREADS ||= 5; our $MAX ||= 1e3; --$THREADS; my $Qin = Thread::Queue->new; my $Qout = Thread::Queue->new; sub worker { my $tid = threads->tid; warn "$tid: starting\n"; my( $Qin, $Qout ) = @_; while( my $work = $Qin->dequeue ) { $Qout->enqueue( "$work - processed by thread $tid" ); } warn "$tid: ending\n"; } my @threads = map threads->new( \&worker, $Qin, $Qout ), 0 .. $THREADS +; my $tOut = async{ while( my $out = $Qout->dequeue ) { print $out; } warn "output thread finished\n"; }; my $i = 0; while( <> ) { printf STDERR "\rPending: %d (%d)", $Qin->pending, $i++; $Qin->enqueue( $_ ); # yield while $Qin->pending > $MAX; } warn "Finished reading & queing file\n"; $Qin->enqueue( undef ) for @threads; $_->join for @threads; $Qout->enqueue( undef ); $tOut->join; warn "main finished\n";

        You will have to raise a bug report against the module and hope that it can be fixed. That said, from my breif reading of the code, I wouldn't get my hopes up for a quick fix. If at all.

        The only alternative I can offer you is to avoid queueing hashes. If I need to pass more than one piece of information between threads, it has been my habit to join them into a scalar before enquing them and spliting them on dequeue. I've found this to be far more efficient that either Storable freeze/thaw or the automatic sharing of structures as currently implemented. Most importantly, it doesn't leak!


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.