fasta.aligned
Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Written by Lucas Sinclair. 6MIT Licensed. 7Contact at www.sinclair.bio 8""" 9 10# Built-in modules # 11import os, multiprocessing, platform 12from collections import OrderedDict 13 14# Internal modules # 15from fasta import FASTA 16from plumbing.cache import property_cached 17from autopaths.file_path import FilePath 18from autopaths.tmp_path import new_temp_path, new_temp_dir 19 20# Third party modules # 21import shutil 22if platform.system() == 'Windows': import pbs3 as sh 23else: import sh 24 25################################################################################ 26class AlignedFASTA(FASTA): 27 """Also a FASTA file technically, but contains an alignment.""" 28 ext = 'aln' 29 30 def __iter__(self): return iter(self.parse()) 31 32 def parse(self): 33 self.open() 34 from Bio import AlignIO 35 return AlignIO.read(self.handle, self.format) 36 37 def add_seq(self, seq): 38 self.buffer.append(seq) 39 40 def flush(self): 41 from Bio import AlignIO 42 from Bio.Align import MultipleSeqAlignment 43 AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format) 44 45 def write(self, reads): 46 from Bio import AlignIO 47 if not self.directory.exists: self.directory.create() 48 self.open('w') 49 AlignIO.write(reads, self.handle, self.format) 50 self.close() 51 52 @property_cached 53 def sequences(self): 54 return OrderedDict(((seq.id, seq) for seq in self)) 55 56 def gblocks(self, 57 new_path = None, 58 seq_type = 'nucl' or 'prot'): 59 """ 60 Apply the gblocks filtering algorithm to the alignment. 61 See this page: 62 http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html 63 Need to rename all sequences, because it will complain with long names. 64 """ 65 # Temporary path # 66 if new_path is None: final = self.__class__(new_temp_path()) 67 else: final = self.__class__(new_path) 68 # Mapping every sequence name with a random name # 69 orig_name_to_temp = {seq.description: 'name' + str(i) 70 for i,seq in enumerate(self)} 71 temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()} 72 # Rename every sequence with a random name # 73 temp_fasta = self.rename_sequences(orig_name_to_temp) 74 # Options # 75 if seq_type == 'nucl': t_option = "-t=d" 76 if seq_type == 'prot': t_option = "-t=p" 77 # Run it # 78 result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3", 79 "-b3=20", "-b5=a", _ok_code=[0,1]) 80 created_file = temp_fasta.path + '-gb' 81 assert os.path.exists(created_file) 82 # Check errors # 83 if "Execution terminated" in result.stdout: 84 raise Exception("gblocks crashed again.") 85 # Back # 86 temp_fasta.rename_sequences(temp_name_to_orig, final) 87 # Return # 88 return final 89 90 def build_tree(self, *args, **kwargs): 91 """ 92 Dispatch a tree build call. Note that you need at least four 93 taxa to express some evolutionary history on an unrooted tree. 94 """ 95 # Check length # 96 assert len(self) > 3 97 # Default option # 98 algorithm = kwargs.pop(kwargs, None) 99 if algorithm is None: algorithm = 'raxml' 100 # Dispatch # 101 if algorithm == 'raxml': 102 return self.build_tree_raxml(*args, **kwargs) 103 if algorithm == 'fasttree': 104 return self.build_tree_fast(*args, **kwargs) 105 106 def build_tree_raxml(self, 107 new_path = None, 108 seq_type = 'nucl' or 'prot', 109 num_threads = None, 110 free_cores = 2, 111 keep_dir = False): 112 """Make a tree with RAxML.""" 113 # Check output # 114 if new_path is None: new_path = self.prefix_path + '.tree' 115 # What model to choose # 116 if seq_type == 'nucl': model = "GTRGAMMA" 117 if seq_type == 'prot': model = "PROTGAMMAJTTF" 118 # Threads # 119 if num_threads is None: 120 num_threads = multiprocessing.cpu_count() - free_cores 121 else: 122 num_threads = int(num_threads) - free_cores 123 num_threads = max(1, num_threads) 124 # Run it # 125 temp_dir = new_temp_dir() 126 sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path, 127 '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N', 128 'autoMR') 129 # Move into place # 130 if keep_dir: 131 shutil.rmtree(new_path) 132 shutil.move(temp_dir, new_path) 133 if not keep_dir: 134 shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path) 135 # Return # 136 return FilePath(new_path) 137 138 def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'): 139 """Make a tree with FastTree. Names will be truncated however.""" 140 # Check output # 141 if new_path is None: new_path = self.prefix_path + '.tree' 142 # Command # 143 command_args = [] 144 if seq_type == 'nucl': command_args += ['-nt'] 145 command_args += ['-gamma'] 146 command_args += ['-out', new_path] 147 command_args += [self.path] 148 # Run it # 149 sh.FastTree(*command_args) 150 # Return # 151 return FilePath(new_path)
27class AlignedFASTA(FASTA): 28 """Also a FASTA file technically, but contains an alignment.""" 29 ext = 'aln' 30 31 def __iter__(self): return iter(self.parse()) 32 33 def parse(self): 34 self.open() 35 from Bio import AlignIO 36 return AlignIO.read(self.handle, self.format) 37 38 def add_seq(self, seq): 39 self.buffer.append(seq) 40 41 def flush(self): 42 from Bio import AlignIO 43 from Bio.Align import MultipleSeqAlignment 44 AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format) 45 46 def write(self, reads): 47 from Bio import AlignIO 48 if not self.directory.exists: self.directory.create() 49 self.open('w') 50 AlignIO.write(reads, self.handle, self.format) 51 self.close() 52 53 @property_cached 54 def sequences(self): 55 return OrderedDict(((seq.id, seq) for seq in self)) 56 57 def gblocks(self, 58 new_path = None, 59 seq_type = 'nucl' or 'prot'): 60 """ 61 Apply the gblocks filtering algorithm to the alignment. 62 See this page: 63 http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html 64 Need to rename all sequences, because it will complain with long names. 65 """ 66 # Temporary path # 67 if new_path is None: final = self.__class__(new_temp_path()) 68 else: final = self.__class__(new_path) 69 # Mapping every sequence name with a random name # 70 orig_name_to_temp = {seq.description: 'name' + str(i) 71 for i,seq in enumerate(self)} 72 temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()} 73 # Rename every sequence with a random name # 74 temp_fasta = self.rename_sequences(orig_name_to_temp) 75 # Options # 76 if seq_type == 'nucl': t_option = "-t=d" 77 if seq_type == 'prot': t_option = "-t=p" 78 # Run it # 79 result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3", 80 "-b3=20", "-b5=a", _ok_code=[0,1]) 81 created_file = temp_fasta.path + '-gb' 82 assert os.path.exists(created_file) 83 # Check errors # 84 if "Execution terminated" in result.stdout: 85 raise Exception("gblocks crashed again.") 86 # Back # 87 temp_fasta.rename_sequences(temp_name_to_orig, final) 88 # Return # 89 return final 90 91 def build_tree(self, *args, **kwargs): 92 """ 93 Dispatch a tree build call. Note that you need at least four 94 taxa to express some evolutionary history on an unrooted tree. 95 """ 96 # Check length # 97 assert len(self) > 3 98 # Default option # 99 algorithm = kwargs.pop(kwargs, None) 100 if algorithm is None: algorithm = 'raxml' 101 # Dispatch # 102 if algorithm == 'raxml': 103 return self.build_tree_raxml(*args, **kwargs) 104 if algorithm == 'fasttree': 105 return self.build_tree_fast(*args, **kwargs) 106 107 def build_tree_raxml(self, 108 new_path = None, 109 seq_type = 'nucl' or 'prot', 110 num_threads = None, 111 free_cores = 2, 112 keep_dir = False): 113 """Make a tree with RAxML.""" 114 # Check output # 115 if new_path is None: new_path = self.prefix_path + '.tree' 116 # What model to choose # 117 if seq_type == 'nucl': model = "GTRGAMMA" 118 if seq_type == 'prot': model = "PROTGAMMAJTTF" 119 # Threads # 120 if num_threads is None: 121 num_threads = multiprocessing.cpu_count() - free_cores 122 else: 123 num_threads = int(num_threads) - free_cores 124 num_threads = max(1, num_threads) 125 # Run it # 126 temp_dir = new_temp_dir() 127 sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path, 128 '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N', 129 'autoMR') 130 # Move into place # 131 if keep_dir: 132 shutil.rmtree(new_path) 133 shutil.move(temp_dir, new_path) 134 if not keep_dir: 135 shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path) 136 # Return # 137 return FilePath(new_path) 138 139 def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'): 140 """Make a tree with FastTree. Names will be truncated however.""" 141 # Check output # 142 if new_path is None: new_path = self.prefix_path + '.tree' 143 # Command # 144 command_args = [] 145 if seq_type == 'nucl': command_args += ['-nt'] 146 command_args += ['-gamma'] 147 command_args += ['-out', new_path] 148 command_args += [self.path] 149 # Run it # 150 sh.FastTree(*command_args) 151 # Return # 152 return FilePath(new_path)
Also a FASTA file technically, but contains an alignment.
def
flush(self):
41 def flush(self): 42 from Bio import AlignIO 43 from Bio.Align import MultipleSeqAlignment 44 AlignIO.write(MultipleSeqAlignment(self.buffer), self.handle, self.format)
Empty the buffer.
sequences
Another way of easily retrieving sequences. Also highly ineffective. Consider using the SQLite API instead.
def
gblocks(self, new_path=None, seq_type='nucl'):
57 def gblocks(self, 58 new_path = None, 59 seq_type = 'nucl' or 'prot'): 60 """ 61 Apply the gblocks filtering algorithm to the alignment. 62 See this page: 63 http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html 64 Need to rename all sequences, because it will complain with long names. 65 """ 66 # Temporary path # 67 if new_path is None: final = self.__class__(new_temp_path()) 68 else: final = self.__class__(new_path) 69 # Mapping every sequence name with a random name # 70 orig_name_to_temp = {seq.description: 'name' + str(i) 71 for i,seq in enumerate(self)} 72 temp_name_to_orig = {v: k for k, v in orig_name_to_temp.items()} 73 # Rename every sequence with a random name # 74 temp_fasta = self.rename_sequences(orig_name_to_temp) 75 # Options # 76 if seq_type == 'nucl': t_option = "-t=d" 77 if seq_type == 'prot': t_option = "-t=p" 78 # Run it # 79 result = sh.gblocks91(temp_fasta.path, t_option, '-p=n', "-b4=3", 80 "-b3=20", "-b5=a", _ok_code=[0,1]) 81 created_file = temp_fasta.path + '-gb' 82 assert os.path.exists(created_file) 83 # Check errors # 84 if "Execution terminated" in result.stdout: 85 raise Exception("gblocks crashed again.") 86 # Back # 87 temp_fasta.rename_sequences(temp_name_to_orig, final) 88 # Return # 89 return final
Apply the gblocks filtering algorithm to the alignment. See this page: http://molevol.cmima.csic.es/castresana/Gblocks/Gblocks_documentation.html Need to rename all sequences, because it will complain with long names.
def
build_tree(self, *args, **kwargs):
91 def build_tree(self, *args, **kwargs): 92 """ 93 Dispatch a tree build call. Note that you need at least four 94 taxa to express some evolutionary history on an unrooted tree. 95 """ 96 # Check length # 97 assert len(self) > 3 98 # Default option # 99 algorithm = kwargs.pop(kwargs, None) 100 if algorithm is None: algorithm = 'raxml' 101 # Dispatch # 102 if algorithm == 'raxml': 103 return self.build_tree_raxml(*args, **kwargs) 104 if algorithm == 'fasttree': 105 return self.build_tree_fast(*args, **kwargs)
Dispatch a tree build call. Note that you need at least four taxa to express some evolutionary history on an unrooted tree.
def
build_tree_raxml( self, new_path=None, seq_type='nucl', num_threads=None, free_cores=2, keep_dir=False):
107 def build_tree_raxml(self, 108 new_path = None, 109 seq_type = 'nucl' or 'prot', 110 num_threads = None, 111 free_cores = 2, 112 keep_dir = False): 113 """Make a tree with RAxML.""" 114 # Check output # 115 if new_path is None: new_path = self.prefix_path + '.tree' 116 # What model to choose # 117 if seq_type == 'nucl': model = "GTRGAMMA" 118 if seq_type == 'prot': model = "PROTGAMMAJTTF" 119 # Threads # 120 if num_threads is None: 121 num_threads = multiprocessing.cpu_count() - free_cores 122 else: 123 num_threads = int(num_threads) - free_cores 124 num_threads = max(1, num_threads) 125 # Run it # 126 temp_dir = new_temp_dir() 127 sh.raxml811('-m', model, "-T", num_threads, '-p', 1, '-s', self.path, 128 '-n', 'tree', '-w', temp_dir, '-f', 'a', '-x', 1, '-N', 129 'autoMR') 130 # Move into place # 131 if keep_dir: 132 shutil.rmtree(new_path) 133 shutil.move(temp_dir, new_path) 134 if not keep_dir: 135 shutil.move(temp_dir + 'RAxML_bestTree.tree', new_path) 136 # Return # 137 return FilePath(new_path)
Make a tree with RAxML.
def
build_tree_fast(self, new_path=None, seq_type='nucl'):
139 def build_tree_fast(self, new_path=None, seq_type='nucl' or 'prot'): 140 """Make a tree with FastTree. Names will be truncated however.""" 141 # Check output # 142 if new_path is None: new_path = self.prefix_path + '.tree' 143 # Command # 144 command_args = [] 145 if seq_type == 'nucl': command_args += ['-nt'] 146 command_args += ['-gamma'] 147 command_args += ['-out', new_path] 148 command_args += [self.path] 149 # Run it # 150 sh.FastTree(*command_args) 151 # Return # 152 return FilePath(new_path)
Make a tree with FastTree. Names will be truncated however.
Inherited Members
- autopaths.base_path.BasePath
- BasePath
- clean_path
- path
- short_prefix
- extension
- escaped
- absolute_path
- physical_path
- relative_path
- rel_path_from
- exists
- is_symlink
- permissions
- mdate
- mdate_iso
- cdate
- cdate_iso
- unix_style
- wsl_style
- win_style
- with_tilda
- with_home
- link_from
- link_to
- symlinks_on_linux
- symlinks_on_windows
- hard_link_win_to
- fasta.core.FASTA
- format
- buffer_size
- gzipped
- first
- count
- lengths
- lengths_counter
- open
- close
- progress
- create
- add
- add_str
- add_fasta
- add_fastas
- compress
- compress_slow
- compress_fast
- ids
- get_id
- sql
- length_by_id
- subsample
- rename_with_num
- rename_with_prefix
- rename_sequences
- extract_length
- extract_sequences
- remove_trailing_stars
- remove_duplicates
- convert_U_to_T
- align
- template_align
- index_bowtie
- index_samtools
- graphs
- parse_primers
- autopaths.file_path.FilePath
- prefix_path
- prefix
- name
- filename
- directory
- count_bytes
- size
- contents
- contents_utf8
- md5
- might_be_binary
- contains_binary
- magic_number
- lines
- read
- touch
- writelines
- remove
- copy
- execute
- replace_extension
- new_name_insert
- make_directory
- must_exist
- head
- pretty_head
- tail
- pretty_tail
- move_to
- rename
- gzip_to
- gzip_internal
- gzip_external
- gzip_pigz
- ungzip_to
- ungzip_internal
- ungzip_external
- zip_to
- unzip_to
- untar_to
- untargz_to
- untargz_to_internal
- untargz_to_external
- append
- prepend
- remove_line
- remove_first_line
- replace_line
- replace_word
- sed_replace
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format_map
- maketrans