in reply to Re: Multithreads with an executable in the worker subroutine
in thread Multithreads with an executable in the worker subroutine

The code is really, really lengthy (sorry), but maybe you can be entertained by my comments. Start with getBLASTGenePredictions().

sub getBLASTGenePredictions($$) { my ($input_seqs, $settings) = @_; $$settings{num_cpus}||=AKUtils::getNumCPUs(); my $orfs = AKUtils::findOrfs2($input_seqs, {orftype=>'start2stop', n +eed_seq=>1, need_aa_seq=>1}); my $numOrfs; foreach my $seqname(keys %$orfs){ foreach my $frame(keys %{$$orfs{$seqname}}){ foreach my $stop (keys %{$$orfs{$seqname}->{$frame}}){ $numOrfs++; } } } logmsg "$numOrfs ORFs found."; my $reportEvery=int($numOrfs/100); # send out 100 total updates $reportEvery=10 if($reportEvery<10); $$settings{blast_reportEvery}=$reportEvery; my $orfCount=0; $$settings{min_default_db_coverage} ||= 0.7; $$settings{min_reference_coverage} ||= 0.85; $$settings{ignore_blast_errors} ||= 1; $$settings{min_default_db_evalue} ||= 1e-8; $$settings{min_default_db_identity} ||= 0; my ($total, $good_cov, $rc_best); my %cov_hist; $$settings{blast_db} ||= $$settings{prediction_blast_db}; # $$settings{blast_db} ||= $$settings{default_blast_db}; $$settings{blast_xopts} = " -e $$settings{min_default_db_evalue}"; $$settings{quiet} = 1; my %blast_preds; my %blast_settings=%$settings; #$blast_settings{num_cpus}=1; #TODO (?) blastx multithreaded here; parse results multithreaded my $blastQueue=Thread::Queue->new; my @thread; my %tmpOrfs=%$orfs; # pass a new copy of orfs to each thread to avoi +d sharing/freezing issues push(@thread, threads->new(\&blastGenePredictionWorker,\%tmpOrfs,$bl +astQueue,\%blast_settings)) for(1..$$settings{num_cpus}); logmsg "Using database $$settings{blast_db}"; logmsg "Running BLAST gene prediction on ".keys(%$input_seqs)." sequ +ences..."; foreach my $seqname (sort keys %$orfs) { $blastQueue->enqueue($seqname); # sends the seqname to &blastGeneP +redictionWorker() #$|++;print ".";$|--; } $blastQueue->enqueue(undef) for(@thread); while($blastQueue->pending>$$settings{num_cpus} && !$thread[0]->is_j +oinable){ logmsg "Waiting on ".$blastQueue->pending." contigs."; sleep 60; } for(@thread){ logmsg "Joining TID".$_->tid."..."; my $tmp=$_->join; %blast_preds=(%blast_preds,%$tmp); } logmsg "Finished BLASTing $orfCount BLASTs"; return \%blast_preds; } # TODO how can I have multiple blast queries happening against # the same blast database? I need to get past this issue. sub blastGenePredictionWorker{ my($orfs,$queue,$settings)=@_; my $tid="TID".threads->tid; my %settings=(%$settings,(tempdir=>AKUtils::mktempdir)); $settings{num_cpus}=1; my $orfCount=0; my $reportEvery=$settings{blast_reportEvery}; my %blast_preds=(); logmsg "$tid: using tempdir $settings{tempdir}"; while(defined(my $seqname=$queue->dequeue)){ #logmsg "Checking contig $seqname. ".scalar(keys(%orfs))." ORFs to + check."; my %contigOrfs=%{$$orfs{$seqname}}; my @contigOrfKeys= sort keys %contigOrfs; foreach my $frame (@contigOrfKeys) { my $frameHash=$contigOrfs{$frame}; my @frameHashKeys=sort keys %$frameHash; foreach my $stop (@frameHashKeys) { my $orf=$$frameHash{$stop}; my $aa_seq = $orf->{aa_seq}; #print "START BLAST $tid $stop $aa_seq $orf\n"; #print "START BLAST $tid \n"; #my $blsinfile="$$settings{tempdir}/blast$tid.in"; #open(IN,">$blsinfile");print IN ">seq$tid\n$aa_seq\n";close I +N; #my $blast_qs = "blastall -i $blsinfile -p blastp -d $settings +{blast_db} -m 8 -a $settings{num_cpus} "; #$blast_qs .= $$settings{blast_xopts}; #`$blast_qs`; #print "DEBUG $tid\n";next; my $blast_hits = AKUtils::blastSeqs({seq1 => $aa_seq}, 'blastp +', \%settings); #print "STOP BLAST $tid \n"; my ($best_hit, $best_hit_coverage); # TODO: treat truncated orfs separately, relax metrics for the +m and remove irrelevant sanity checks for them # TODO: this selects best hit by coverage, but for purposes of + homolog finding max e-value may be better foreach my $hit (@$blast_hits) { next unless $$hit{start1}; $$hit{coverage} = $$hit{al_len} / length($aa_seq); # query c +overage (not db coverage) next if $$hit{coverage} < $settings{min_default_db_coverage} +; next if $$hit{percent_id} < $settings{min_default_db_identit +y} * 100; $best_hit_coverage = $$hit{coverage} if $best_hit_coverage < + $$hit{coverage} or not defined $best_hit_coverage; $best_hit = $hit if $best_hit_coverage == $$hit{coverage}; } if ($best_hit) { my ($lo_in_seq, $hi_in_seq); if ($$orf{strand} eq '+') { $lo_in_seq = $$orf{start} + ($$best_hit{start1}-1)*3; $hi_in_seq = $$orf{start} + ($$best_hit{end1}-1)*3; } else { $lo_in_seq = $$orf{start} - ($$best_hit{end1}-1)*3; $hi_in_seq = $$orf{start} - ($$best_hit{start1}-1)*3; } # print "Hit:\n"; foreach my $k(sort keys %$best_hit) { print +"\t$k\t$$best_hit{$k}\n"; } # print "PRED: $lo_in_seq..$hi_in_seq ($$orf{strand})\n"; # TODO: scan up to upstream Met from hit location, but not i +f truncated my $pred = { seqname => $seqname, lo => ($$orf{strand} eq '+' ? $lo_in_seq : $stop), # snap +to stop hi => ($$orf{strand} eq '+' ? $stop : $hi_in_seq), # snap +to stop strand => $$orf{strand}, blast_score => $$best_hit{bitscore}, type => 'CDS', start => ($$orf{strand} eq '+' ? $lo_in_seq : $hi_in_seq), stop => $stop, predictor => 'BLAST', }; push(@{$blast_preds{$seqname}}, $pred); # print "\@stop: $stop\n"; foreach my $k(sort keys %{$blast_pr +eds{$seqname}->{$$orf{strand}}->{$stop}}) { print "\t$k\t$blast_preds +{$seqname}->{$$orf{strand}}->{$stop}->{$k}\n"; } die("bad orf $$orf{lo}..$$orf{hi}") if ($$orf{hi}-$$orf{lo}+ +1) % 3 != 0; die("bad pred $$pred{lo} .. $$pred{hi} (orf $$orf{lo}..$$orf +{hi}, in-hit coords $$best_hit{start1}..$$best_hit{end1})") if ($$pre +d{hi} - $$pred{lo} + 1 ) % 3 != 0; } if(++$orfCount % $reportEvery==0){ my $percent_done=int($orfCount*AKUtils::getNumCPUs()/$report +Every); logmsg "$tid: Finished with $orfCount blasts ($percent_done% + done)"; } } } #logmsg "DEBUGGING by returning everything from this seqname ($tid +)";return \%blast_preds; } return \%blast_preds; } # BLAST all sequences in %$seqs against a supplied database. Return re +sults in tabulated format. # mode = blastp|blastn|blastx|psitblastn|tblastn|tblastx (required) # settings required: blast_db # settings optional: blastseqs_xopts, num_cpus, tempdir sub blastSeqs($$$) { my ($seqs, $mode, $settings) = @_; die("Internal error: invalid mode") if $mode !~ /^(blastp|blastn|bla +stx|psitblastn|tblastn|tblastx)$/; die("No BLAST database name supplied") unless $$settings{blast_db}; $$settings{tempdir} ||= AKUtils::mktempdir($settings); $$settings{num_cpus} ||= getNumCPUs(); my $blast_infile = "$$settings{tempdir}/blastseqs.in"; my $blast_outfile = "$$settings{tempdir}/blastseqs.out"; printSeqsToFile($seqs, $blast_infile); my $blast_qs = "legacy_blast.pl blastall -p $mode -d $$settings{blas +t_db} -m 8 -a $$settings{num_cpus} -i $blast_infile "; $blast_qs .= "-o $blast_outfile "; $blast_qs .= $$settings{blast_xopts}; logmsg "Running $blast_qs" unless $$settings{quiet}; #system($blast_qs); my $blsOut=`$blast_qs`; if ($?) { my $er_str = "Error running \"$blast_qs\": $!"; $$settings{ignore_blast_errors} ? warn($er_str) : die($er_str); } #logmsg "DEBUG";$$settings{quiet}=0;return []; # open(BLAST_OUT, "$blast_qs |") or die "Unable to run \"$blast_qs\": +$!"; return loadBLAST8($blast_outfile); } sub loadBLAST8($) { my ($blast_outfile) = @_; my @hits; open(BLAST_OUT, '<', $blast_outfile) or die "Unable to open $blast_o +utfile for reading: $!"; while (<BLAST_OUT>) { chomp; my @line = split /\t/; my %hit; for (qw(name1 name2 percent_id al_len mismatch_bp gap_openings sta +rt1 end1 start2 end2 Evalue bitscore)) { $hit{$_} = shift @line; } push(@hits, \%hit); } close BLAST_OUT; return \@hits; }

Replies are listed 'Best First'.
Re^3: Multithreads with an executable in the worker subroutine
by BrowserUk (Patriarch) on May 31, 2012 at 00:41 UTC

    I'll take a proper look tomorrow, but as a first pass guess, try commenting out all the logmsg code and see if makes any difference.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

Re^3: Multithreads with an executable in the worker subroutine (fork (probably) won't help)
by BrowserUk (Patriarch) on May 31, 2012 at 12:30 UTC

    Okay. Looking at the code I see nothing -- other than perhaps the implementation of logmsg which you haven't supplied -- to suggest that it is your code that is preventing concurrency.

    Having scanned a little of the information I can find about the blast program, I strongly suspect that it is serialising access to it database file -- probably through file locking. So the problem would also exist if you were using fork.

    The solution I would try is to use different databases for the concurrent runs. If the nature of your workload means that you only want to use a single database, then you could make N (where N is the number of cores) copies of that db file and have each thread operate against a different copy. I am assuming that the blast program is using the db for reference only, not updating it.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

      This is logmsg and I don't think it's blocking anything.
      sub logmsg {my $FH = $AKUtils::LOG || *STDOUT; print $FH "$0: ".(caller(1))[3].": @_\n";}
        Sorry, I replied anonymously, but that is indeed the logmsg subroutine. sub logmsg {my $FH = $AKUtils::LOG || *STDOUT; print $FH "$0: ".(caller(1))[3].": @_\n";}