The code is really, really lengthy (sorry), but maybe you can be entertained by my comments. Start with getBLASTGenePredictions().

sub getBLASTGenePredictions($$) { my ($input_seqs, $settings) = @_; $$settings{num_cpus}||=AKUtils::getNumCPUs(); my $orfs = AKUtils::findOrfs2($input_seqs, {orftype=>'start2stop', n +eed_seq=>1, need_aa_seq=>1}); my $numOrfs; foreach my $seqname(keys %$orfs){ foreach my $frame(keys %{$$orfs{$seqname}}){ foreach my $stop (keys %{$$orfs{$seqname}->{$frame}}){ $numOrfs++; } } } logmsg "$numOrfs ORFs found."; my $reportEvery=int($numOrfs/100); # send out 100 total updates $reportEvery=10 if($reportEvery<10); $$settings{blast_reportEvery}=$reportEvery; my $orfCount=0; $$settings{min_default_db_coverage} ||= 0.7; $$settings{min_reference_coverage} ||= 0.85; $$settings{ignore_blast_errors} ||= 1; $$settings{min_default_db_evalue} ||= 1e-8; $$settings{min_default_db_identity} ||= 0; my ($total, $good_cov, $rc_best); my %cov_hist; $$settings{blast_db} ||= $$settings{prediction_blast_db}; # $$settings{blast_db} ||= $$settings{default_blast_db}; $$settings{blast_xopts} = " -e $$settings{min_default_db_evalue}"; $$settings{quiet} = 1; my %blast_preds; my %blast_settings=%$settings; #$blast_settings{num_cpus}=1; #TODO (?) blastx multithreaded here; parse results multithreaded my $blastQueue=Thread::Queue->new; my @thread; my %tmpOrfs=%$orfs; # pass a new copy of orfs to each thread to avoi +d sharing/freezing issues push(@thread, threads->new(\&blastGenePredictionWorker,\%tmpOrfs,$bl +astQueue,\%blast_settings)) for(1..$$settings{num_cpus}); logmsg "Using database $$settings{blast_db}"; logmsg "Running BLAST gene prediction on ".keys(%$input_seqs)." sequ +ences..."; foreach my $seqname (sort keys %$orfs) { $blastQueue->enqueue($seqname); # sends the seqname to &blastGeneP +redictionWorker() #$|++;print ".";$|--; } $blastQueue->enqueue(undef) for(@thread); while($blastQueue->pending>$$settings{num_cpus} && !$thread[0]->is_j +oinable){ logmsg "Waiting on ".$blastQueue->pending." contigs."; sleep 60; } for(@thread){ logmsg "Joining TID".$_->tid."..."; my $tmp=$_->join; %blast_preds=(%blast_preds,%$tmp); } logmsg "Finished BLASTing $orfCount BLASTs"; return \%blast_preds; } # TODO how can I have multiple blast queries happening against # the same blast database? I need to get past this issue. sub blastGenePredictionWorker{ my($orfs,$queue,$settings)=@_; my $tid="TID".threads->tid; my %settings=(%$settings,(tempdir=>AKUtils::mktempdir)); $settings{num_cpus}=1; my $orfCount=0; my $reportEvery=$settings{blast_reportEvery}; my %blast_preds=(); logmsg "$tid: using tempdir $settings{tempdir}"; while(defined(my $seqname=$queue->dequeue)){ #logmsg "Checking contig $seqname. ".scalar(keys(%orfs))." ORFs to + check."; my %contigOrfs=%{$$orfs{$seqname}}; my @contigOrfKeys= sort keys %contigOrfs; foreach my $frame (@contigOrfKeys) { my $frameHash=$contigOrfs{$frame}; my @frameHashKeys=sort keys %$frameHash; foreach my $stop (@frameHashKeys) { my $orf=$$frameHash{$stop}; my $aa_seq = $orf->{aa_seq}; #print "START BLAST $tid $stop $aa_seq $orf\n"; #print "START BLAST $tid \n"; #my $blsinfile="$$settings{tempdir}/blast$tid.in"; #open(IN,">$blsinfile");print IN ">seq$tid\n$aa_seq\n";close I +N; #my $blast_qs = "blastall -i $blsinfile -p blastp -d $settings +{blast_db} -m 8 -a $settings{num_cpus} "; #$blast_qs .= $$settings{blast_xopts}; #`$blast_qs`; #print "DEBUG $tid\n";next; my $blast_hits = AKUtils::blastSeqs({seq1 => $aa_seq}, 'blastp +', \%settings); #print "STOP BLAST $tid \n"; my ($best_hit, $best_hit_coverage); # TODO: treat truncated orfs separately, relax metrics for the +m and remove irrelevant sanity checks for them # TODO: this selects best hit by coverage, but for purposes of + homolog finding max e-value may be better foreach my $hit (@$blast_hits) { next unless $$hit{start1}; $$hit{coverage} = $$hit{al_len} / length($aa_seq); # query c +overage (not db coverage) next if $$hit{coverage} < $settings{min_default_db_coverage} +; next if $$hit{percent_id} < $settings{min_default_db_identit +y} * 100; $best_hit_coverage = $$hit{coverage} if $best_hit_coverage < + $$hit{coverage} or not defined $best_hit_coverage; $best_hit = $hit if $best_hit_coverage == $$hit{coverage}; } if ($best_hit) { my ($lo_in_seq, $hi_in_seq); if ($$orf{strand} eq '+') { $lo_in_seq = $$orf{start} + ($$best_hit{start1}-1)*3; $hi_in_seq = $$orf{start} + ($$best_hit{end1}-1)*3; } else { $lo_in_seq = $$orf{start} - ($$best_hit{end1}-1)*3; $hi_in_seq = $$orf{start} - ($$best_hit{start1}-1)*3; } # print "Hit:\n"; foreach my $k(sort keys %$best_hit) { print +"\t$k\t$$best_hit{$k}\n"; } # print "PRED: $lo_in_seq..$hi_in_seq ($$orf{strand})\n"; # TODO: scan up to upstream Met from hit location, but not i +f truncated my $pred = { seqname => $seqname, lo => ($$orf{strand} eq '+' ? $lo_in_seq : $stop), # snap +to stop hi => ($$orf{strand} eq '+' ? $stop : $hi_in_seq), # snap +to stop strand => $$orf{strand}, blast_score => $$best_hit{bitscore}, type => 'CDS', start => ($$orf{strand} eq '+' ? $lo_in_seq : $hi_in_seq), stop => $stop, predictor => 'BLAST', }; push(@{$blast_preds{$seqname}}, $pred); # print "\@stop: $stop\n"; foreach my $k(sort keys %{$blast_pr +eds{$seqname}->{$$orf{strand}}->{$stop}}) { print "\t$k\t$blast_preds +{$seqname}->{$$orf{strand}}->{$stop}->{$k}\n"; } die("bad orf $$orf{lo}..$$orf{hi}") if ($$orf{hi}-$$orf{lo}+ +1) % 3 != 0; die("bad pred $$pred{lo} .. $$pred{hi} (orf $$orf{lo}..$$orf +{hi}, in-hit coords $$best_hit{start1}..$$best_hit{end1})") if ($$pre +d{hi} - $$pred{lo} + 1 ) % 3 != 0; } if(++$orfCount % $reportEvery==0){ my $percent_done=int($orfCount*AKUtils::getNumCPUs()/$report +Every); logmsg "$tid: Finished with $orfCount blasts ($percent_done% + done)"; } } } #logmsg "DEBUGGING by returning everything from this seqname ($tid +)";return \%blast_preds; } return \%blast_preds; } # BLAST all sequences in %$seqs against a supplied database. Return re +sults in tabulated format. # mode = blastp|blastn|blastx|psitblastn|tblastn|tblastx (required) # settings required: blast_db # settings optional: blastseqs_xopts, num_cpus, tempdir sub blastSeqs($$$) { my ($seqs, $mode, $settings) = @_; die("Internal error: invalid mode") if $mode !~ /^(blastp|blastn|bla +stx|psitblastn|tblastn|tblastx)$/; die("No BLAST database name supplied") unless $$settings{blast_db}; $$settings{tempdir} ||= AKUtils::mktempdir($settings); $$settings{num_cpus} ||= getNumCPUs(); my $blast_infile = "$$settings{tempdir}/blastseqs.in"; my $blast_outfile = "$$settings{tempdir}/blastseqs.out"; printSeqsToFile($seqs, $blast_infile); my $blast_qs = "legacy_blast.pl blastall -p $mode -d $$settings{blas +t_db} -m 8 -a $$settings{num_cpus} -i $blast_infile "; $blast_qs .= "-o $blast_outfile "; $blast_qs .= $$settings{blast_xopts}; logmsg "Running $blast_qs" unless $$settings{quiet}; #system($blast_qs); my $blsOut=`$blast_qs`; if ($?) { my $er_str = "Error running \"$blast_qs\": $!"; $$settings{ignore_blast_errors} ? warn($er_str) : die($er_str); } #logmsg "DEBUG";$$settings{quiet}=0;return []; # open(BLAST_OUT, "$blast_qs |") or die "Unable to run \"$blast_qs\": +$!"; return loadBLAST8($blast_outfile); } sub loadBLAST8($) { my ($blast_outfile) = @_; my @hits; open(BLAST_OUT, '<', $blast_outfile) or die "Unable to open $blast_o +utfile for reading: $!"; while (<BLAST_OUT>) { chomp; my @line = split /\t/; my %hit; for (qw(name1 name2 percent_id al_len mismatch_bp gap_openings sta +rt1 end1 start2 end2 Evalue bitscore)) { $hit{$_} = shift @line; } push(@hits, \%hit); } close BLAST_OUT; return \@hits; }

In reply to Re^2: Multithreads with an executable in the worker subroutine by lskatz
in thread Multithreads with an executable in the worker subroutine by lskatz

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post, it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Titles consisting of a single word are discouraged, and in most cases are disallowed outright.
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, details, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, summary, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.