Source code for seqenv.seqsearch.blast
# Futures #
from __future__ import division
# Built-in modules #
import os, multiprocessing, threading, shutil
# Internal modules #
from seqenv.fasta import FASTA
from seqenv.common import new_temp_path
from seqenv.common.autopaths import FilePath
# Third party modules #
import sh
###############################################################################
[docs]class BLASTquery(object):
"""A blast job. Possibly the standard BLAST algorithm or
BLASTP or BLASTX etc. Typically you could use it like this:
import sys, os
records_path = os.path.expanduser(sys.argv[1])
centers_path = 'centers.fasta'
db = parallelblast.BLASTdb(centers_path)
db.makeblastdb()
params = {'executable': "~/share/blastplus/blastn",
'-outfmt': 0,
'-evalue': 1e-2,
'-perc_identity': 97,
'-num_threads': 16}
search = parallelblast.BLASTquery(records_path, db, params)
search.run()
You can also call search.non_block_run() to run maybe searches in parallel.
"""
def __init__(self, query_path, db_path,
params = None,
algorithm = "blastn",
version = "plus" or "legacy",
out_path = None,
executable = None):
# Save attributes #
self.path = query_path
self.query = FASTA(query_path)
self.db = FilePath(db_path)
self.version = version
self.algorithm = algorithm
self.params = params if params else {}
self.executable = FilePath(executable)
# Output #
if out_path is None: self.out_path = self.query.prefix_path + '.blastout'
elif out_path.endswith('/'): self.out_path = out_path + self.query.prefix + '.blastout'
else: self.out_path = out_path
self.out_path = FilePath(self.out_path)
# Defaults #
self.cpus = multiprocessing.cpu_count()
if self.version == 'plus':
if '-num_threads' not in self.params: self.params['-num_threads'] = self.cpus
if self.version == 'legacy':
if '-a' not in self.params: self.params['-a'] = self.cpus
@property
def command(self):
# Executable #
if self.executable: cmd = [self.executable.path]
elif self.version == 'legacy': cmd = ["blastall", '-p', self.algorithm]
else: cmd = [self.algorithm]
# Essentials #
if self.version == 'legacy': cmd += ['-d', self.db, '-i', self.query, '-o', self.out_path]
if self.version == 'plus': cmd += ['-db', self.db, '-query', self.query, '-out', self.out_path]
# Options #
for k,v in self.params.items(): cmd += [k, v]
# Return #
return map(str, cmd)
[docs] def run(self):
sh.Command(self.command[0])(self.command[1:])
if os.path.exists("error.log") and os.path.getsize("error.log") == 0: os.remove("error.log")
[docs] def non_block_run(self):
"""Special method to run the query in a thread without blocking."""
self.thread = threading.Thread(target=self.run)
self.thread.start()
[docs] def wait(self):
"""If you have run the query in a non-blocking way, call this method to pause
until the query is finished."""
self.thread.join()
[docs] def filter(self, filtering):
"""We can do some special filtering on the results. For the moment only minimum coverage."""
# Conditions #
if 'min_coverage' not in filtering: return
if 'qcovs' not in self.params['-outfmt']: raise Exception()
# Iterator #
def filter_lines(blastout):
threshold = filtering['min_coverage'] * 100
position = self.params['-outfmt'].strip('"').split().index('qcovs') - 1
for line in blastout:
coverage = line.split()[position]
if coverage < threshold: continue
else: yield line
# Do it #
temp_path = new_temp_path()
with open(temp_path, 'w') as handle: handle.writelines(filter_lines(self.out_path))
os.remove(self.out_path)
shutil.move(temp_path, self.out_path)
@property
def results(self):
"""Parse the results."""
for line in self.out_path: yield line.split()