lskatz has asked for the wisdom of the Perl Monks concerning the following question:

Hi Monks, I am using the program BLAST inside of perl threads. I have one thread per cpu, and in each thread, I run BLAST with a single cpu. The input for BLAST is given by dequeuing from a queue that all threads share. The threads do not share anything else.

However, each thread waits for all other threads, and I cannot figure out why. In other words, only one instance of BLAST runs at once. Sometimes I catch 2 instances at once when I use the top command. When I remove the system() call, the loop executes almost instantaneously. When I introduce it, it will take a day.

What are some potential causes? Thank you Perl Monks!

  • Comment on Multithreads with an executable in the worker subroutine

Replies are listed 'Best First'.
Re: Multithreads with an executable in the worker subroutine
by BrowserUk (Patriarch) on May 31, 2012 at 00:07 UTC

    Show us the code. It's the only way to resolve the issue.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

      The code is really, really lengthy (sorry), but maybe you can be entertained by my comments. Start with getBLASTGenePredictions().

      sub getBLASTGenePredictions($$) { my ($input_seqs, $settings) = @_; $$settings{num_cpus}||=AKUtils::getNumCPUs(); my $orfs = AKUtils::findOrfs2($input_seqs, {orftype=>'start2stop', n +eed_seq=>1, need_aa_seq=>1}); my $numOrfs; foreach my $seqname(keys %$orfs){ foreach my $frame(keys %{$$orfs{$seqname}}){ foreach my $stop (keys %{$$orfs{$seqname}->{$frame}}){ $numOrfs++; } } } logmsg "$numOrfs ORFs found."; my $reportEvery=int($numOrfs/100); # send out 100 total updates $reportEvery=10 if($reportEvery<10); $$settings{blast_reportEvery}=$reportEvery; my $orfCount=0; $$settings{min_default_db_coverage} ||= 0.7; $$settings{min_reference_coverage} ||= 0.85; $$settings{ignore_blast_errors} ||= 1; $$settings{min_default_db_evalue} ||= 1e-8; $$settings{min_default_db_identity} ||= 0; my ($total, $good_cov, $rc_best); my %cov_hist; $$settings{blast_db} ||= $$settings{prediction_blast_db}; # $$settings{blast_db} ||= $$settings{default_blast_db}; $$settings{blast_xopts} = " -e $$settings{min_default_db_evalue}"; $$settings{quiet} = 1; my %blast_preds; my %blast_settings=%$settings; #$blast_settings{num_cpus}=1; #TODO (?) blastx multithreaded here; parse results multithreaded my $blastQueue=Thread::Queue->new; my @thread; my %tmpOrfs=%$orfs; # pass a new copy of orfs to each thread to avoi +d sharing/freezing issues push(@thread, threads->new(\&blastGenePredictionWorker,\%tmpOrfs,$bl +astQueue,\%blast_settings)) for(1..$$settings{num_cpus}); logmsg "Using database $$settings{blast_db}"; logmsg "Running BLAST gene prediction on ".keys(%$input_seqs)." sequ +ences..."; foreach my $seqname (sort keys %$orfs) { $blastQueue->enqueue($seqname); # sends the seqname to &blastGeneP +redictionWorker() #$|++;print ".";$|--; } $blastQueue->enqueue(undef) for(@thread); while($blastQueue->pending>$$settings{num_cpus} && !$thread[0]->is_j +oinable){ logmsg "Waiting on ".$blastQueue->pending." contigs."; sleep 60; } for(@thread){ logmsg "Joining TID".$_->tid."..."; my $tmp=$_->join; %blast_preds=(%blast_preds,%$tmp); } logmsg "Finished BLASTing $orfCount BLASTs"; return \%blast_preds; } # TODO how can I have multiple blast queries happening against # the same blast database? I need to get past this issue. sub blastGenePredictionWorker{ my($orfs,$queue,$settings)=@_; my $tid="TID".threads->tid; my %settings=(%$settings,(tempdir=>AKUtils::mktempdir)); $settings{num_cpus}=1; my $orfCount=0; my $reportEvery=$settings{blast_reportEvery}; my %blast_preds=(); logmsg "$tid: using tempdir $settings{tempdir}"; while(defined(my $seqname=$queue->dequeue)){ #logmsg "Checking contig $seqname. ".scalar(keys(%orfs))." ORFs to + check."; my %contigOrfs=%{$$orfs{$seqname}}; my @contigOrfKeys= sort keys %contigOrfs; foreach my $frame (@contigOrfKeys) { my $frameHash=$contigOrfs{$frame}; my @frameHashKeys=sort keys %$frameHash; foreach my $stop (@frameHashKeys) { my $orf=$$frameHash{$stop}; my $aa_seq = $orf->{aa_seq}; #print "START BLAST $tid $stop $aa_seq $orf\n"; #print "START BLAST $tid \n"; #my $blsinfile="$$settings{tempdir}/blast$tid.in"; #open(IN,">$blsinfile");print IN ">seq$tid\n$aa_seq\n";close I +N; #my $blast_qs = "blastall -i $blsinfile -p blastp -d $settings +{blast_db} -m 8 -a $settings{num_cpus} "; #$blast_qs .= $$settings{blast_xopts}; #`$blast_qs`; #print "DEBUG $tid\n";next; my $blast_hits = AKUtils::blastSeqs({seq1 => $aa_seq}, 'blastp +', \%settings); #print "STOP BLAST $tid \n"; my ($best_hit, $best_hit_coverage); # TODO: treat truncated orfs separately, relax metrics for the +m and remove irrelevant sanity checks for them # TODO: this selects best hit by coverage, but for purposes of + homolog finding max e-value may be better foreach my $hit (@$blast_hits) { next unless $$hit{start1}; $$hit{coverage} = $$hit{al_len} / length($aa_seq); # query c +overage (not db coverage) next if $$hit{coverage} < $settings{min_default_db_coverage} +; next if $$hit{percent_id} < $settings{min_default_db_identit +y} * 100; $best_hit_coverage = $$hit{coverage} if $best_hit_coverage < + $$hit{coverage} or not defined $best_hit_coverage; $best_hit = $hit if $best_hit_coverage == $$hit{coverage}; } if ($best_hit) { my ($lo_in_seq, $hi_in_seq); if ($$orf{strand} eq '+') { $lo_in_seq = $$orf{start} + ($$best_hit{start1}-1)*3; $hi_in_seq = $$orf{start} + ($$best_hit{end1}-1)*3; } else { $lo_in_seq = $$orf{start} - ($$best_hit{end1}-1)*3; $hi_in_seq = $$orf{start} - ($$best_hit{start1}-1)*3; } # print "Hit:\n"; foreach my $k(sort keys %$best_hit) { print +"\t$k\t$$best_hit{$k}\n"; } # print "PRED: $lo_in_seq..$hi_in_seq ($$orf{strand})\n"; # TODO: scan up to upstream Met from hit location, but not i +f truncated my $pred = { seqname => $seqname, lo => ($$orf{strand} eq '+' ? $lo_in_seq : $stop), # snap +to stop hi => ($$orf{strand} eq '+' ? $stop : $hi_in_seq), # snap +to stop strand => $$orf{strand}, blast_score => $$best_hit{bitscore}, type => 'CDS', start => ($$orf{strand} eq '+' ? $lo_in_seq : $hi_in_seq), stop => $stop, predictor => 'BLAST', }; push(@{$blast_preds{$seqname}}, $pred); # print "\@stop: $stop\n"; foreach my $k(sort keys %{$blast_pr +eds{$seqname}->{$$orf{strand}}->{$stop}}) { print "\t$k\t$blast_preds +{$seqname}->{$$orf{strand}}->{$stop}->{$k}\n"; } die("bad orf $$orf{lo}..$$orf{hi}") if ($$orf{hi}-$$orf{lo}+ +1) % 3 != 0; die("bad pred $$pred{lo} .. $$pred{hi} (orf $$orf{lo}..$$orf +{hi}, in-hit coords $$best_hit{start1}..$$best_hit{end1})") if ($$pre +d{hi} - $$pred{lo} + 1 ) % 3 != 0; } if(++$orfCount % $reportEvery==0){ my $percent_done=int($orfCount*AKUtils::getNumCPUs()/$report +Every); logmsg "$tid: Finished with $orfCount blasts ($percent_done% + done)"; } } } #logmsg "DEBUGGING by returning everything from this seqname ($tid +)";return \%blast_preds; } return \%blast_preds; } # BLAST all sequences in %$seqs against a supplied database. Return re +sults in tabulated format. # mode = blastp|blastn|blastx|psitblastn|tblastn|tblastx (required) # settings required: blast_db # settings optional: blastseqs_xopts, num_cpus, tempdir sub blastSeqs($$$) { my ($seqs, $mode, $settings) = @_; die("Internal error: invalid mode") if $mode !~ /^(blastp|blastn|bla +stx|psitblastn|tblastn|tblastx)$/; die("No BLAST database name supplied") unless $$settings{blast_db}; $$settings{tempdir} ||= AKUtils::mktempdir($settings); $$settings{num_cpus} ||= getNumCPUs(); my $blast_infile = "$$settings{tempdir}/blastseqs.in"; my $blast_outfile = "$$settings{tempdir}/blastseqs.out"; printSeqsToFile($seqs, $blast_infile); my $blast_qs = "legacy_blast.pl blastall -p $mode -d $$settings{blas +t_db} -m 8 -a $$settings{num_cpus} -i $blast_infile "; $blast_qs .= "-o $blast_outfile "; $blast_qs .= $$settings{blast_xopts}; logmsg "Running $blast_qs" unless $$settings{quiet}; #system($blast_qs); my $blsOut=`$blast_qs`; if ($?) { my $er_str = "Error running \"$blast_qs\": $!"; $$settings{ignore_blast_errors} ? warn($er_str) : die($er_str); } #logmsg "DEBUG";$$settings{quiet}=0;return []; # open(BLAST_OUT, "$blast_qs |") or die "Unable to run \"$blast_qs\": +$!"; return loadBLAST8($blast_outfile); } sub loadBLAST8($) { my ($blast_outfile) = @_; my @hits; open(BLAST_OUT, '<', $blast_outfile) or die "Unable to open $blast_o +utfile for reading: $!"; while (<BLAST_OUT>) { chomp; my @line = split /\t/; my %hit; for (qw(name1 name2 percent_id al_len mismatch_bp gap_openings sta +rt1 end1 start2 end2 Evalue bitscore)) { $hit{$_} = shift @line; } push(@hits, \%hit); } close BLAST_OUT; return \@hits; }

        I'll take a proper look tomorrow, but as a first pass guess, try commenting out all the logmsg code and see if makes any difference.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        The start of some sanity?

        Okay. Looking at the code I see nothing -- other than perhaps the implementation of logmsg which you haven't supplied -- to suggest that it is your code that is preventing concurrency.

        Having scanned a little of the information I can find about the blast program, I strongly suspect that it is serialising access to it database file -- probably through file locking. So the problem would also exist if you were using fork.

        The solution I would try is to use different databases for the concurrent runs. If the nature of your workload means that you only want to use a single database, then you could make N (where N is the number of cores) copies of that db file and have each thread operate against a different copy. I am assuming that the blast program is using the db for reference only, not updating it.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        The start of some sanity?

Re: Multithreads with an executable in the worker subroutine
by zentara (Cardinal) on May 31, 2012 at 11:07 UTC
    The input for BLAST is given by dequeuing from a queue that all threads share. The threads do not share anything else.

    If you are not needing shared variables, why not just use fork?


    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
      Last I used ithreads on windows (OP is unix I think) 2 years ago ithread interps never clean up after themselves after exiting and memleak. I believe there is a warning in the POD to reuse interps and not fork.

      To OP, system is a blocking command. It won't return control until the command system runs exists. Not sure if this is your problem.

        Copying the entire database for each thread is about 500MB of space per thread, but it seems to be working. There is definitely some kind of block, after seeing this.

        However, I don't think it is a system block because I was able to put 4 blast queries into a loop in the bash shell and fork them, and top showed all four running at the same time.

        ithread interps never clean up after themselves after exiting and memleak. I believe there is a warning in the POD to reuse interps and not fork.

        Maybe the Windows emulation of fork with threads does that, but on Linux, it's the exact opposite. Forking releases memory reliably, whereas threads don't. Reusing threads is a good idea though.


        I'm not really a human, but I play one on earth.
        Old Perl Programmer Haiku ................... flash japh