seqsearch.search.blast

Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio

  1#!/usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3
  4"""
  5Written by Lucas Sinclair.
  6MIT Licensed.
  7Contact at www.sinclair.bio
  8"""
  9
 10# Built-in modules #
 11import os, shutil
 12
 13# First party modules #
 14from fasta import FASTA
 15from autopaths.tmp_path  import new_temp_path
 16
 17# Internal modules #
 18from seqsearch.search.core import CoreSearch
 19
 20# Module for launching shell commands #
 21from seqsearch import sh
 22
 23###############################################################################
 24class BLASTquery(CoreSearch):
 25    """
 26    A blast job. Possibly the standard BLAST algorithm or
 27    BLASTP or BLASTX etc. Typically you could use it like this:
 28
 29         import sys, os
 30         records_path = os.path.expanduser(sys.argv[1])
 31         centers_path = 'centers.fasta'
 32         db = seqsearch.BLASTdb(centers_path)
 33         db.makeblastdb()
 34         params = {'executable':    "~/share/blastplus/blastn",
 35                   '-outfmt':        0,
 36                   '-evalue':        1e-2,
 37                   '-perc_identity': 97,
 38                   '-num_threads':   16}
 39         search = seqsearch.BLASTquery(records_path, db, params)
 40         search.run()
 41
 42    You can also call search.non_block_run() to run many searches in parallel.
 43    """
 44
 45    extension = 'blastout'
 46
 47    def __init__(self, *args, **kwargs):
 48        # Parent constructor #
 49        super(BLASTquery, self).__init__(*args, **kwargs)
 50        # Auto detect XML output #
 51        if self.out_path.extension == 'xml': self.params['-outfmt'] = '5'
 52        # The database to search against #
 53        self.db = BLASTdb(self.db, self.seq_type)
 54
 55    @property
 56    def command(self):
 57        # Executable #
 58        if self.executable: cmd = [str(self.executable)]
 59        else:               cmd = [self.algorithm]
 60        # Other parameters
 61        cmd += ['-db',          self.db,
 62                '-query',       self.query,
 63                '-out',         self.out_path,
 64                '-num_threads', self.cpus]
 65        # Options #
 66        for k,v in self.params.items(): cmd += [k, v]
 67        # Return #
 68        return list(map(str, cmd))
 69
 70    #-------------------------------- RUNNING --------------------------------#
 71    def run(self, verbose=False):
 72        """Simply run the BLAST search locally."""
 73        # Check the executable is available #
 74        if self.executable:
 75            self.executable.must_exist()
 76        else:
 77            from plumbing.check_cmd_found import check_cmd
 78            check_cmd(self.command[0])
 79        # Create the output directory if it doesn't exist #
 80        self.out_path.directory.create_if_not_exists()
 81        # Optionally print the command #
 82        if verbose:
 83            print("Running BLAST command:\n    %s" % ' '.join(self.command))
 84        # Run it #
 85        cmd    = sh.Command(self.command[0])
 86        result = cmd(self.command[1:], _out=self._out, _err=self._err)
 87        # Clean up #
 88        if os.path.exists("error.log") and os.path.getsize("error.log") == 0:
 89            os.remove("error.log")
 90        # Return #
 91        return result
 92
 93    #------------------------------- FILTERING -------------------------------#
 94    def filter(self, filtering):
 95        """
 96        We can do some special filtering on the results.
 97        For the moment only minimum coverage and minimum identity.
 98        """
 99        # Conditions #
100        if 'min_coverage' in filtering and \
101           'qcovs' not in self.params['-outfmt']:
102            msg = "Can't filter on minimum coverage because it wasn't included."
103            raise Exception(msg)
104        if 'min_identity' in filtering and \
105           'pident' not in self.params['-outfmt']:
106            msg = "Can't filter on minimum identity because it wasn't included."
107            raise Exception(msg)
108        # Iterator #
109        def filter_lines(blastout):
110            cov_threshold = filtering.get('min_coverage', 0.0) * 100
111            idy_threshold = filtering.get('min_identity', 0.0) * 100
112            outfmt_str    = self.params['-outfmt'].strip('"').split()
113            cov_position  = outfmt_str.index('qcovs')  - 1
114            idy_position  = outfmt_str.index('pident') - 1
115            for line in blastout:
116                coverage = float(line.split()[cov_position])
117                identity = float(line.split()[idy_position])
118                if coverage < cov_threshold: continue
119                if identity < idy_threshold: continue
120                else:yield line
121        # Do it #
122        temp_path = new_temp_path()
123        with open(temp_path, 'w') as handle:
124            handle.writelines(filter_lines(self.out_path))
125        os.remove(self.out_path)
126        shutil.move(temp_path, self.out_path)
127
128    #----------------------------- PARSE RESULTS -----------------------------#
129    @property
130    def results(self):
131        """Parse the results and yield biopython SearchIO entries."""
132        # Import parsing library #
133        from Bio import SearchIO
134        import Bio.Blast.NCBIXML
135        # Avoid the warning #
136        import warnings
137        warnings.filterwarnings("ignore", 'BiopythonDeprecationWarning')
138        # Get the first number of the outfmt #
139        outfmt_str = self.params.get('-outfmt', '0').strip('"').split()
140        number = outfmt_str[0]
141        # Check for XML #
142        if number == '5':
143            with open(self.out_path, 'rb') as handle:
144                for entry in Bio.Blast.NCBIXML.parse(handle):
145                    yield entry
146        # Check for tabular #
147        elif number == '6':
148            with open(self.out_path, 'rt') as handle:
149                for entry in SearchIO.parse(handle, 'blast-tab'):
150                    yield entry
151        # Check for tabular with comments #
152        elif number == '7':
153            with open(self.out_path, 'rt') as handle:
154                for entry in SearchIO.parse(handle, 'blast-tab', comments=True):
155                    yield entry
156        # Default case #
157        else:
158            for line in self.out_path:
159                yield line.split()
160
161###############################################################################
162class BLASTdb(FASTA):
163    """A BLAST database one can search against."""
164
165    def __repr__(self):
166        return '<%s on "%s">' % (self.__class__.__name__, self.path)
167
168    def __init__(self, fasta_path, seq_type='nucl' or 'prot'):
169        # Check if the FASTA already has the seq_type set #
170        if hasattr(fasta_path, 'seq_type'): self.seq_type = fasta_path.seq_type
171        else:                               self.seq_type = seq_type
172        # Call parent constructor #
173        FASTA.__init__(self, fasta_path)
174
175    def __bool__(self):
176        """Does the indexed database actually exist?"""
177        return bool(self + '.nsq')
178
179    def create_if_not_exists(self, *args, **kwargs):
180        """If the indexed database has not been generated, generate it."""
181        if not self: return self.makedb(*args, **kwargs)
182
183    def makedb(self, logfile=None, stdout=None, verbose=False):
184        # Message #
185        if verbose: print("Calling `makeblastdb` on '%s'..." % self)
186        # Options #
187        options = ['-in', self.path, '-dbtype', self.seq_type]
188        # Add a log file #
189        if logfile is not None: options += ['-logfile', logfile]
190        # Call the program #
191        sh.makeblastdb(*options, _out=stdout)
class BLASTquery(seqsearch.search.core.CoreSearch):
 25class BLASTquery(CoreSearch):
 26    """
 27    A blast job. Possibly the standard BLAST algorithm or
 28    BLASTP or BLASTX etc. Typically you could use it like this:
 29
 30         import sys, os
 31         records_path = os.path.expanduser(sys.argv[1])
 32         centers_path = 'centers.fasta'
 33         db = seqsearch.BLASTdb(centers_path)
 34         db.makeblastdb()
 35         params = {'executable':    "~/share/blastplus/blastn",
 36                   '-outfmt':        0,
 37                   '-evalue':        1e-2,
 38                   '-perc_identity': 97,
 39                   '-num_threads':   16}
 40         search = seqsearch.BLASTquery(records_path, db, params)
 41         search.run()
 42
 43    You can also call search.non_block_run() to run many searches in parallel.
 44    """
 45
 46    extension = 'blastout'
 47
 48    def __init__(self, *args, **kwargs):
 49        # Parent constructor #
 50        super(BLASTquery, self).__init__(*args, **kwargs)
 51        # Auto detect XML output #
 52        if self.out_path.extension == 'xml': self.params['-outfmt'] = '5'
 53        # The database to search against #
 54        self.db = BLASTdb(self.db, self.seq_type)
 55
 56    @property
 57    def command(self):
 58        # Executable #
 59        if self.executable: cmd = [str(self.executable)]
 60        else:               cmd = [self.algorithm]
 61        # Other parameters
 62        cmd += ['-db',          self.db,
 63                '-query',       self.query,
 64                '-out',         self.out_path,
 65                '-num_threads', self.cpus]
 66        # Options #
 67        for k,v in self.params.items(): cmd += [k, v]
 68        # Return #
 69        return list(map(str, cmd))
 70
 71    #-------------------------------- RUNNING --------------------------------#
 72    def run(self, verbose=False):
 73        """Simply run the BLAST search locally."""
 74        # Check the executable is available #
 75        if self.executable:
 76            self.executable.must_exist()
 77        else:
 78            from plumbing.check_cmd_found import check_cmd
 79            check_cmd(self.command[0])
 80        # Create the output directory if it doesn't exist #
 81        self.out_path.directory.create_if_not_exists()
 82        # Optionally print the command #
 83        if verbose:
 84            print("Running BLAST command:\n    %s" % ' '.join(self.command))
 85        # Run it #
 86        cmd    = sh.Command(self.command[0])
 87        result = cmd(self.command[1:], _out=self._out, _err=self._err)
 88        # Clean up #
 89        if os.path.exists("error.log") and os.path.getsize("error.log") == 0:
 90            os.remove("error.log")
 91        # Return #
 92        return result
 93
 94    #------------------------------- FILTERING -------------------------------#
 95    def filter(self, filtering):
 96        """
 97        We can do some special filtering on the results.
 98        For the moment only minimum coverage and minimum identity.
 99        """
100        # Conditions #
101        if 'min_coverage' in filtering and \
102           'qcovs' not in self.params['-outfmt']:
103            msg = "Can't filter on minimum coverage because it wasn't included."
104            raise Exception(msg)
105        if 'min_identity' in filtering and \
106           'pident' not in self.params['-outfmt']:
107            msg = "Can't filter on minimum identity because it wasn't included."
108            raise Exception(msg)
109        # Iterator #
110        def filter_lines(blastout):
111            cov_threshold = filtering.get('min_coverage', 0.0) * 100
112            idy_threshold = filtering.get('min_identity', 0.0) * 100
113            outfmt_str    = self.params['-outfmt'].strip('"').split()
114            cov_position  = outfmt_str.index('qcovs')  - 1
115            idy_position  = outfmt_str.index('pident') - 1
116            for line in blastout:
117                coverage = float(line.split()[cov_position])
118                identity = float(line.split()[idy_position])
119                if coverage < cov_threshold: continue
120                if identity < idy_threshold: continue
121                else:yield line
122        # Do it #
123        temp_path = new_temp_path()
124        with open(temp_path, 'w') as handle:
125            handle.writelines(filter_lines(self.out_path))
126        os.remove(self.out_path)
127        shutil.move(temp_path, self.out_path)
128
129    #----------------------------- PARSE RESULTS -----------------------------#
130    @property
131    def results(self):
132        """Parse the results and yield biopython SearchIO entries."""
133        # Import parsing library #
134        from Bio import SearchIO
135        import Bio.Blast.NCBIXML
136        # Avoid the warning #
137        import warnings
138        warnings.filterwarnings("ignore", 'BiopythonDeprecationWarning')
139        # Get the first number of the outfmt #
140        outfmt_str = self.params.get('-outfmt', '0').strip('"').split()
141        number = outfmt_str[0]
142        # Check for XML #
143        if number == '5':
144            with open(self.out_path, 'rb') as handle:
145                for entry in Bio.Blast.NCBIXML.parse(handle):
146                    yield entry
147        # Check for tabular #
148        elif number == '6':
149            with open(self.out_path, 'rt') as handle:
150                for entry in SearchIO.parse(handle, 'blast-tab'):
151                    yield entry
152        # Check for tabular with comments #
153        elif number == '7':
154            with open(self.out_path, 'rt') as handle:
155                for entry in SearchIO.parse(handle, 'blast-tab', comments=True):
156                    yield entry
157        # Default case #
158        else:
159            for line in self.out_path:
160                yield line.split()

A blast job. Possibly the standard BLAST algorithm or BLASTP or BLASTX etc. Typically you could use it like this:

 import sys, os
 records_path = os.path.expanduser(sys.argv[1])
 centers_path = 'centers.fasta'
 db = seqsearch.BLASTdb(centers_path)
 db.makeblastdb()
 params = {'executable':    "~/share/blastplus/blastn",
           '-outfmt':        0,
           '-evalue':        1e-2,
           '-perc_identity': 97,
           '-num_threads':   16}
 search = seqsearch.BLASTquery(records_path, db, params)
 search.run()

You can also call search.non_block_run() to run many searches in parallel.

BLASTquery(*args, **kwargs)
48    def __init__(self, *args, **kwargs):
49        # Parent constructor #
50        super(BLASTquery, self).__init__(*args, **kwargs)
51        # Auto detect XML output #
52        if self.out_path.extension == 'xml': self.params['-outfmt'] = '5'
53        # The database to search against #
54        self.db = BLASTdb(self.db, self.seq_type)
def run(self, verbose=False):
72    def run(self, verbose=False):
73        """Simply run the BLAST search locally."""
74        # Check the executable is available #
75        if self.executable:
76            self.executable.must_exist()
77        else:
78            from plumbing.check_cmd_found import check_cmd
79            check_cmd(self.command[0])
80        # Create the output directory if it doesn't exist #
81        self.out_path.directory.create_if_not_exists()
82        # Optionally print the command #
83        if verbose:
84            print("Running BLAST command:\n    %s" % ' '.join(self.command))
85        # Run it #
86        cmd    = sh.Command(self.command[0])
87        result = cmd(self.command[1:], _out=self._out, _err=self._err)
88        # Clean up #
89        if os.path.exists("error.log") and os.path.getsize("error.log") == 0:
90            os.remove("error.log")
91        # Return #
92        return result

Simply run the BLAST search locally.

def filter(self, filtering):
 95    def filter(self, filtering):
 96        """
 97        We can do some special filtering on the results.
 98        For the moment only minimum coverage and minimum identity.
 99        """
100        # Conditions #
101        if 'min_coverage' in filtering and \
102           'qcovs' not in self.params['-outfmt']:
103            msg = "Can't filter on minimum coverage because it wasn't included."
104            raise Exception(msg)
105        if 'min_identity' in filtering and \
106           'pident' not in self.params['-outfmt']:
107            msg = "Can't filter on minimum identity because it wasn't included."
108            raise Exception(msg)
109        # Iterator #
110        def filter_lines(blastout):
111            cov_threshold = filtering.get('min_coverage', 0.0) * 100
112            idy_threshold = filtering.get('min_identity', 0.0) * 100
113            outfmt_str    = self.params['-outfmt'].strip('"').split()
114            cov_position  = outfmt_str.index('qcovs')  - 1
115            idy_position  = outfmt_str.index('pident') - 1
116            for line in blastout:
117                coverage = float(line.split()[cov_position])
118                identity = float(line.split()[idy_position])
119                if coverage < cov_threshold: continue
120                if identity < idy_threshold: continue
121                else:yield line
122        # Do it #
123        temp_path = new_temp_path()
124        with open(temp_path, 'w') as handle:
125            handle.writelines(filter_lines(self.out_path))
126        os.remove(self.out_path)
127        shutil.move(temp_path, self.out_path)

We can do some special filtering on the results. For the moment only minimum coverage and minimum identity.

results

Parse the results and yield biopython SearchIO entries.

class BLASTdb(fasta.core.FASTA):
163class BLASTdb(FASTA):
164    """A BLAST database one can search against."""
165
166    def __repr__(self):
167        return '<%s on "%s">' % (self.__class__.__name__, self.path)
168
169    def __init__(self, fasta_path, seq_type='nucl' or 'prot'):
170        # Check if the FASTA already has the seq_type set #
171        if hasattr(fasta_path, 'seq_type'): self.seq_type = fasta_path.seq_type
172        else:                               self.seq_type = seq_type
173        # Call parent constructor #
174        FASTA.__init__(self, fasta_path)
175
176    def __bool__(self):
177        """Does the indexed database actually exist?"""
178        return bool(self + '.nsq')
179
180    def create_if_not_exists(self, *args, **kwargs):
181        """If the indexed database has not been generated, generate it."""
182        if not self: return self.makedb(*args, **kwargs)
183
184    def makedb(self, logfile=None, stdout=None, verbose=False):
185        # Message #
186        if verbose: print("Calling `makeblastdb` on '%s'..." % self)
187        # Options #
188        options = ['-in', self.path, '-dbtype', self.seq_type]
189        # Add a log file #
190        if logfile is not None: options += ['-logfile', logfile]
191        # Call the program #
192        sh.makeblastdb(*options, _out=stdout)

A BLAST database one can search against.

BLASTdb(path, *args, **kwargs)
59    def __new__(cls, path, *args, **kwargs):
60        """A Path object is in fact a string."""
61        return str.__new__(cls, cls.clean_path(path))

A Path object is in fact a string.

def create_if_not_exists(self, *args, **kwargs):
180    def create_if_not_exists(self, *args, **kwargs):
181        """If the indexed database has not been generated, generate it."""
182        if not self: return self.makedb(*args, **kwargs)

If the indexed database has not been generated, generate it.

def makedb(self, logfile=None, stdout=None, verbose=False):
184    def makedb(self, logfile=None, stdout=None, verbose=False):
185        # Message #
186        if verbose: print("Calling `makeblastdb` on '%s'..." % self)
187        # Options #
188        options = ['-in', self.path, '-dbtype', self.seq_type]
189        # Add a log file #
190        if logfile is not None: options += ['-logfile', logfile]
191        # Call the program #
192        sh.makeblastdb(*options, _out=stdout)
Inherited Members
fasta.core.FASTA
first
count
lengths
lengths_counter
open
close
parse
progress
create
add
add_seq
add_str
add_fasta
add_fastas
flush
write
compress
compress_slow
compress_fast
ids
get_id
sequences
sql
length_by_id
subsample
rename_with_num
rename_with_prefix
rename_sequences
extract_length
extract_sequences
remove_trailing_stars
remove_duplicates
convert_U_to_T
align
template_align
index_bowtie
index_samtools
graphs
parse_primers
autopaths.file_path.FilePath
prefix_path
prefix
name
filename
directory
count_bytes
size
contents
contents_utf8
md5
might_be_binary
contains_binary
magic_number
lines
read
touch
writelines
remove
copy
execute
replace_extension
new_name_insert
make_directory
must_exist
head
tail
move_to
rename
gzip_to
gzip_internal
gzip_external
gzip_pigz
ungzip_to
ungzip_internal
ungzip_external
zip_to
unzip_to
untar_to
untargz_to
untargz_to_internal
untargz_to_external
append
prepend
remove_line
remove_first_line
replace_line
replace_word
sed_replace
autopaths.base_path.BasePath
clean_path
short_prefix
extension
escaped
absolute_path
physical_path
relative_path
rel_path_from
exists
permissions
mdate
mdate_iso
cdate
cdate_iso
unix_style
wsl_style
win_style
with_tilda
with_home
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format_map
maketrans