in reply to Re: Multithreads with an executable in the worker subroutine
in thread Multithreads with an executable in the worker subroutine
The code is really, really lengthy (sorry), but maybe you can be entertained by my comments. Start with getBLASTGenePredictions().
sub getBLASTGenePredictions($$) { my ($input_seqs, $settings) = @_; $$settings{num_cpus}||=AKUtils::getNumCPUs(); my $orfs = AKUtils::findOrfs2($input_seqs, {orftype=>'start2stop', n +eed_seq=>1, need_aa_seq=>1}); my $numOrfs; foreach my $seqname(keys %$orfs){ foreach my $frame(keys %{$$orfs{$seqname}}){ foreach my $stop (keys %{$$orfs{$seqname}->{$frame}}){ $numOrfs++; } } } logmsg "$numOrfs ORFs found."; my $reportEvery=int($numOrfs/100); # send out 100 total updates $reportEvery=10 if($reportEvery<10); $$settings{blast_reportEvery}=$reportEvery; my $orfCount=0; $$settings{min_default_db_coverage} ||= 0.7; $$settings{min_reference_coverage} ||= 0.85; $$settings{ignore_blast_errors} ||= 1; $$settings{min_default_db_evalue} ||= 1e-8; $$settings{min_default_db_identity} ||= 0; my ($total, $good_cov, $rc_best); my %cov_hist; $$settings{blast_db} ||= $$settings{prediction_blast_db}; # $$settings{blast_db} ||= $$settings{default_blast_db}; $$settings{blast_xopts} = " -e $$settings{min_default_db_evalue}"; $$settings{quiet} = 1; my %blast_preds; my %blast_settings=%$settings; #$blast_settings{num_cpus}=1; #TODO (?) blastx multithreaded here; parse results multithreaded my $blastQueue=Thread::Queue->new; my @thread; my %tmpOrfs=%$orfs; # pass a new copy of orfs to each thread to avoi +d sharing/freezing issues push(@thread, threads->new(\&blastGenePredictionWorker,\%tmpOrfs,$bl +astQueue,\%blast_settings)) for(1..$$settings{num_cpus}); logmsg "Using database $$settings{blast_db}"; logmsg "Running BLAST gene prediction on ".keys(%$input_seqs)." sequ +ences..."; foreach my $seqname (sort keys %$orfs) { $blastQueue->enqueue($seqname); # sends the seqname to &blastGeneP +redictionWorker() #$|++;print ".";$|--; } $blastQueue->enqueue(undef) for(@thread); while($blastQueue->pending>$$settings{num_cpus} && !$thread[0]->is_j +oinable){ logmsg "Waiting on ".$blastQueue->pending." contigs."; sleep 60; } for(@thread){ logmsg "Joining TID".$_->tid."..."; my $tmp=$_->join; %blast_preds=(%blast_preds,%$tmp); } logmsg "Finished BLASTing $orfCount BLASTs"; return \%blast_preds; } # TODO how can I have multiple blast queries happening against # the same blast database? I need to get past this issue. sub blastGenePredictionWorker{ my($orfs,$queue,$settings)=@_; my $tid="TID".threads->tid; my %settings=(%$settings,(tempdir=>AKUtils::mktempdir)); $settings{num_cpus}=1; my $orfCount=0; my $reportEvery=$settings{blast_reportEvery}; my %blast_preds=(); logmsg "$tid: using tempdir $settings{tempdir}"; while(defined(my $seqname=$queue->dequeue)){ #logmsg "Checking contig $seqname. ".scalar(keys(%orfs))." ORFs to + check."; my %contigOrfs=%{$$orfs{$seqname}}; my @contigOrfKeys= sort keys %contigOrfs; foreach my $frame (@contigOrfKeys) { my $frameHash=$contigOrfs{$frame}; my @frameHashKeys=sort keys %$frameHash; foreach my $stop (@frameHashKeys) { my $orf=$$frameHash{$stop}; my $aa_seq = $orf->{aa_seq}; #print "START BLAST $tid $stop $aa_seq $orf\n"; #print "START BLAST $tid \n"; #my $blsinfile="$$settings{tempdir}/blast$tid.in"; #open(IN,">$blsinfile");print IN ">seq$tid\n$aa_seq\n";close I +N; #my $blast_qs = "blastall -i $blsinfile -p blastp -d $settings +{blast_db} -m 8 -a $settings{num_cpus} "; #$blast_qs .= $$settings{blast_xopts}; #`$blast_qs`; #print "DEBUG $tid\n";next; my $blast_hits = AKUtils::blastSeqs({seq1 => $aa_seq}, 'blastp +', \%settings); #print "STOP BLAST $tid \n"; my ($best_hit, $best_hit_coverage); # TODO: treat truncated orfs separately, relax metrics for the +m and remove irrelevant sanity checks for them # TODO: this selects best hit by coverage, but for purposes of + homolog finding max e-value may be better foreach my $hit (@$blast_hits) { next unless $$hit{start1}; $$hit{coverage} = $$hit{al_len} / length($aa_seq); # query c +overage (not db coverage) next if $$hit{coverage} < $settings{min_default_db_coverage} +; next if $$hit{percent_id} < $settings{min_default_db_identit +y} * 100; $best_hit_coverage = $$hit{coverage} if $best_hit_coverage < + $$hit{coverage} or not defined $best_hit_coverage; $best_hit = $hit if $best_hit_coverage == $$hit{coverage}; } if ($best_hit) { my ($lo_in_seq, $hi_in_seq); if ($$orf{strand} eq '+') { $lo_in_seq = $$orf{start} + ($$best_hit{start1}-1)*3; $hi_in_seq = $$orf{start} + ($$best_hit{end1}-1)*3; } else { $lo_in_seq = $$orf{start} - ($$best_hit{end1}-1)*3; $hi_in_seq = $$orf{start} - ($$best_hit{start1}-1)*3; } # print "Hit:\n"; foreach my $k(sort keys %$best_hit) { print +"\t$k\t$$best_hit{$k}\n"; } # print "PRED: $lo_in_seq..$hi_in_seq ($$orf{strand})\n"; # TODO: scan up to upstream Met from hit location, but not i +f truncated my $pred = { seqname => $seqname, lo => ($$orf{strand} eq '+' ? $lo_in_seq : $stop), # snap +to stop hi => ($$orf{strand} eq '+' ? $stop : $hi_in_seq), # snap +to stop strand => $$orf{strand}, blast_score => $$best_hit{bitscore}, type => 'CDS', start => ($$orf{strand} eq '+' ? $lo_in_seq : $hi_in_seq), stop => $stop, predictor => 'BLAST', }; push(@{$blast_preds{$seqname}}, $pred); # print "\@stop: $stop\n"; foreach my $k(sort keys %{$blast_pr +eds{$seqname}->{$$orf{strand}}->{$stop}}) { print "\t$k\t$blast_preds +{$seqname}->{$$orf{strand}}->{$stop}->{$k}\n"; } die("bad orf $$orf{lo}..$$orf{hi}") if ($$orf{hi}-$$orf{lo}+ +1) % 3 != 0; die("bad pred $$pred{lo} .. $$pred{hi} (orf $$orf{lo}..$$orf +{hi}, in-hit coords $$best_hit{start1}..$$best_hit{end1})") if ($$pre +d{hi} - $$pred{lo} + 1 ) % 3 != 0; } if(++$orfCount % $reportEvery==0){ my $percent_done=int($orfCount*AKUtils::getNumCPUs()/$report +Every); logmsg "$tid: Finished with $orfCount blasts ($percent_done% + done)"; } } } #logmsg "DEBUGGING by returning everything from this seqname ($tid +)";return \%blast_preds; } return \%blast_preds; } # BLAST all sequences in %$seqs against a supplied database. Return re +sults in tabulated format. # mode = blastp|blastn|blastx|psitblastn|tblastn|tblastx (required) # settings required: blast_db # settings optional: blastseqs_xopts, num_cpus, tempdir sub blastSeqs($$$) { my ($seqs, $mode, $settings) = @_; die("Internal error: invalid mode") if $mode !~ /^(blastp|blastn|bla +stx|psitblastn|tblastn|tblastx)$/; die("No BLAST database name supplied") unless $$settings{blast_db}; $$settings{tempdir} ||= AKUtils::mktempdir($settings); $$settings{num_cpus} ||= getNumCPUs(); my $blast_infile = "$$settings{tempdir}/blastseqs.in"; my $blast_outfile = "$$settings{tempdir}/blastseqs.out"; printSeqsToFile($seqs, $blast_infile); my $blast_qs = "legacy_blast.pl blastall -p $mode -d $$settings{blas +t_db} -m 8 -a $$settings{num_cpus} -i $blast_infile "; $blast_qs .= "-o $blast_outfile "; $blast_qs .= $$settings{blast_xopts}; logmsg "Running $blast_qs" unless $$settings{quiet}; #system($blast_qs); my $blsOut=`$blast_qs`; if ($?) { my $er_str = "Error running \"$blast_qs\": $!"; $$settings{ignore_blast_errors} ? warn($er_str) : die($er_str); } #logmsg "DEBUG";$$settings{quiet}=0;return []; # open(BLAST_OUT, "$blast_qs |") or die "Unable to run \"$blast_qs\": +$!"; return loadBLAST8($blast_outfile); } sub loadBLAST8($) { my ($blast_outfile) = @_; my @hits; open(BLAST_OUT, '<', $blast_outfile) or die "Unable to open $blast_o +utfile for reading: $!"; while (<BLAST_OUT>) { chomp; my @line = split /\t/; my %hit; for (qw(name1 name2 percent_id al_len mismatch_bp gap_openings sta +rt1 end1 start2 end2 Evalue bitscore)) { $hit{$_} = shift @line; } push(@hits, \%hit); } close BLAST_OUT; return \@hits; }
|
|---|
| Replies are listed 'Best First'. | |
|---|---|
|
Re^3: Multithreads with an executable in the worker subroutine
by BrowserUk (Patriarch) on May 31, 2012 at 00:41 UTC | |
|
Re^3: Multithreads with an executable in the worker subroutine (fork (probably) won't help)
by BrowserUk (Patriarch) on May 31, 2012 at 12:30 UTC | |
by Anonymous Monk on May 31, 2012 at 13:24 UTC | |
by lskatz (Novice) on May 31, 2012 at 13:27 UTC |