my $writerThr = threads->create( sub { my $numSeedsInv = 1.0 / $numSeeds; open my $EXTENSION_FILE, ">", $extendFileName or die "ERROR: could not write to $extendFileName: $!"; open my $UNMATCHED_FILE, ">", $unmatchedFileName or die "ERROR: could not write to $unmatchedFileName: $!"; print STDOUT "INFO: 0% complete\n"; # print out the best extension(s) for this seed; otherwise, print # this read out to the unmatchable file my $numSeedsExtended = 0; my $nextTarget = 10; my %buffer = (); my @suspendedThreads = (); while ( $numSeedsExtended < $numSeeds ) { my $resultRef = $outQ->dequeue(); my $lot = $resultRef->{ 'lot' }; my $batch = $resultRef->{ 'batch' }; my $part = $resultRef->{ 'part' }; my $threadId = $resultRef->{ 'thread' }; if ( $lot == $curLot && $batch == $curBatch && $part == $curPart ) { # thread is free to go $readyQ->enqueue( $threadId ); # print its results and check the buffer for the next set # of results for ( ;; ) { my $bestSeedExtensions = $resultRef->{ 'extensions' }; my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' }; my $beadID = $resultRef->{ 'beadid' }; my $readSequence = $resultRef->{ 'read' }; my $readIsExtendable = @$bestSeedExtensions; if ( $readIsExtendable ) { printBestExtensions( $beadID, $EXTENSION_FILE, $maxNumHits, $bestSeedExtensions, $bestSeedOverallMismatches ); print $EXTENSION_FILE "\n$readSequence\n"; } else { print $UNMATCHED_FILE "$beadID\n"; print $UNMATCHED_FILE "$readSequence\n"; } # print progress ++$numSeedsExtended; my $percentComplete = $numSeedsExtended * $numSeedsInv * 100; if ( $percentComplete >= $nextTarget ) { $percentComplete = int( $percentComplete * 0.1 ) * 10; print STDOUT "INFO: $percentComplete% complete\n"; $nextTarget = $percentComplete + 10; } $buffer{ $batch }->[ $part ] = undef; # go to the next needed result $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } # release suspended threads, if we're back at the # start of the buffer if ( @suspendedThreads && $curBatch == 0 && $curPart == 0 ) { foreach my $resultRef_s ( @suspendedThreads ) { my $batch_s = $resultRef_s->{ 'batch' }; my $part_s = $resultRef_s->{ 'part' }; my $threadId_s = $resultRef_s->{ 'thread' }; $buffer{ $batch_s }->[ $part_s ] = $resultRef_s; $readyQ->enqueue( $threadId_s ); } @suspendedThreads = (); } # check if the result for the next needed result is # already in the buffer $resultRef = $buffer{ $curBatch }->[ $curPart ]; last unless( defined $resultRef ); $batch = $curBatch; $part = $curPart; }#for ever } else { if ( $lot == $curLot ) { $buffer{ $batch }->[ $part ] = $resultRef; $readyQ->enqueue( $threadId ); } else { push @suspendedThreads, $resultRef; } } }# while numSeedsExtended close $EXTENSION_FILE; close $UNMATCHED_FILE; } );