my $NUM_THREADS = 5; my $BUFFER_SIZE = 1; my @inQueues = (); my @threads = (); my $outQ = Thread::Queue->new(); my $readyQ = Thread::Queue->new(); # create worker threads for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { my $inQ = Thread::Queue->new(); my $thr = threads->create( sub { # see the Worker Threads section for this ... } ); push @inQueues, $inQ; push @threads, $thr; # start the threads $readyQ->enqueue( $i ); } # create writer thread my $curLot = 0; my $curBatch = 0; my $curPart = 0; my $writerThr = threads->create( sub { # see the Writer Thread section for this ... } ); # give a line from SEED_FILE to each thread for ( ;; ) { last if eof( $SEED_FILE ); my $nextThread = $readyQ->dequeue(); chomp( my $fastaComment = <$SEED_FILE> ); chomp( my $readSequence = <$SEED_FILE> ); my %item = ( 'lot' => $curLot, 'batch' => $curBatch, 'part' => $curPart, 'fasta' => $fastaComment, 'read' => $readSequence, ); $inQueues[ $nextThread ]->enqueue( \%item ); # get the next batch/part number $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } } # let threads know that input is done for ( my $i = 0; $i < $NUM_THREADS; $i++ ) { $inQueues[ $i ]->enqueue( undef ); } # wait for threads to finish foreach my $thr ( @threads ) { $thr->join(); } $writerThr->join(); #### my $thr = threads->create( sub { my $threadId = $i; open my $REFERENCE_FILE, '<', "$refFileName" or die "Can't open $refFileName: $!"; while ( defined( my $item = $inQ->dequeue() ) ) { my $lot = $item->{ 'lot' }; my $batch = $item->{ 'batch' }; my $part = $item->{ 'part' }; my $fastaComment = $item->{ 'fasta' }; my $readSequence = $item->{ 'read' }; my @hits = split /[,]/, $fastaComment; my $beadID = shift( @hits ); # do some sutff to the current item that makes use of # REFERENCE_FILE my $bestSeedOverallMismatches = $INF; my @bestSeedExtensions = (); foreach my $hitString ( @hits ) { ... } my %result = ( 'lot' => $lot, 'batch' => $batch, 'part' => $part, 'thread' => $threadId, 'extensions' => \@bestSeedExtensions, 'num_mm' => $bestSeedOverallMismatches, 'beadid' => $beadID, 'read' => $readSequence, ); $outQ->enqueue( \%result ); } close $REFERENCE_FILE; } );#end sub #### my $writerThr = threads->create( sub { my $numSeedsInv = 1.0 / $numSeeds; open my $EXTENSION_FILE, ">", $extendFileName or die "ERROR: could not write to $extendFileName: $!"; open my $UNMATCHED_FILE, ">", $unmatchedFileName or die "ERROR: could not write to $unmatchedFileName: $!"; print STDOUT "INFO: 0% complete\n"; # print out the best extension(s) for this seed; otherwise, print # this read out to the unmatchable file my $numSeedsExtended = 0; my $nextTarget = 10; my %buffer = (); my @suspendedThreads = (); while ( $numSeedsExtended < $numSeeds ) { my $resultRef = $outQ->dequeue(); my $lot = $resultRef->{ 'lot' }; my $batch = $resultRef->{ 'batch' }; my $part = $resultRef->{ 'part' }; my $threadId = $resultRef->{ 'thread' }; if ( $lot == $curLot && $batch == $curBatch && $part == $curPart ) { # thread is free to go $readyQ->enqueue( $threadId ); # print its results and check the buffer for the next set # of results for ( ;; ) { my $bestSeedExtensions = $resultRef->{ 'extensions' }; my $bestSeedOverallMismatches = $resultRef->{ 'num_mm' }; my $beadID = $resultRef->{ 'beadid' }; my $readSequence = $resultRef->{ 'read' }; my $readIsExtendable = @$bestSeedExtensions; if ( $readIsExtendable ) { printBestExtensions( $beadID, $EXTENSION_FILE, $maxNumHits, $bestSeedExtensions, $bestSeedOverallMismatches ); print $EXTENSION_FILE "\n$readSequence\n"; } else { print $UNMATCHED_FILE "$beadID\n"; print $UNMATCHED_FILE "$readSequence\n"; } # print progress ++$numSeedsExtended; my $percentComplete = $numSeedsExtended * $numSeedsInv * 100; if ( $percentComplete >= $nextTarget ) { $percentComplete = int( $percentComplete * 0.1 ) * 10; print STDOUT "INFO: $percentComplete% complete\n"; $nextTarget = $percentComplete + 10; } $buffer{ $batch }->[ $part ] = undef; # go to the next needed result $curPart = ($curPart + 1) % $NUM_THREADS; if ( $curPart == 0 ) { ++$curBatch; $curBatch %= $BUFFER_SIZE; } if ( $curPart == 0 && $curBatch == 0 ) { ++$curLot; } # release suspended threads, if we're back at the # start of the buffer if ( @suspendedThreads && $curBatch == 0 && $curPart == 0 ) { foreach my $resultRef_s ( @suspendedThreads ) { my $batch_s = $resultRef_s->{ 'batch' }; my $part_s = $resultRef_s->{ 'part' }; my $threadId_s = $resultRef_s->{ 'thread' }; $buffer{ $batch_s }->[ $part_s ] = $resultRef_s; $readyQ->enqueue( $threadId_s ); } @suspendedThreads = (); } # check if the result for the next needed result is # already in the buffer $resultRef = $buffer{ $curBatch }->[ $curPart ]; last unless( defined $resultRef ); $batch = $curBatch; $part = $curPart; }#for ever } else { if ( $lot == $curLot ) { $buffer{ $batch }->[ $part ] = $resultRef; $readyQ->enqueue( $threadId ); } else { push @suspendedThreads, $resultRef; } } }# while numSeedsExtended close $EXTENSION_FILE; close $UNMATCHED_FILE; } );