sub getBLASTGenePredictions($$) {
my ($input_seqs, $settings) = @_;
$$settings{num_cpus}||=AKUtils::getNumCPUs();
my $orfs = AKUtils::findOrfs2($input_seqs, {orftype=>'start2stop', n
+eed_seq=>1, need_aa_seq=>1});
my $numOrfs;
foreach my $seqname(keys %$orfs){
foreach my $frame(keys %{$$orfs{$seqname}}){
foreach my $stop (keys %{$$orfs{$seqname}->{$frame}}){
$numOrfs++;
}
}
}
logmsg "$numOrfs ORFs found.";
my $reportEvery=int($numOrfs/100); # send out 100 total updates
$reportEvery=10 if($reportEvery<10);
$$settings{blast_reportEvery}=$reportEvery;
my $orfCount=0;
$$settings{min_default_db_coverage} ||= 0.7;
$$settings{min_reference_coverage} ||= 0.85;
$$settings{ignore_blast_errors} ||= 1;
$$settings{min_default_db_evalue} ||= 1e-8;
$$settings{min_default_db_identity} ||= 0;
my ($total, $good_cov, $rc_best); my %cov_hist;
$$settings{blast_db} ||= $$settings{prediction_blast_db};
# $$settings{blast_db} ||= $$settings{default_blast_db};
$$settings{blast_xopts} = " -e $$settings{min_default_db_evalue}";
$$settings{quiet} = 1;
my %blast_preds;
my %blast_settings=%$settings;
#$blast_settings{num_cpus}=1;
#TODO (?) blastx multithreaded here; parse results multithreaded
my $blastQueue=Thread::Queue->new;
my @thread;
my %tmpOrfs=%$orfs; # pass a new copy of orfs to each thread to avoi
+d sharing/freezing issues
push(@thread, threads->new(\&blastGenePredictionWorker,\%tmpOrfs,$bl
+astQueue,\%blast_settings)) for(1..$$settings{num_cpus});
logmsg "Using database $$settings{blast_db}";
logmsg "Running BLAST gene prediction on ".keys(%$input_seqs)." sequ
+ences...";
foreach my $seqname (sort keys %$orfs) {
$blastQueue->enqueue($seqname); # sends the seqname to &blastGeneP
+redictionWorker()
#$|++;print ".";$|--;
}
$blastQueue->enqueue(undef) for(@thread);
while($blastQueue->pending>$$settings{num_cpus} && !$thread[0]->is_j
+oinable){
logmsg "Waiting on ".$blastQueue->pending." contigs.";
sleep 60;
}
for(@thread){
logmsg "Joining TID".$_->tid."...";
my $tmp=$_->join;
%blast_preds=(%blast_preds,%$tmp);
}
logmsg "Finished BLASTing $orfCount BLASTs";
return \%blast_preds;
}
# TODO how can I have multiple blast queries happening against
# the same blast database? I need to get past this issue.
sub blastGenePredictionWorker{
my($orfs,$queue,$settings)=@_;
my $tid="TID".threads->tid;
my %settings=(%$settings,(tempdir=>AKUtils::mktempdir));
$settings{num_cpus}=1;
my $orfCount=0;
my $reportEvery=$settings{blast_reportEvery};
my %blast_preds=();
logmsg "$tid: using tempdir $settings{tempdir}";
while(defined(my $seqname=$queue->dequeue)){
#logmsg "Checking contig $seqname. ".scalar(keys(%orfs))." ORFs to
+ check.";
my %contigOrfs=%{$$orfs{$seqname}};
my @contigOrfKeys= sort keys %contigOrfs;
foreach my $frame (@contigOrfKeys) {
my $frameHash=$contigOrfs{$frame};
my @frameHashKeys=sort keys %$frameHash;
foreach my $stop (@frameHashKeys) {
my $orf=$$frameHash{$stop};
my $aa_seq = $orf->{aa_seq};
#print "START BLAST $tid $stop $aa_seq $orf\n";
#print "START BLAST $tid \n";
#my $blsinfile="$$settings{tempdir}/blast$tid.in";
#open(IN,">$blsinfile");print IN ">seq$tid\n$aa_seq\n";close I
+N;
#my $blast_qs = "blastall -i $blsinfile -p blastp -d $settings
+{blast_db} -m 8 -a $settings{num_cpus} ";
#$blast_qs .= $$settings{blast_xopts};
#`$blast_qs`;
#print "DEBUG $tid\n";next;
my $blast_hits = AKUtils::blastSeqs({seq1 => $aa_seq}, 'blastp
+', \%settings);
#print "STOP BLAST $tid \n";
my ($best_hit, $best_hit_coverage);
# TODO: treat truncated orfs separately, relax metrics for the
+m and remove irrelevant sanity checks for them
# TODO: this selects best hit by coverage, but for purposes of
+ homolog finding max e-value may be better
foreach my $hit (@$blast_hits) {
next unless $$hit{start1};
$$hit{coverage} = $$hit{al_len} / length($aa_seq); # query c
+overage (not db coverage)
next if $$hit{coverage} < $settings{min_default_db_coverage}
+;
next if $$hit{percent_id} < $settings{min_default_db_identit
+y} * 100;
$best_hit_coverage = $$hit{coverage} if $best_hit_coverage <
+ $$hit{coverage} or not defined $best_hit_coverage;
$best_hit = $hit if $best_hit_coverage == $$hit{coverage};
}
if ($best_hit) {
my ($lo_in_seq, $hi_in_seq);
if ($$orf{strand} eq '+') {
$lo_in_seq = $$orf{start} + ($$best_hit{start1}-1)*3;
$hi_in_seq = $$orf{start} + ($$best_hit{end1}-1)*3;
} else {
$lo_in_seq = $$orf{start} - ($$best_hit{end1}-1)*3;
$hi_in_seq = $$orf{start} - ($$best_hit{start1}-1)*3;
}
# print "Hit:\n"; foreach my $k(sort keys %$best_hit) { print
+"\t$k\t$$best_hit{$k}\n"; }
# print "PRED: $lo_in_seq..$hi_in_seq ($$orf{strand})\n";
# TODO: scan up to upstream Met from hit location, but not i
+f truncated
my $pred = { seqname => $seqname,
lo => ($$orf{strand} eq '+' ? $lo_in_seq : $stop), # snap
+to stop
hi => ($$orf{strand} eq '+' ? $stop : $hi_in_seq), # snap
+to stop
strand => $$orf{strand},
blast_score => $$best_hit{bitscore},
type => 'CDS',
start => ($$orf{strand} eq '+' ? $lo_in_seq : $hi_in_seq),
stop => $stop,
predictor => 'BLAST', };
push(@{$blast_preds{$seqname}}, $pred);
# print "\@stop: $stop\n"; foreach my $k(sort keys %{$blast_pr
+eds{$seqname}->{$$orf{strand}}->{$stop}}) { print "\t$k\t$blast_preds
+{$seqname}->{$$orf{strand}}->{$stop}->{$k}\n"; }
die("bad orf $$orf{lo}..$$orf{hi}") if ($$orf{hi}-$$orf{lo}+
+1) % 3 != 0;
die("bad pred $$pred{lo} .. $$pred{hi} (orf $$orf{lo}..$$orf
+{hi}, in-hit coords $$best_hit{start1}..$$best_hit{end1})") if ($$pre
+d{hi} - $$pred{lo} + 1 ) % 3 != 0;
}
if(++$orfCount % $reportEvery==0){
my $percent_done=int($orfCount*AKUtils::getNumCPUs()/$report
+Every);
logmsg "$tid: Finished with $orfCount blasts ($percent_done%
+ done)";
}
}
}
#logmsg "DEBUGGING by returning everything from this seqname ($tid
+)";return \%blast_preds;
}
return \%blast_preds;
}
# BLAST all sequences in %$seqs against a supplied database. Return re
+sults in tabulated format.
# mode = blastp|blastn|blastx|psitblastn|tblastn|tblastx (required)
# settings required: blast_db
# settings optional: blastseqs_xopts, num_cpus, tempdir
sub blastSeqs($$$) {
my ($seqs, $mode, $settings) = @_;
die("Internal error: invalid mode") if $mode !~ /^(blastp|blastn|bla
+stx|psitblastn|tblastn|tblastx)$/;
die("No BLAST database name supplied") unless $$settings{blast_db};
$$settings{tempdir} ||= AKUtils::mktempdir($settings);
$$settings{num_cpus} ||= getNumCPUs();
my $blast_infile = "$$settings{tempdir}/blastseqs.in";
my $blast_outfile = "$$settings{tempdir}/blastseqs.out";
printSeqsToFile($seqs, $blast_infile);
my $blast_qs = "legacy_blast.pl blastall -p $mode -d $$settings{blas
+t_db} -m 8 -a $$settings{num_cpus} -i $blast_infile ";
$blast_qs .= "-o $blast_outfile ";
$blast_qs .= $$settings{blast_xopts};
logmsg "Running $blast_qs" unless $$settings{quiet};
#system($blast_qs);
my $blsOut=`$blast_qs`;
if ($?) {
my $er_str = "Error running \"$blast_qs\": $!";
$$settings{ignore_blast_errors} ? warn($er_str) : die($er_str);
}
#logmsg "DEBUG";$$settings{quiet}=0;return [];
# open(BLAST_OUT, "$blast_qs |") or die "Unable to run \"$blast_qs\":
+$!";
return loadBLAST8($blast_outfile);
}
sub loadBLAST8($) {
my ($blast_outfile) = @_;
my @hits;
open(BLAST_OUT, '<', $blast_outfile) or die "Unable to open $blast_o
+utfile for reading: $!";
while (<BLAST_OUT>) {
chomp;
my @line = split /\t/;
my %hit;
for (qw(name1 name2 percent_id al_len mismatch_bp gap_openings sta
+rt1 end1 start2 end2 Evalue bitscore)) {
$hit{$_} = shift @line;
}
push(@hits, \%hit);
}
close BLAST_OUT;
return \@hits;
}
|