fasta.aligned

Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio

  1#!/usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3
  4"""
  5Written by Lucas Sinclair.
  6MIT Licensed.
  7Contact at www.sinclair.bio
  8"""
  9
 10# Built-in modules #
 11import os, multiprocessing, platform
 12from collections import OrderedDict
 13
 14# Internal modules #
 15from fasta import FASTA
 16from plumbing.cache import property_cached
 17from autopaths.file_path import FilePath
 18from autopaths.tmp_path  import new_temp_path, new_temp_dir
 19
 20# Third party modules #
 21import shutil
 22if platform.system() == 'Windows': import pbs3 as sh
 23else: import sh
 24
 25################################################################################
 26class AlignedFASTA(FASTA):
 27    """Also a FASTA file technically, but contains an alignment."""
 28    ext = 'aln'
 29
 30    def __iter__(self): return iter(self.parse())
 31
 32    def parse(self):
 33        self.open()
 34        from Bio import AlignIO
 35        return AlignIO.read(self.handle, self.format)
 36
 37    def add_seq(self, seq):
 38        self.buffer.append(seq)
 39
 40    def flush(self):
 41        from Bio import AlignIO
 42        from Bio.Align import MultipleSeqAlignment
 43        AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format)
 44
 45    def write(self, reads):
 46        from Bio import AlignIO
 47        if not self.directory.exists: self.directory.create()
 48        self.open('w')
 49        AlignIO.write(reads, self.handle, self.format)
 50        self.close()
 51
 52    @property_cached
 53    def sequences(self):
 54        return OrderedDict(((seq.id, seq) for seq in self))
 55
 56    def gblocks(self,
 57                new_path = None,
 58                seq_type = 'nucl' or 'prot'):
 59        """
 60        Apply the gblocks filtering algorithm to the alignment.
 61        See this page:
 62        http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html
 63        Need to rename all sequences, because it will complain with long names.
 64        """
 65        # Temporary path #
 66        if new_path is None: final = self.__class__(new_temp_path())
 67        else:                final = self.__class__(new_path)
 68        # Mapping every sequence name with a random name #
 69        orig_name_to_temp = {seq.description: 'name' + str(i)
 70                             for i,seq in enumerate(self)}
 71        temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()}
 72        # Rename every sequence with a random name #
 73        temp_fasta = self.rename_sequences(orig_name_to_temp)
 74        # Options #
 75        if seq_type == 'nucl': t_option = "-t=d"
 76        if seq_type == 'prot': t_option = "-t=p"
 77        # Run it #
 78        result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3",
 79                              "-b3=20", "-b5=a", _ok_code=[0,1])
 80        created_file = temp_fasta.path + '-gb'
 81        assert os.path.exists(created_file)
 82        # Check errors #
 83        if "Execution terminated" in result.stdout:
 84            raise Exception("gblocks crashed again.")
 85        # Back #
 86        temp_fasta.rename_sequences(temp_name_to_orig, final)
 87        # Return #
 88        return final
 89
 90    def build_tree(self, *args, **kwargs):
 91        """
 92        Dispatch a tree build call. Note that you need at least four
 93        taxa to express some evolutionary history on an unrooted tree.
 94        """
 95        # Check length #
 96        assert len(self) > 3
 97        # Default option #
 98        algorithm = kwargs.pop(kwargs, None)
 99        if algorithm is None: algorithm = 'raxml'
100        # Dispatch #
101        if algorithm == 'raxml':
102            return self.build_tree_raxml(*args, **kwargs)
103        if algorithm == 'fasttree':
104            return self.build_tree_fast(*args, **kwargs)
105
106    def build_tree_raxml(self,
107                         new_path    = None,
108                         seq_type    = 'nucl' or 'prot',
109                         num_threads = None,
110                         free_cores  = 2,
111                         keep_dir    = False):
112        """Make a tree with RAxML."""
113        # Check output #
114        if new_path is None: new_path = self.prefix_path + '.tree'
115        # What model to choose #
116        if seq_type == 'nucl': model = "GTRGAMMA"
117        if seq_type == 'prot': model = "PROTGAMMAJTTF"
118        # Threads #
119        if num_threads is None:
120            num_threads = multiprocessing.cpu_count() - free_cores
121        else:
122            num_threads = int(num_threads) - free_cores
123        num_threads = max(1, num_threads)
124        # Run it #
125        temp_dir = new_temp_dir()
126        sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path,
127                    '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N',
128                    'autoMR')
129        # Move into place #
130        if keep_dir:
131            shutil.rmtree(new_path)
132            shutil.move(temp_dir, new_path)
133        if not keep_dir:
134            shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path)
135        # Return #
136        return FilePath(new_path)
137
138    def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'):
139        """Make a tree with FastTree. Names will be truncated however."""
140        # Check output #
141        if new_path is None: new_path = self.prefix_path + '.tree'
142        # Command #
143        command_args = []
144        if seq_type == 'nucl': command_args += ['-nt']
145        command_args += ['-gamma']
146        command_args += ['-out', new_path]
147        command_args += [self.path]
148        # Run it #
149        sh.FastTree(*command_args)
150        # Return #
151        return FilePath(new_path)
class AlignedFASTA(fasta.core.FASTA):
 27class AlignedFASTA(FASTA):
 28    """Also a FASTA file technically, but contains an alignment."""
 29    ext = 'aln'
 30
 31    def __iter__(self): return iter(self.parse())
 32
 33    def parse(self):
 34        self.open()
 35        from Bio import AlignIO
 36        return AlignIO.read(self.handle, self.format)
 37
 38    def add_seq(self, seq):
 39        self.buffer.append(seq)
 40
 41    def flush(self):
 42        from Bio import AlignIO
 43        from Bio.Align import MultipleSeqAlignment
 44        AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format)
 45
 46    def write(self, reads):
 47        from Bio import AlignIO
 48        if not self.directory.exists: self.directory.create()
 49        self.open('w')
 50        AlignIO.write(reads, self.handle, self.format)
 51        self.close()
 52
 53    @property_cached
 54    def sequences(self):
 55        return OrderedDict(((seq.id, seq) for seq in self))
 56
 57    def gblocks(self,
 58                new_path = None,
 59                seq_type = 'nucl' or 'prot'):
 60        """
 61        Apply the gblocks filtering algorithm to the alignment.
 62        See this page:
 63        http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html
 64        Need to rename all sequences, because it will complain with long names.
 65        """
 66        # Temporary path #
 67        if new_path is None: final = self.__class__(new_temp_path())
 68        else:                final = self.__class__(new_path)
 69        # Mapping every sequence name with a random name #
 70        orig_name_to_temp = {seq.description: 'name' + str(i)
 71                             for i,seq in enumerate(self)}
 72        temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()}
 73        # Rename every sequence with a random name #
 74        temp_fasta = self.rename_sequences(orig_name_to_temp)
 75        # Options #
 76        if seq_type == 'nucl': t_option = "-t=d"
 77        if seq_type == 'prot': t_option = "-t=p"
 78        # Run it #
 79        result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3",
 80                              "-b3=20", "-b5=a", _ok_code=[0,1])
 81        created_file = temp_fasta.path + '-gb'
 82        assert os.path.exists(created_file)
 83        # Check errors #
 84        if "Execution terminated" in result.stdout:
 85            raise Exception("gblocks crashed again.")
 86        # Back #
 87        temp_fasta.rename_sequences(temp_name_to_orig, final)
 88        # Return #
 89        return final
 90
 91    def build_tree(self, *args, **kwargs):
 92        """
 93        Dispatch a tree build call. Note that you need at least four
 94        taxa to express some evolutionary history on an unrooted tree.
 95        """
 96        # Check length #
 97        assert len(self) > 3
 98        # Default option #
 99        algorithm = kwargs.pop(kwargs, None)
100        if algorithm is None: algorithm = 'raxml'
101        # Dispatch #
102        if algorithm == 'raxml':
103            return self.build_tree_raxml(*args, **kwargs)
104        if algorithm == 'fasttree':
105            return self.build_tree_fast(*args, **kwargs)
106
107    def build_tree_raxml(self,
108                         new_path    = None,
109                         seq_type    = 'nucl' or 'prot',
110                         num_threads = None,
111                         free_cores  = 2,
112                         keep_dir    = False):
113        """Make a tree with RAxML."""
114        # Check output #
115        if new_path is None: new_path = self.prefix_path + '.tree'
116        # What model to choose #
117        if seq_type == 'nucl': model = "GTRGAMMA"
118        if seq_type == 'prot': model = "PROTGAMMAJTTF"
119        # Threads #
120        if num_threads is None:
121            num_threads = multiprocessing.cpu_count() - free_cores
122        else:
123            num_threads = int(num_threads) - free_cores
124        num_threads = max(1, num_threads)
125        # Run it #
126        temp_dir = new_temp_dir()
127        sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path,
128                    '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N',
129                    'autoMR')
130        # Move into place #
131        if keep_dir:
132            shutil.rmtree(new_path)
133            shutil.move(temp_dir, new_path)
134        if not keep_dir:
135            shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path)
136        # Return #
137        return FilePath(new_path)
138
139    def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'):
140        """Make a tree with FastTree. Names will be truncated however."""
141        # Check output #
142        if new_path is None: new_path = self.prefix_path + '.tree'
143        # Command #
144        command_args = []
145        if seq_type == 'nucl': command_args += ['-nt']
146        command_args += ['-gamma']
147        command_args += ['-out', new_path]
148        command_args += [self.path]
149        # Run it #
150        sh.FastTree(*command_args)
151        # Return #
152        return FilePath(new_path)

Also a FASTA file technically, but contains an alignment.

ext = 'aln'
def parse(self):
33    def parse(self):
34        self.open()
35        from Bio import AlignIO
36        return AlignIO.read(self.handle, self.format)
def add_seq(self, seq):
38    def add_seq(self, seq):
39        self.buffer.append(seq)

Use this method to add a SeqRecord object to this fasta.

def flush(self):
41    def flush(self):
42        from Bio import AlignIO
43        from Bio.Align import MultipleSeqAlignment
44        AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format)

Empty the buffer.

def write(self, reads):
46    def write(self, reads):
47        from Bio import AlignIO
48        if not self.directory.exists: self.directory.create()
49        self.open('w')
50        AlignIO.write(reads, self.handle, self.format)
51        self.close()
sequences

Another way of easily retrieving sequences. Also highly ineffective. Consider using the SQLite API instead.

def gblocks(self, new_path=None, seq_type='nucl'):
57    def gblocks(self,
58                new_path = None,
59                seq_type = 'nucl' or 'prot'):
60        """
61        Apply the gblocks filtering algorithm to the alignment.
62        See this page:
63        http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html
64        Need to rename all sequences, because it will complain with long names.
65        """
66        # Temporary path #
67        if new_path is None: final = self.__class__(new_temp_path())
68        else:                final = self.__class__(new_path)
69        # Mapping every sequence name with a random name #
70        orig_name_to_temp = {seq.description: 'name' + str(i)
71                             for i,seq in enumerate(self)}
72        temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()}
73        # Rename every sequence with a random name #
74        temp_fasta = self.rename_sequences(orig_name_to_temp)
75        # Options #
76        if seq_type == 'nucl': t_option = "-t=d"
77        if seq_type == 'prot': t_option = "-t=p"
78        # Run it #
79        result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3",
80                              "-b3=20", "-b5=a", _ok_code=[0,1])
81        created_file = temp_fasta.path + '-gb'
82        assert os.path.exists(created_file)
83        # Check errors #
84        if "Execution terminated" in result.stdout:
85            raise Exception("gblocks crashed again.")
86        # Back #
87        temp_fasta.rename_sequences(temp_name_to_orig, final)
88        # Return #
89        return final

Apply the gblocks filtering algorithm to the alignment. See this page: http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html Need to rename all sequences, because it will complain with long names.

def build_tree(self, *args, **kwargs):
 91    def build_tree(self, *args, **kwargs):
 92        """
 93        Dispatch a tree build call. Note that you need at least four
 94        taxa to express some evolutionary history on an unrooted tree.
 95        """
 96        # Check length #
 97        assert len(self) > 3
 98        # Default option #
 99        algorithm = kwargs.pop(kwargs, None)
100        if algorithm is None: algorithm = 'raxml'
101        # Dispatch #
102        if algorithm == 'raxml':
103            return self.build_tree_raxml(*args, **kwargs)
104        if algorithm == 'fasttree':
105            return self.build_tree_fast(*args, **kwargs)

Dispatch a tree build call. Note that you need at least four taxa to express some evolutionary history on an unrooted tree.

def build_tree_raxml( self, new_path=None, seq_type='nucl', num_threads=None, free_cores=2, keep_dir=False):
107    def build_tree_raxml(self,
108                         new_path    = None,
109                         seq_type    = 'nucl' or 'prot',
110                         num_threads = None,
111                         free_cores  = 2,
112                         keep_dir    = False):
113        """Make a tree with RAxML."""
114        # Check output #
115        if new_path is None: new_path = self.prefix_path + '.tree'
116        # What model to choose #
117        if seq_type == 'nucl': model = "GTRGAMMA"
118        if seq_type == 'prot': model = "PROTGAMMAJTTF"
119        # Threads #
120        if num_threads is None:
121            num_threads = multiprocessing.cpu_count() - free_cores
122        else:
123            num_threads = int(num_threads) - free_cores
124        num_threads = max(1, num_threads)
125        # Run it #
126        temp_dir = new_temp_dir()
127        sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path,
128                    '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N',
129                    'autoMR')
130        # Move into place #
131        if keep_dir:
132            shutil.rmtree(new_path)
133            shutil.move(temp_dir, new_path)
134        if not keep_dir:
135            shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path)
136        # Return #
137        return FilePath(new_path)

Make a tree with RAxML.

def build_tree_fast(self, new_path=None, seq_type='nucl'):
139    def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'):
140        """Make a tree with FastTree. Names will be truncated however."""
141        # Check output #
142        if new_path is None: new_path = self.prefix_path + '.tree'
143        # Command #
144        command_args = []
145        if seq_type == 'nucl': command_args += ['-nt']
146        command_args += ['-gamma']
147        command_args += ['-out', new_path]
148        command_args += [self.path]
149        # Run it #
150        sh.FastTree(*command_args)
151        # Return #
152        return FilePath(new_path)

Make a tree with FastTree. Names will be truncated however.

Inherited Members
autopaths.base_path.BasePath
BasePath
clean_path
path
short_prefix
extension
escaped
absolute_path
physical_path
relative_path
rel_path_from
exists
permissions
mdate
mdate_iso
cdate
cdate_iso
unix_style
wsl_style
win_style
with_tilda
with_home
fasta.core.FASTA
format
buffer_size
gzipped
first
count
lengths
lengths_counter
open
close
progress
create
add
add_str
add_fasta
add_fastas
compress
compress_slow
compress_fast
ids
get_id
sql
length_by_id
subsample
rename_with_num
rename_with_prefix
rename_sequences
extract_length
extract_sequences
remove_trailing_stars
remove_duplicates
convert_U_to_T
align
template_align
index_bowtie
index_samtools
graphs
parse_primers
autopaths.file_path.FilePath
prefix_path
prefix
name
filename
directory
count_bytes
size
contents
contents_utf8
md5
might_be_binary
contains_binary
magic_number
lines
read
touch
writelines
remove
copy
execute
replace_extension
new_name_insert
make_directory
must_exist
head
pretty_head
tail
pretty_tail
move_to
rename
gzip_to
gzip_internal
gzip_external
gzip_pigz
ungzip_to
ungzip_internal
ungzip_external
zip_to
unzip_to
untar_to
untargz_to
untargz_to_internal
untargz_to_external
append
prepend
remove_line
remove_first_line
replace_line
replace_word
sed_replace
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format_map
maketrans