seqsearch.search.blast
Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Written by Lucas Sinclair. 6MIT Licensed. 7Contact at www.sinclair.bio 8""" 9 10# Built-in modules # 11import os, shutil 12 13# First party modules # 14from fasta import FASTA 15from autopaths.tmp_path import new_temp_path 16 17# Internal modules # 18from seqsearch.search.core import CoreSearch 19 20# Module for launching shell commands # 21from seqsearch import sh 22 23############################################################################### 24class BLASTquery(CoreSearch): 25 """ 26 A blast job. Possibly the standard BLAST algorithm or 27 BLASTP or BLASTX etc. Typically you could use it like this: 28 29 import sys, os 30 records_path = os.path.expanduser(sys.argv[1]) 31 centers_path = 'centers.fasta' 32 db = seqsearch.BLASTdb(centers_path) 33 db.makeblastdb() 34 params = {'executable': "~/share/blastplus/blastn", 35 '-outfmt': 0, 36 '-evalue': 1e-2, 37 '-perc_identity': 97, 38 '-num_threads': 16} 39 search = seqsearch.BLASTquery(records_path, db, params) 40 search.run() 41 42 You can also call search.non_block_run() to run many searches in parallel. 43 """ 44 45 extension = 'blastout' 46 47 def __init__(self, *args, **kwargs): 48 # Parent constructor # 49 super(BLASTquery, self).__init__(*args, **kwargs) 50 # Auto detect XML output # 51 if self.out_path.extension == 'xml': self.params['-outfmt'] = '5' 52 # The database to search against # 53 self.db = BLASTdb(self.db, self.seq_type) 54 55 @property 56 def command(self): 57 # Executable # 58 if self.executable: cmd = [str(self.executable)] 59 else: cmd = [self.algorithm] 60 # Other parameters 61 cmd += ['-db', self.db, 62 '-query', self.query, 63 '-out', self.out_path, 64 '-num_threads', self.cpus] 65 # Options # 66 for k,v in self.params.items(): cmd += [k, v] 67 # Return # 68 return list(map(str, cmd)) 69 70 #-------------------------------- RUNNING --------------------------------# 71 def run(self, verbose=False): 72 """Simply run the BLAST search locally.""" 73 # Check the executable is available # 74 if self.executable: 75 self.executable.must_exist() 76 else: 77 from plumbing.check_cmd_found import check_cmd 78 check_cmd(self.command[0]) 79 # Create the output directory if it doesn't exist # 80 self.out_path.directory.create_if_not_exists() 81 # Optionally print the command # 82 if verbose: 83 print("Running BLAST command:\n %s" % ' '.join(self.command)) 84 # Run it # 85 cmd = sh.Command(self.command[0]) 86 result = cmd(self.command[1:], _out=self._out, _err=self._err) 87 # Clean up # 88 if os.path.exists("error.log") and os.path.getsize("error.log") == 0: 89 os.remove("error.log") 90 # Return # 91 return result 92 93 #------------------------------- FILTERING -------------------------------# 94 def filter(self, filtering): 95 """ 96 We can do some special filtering on the results. 97 For the moment only minimum coverage and minimum identity. 98 """ 99 # Conditions # 100 if 'min_coverage' in filtering and \ 101 'qcovs' not in self.params['-outfmt']: 102 msg = "Can't filter on minimum coverage because it wasn't included." 103 raise Exception(msg) 104 if 'min_identity' in filtering and \ 105 'pident' not in self.params['-outfmt']: 106 msg = "Can't filter on minimum identity because it wasn't included." 107 raise Exception(msg) 108 # Iterator # 109 def filter_lines(blastout): 110 cov_threshold = filtering.get('min_coverage', 0.0) * 100 111 idy_threshold = filtering.get('min_identity', 0.0) * 100 112 outfmt_str = self.params['-outfmt'].strip('"').split() 113 cov_position = outfmt_str.index('qcovs') - 1 114 idy_position = outfmt_str.index('pident') - 1 115 for line in blastout: 116 coverage = float(line.split()[cov_position]) 117 identity = float(line.split()[idy_position]) 118 if coverage < cov_threshold: continue 119 if identity < idy_threshold: continue 120 else:yield line 121 # Do it # 122 temp_path = new_temp_path() 123 with open(temp_path, 'w') as handle: 124 handle.writelines(filter_lines(self.out_path)) 125 os.remove(self.out_path) 126 shutil.move(temp_path, self.out_path) 127 128 #----------------------------- PARSE RESULTS -----------------------------# 129 @property 130 def results(self): 131 """Parse the results and yield biopython SearchIO entries.""" 132 # Import parsing library # 133 from Bio import SearchIO 134 import Bio.Blast.NCBIXML 135 # Avoid the warning # 136 import warnings 137 warnings.filterwarnings("ignore", 'BiopythonDeprecationWarning') 138 # Get the first number of the outfmt # 139 outfmt_str = self.params.get('-outfmt', '0').strip('"').split() 140 number = outfmt_str[0] 141 # Check for XML # 142 if number == '5': 143 with open(self.out_path, 'rb') as handle: 144 for entry in Bio.Blast.NCBIXML.parse(handle): 145 yield entry 146 # Check for tabular # 147 elif number == '6': 148 with open(self.out_path, 'rt') as handle: 149 for entry in SearchIO.parse(handle, 'blast-tab'): 150 yield entry 151 # Check for tabular with comments # 152 elif number == '7': 153 with open(self.out_path, 'rt') as handle: 154 for entry in SearchIO.parse(handle, 'blast-tab', comments=True): 155 yield entry 156 # Default case # 157 else: 158 for line in self.out_path: 159 yield line.split() 160 161############################################################################### 162class BLASTdb(FASTA): 163 """A BLAST database one can search against.""" 164 165 def __repr__(self): 166 return '<%s on "%s">' % (self.__class__.__name__, self.path) 167 168 def __init__(self, fasta_path, seq_type='nucl' or 'prot'): 169 # Check if the FASTA already has the seq_type set # 170 if hasattr(fasta_path, 'seq_type'): self.seq_type = fasta_path.seq_type 171 else: self.seq_type = seq_type 172 # Call parent constructor # 173 FASTA.__init__(self, fasta_path) 174 175 def __bool__(self): 176 """Does the indexed database actually exist?""" 177 return bool(self + '.nsq') 178 179 def create_if_not_exists(self, *args, **kwargs): 180 """If the indexed database has not been generated, generate it.""" 181 if not self: return self.makedb(*args, **kwargs) 182 183 def makedb(self, logfile=None, stdout=None, verbose=False): 184 # Message # 185 if verbose: print("Calling `makeblastdb` on '%s'..." % self) 186 # Options # 187 options = ['-in', self.path, '-dbtype', self.seq_type] 188 # Add a log file # 189 if logfile is not None: options += ['-logfile', logfile] 190 # Call the program # 191 sh.makeblastdb(*options, _out=stdout)
25class BLASTquery(CoreSearch): 26 """ 27 A blast job. Possibly the standard BLAST algorithm or 28 BLASTP or BLASTX etc. Typically you could use it like this: 29 30 import sys, os 31 records_path = os.path.expanduser(sys.argv[1]) 32 centers_path = 'centers.fasta' 33 db = seqsearch.BLASTdb(centers_path) 34 db.makeblastdb() 35 params = {'executable': "~/share/blastplus/blastn", 36 '-outfmt': 0, 37 '-evalue': 1e-2, 38 '-perc_identity': 97, 39 '-num_threads': 16} 40 search = seqsearch.BLASTquery(records_path, db, params) 41 search.run() 42 43 You can also call search.non_block_run() to run many searches in parallel. 44 """ 45 46 extension = 'blastout' 47 48 def __init__(self, *args, **kwargs): 49 # Parent constructor # 50 super(BLASTquery, self).__init__(*args, **kwargs) 51 # Auto detect XML output # 52 if self.out_path.extension == 'xml': self.params['-outfmt'] = '5' 53 # The database to search against # 54 self.db = BLASTdb(self.db, self.seq_type) 55 56 @property 57 def command(self): 58 # Executable # 59 if self.executable: cmd = [str(self.executable)] 60 else: cmd = [self.algorithm] 61 # Other parameters 62 cmd += ['-db', self.db, 63 '-query', self.query, 64 '-out', self.out_path, 65 '-num_threads', self.cpus] 66 # Options # 67 for k,v in self.params.items(): cmd += [k, v] 68 # Return # 69 return list(map(str, cmd)) 70 71 #-------------------------------- RUNNING --------------------------------# 72 def run(self, verbose=False): 73 """Simply run the BLAST search locally.""" 74 # Check the executable is available # 75 if self.executable: 76 self.executable.must_exist() 77 else: 78 from plumbing.check_cmd_found import check_cmd 79 check_cmd(self.command[0]) 80 # Create the output directory if it doesn't exist # 81 self.out_path.directory.create_if_not_exists() 82 # Optionally print the command # 83 if verbose: 84 print("Running BLAST command:\n %s" % ' '.join(self.command)) 85 # Run it # 86 cmd = sh.Command(self.command[0]) 87 result = cmd(self.command[1:], _out=self._out, _err=self._err) 88 # Clean up # 89 if os.path.exists("error.log") and os.path.getsize("error.log") == 0: 90 os.remove("error.log") 91 # Return # 92 return result 93 94 #------------------------------- FILTERING -------------------------------# 95 def filter(self, filtering): 96 """ 97 We can do some special filtering on the results. 98 For the moment only minimum coverage and minimum identity. 99 """ 100 # Conditions # 101 if 'min_coverage' in filtering and \ 102 'qcovs' not in self.params['-outfmt']: 103 msg = "Can't filter on minimum coverage because it wasn't included." 104 raise Exception(msg) 105 if 'min_identity' in filtering and \ 106 'pident' not in self.params['-outfmt']: 107 msg = "Can't filter on minimum identity because it wasn't included." 108 raise Exception(msg) 109 # Iterator # 110 def filter_lines(blastout): 111 cov_threshold = filtering.get('min_coverage', 0.0) * 100 112 idy_threshold = filtering.get('min_identity', 0.0) * 100 113 outfmt_str = self.params['-outfmt'].strip('"').split() 114 cov_position = outfmt_str.index('qcovs') - 1 115 idy_position = outfmt_str.index('pident') - 1 116 for line in blastout: 117 coverage = float(line.split()[cov_position]) 118 identity = float(line.split()[idy_position]) 119 if coverage < cov_threshold: continue 120 if identity < idy_threshold: continue 121 else:yield line 122 # Do it # 123 temp_path = new_temp_path() 124 with open(temp_path, 'w') as handle: 125 handle.writelines(filter_lines(self.out_path)) 126 os.remove(self.out_path) 127 shutil.move(temp_path, self.out_path) 128 129 #----------------------------- PARSE RESULTS -----------------------------# 130 @property 131 def results(self): 132 """Parse the results and yield biopython SearchIO entries.""" 133 # Import parsing library # 134 from Bio import SearchIO 135 import Bio.Blast.NCBIXML 136 # Avoid the warning # 137 import warnings 138 warnings.filterwarnings("ignore", 'BiopythonDeprecationWarning') 139 # Get the first number of the outfmt # 140 outfmt_str = self.params.get('-outfmt', '0').strip('"').split() 141 number = outfmt_str[0] 142 # Check for XML # 143 if number == '5': 144 with open(self.out_path, 'rb') as handle: 145 for entry in Bio.Blast.NCBIXML.parse(handle): 146 yield entry 147 # Check for tabular # 148 elif number == '6': 149 with open(self.out_path, 'rt') as handle: 150 for entry in SearchIO.parse(handle, 'blast-tab'): 151 yield entry 152 # Check for tabular with comments # 153 elif number == '7': 154 with open(self.out_path, 'rt') as handle: 155 for entry in SearchIO.parse(handle, 'blast-tab', comments=True): 156 yield entry 157 # Default case # 158 else: 159 for line in self.out_path: 160 yield line.split()
A blast job. Possibly the standard BLAST algorithm or BLASTP or BLASTX etc. Typically you could use it like this:
import sys, os
records_path = os.path.expanduser(sys.argv[1])
centers_path = 'centers.fasta'
db = seqsearch.BLASTdb(centers_path)
db.makeblastdb()
params = {'executable': "~/share/blastplus/blastn",
'-outfmt': 0,
'-evalue': 1e-2,
'-perc_identity': 97,
'-num_threads': 16}
search = seqsearch.BLASTquery(records_path, db, params)
search.run()
You can also call search.non_block_run() to run many searches in parallel.
BLASTquery(*args, **kwargs)
48 def __init__(self, *args, **kwargs): 49 # Parent constructor # 50 super(BLASTquery, self).__init__(*args, **kwargs) 51 # Auto detect XML output # 52 if self.out_path.extension == 'xml': self.params['-outfmt'] = '5' 53 # The database to search against # 54 self.db = BLASTdb(self.db, self.seq_type)
def
run(self, verbose=False):
72 def run(self, verbose=False): 73 """Simply run the BLAST search locally.""" 74 # Check the executable is available # 75 if self.executable: 76 self.executable.must_exist() 77 else: 78 from plumbing.check_cmd_found import check_cmd 79 check_cmd(self.command[0]) 80 # Create the output directory if it doesn't exist # 81 self.out_path.directory.create_if_not_exists() 82 # Optionally print the command # 83 if verbose: 84 print("Running BLAST command:\n %s" % ' '.join(self.command)) 85 # Run it # 86 cmd = sh.Command(self.command[0]) 87 result = cmd(self.command[1:], _out=self._out, _err=self._err) 88 # Clean up # 89 if os.path.exists("error.log") and os.path.getsize("error.log") == 0: 90 os.remove("error.log") 91 # Return # 92 return result
Simply run the BLAST search locally.
def
filter(self, filtering):
95 def filter(self, filtering): 96 """ 97 We can do some special filtering on the results. 98 For the moment only minimum coverage and minimum identity. 99 """ 100 # Conditions # 101 if 'min_coverage' in filtering and \ 102 'qcovs' not in self.params['-outfmt']: 103 msg = "Can't filter on minimum coverage because it wasn't included." 104 raise Exception(msg) 105 if 'min_identity' in filtering and \ 106 'pident' not in self.params['-outfmt']: 107 msg = "Can't filter on minimum identity because it wasn't included." 108 raise Exception(msg) 109 # Iterator # 110 def filter_lines(blastout): 111 cov_threshold = filtering.get('min_coverage', 0.0) * 100 112 idy_threshold = filtering.get('min_identity', 0.0) * 100 113 outfmt_str = self.params['-outfmt'].strip('"').split() 114 cov_position = outfmt_str.index('qcovs') - 1 115 idy_position = outfmt_str.index('pident') - 1 116 for line in blastout: 117 coverage = float(line.split()[cov_position]) 118 identity = float(line.split()[idy_position]) 119 if coverage < cov_threshold: continue 120 if identity < idy_threshold: continue 121 else:yield line 122 # Do it # 123 temp_path = new_temp_path() 124 with open(temp_path, 'w') as handle: 125 handle.writelines(filter_lines(self.out_path)) 126 os.remove(self.out_path) 127 shutil.move(temp_path, self.out_path)
We can do some special filtering on the results. For the moment only minimum coverage and minimum identity.
Inherited Members
class
BLASTdb(fasta.core.FASTA):
163class BLASTdb(FASTA): 164 """A BLAST database one can search against.""" 165 166 def __repr__(self): 167 return '<%s on "%s">' % (self.__class__.__name__, self.path) 168 169 def __init__(self, fasta_path, seq_type='nucl' or 'prot'): 170 # Check if the FASTA already has the seq_type set # 171 if hasattr(fasta_path, 'seq_type'): self.seq_type = fasta_path.seq_type 172 else: self.seq_type = seq_type 173 # Call parent constructor # 174 FASTA.__init__(self, fasta_path) 175 176 def __bool__(self): 177 """Does the indexed database actually exist?""" 178 return bool(self + '.nsq') 179 180 def create_if_not_exists(self, *args, **kwargs): 181 """If the indexed database has not been generated, generate it.""" 182 if not self: return self.makedb(*args, **kwargs) 183 184 def makedb(self, logfile=None, stdout=None, verbose=False): 185 # Message # 186 if verbose: print("Calling `makeblastdb` on '%s'..." % self) 187 # Options # 188 options = ['-in', self.path, '-dbtype', self.seq_type] 189 # Add a log file # 190 if logfile is not None: options += ['-logfile', logfile] 191 # Call the program # 192 sh.makeblastdb(*options, _out=stdout)
A BLAST database one can search against.
BLASTdb(path, *args, **kwargs)
59 def __new__(cls, path, *args, **kwargs): 60 """A Path object is in fact a string.""" 61 return str.__new__(cls, cls.clean_path(path))
A Path object is in fact a string.
def
create_if_not_exists(self, *args, **kwargs):
180 def create_if_not_exists(self, *args, **kwargs): 181 """If the indexed database has not been generated, generate it.""" 182 if not self: return self.makedb(*args, **kwargs)
If the indexed database has not been generated, generate it.
def
makedb(self, logfile=None, stdout=None, verbose=False):
184 def makedb(self, logfile=None, stdout=None, verbose=False): 185 # Message # 186 if verbose: print("Calling `makeblastdb` on '%s'..." % self) 187 # Options # 188 options = ['-in', self.path, '-dbtype', self.seq_type] 189 # Add a log file # 190 if logfile is not None: options += ['-logfile', logfile] 191 # Call the program # 192 sh.makeblastdb(*options, _out=stdout)
Inherited Members
- fasta.core.FASTA
- first
- count
- lengths
- lengths_counter
- open
- close
- parse
- progress
- create
- add
- add_seq
- add_str
- add_fasta
- add_fastas
- flush
- write
- compress
- compress_slow
- compress_fast
- ids
- get_id
- sequences
- sql
- length_by_id
- subsample
- rename_with_num
- rename_with_prefix
- rename_sequences
- extract_length
- extract_sequences
- remove_trailing_stars
- remove_duplicates
- convert_U_to_T
- align
- template_align
- index_bowtie
- index_samtools
- graphs
- parse_primers
- autopaths.file_path.FilePath
- prefix_path
- prefix
- name
- filename
- directory
- count_bytes
- size
- contents
- contents_utf8
- md5
- might_be_binary
- contains_binary
- magic_number
- lines
- read
- touch
- writelines
- remove
- copy
- execute
- replace_extension
- new_name_insert
- make_directory
- must_exist
- head
- tail
- move_to
- rename
- gzip_to
- gzip_internal
- gzip_external
- gzip_pigz
- ungzip_to
- ungzip_internal
- ungzip_external
- zip_to
- unzip_to
- untar_to
- untargz_to
- untargz_to_internal
- untargz_to_external
- append
- prepend
- remove_line
- remove_first_line
- replace_line
- replace_word
- sed_replace
- autopaths.base_path.BasePath
- clean_path
- short_prefix
- extension
- escaped
- absolute_path
- physical_path
- relative_path
- rel_path_from
- exists
- is_symlink
- permissions
- mdate
- mdate_iso
- cdate
- cdate_iso
- unix_style
- wsl_style
- win_style
- with_tilda
- with_home
- link_from
- link_to
- symlinks_on_linux
- symlinks_on_windows
- hard_link_win_to
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format_map
- maketrans