fasta.core
Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Written by Lucas Sinclair. 6MIT Licensed. 7Contact at www.sinclair.bio 8""" 9 10# Built-in modules # 11import os, sys, io, gzip, shutil, itertools, platform 12from collections import Counter, OrderedDict 13from six import string_types 14 15# Internal modules # 16from fasta import graphs 17 18# First party modules # 19from plumbing.common import isubsample 20from plumbing.color import Color 21from plumbing.cache import property_cached 22from autopaths.file_path import FilePath 23from autopaths.tmp_path import new_temp_path 24 25# Third party modules # 26from tqdm import tqdm 27if platform.system() == 'Windows': import pbs3 as sh 28else: import sh 29 30# Constants # 31class Dummy: pass 32 33############################################################################### 34class FASTA(FilePath): 35 """ 36 A single FASTA file somewhere in the filesystem. You can read from it in 37 several convenient ways. You can write to it in a automatically buffered 38 way. There are several other things you can do with a FASTA file. 39 """ 40 41 format = 'fasta' 42 ext = 'fasta' 43 buffer_size = 1000 44 45 def __len__(self): return self.count 46 47 def __repr__(self): 48 return '<%s object on "%s">' % (self.__class__.__name__, self.path) 49 50 def __contains__(self, other): return other in self.ids 51 52 def __enter__(self): return self.create() 53 54 def __exit__(self, exc_type, exc_value, traceback): self.close() 55 56 def __iter__(self): 57 for seq in self.parse(): yield seq 58 self.close() 59 60 def __getitem__(self, key): 61 if isinstance(key, string_types): return self.sequences[key] 62 elif isinstance(key, int): return self.sequences.items()[key] 63 elif isinstance(key, slice): 64 return itertools.islice(self, key.start, key.stop, key.step) 65 66 #----------------------------- Properties --------------------------------# 67 @property 68 def gzipped(self): return True if self.path.endswith('gz') else False 69 70 @property 71 def first(self): 72 """Just the first sequence.""" 73 from Bio import SeqIO 74 self.open() 75 seq = next(SeqIO.parse(self.handle, self.format)) 76 self.close() 77 return seq 78 79 @property_cached 80 def count(self): 81 """ 82 Should probably check for file size changes instead of just 83 caching once TODO. 84 """ 85 # For debugging purposes # 86 if False: print("-> counting reads in `%s`" % self.path) 87 # If we are gzipped we can just use zgrep # 88 if self.gzipped: 89 return int(sh.zgrep('-c', "^>", self.path, _ok_code=[0,1])) 90 else: 91 return int(sh.grep('-c', "^>", self.path, _ok_code=[0,1])) 92 93 @property 94 def lengths(self): 95 """All the lengths, one by one, in a list.""" 96 return map(len, self) 97 98 @property_cached 99 def lengths_counter(self): 100 """A Counter() object with all the lengths inside.""" 101 return Counter((len(s) for s in self.parse())) 102 103 #-------------------------- Basic IO methods -----------------------------# 104 def open(self, mode='r'): 105 # Two cases # 106 if self.gzipped: 107 self.handle = gzip.open(self.path, mode) 108 self.handle = io.TextIOWrapper(self.handle, encoding='utf8') 109 else: 110 self.handle = open(self.path, mode) 111 # For convenience # 112 return self.handle 113 114 def close(self): 115 # Case we were writing to the file # 116 if hasattr(self, 'buffer'): 117 self.flush() 118 del self.buffer 119 # Standard case # 120 self.handle.close() 121 # For pickling purposes (can't use dill on gzip handles) # 122 del self.handle 123 124 def parse(self): 125 self.open() 126 from Bio import SeqIO 127 return SeqIO.parse(self.handle, self.format) 128 129 @property 130 def progress(self): 131 """Just like self.parse() but will display a progress bar.""" 132 return tqdm(self, total=len(self)) 133 134 def create(self): 135 """Create the file on the file system.""" 136 self.buffer = [] 137 self.buf_count = 0 138 if not self.directory.exists: self.directory.create() 139 self.open('w') 140 return self 141 142 def add(self, seqs): 143 """Use this method to add a bunch of SeqRecords at once.""" 144 for seq in seqs: self.add_seq(seq) 145 146 def add_seq(self, seq): 147 """Use this method to add a SeqRecord object to this fasta.""" 148 self.buffer.append(seq) 149 self.buf_count += 1 150 if self.buf_count % self.buffer_size == 0: self.flush() 151 152 def add_str(self, seq, name=None, description=""): 153 """Use this method to add a sequence as a string to this fasta.""" 154 from Bio.SeqRecord import SeqRecord 155 from Bio.Seq import Seq 156 self.add_seq(SeqRecord(Seq(seq), id=name, description=description)) 157 158 def add_fasta(self, path): 159 """Use this method to add an other fasta to this fasta.""" 160 path = FASTA(path) 161 self.add(path) 162 163 def add_fastas(self, paths): 164 """Use this method to add a bunch of fastas to this fasta.""" 165 for p in paths: self.add_fasta(p) 166 167 def flush(self): 168 """Empty the buffer.""" 169 from Bio import SeqIO 170 for seq in self.buffer: 171 SeqIO.write(seq, self.handle, self.format) 172 self.buffer = [] 173 174 def write(self, reads): 175 from Bio import SeqIO 176 if not self.directory.exists: self.directory.create() 177 self.open('w') 178 SeqIO.write(reads, self.handle, self.format) 179 self.close() 180 return self 181 182 #-------------------------- Compressing the data -------------------------# 183 def compress(self, new_path=None, remove_orig=False, method='slow'): 184 """Turn this FASTA file into a gzipped FASTA file.""" 185 # Check we are not compressed already # 186 if self.gzipped: 187 msg = "The fasta file '%s' is already compressed." 188 raise Exception(msg % self.path) 189 # Pick the new path # 190 if new_path is None: new_path = self.path + '.gz' 191 # Do it the fast way or the slow way # 192 if method == 'fast': self.compress_fast(new_path) 193 else: self.compress_slow(new_path) 194 # Optionally remove the original uncompressed file # 195 if remove_orig: self.remove() 196 # Update the internal path # 197 self.path = new_path 198 # Return # 199 return self.path 200 201 def compress_slow(self, new_path): 202 """Do the compression internally via python.""" 203 with gzip.open(new_path, 'wb') as handle: 204 shutil.copyfileobj(self.open('rb'), handle) 205 206 def compress_fast(self, new_path): 207 """Do the compression with an external shell command call.""" 208 # We don't want python to be buffering the text for speed # 209 from shell_command import shell_output 210 cmd = 'gzip --stdout %s > %s' % (self.path, new_path) 211 return shell_output(cmd) 212 213 #------------------------- When IDs are important ------------------------# 214 @property_cached 215 def ids(self): 216 """A frozen set of all unique IDs in the file.""" 217 as_list = [seq.description.split()[0] for seq in self] 218 as_set = frozenset(as_list) 219 assert len(as_set) == len(as_list) 220 return as_set 221 222 def get_id(self, id_num): 223 """ 224 Extract one sequence from the file based on its ID. 225 This is highly ineffective. 226 Consider using the SQLite API instead or memory map the file. 227 """ 228 for seq in self: 229 if seq.id == id_num: return seq 230 231 @property_cached 232 def sequences(self): 233 """ 234 Another way of easily retrieving sequences. Also highly ineffective. 235 Consider using the SQLite API instead. 236 """ 237 return OrderedDict(((seq.id, seq) for seq in self)) 238 239 @property_cached 240 def sql(self): 241 """ 242 If you access this attribute, we will build an SQLite database 243 out of the FASTA file and you will be able access everything in an 244 indexed fashion, and use the blaze library via sql.frame 245 """ 246 from fasta.indexed import DatabaseFASTA, fasta_to_sql 247 db = DatabaseFASTA(self.prefix_path + ".db") 248 if not db.exists: fasta_to_sql(self.path, db.path) 249 return db 250 251 @property_cached 252 def length_by_id(self): 253 """ 254 In some use cases you just need the sequence lengths in an indexed 255 fashion. If you access this attribute, we will make a hash map in 256 memory. 257 """ 258 hash_map = dict((seq.id, len(seq)) for seq in self) 259 tmp = hash_map.copy() 260 hash_map.update(tmp) 261 return hash_map 262 263 #----------------- Ways of interacting with the data --------------------# 264 def subsample(self, down_to=1, new_path=None, verbose=True): 265 """Pick a given number of sequences from the file pseudo-randomly.""" 266 # Pick the destination path # 267 if new_path is None: 268 subsampled = self.__class__(new_temp_path()) 269 elif isinstance(new_path, FASTA): 270 subsampled = new_path 271 else: 272 subsampled = self.__class__(new_path) 273 # Check size # 274 if down_to > len(self): 275 message = "Can't subsample %s down to %i. Only down to %i." 276 print(Color.ylw + message % (self, down_to, len(self)) + Color.end) 277 self.copy(new_path) 278 return 279 # Select verbosity # 280 import tqdm 281 if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count) 282 else: wrapper = lambda x: x 283 # Generator # 284 def iterator(): 285 for read in wrapper(isubsample(self, down_to)): 286 yield read 287 # Do it # 288 subsampled.write(iterator()) 289 # Did it work # 290 assert len(subsampled) == down_to 291 # Return # 292 return subsampled 293 294 def rename_with_num(self, prefix="", new_path=None, remove_desc=True): 295 """Rename every sequence based on a prefix and a number.""" 296 # Temporary path # 297 if new_path is None: numbered = self.__class__(new_temp_path()) 298 else: numbered = self.__class__(new_path) 299 # Generator # 300 def numbered_iterator(): 301 for i,read in enumerate(self): 302 read.id = prefix + str(i) 303 read.seq = read.seq.upper() 304 if remove_desc: read.description = "" 305 yield read 306 # Do it # 307 numbered.write(numbered_iterator()) 308 # Replace it # 309 if new_path is None: 310 os.remove(self.path) 311 shutil.move(numbered, self.path) 312 # Return # 313 return numbered 314 315 def rename_with_prefix(self, prefix="", new_path=None, in_place=True, 316 remove_desc=True): 317 """Rename every sequence based on a prefix.""" 318 # Temporary path # 319 if new_path is None: prefixed = self.__class__(new_temp_path()) 320 else: prefixed = self.__class__(new_path) 321 # Generator # 322 def prefixed_iterator(): 323 for i,read in enumerate(self): 324 read.id = prefix + read.id 325 if remove_desc: read.description = "" 326 yield read 327 # Do it # 328 prefixed.write(prefixed_iterator()) 329 # Replace it # 330 if in_place: 331 os.remove(self.path) 332 shutil.move(prefixed, self.path) 333 # Return # 334 return prefixed 335 336 def rename_sequences(self, mapping, new_path=None, in_place=False): 337 """ 338 Will rename all sequences in the current fasta file using 339 the mapping dictionary also provided. In place or at a new path. 340 """ 341 # Where is the new file # 342 if new_path is None: new_fasta = self.__class__(new_temp_path()) 343 else: new_fasta = self.__class__(new_path) 344 # Do it # 345 new_fasta.create() 346 for seq in self: 347 new_name = mapping[seq.description] 348 nucleotides = str(seq.seq) 349 new_fasta.add_str(nucleotides, new_name) 350 new_fasta.close() 351 # Return # 352 if in_place: 353 os.remove(self.path) 354 shutil.move(new_fasta, self.path) 355 return self 356 else: return new_fasta 357 358 def extract_length(self, lower_bound=None, 359 upper_bound=None, 360 new_path=None): 361 """Extract a certain length fraction and place them in a new file.""" 362 # Temporary path # 363 if new_path is None: fraction = self.__class__(new_temp_path()) 364 elif isinstance(new_path, FASTA): fraction = new_path 365 else: fraction = self.__class__(new_path) 366 # Generator # 367 if lower_bound is None: lower_bound = 0 368 if upper_bound is None: upper_bound = sys.maxsize 369 def fraction_iterator(): 370 for read in self: 371 if lower_bound <= len(read) <= upper_bound: 372 yield read 373 # Do it # 374 fraction.write(fraction_iterator()) 375 # Return # 376 return fraction 377 378 def extract_sequences(self, ids, 379 new_path = None, 380 in_place = False, 381 verbose = False): 382 """ 383 Will take all the sequences from the current file who's id appears in 384 the ids given and place them in a new file. 385 If no path is given, a new temporary path is created and returned. 386 If `in_place` is set to True, the original file is removed and replaced 387 with the result of the extraction. 388 Optionally, the argument `ids` can be a function which has to take 389 one string as only input and return True for keeping the sequence and 390 False for discarding the sequence. 391 """ 392 # Temporary path # 393 if new_path is None: new_fasta = self.__class__(new_temp_path()) 394 elif isinstance(new_path, FASTA): new_fasta = new_path 395 else: new_fasta = self.__class__(new_path) 396 # Select verbosity # 397 import tqdm 398 wrapper = tqdm.tqdm if verbose else lambda x: x 399 # Simple generator # 400 def simple_match(reads): 401 for r in wrapper(reads): 402 if r.id in ids: yield r 403 # Generator with function # 404 def function_match(reads): 405 for r in wrapper(reads): 406 if ids(r.id): yield r 407 # Do it # 408 if callable(ids): 409 new_fasta.write(function_match(self)) 410 else: 411 new_fasta.write(simple_match(self)) 412 # Return # 413 if in_place: 414 os.remove(self.path) 415 shutil.move(new_fasta, self.path) 416 return self 417 else: return new_fasta 418 419 def remove_trailing_stars(self, new_path=None, in_place=True, check=False): 420 """ 421 Remove the bad character that can be inserted by some programs at the 422 end of sequences. 423 """ 424 # Optional check # 425 if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0: 426 return self 427 # Faster with bash utilities # 428 if in_place is True: 429 sh.sed('-i', 's/\\*$//g', self.path) 430 return self 431 # Standard way # 432 if new_path is None: new_fasta = self.__class__(new_temp_path()) 433 else: new_fasta = self.__class__(new_path) 434 new_fasta.create() 435 for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id) 436 new_fasta.close() 437 # Return # 438 return new_fasta 439 440 def _generator_mod(self, generator, new_path=None, in_place=True): 441 """ 442 Generic way of modifying the current fasta either in place or 443 with a new destination pass. 444 Simply, pass a generator function that will yield the new sequences 445 given the current ones. 446 """ 447 # Temporary path # 448 if new_path is None: new_fasta = self.__class__(new_temp_path()) 449 elif isinstance(new_path, FASTA): new_fasta = new_path 450 else: new_fasta = self.__class__(new_path) 451 # Do it # 452 new_fasta.write(generator()) 453 # Return # 454 if in_place: 455 os.remove(self.path) 456 shutil.move(new_fasta, self.path) 457 return self 458 else: return new_fasta 459 460 def remove_duplicates(self, new_path=None, in_place=True): 461 """ 462 If several entries have the same ID in the FASTA file, keep only the 463 first appearance and remove all the others. 464 """ 465 # Generator # 466 def unique_entries(): 467 seen = set() 468 for i, read in enumerate(self): 469 if read.id in seen: continue 470 else: 471 seen.add(read.id) 472 yield read 473 # Return # 474 return self._generator_mod(unique_entries, new_path, in_place) 475 476 def convert_U_to_T(self, new_path=None, in_place=True): 477 # Generator # 478 def all_U_to_T(): 479 for i, read in enumerate(self): 480 read.seq = read.seq.back_transcribe() 481 yield read 482 # Return # 483 return self._generator_mod(all_U_to_T, new_path, in_place) 484 485 #---------------------------- Third party programs -----------------------# 486 def align(self, out_path=None): 487 """We align the sequences in the fasta file with muscle.""" 488 if out_path is None: out_path = self.prefix_path + '.aln' 489 sh.muscle38("-in", self.path, "-out", out_path) 490 from fasta.aligned import AlignedFASTA 491 return AlignedFASTA(out_path) 492 493 def template_align(self, ref_path): 494 """ 495 We align the sequences in the fasta file with mothur and a template. 496 """ 497 # Run it # 498 msg = "#align.seqs(candidate=%s, template=%s, search=blast," \ 499 "flip=false, processors=8);" 500 sh.mothur(msg % (self.path, ref_path)) 501 # Move things # 502 shutil.move(self.path[:-6] + '.align', self.p.aligned) 503 shutil.move(self.path[:-6] + '.align.report', self.p.report) 504 shutil.move(self.path[:-6] + '.flip.accnos', self.p.accnos) 505 # Clean up # 506 if os.path.exists('formatdb.log'): 507 os.remove('formatdb.log') 508 if os.path.exists('error.log') and os.path.getsize('error.log') == 0: 509 os.remove('error.log') 510 for path in sh.glob('mothur.*.logfile'): 511 os.remove(path) 512 # Return # 513 return self.p.aligned 514 515 def index_bowtie(self): 516 """Create an index on the fasta file compatible with bowtie2.""" 517 # It returns exit code 1 if the fasta is empty # 518 assert self 519 # Call the bowtie executable # 520 sh.bowtie2_build(self.path, self.path) 521 return FilePath(self.path + '.1.bt2') 522 523 def index_samtools(self): 524 """Create an index on the fasta file compatible with samtools.""" 525 sh.samtools('faidx', self.path) 526 return FilePath(self.path + '.fai') 527 528 #--------------------------------- Graphs --------------------------------# 529 @property_cached 530 def graphs(self): 531 """ 532 Sorry for the black magic. The result is an object whose attributes 533 are all the graphs found in `./graphs.py` initialized with this 534 instance as only argument. 535 """ 536 # Make a dummy object # 537 result = Dummy() 538 # Loop over graphs # 539 for graph in graphs.__all__: 540 cls = getattr(graphs, graph) 541 setattr(result, cls.short_name, cls(self)) 542 # Return # 543 return result 544 545 #-------------------------------- Primers -------------------------------# 546 def parse_primers(self, primers, mismatches=None): 547 """ 548 Takes care of identifying primers inside every sequence. 549 Instead of yielding Seq objects now we yield ReadWithPrimers objects. 550 These have extra properties that show the start and end positions 551 of all primers found. 552 """ 553 # Default is zero # 554 if mismatches is None: mismatches = 0 555 # Get the search expressions with mismatches # 556 from fasta.primers import PrimersRegexes 557 regexes = PrimersRegexes(primers, mismatches) 558 # Generate a new special object for every read # 559 from fasta.primers import ReadWithPrimers 560 read_with_primer = lambda read: ReadWithPrimers(read, regexes) 561 generator = (read_with_primer(r) for r in self.parse()) 562 # Add the length to the generator # 563 from plumbing.common import GenWithLength 564 generator = GenWithLength(generator, len(self)) 565 # Return # 566 return generator
32class Dummy: pass
35class FASTA(FilePath): 36 """ 37 A single FASTA file somewhere in the filesystem. You can read from it in 38 several convenient ways. You can write to it in a automatically buffered 39 way. There are several other things you can do with a FASTA file. 40 """ 41 42 format = 'fasta' 43 ext = 'fasta' 44 buffer_size = 1000 45 46 def __len__(self): return self.count 47 48 def __repr__(self): 49 return '<%s object on "%s">' % (self.__class__.__name__, self.path) 50 51 def __contains__(self, other): return other in self.ids 52 53 def __enter__(self): return self.create() 54 55 def __exit__(self, exc_type, exc_value, traceback): self.close() 56 57 def __iter__(self): 58 for seq in self.parse(): yield seq 59 self.close() 60 61 def __getitem__(self, key): 62 if isinstance(key, string_types): return self.sequences[key] 63 elif isinstance(key, int): return self.sequences.items()[key] 64 elif isinstance(key, slice): 65 return itertools.islice(self, key.start, key.stop, key.step) 66 67 #----------------------------- Properties --------------------------------# 68 @property 69 def gzipped(self): return True if self.path.endswith('gz') else False 70 71 @property 72 def first(self): 73 """Just the first sequence.""" 74 from Bio import SeqIO 75 self.open() 76 seq = next(SeqIO.parse(self.handle, self.format)) 77 self.close() 78 return seq 79 80 @property_cached 81 def count(self): 82 """ 83 Should probably check for file size changes instead of just 84 caching once TODO. 85 """ 86 # For debugging purposes # 87 if False: print("-> counting reads in `%s`" % self.path) 88 # If we are gzipped we can just use zgrep # 89 if self.gzipped: 90 return int(sh.zgrep('-c', "^>", self.path, _ok_code=[0,1])) 91 else: 92 return int(sh.grep('-c', "^>", self.path, _ok_code=[0,1])) 93 94 @property 95 def lengths(self): 96 """All the lengths, one by one, in a list.""" 97 return map(len, self) 98 99 @property_cached 100 def lengths_counter(self): 101 """A Counter() object with all the lengths inside.""" 102 return Counter((len(s) for s in self.parse())) 103 104 #-------------------------- Basic IO methods -----------------------------# 105 def open(self, mode='r'): 106 # Two cases # 107 if self.gzipped: 108 self.handle = gzip.open(self.path, mode) 109 self.handle = io.TextIOWrapper(self.handle, encoding='utf8') 110 else: 111 self.handle = open(self.path, mode) 112 # For convenience # 113 return self.handle 114 115 def close(self): 116 # Case we were writing to the file # 117 if hasattr(self, 'buffer'): 118 self.flush() 119 del self.buffer 120 # Standard case # 121 self.handle.close() 122 # For pickling purposes (can't use dill on gzip handles) # 123 del self.handle 124 125 def parse(self): 126 self.open() 127 from Bio import SeqIO 128 return SeqIO.parse(self.handle, self.format) 129 130 @property 131 def progress(self): 132 """Just like self.parse() but will display a progress bar.""" 133 return tqdm(self, total=len(self)) 134 135 def create(self): 136 """Create the file on the file system.""" 137 self.buffer = [] 138 self.buf_count = 0 139 if not self.directory.exists: self.directory.create() 140 self.open('w') 141 return self 142 143 def add(self, seqs): 144 """Use this method to add a bunch of SeqRecords at once.""" 145 for seq in seqs: self.add_seq(seq) 146 147 def add_seq(self, seq): 148 """Use this method to add a SeqRecord object to this fasta.""" 149 self.buffer.append(seq) 150 self.buf_count += 1 151 if self.buf_count % self.buffer_size == 0: self.flush() 152 153 def add_str(self, seq, name=None, description=""): 154 """Use this method to add a sequence as a string to this fasta.""" 155 from Bio.SeqRecord import SeqRecord 156 from Bio.Seq import Seq 157 self.add_seq(SeqRecord(Seq(seq), id=name, description=description)) 158 159 def add_fasta(self, path): 160 """Use this method to add an other fasta to this fasta.""" 161 path = FASTA(path) 162 self.add(path) 163 164 def add_fastas(self, paths): 165 """Use this method to add a bunch of fastas to this fasta.""" 166 for p in paths: self.add_fasta(p) 167 168 def flush(self): 169 """Empty the buffer.""" 170 from Bio import SeqIO 171 for seq in self.buffer: 172 SeqIO.write(seq, self.handle, self.format) 173 self.buffer = [] 174 175 def write(self, reads): 176 from Bio import SeqIO 177 if not self.directory.exists: self.directory.create() 178 self.open('w') 179 SeqIO.write(reads, self.handle, self.format) 180 self.close() 181 return self 182 183 #-------------------------- Compressing the data -------------------------# 184 def compress(self, new_path=None, remove_orig=False, method='slow'): 185 """Turn this FASTA file into a gzipped FASTA file.""" 186 # Check we are not compressed already # 187 if self.gzipped: 188 msg = "The fasta file '%s' is already compressed." 189 raise Exception(msg % self.path) 190 # Pick the new path # 191 if new_path is None: new_path = self.path + '.gz' 192 # Do it the fast way or the slow way # 193 if method == 'fast': self.compress_fast(new_path) 194 else: self.compress_slow(new_path) 195 # Optionally remove the original uncompressed file # 196 if remove_orig: self.remove() 197 # Update the internal path # 198 self.path = new_path 199 # Return # 200 return self.path 201 202 def compress_slow(self, new_path): 203 """Do the compression internally via python.""" 204 with gzip.open(new_path, 'wb') as handle: 205 shutil.copyfileobj(self.open('rb'), handle) 206 207 def compress_fast(self, new_path): 208 """Do the compression with an external shell command call.""" 209 # We don't want python to be buffering the text for speed # 210 from shell_command import shell_output 211 cmd = 'gzip --stdout %s > %s' % (self.path, new_path) 212 return shell_output(cmd) 213 214 #------------------------- When IDs are important ------------------------# 215 @property_cached 216 def ids(self): 217 """A frozen set of all unique IDs in the file.""" 218 as_list = [seq.description.split()[0] for seq in self] 219 as_set = frozenset(as_list) 220 assert len(as_set) == len(as_list) 221 return as_set 222 223 def get_id(self, id_num): 224 """ 225 Extract one sequence from the file based on its ID. 226 This is highly ineffective. 227 Consider using the SQLite API instead or memory map the file. 228 """ 229 for seq in self: 230 if seq.id == id_num: return seq 231 232 @property_cached 233 def sequences(self): 234 """ 235 Another way of easily retrieving sequences. Also highly ineffective. 236 Consider using the SQLite API instead. 237 """ 238 return OrderedDict(((seq.id, seq) for seq in self)) 239 240 @property_cached 241 def sql(self): 242 """ 243 If you access this attribute, we will build an SQLite database 244 out of the FASTA file and you will be able access everything in an 245 indexed fashion, and use the blaze library via sql.frame 246 """ 247 from fasta.indexed import DatabaseFASTA, fasta_to_sql 248 db = DatabaseFASTA(self.prefix_path + ".db") 249 if not db.exists: fasta_to_sql(self.path, db.path) 250 return db 251 252 @property_cached 253 def length_by_id(self): 254 """ 255 In some use cases you just need the sequence lengths in an indexed 256 fashion. If you access this attribute, we will make a hash map in 257 memory. 258 """ 259 hash_map = dict((seq.id, len(seq)) for seq in self) 260 tmp = hash_map.copy() 261 hash_map.update(tmp) 262 return hash_map 263 264 #----------------- Ways of interacting with the data --------------------# 265 def subsample(self, down_to=1, new_path=None, verbose=True): 266 """Pick a given number of sequences from the file pseudo-randomly.""" 267 # Pick the destination path # 268 if new_path is None: 269 subsampled = self.__class__(new_temp_path()) 270 elif isinstance(new_path, FASTA): 271 subsampled = new_path 272 else: 273 subsampled = self.__class__(new_path) 274 # Check size # 275 if down_to > len(self): 276 message = "Can't subsample %s down to %i. Only down to %i." 277 print(Color.ylw + message % (self, down_to, len(self)) + Color.end) 278 self.copy(new_path) 279 return 280 # Select verbosity # 281 import tqdm 282 if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count) 283 else: wrapper = lambda x: x 284 # Generator # 285 def iterator(): 286 for read in wrapper(isubsample(self, down_to)): 287 yield read 288 # Do it # 289 subsampled.write(iterator()) 290 # Did it work # 291 assert len(subsampled) == down_to 292 # Return # 293 return subsampled 294 295 def rename_with_num(self, prefix="", new_path=None, remove_desc=True): 296 """Rename every sequence based on a prefix and a number.""" 297 # Temporary path # 298 if new_path is None: numbered = self.__class__(new_temp_path()) 299 else: numbered = self.__class__(new_path) 300 # Generator # 301 def numbered_iterator(): 302 for i,read in enumerate(self): 303 read.id = prefix + str(i) 304 read.seq = read.seq.upper() 305 if remove_desc: read.description = "" 306 yield read 307 # Do it # 308 numbered.write(numbered_iterator()) 309 # Replace it # 310 if new_path is None: 311 os.remove(self.path) 312 shutil.move(numbered, self.path) 313 # Return # 314 return numbered 315 316 def rename_with_prefix(self, prefix="", new_path=None, in_place=True, 317 remove_desc=True): 318 """Rename every sequence based on a prefix.""" 319 # Temporary path # 320 if new_path is None: prefixed = self.__class__(new_temp_path()) 321 else: prefixed = self.__class__(new_path) 322 # Generator # 323 def prefixed_iterator(): 324 for i,read in enumerate(self): 325 read.id = prefix + read.id 326 if remove_desc: read.description = "" 327 yield read 328 # Do it # 329 prefixed.write(prefixed_iterator()) 330 # Replace it # 331 if in_place: 332 os.remove(self.path) 333 shutil.move(prefixed, self.path) 334 # Return # 335 return prefixed 336 337 def rename_sequences(self, mapping, new_path=None, in_place=False): 338 """ 339 Will rename all sequences in the current fasta file using 340 the mapping dictionary also provided. In place or at a new path. 341 """ 342 # Where is the new file # 343 if new_path is None: new_fasta = self.__class__(new_temp_path()) 344 else: new_fasta = self.__class__(new_path) 345 # Do it # 346 new_fasta.create() 347 for seq in self: 348 new_name = mapping[seq.description] 349 nucleotides = str(seq.seq) 350 new_fasta.add_str(nucleotides, new_name) 351 new_fasta.close() 352 # Return # 353 if in_place: 354 os.remove(self.path) 355 shutil.move(new_fasta, self.path) 356 return self 357 else: return new_fasta 358 359 def extract_length(self, lower_bound=None, 360 upper_bound=None, 361 new_path=None): 362 """Extract a certain length fraction and place them in a new file.""" 363 # Temporary path # 364 if new_path is None: fraction = self.__class__(new_temp_path()) 365 elif isinstance(new_path, FASTA): fraction = new_path 366 else: fraction = self.__class__(new_path) 367 # Generator # 368 if lower_bound is None: lower_bound = 0 369 if upper_bound is None: upper_bound = sys.maxsize 370 def fraction_iterator(): 371 for read in self: 372 if lower_bound <= len(read) <= upper_bound: 373 yield read 374 # Do it # 375 fraction.write(fraction_iterator()) 376 # Return # 377 return fraction 378 379 def extract_sequences(self, ids, 380 new_path = None, 381 in_place = False, 382 verbose = False): 383 """ 384 Will take all the sequences from the current file who's id appears in 385 the ids given and place them in a new file. 386 If no path is given, a new temporary path is created and returned. 387 If `in_place` is set to True, the original file is removed and replaced 388 with the result of the extraction. 389 Optionally, the argument `ids` can be a function which has to take 390 one string as only input and return True for keeping the sequence and 391 False for discarding the sequence. 392 """ 393 # Temporary path # 394 if new_path is None: new_fasta = self.__class__(new_temp_path()) 395 elif isinstance(new_path, FASTA): new_fasta = new_path 396 else: new_fasta = self.__class__(new_path) 397 # Select verbosity # 398 import tqdm 399 wrapper = tqdm.tqdm if verbose else lambda x: x 400 # Simple generator # 401 def simple_match(reads): 402 for r in wrapper(reads): 403 if r.id in ids: yield r 404 # Generator with function # 405 def function_match(reads): 406 for r in wrapper(reads): 407 if ids(r.id): yield r 408 # Do it # 409 if callable(ids): 410 new_fasta.write(function_match(self)) 411 else: 412 new_fasta.write(simple_match(self)) 413 # Return # 414 if in_place: 415 os.remove(self.path) 416 shutil.move(new_fasta, self.path) 417 return self 418 else: return new_fasta 419 420 def remove_trailing_stars(self, new_path=None, in_place=True, check=False): 421 """ 422 Remove the bad character that can be inserted by some programs at the 423 end of sequences. 424 """ 425 # Optional check # 426 if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0: 427 return self 428 # Faster with bash utilities # 429 if in_place is True: 430 sh.sed('-i', 's/\\*$//g', self.path) 431 return self 432 # Standard way # 433 if new_path is None: new_fasta = self.__class__(new_temp_path()) 434 else: new_fasta = self.__class__(new_path) 435 new_fasta.create() 436 for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id) 437 new_fasta.close() 438 # Return # 439 return new_fasta 440 441 def _generator_mod(self, generator, new_path=None, in_place=True): 442 """ 443 Generic way of modifying the current fasta either in place or 444 with a new destination pass. 445 Simply, pass a generator function that will yield the new sequences 446 given the current ones. 447 """ 448 # Temporary path # 449 if new_path is None: new_fasta = self.__class__(new_temp_path()) 450 elif isinstance(new_path, FASTA): new_fasta = new_path 451 else: new_fasta = self.__class__(new_path) 452 # Do it # 453 new_fasta.write(generator()) 454 # Return # 455 if in_place: 456 os.remove(self.path) 457 shutil.move(new_fasta, self.path) 458 return self 459 else: return new_fasta 460 461 def remove_duplicates(self, new_path=None, in_place=True): 462 """ 463 If several entries have the same ID in the FASTA file, keep only the 464 first appearance and remove all the others. 465 """ 466 # Generator # 467 def unique_entries(): 468 seen = set() 469 for i, read in enumerate(self): 470 if read.id in seen: continue 471 else: 472 seen.add(read.id) 473 yield read 474 # Return # 475 return self._generator_mod(unique_entries, new_path, in_place) 476 477 def convert_U_to_T(self, new_path=None, in_place=True): 478 # Generator # 479 def all_U_to_T(): 480 for i, read in enumerate(self): 481 read.seq = read.seq.back_transcribe() 482 yield read 483 # Return # 484 return self._generator_mod(all_U_to_T, new_path, in_place) 485 486 #---------------------------- Third party programs -----------------------# 487 def align(self, out_path=None): 488 """We align the sequences in the fasta file with muscle.""" 489 if out_path is None: out_path = self.prefix_path + '.aln' 490 sh.muscle38("-in", self.path, "-out", out_path) 491 from fasta.aligned import AlignedFASTA 492 return AlignedFASTA(out_path) 493 494 def template_align(self, ref_path): 495 """ 496 We align the sequences in the fasta file with mothur and a template. 497 """ 498 # Run it # 499 msg = "#align.seqs(candidate=%s, template=%s, search=blast," \ 500 "flip=false, processors=8);" 501 sh.mothur(msg % (self.path, ref_path)) 502 # Move things # 503 shutil.move(self.path[:-6] + '.align', self.p.aligned) 504 shutil.move(self.path[:-6] + '.align.report', self.p.report) 505 shutil.move(self.path[:-6] + '.flip.accnos', self.p.accnos) 506 # Clean up # 507 if os.path.exists('formatdb.log'): 508 os.remove('formatdb.log') 509 if os.path.exists('error.log') and os.path.getsize('error.log') == 0: 510 os.remove('error.log') 511 for path in sh.glob('mothur.*.logfile'): 512 os.remove(path) 513 # Return # 514 return self.p.aligned 515 516 def index_bowtie(self): 517 """Create an index on the fasta file compatible with bowtie2.""" 518 # It returns exit code 1 if the fasta is empty # 519 assert self 520 # Call the bowtie executable # 521 sh.bowtie2_build(self.path, self.path) 522 return FilePath(self.path + '.1.bt2') 523 524 def index_samtools(self): 525 """Create an index on the fasta file compatible with samtools.""" 526 sh.samtools('faidx', self.path) 527 return FilePath(self.path + '.fai') 528 529 #--------------------------------- Graphs --------------------------------# 530 @property_cached 531 def graphs(self): 532 """ 533 Sorry for the black magic. The result is an object whose attributes 534 are all the graphs found in `./graphs.py` initialized with this 535 instance as only argument. 536 """ 537 # Make a dummy object # 538 result = Dummy() 539 # Loop over graphs # 540 for graph in graphs.__all__: 541 cls = getattr(graphs, graph) 542 setattr(result, cls.short_name, cls(self)) 543 # Return # 544 return result 545 546 #-------------------------------- Primers -------------------------------# 547 def parse_primers(self, primers, mismatches=None): 548 """ 549 Takes care of identifying primers inside every sequence. 550 Instead of yielding Seq objects now we yield ReadWithPrimers objects. 551 These have extra properties that show the start and end positions 552 of all primers found. 553 """ 554 # Default is zero # 555 if mismatches is None: mismatches = 0 556 # Get the search expressions with mismatches # 557 from fasta.primers import PrimersRegexes 558 regexes = PrimersRegexes(primers, mismatches) 559 # Generate a new special object for every read # 560 from fasta.primers import ReadWithPrimers 561 read_with_primer = lambda read: ReadWithPrimers(read, regexes) 562 generator = (read_with_primer(r) for r in self.parse()) 563 # Add the length to the generator # 564 from plumbing.common import GenWithLength 565 generator = GenWithLength(generator, len(self)) 566 # Return # 567 return generator
A single FASTA file somewhere in the filesystem. You can read from it in several convenient ways. You can write to it in a automatically buffered way. There are several other things you can do with a FASTA file.
135 def create(self): 136 """Create the file on the file system.""" 137 self.buffer = [] 138 self.buf_count = 0 139 if not self.directory.exists: self.directory.create() 140 self.open('w') 141 return self
Create the file on the file system.
143 def add(self, seqs): 144 """Use this method to add a bunch of SeqRecords at once.""" 145 for seq in seqs: self.add_seq(seq)
Use this method to add a bunch of SeqRecords at once.
147 def add_seq(self, seq): 148 """Use this method to add a SeqRecord object to this fasta.""" 149 self.buffer.append(seq) 150 self.buf_count += 1 151 if self.buf_count % self.buffer_size == 0: self.flush()
Use this method to add a SeqRecord object to this fasta.
153 def add_str(self, seq, name=None, description=""): 154 """Use this method to add a sequence as a string to this fasta.""" 155 from Bio.SeqRecord import SeqRecord 156 from Bio.Seq import Seq 157 self.add_seq(SeqRecord(Seq(seq), id=name, description=description))
Use this method to add a sequence as a string to this fasta.
159 def add_fasta(self, path): 160 """Use this method to add an other fasta to this fasta.""" 161 path = FASTA(path) 162 self.add(path)
Use this method to add an other fasta to this fasta.
164 def add_fastas(self, paths): 165 """Use this method to add a bunch of fastas to this fasta.""" 166 for p in paths: self.add_fasta(p)
Use this method to add a bunch of fastas to this fasta.
168 def flush(self): 169 """Empty the buffer.""" 170 from Bio import SeqIO 171 for seq in self.buffer: 172 SeqIO.write(seq, self.handle, self.format) 173 self.buffer = []
Empty the buffer.
184 def compress(self, new_path=None, remove_orig=False, method='slow'): 185 """Turn this FASTA file into a gzipped FASTA file.""" 186 # Check we are not compressed already # 187 if self.gzipped: 188 msg = "The fasta file '%s' is already compressed." 189 raise Exception(msg % self.path) 190 # Pick the new path # 191 if new_path is None: new_path = self.path + '.gz' 192 # Do it the fast way or the slow way # 193 if method == 'fast': self.compress_fast(new_path) 194 else: self.compress_slow(new_path) 195 # Optionally remove the original uncompressed file # 196 if remove_orig: self.remove() 197 # Update the internal path # 198 self.path = new_path 199 # Return # 200 return self.path
Turn this FASTA file into a gzipped FASTA file.
202 def compress_slow(self, new_path): 203 """Do the compression internally via python.""" 204 with gzip.open(new_path, 'wb') as handle: 205 shutil.copyfileobj(self.open('rb'), handle)
Do the compression internally via python.
207 def compress_fast(self, new_path): 208 """Do the compression with an external shell command call.""" 209 # We don't want python to be buffering the text for speed # 210 from shell_command import shell_output 211 cmd = 'gzip --stdout %s > %s' % (self.path, new_path) 212 return shell_output(cmd)
Do the compression with an external shell command call.
223 def get_id(self, id_num): 224 """ 225 Extract one sequence from the file based on its ID. 226 This is highly ineffective. 227 Consider using the SQLite API instead or memory map the file. 228 """ 229 for seq in self: 230 if seq.id == id_num: return seq
Extract one sequence from the file based on its ID. This is highly ineffective. Consider using the SQLite API instead or memory map the file.
Another way of easily retrieving sequences. Also highly ineffective. Consider using the SQLite API instead.
If you access this attribute, we will build an SQLite database out of the FASTA file and you will be able access everything in an indexed fashion, and use the blaze library via sql.frame
In some use cases you just need the sequence lengths in an indexed fashion. If you access this attribute, we will make a hash map in memory.
265 def subsample(self, down_to=1, new_path=None, verbose=True): 266 """Pick a given number of sequences from the file pseudo-randomly.""" 267 # Pick the destination path # 268 if new_path is None: 269 subsampled = self.__class__(new_temp_path()) 270 elif isinstance(new_path, FASTA): 271 subsampled = new_path 272 else: 273 subsampled = self.__class__(new_path) 274 # Check size # 275 if down_to > len(self): 276 message = "Can't subsample %s down to %i. Only down to %i." 277 print(Color.ylw + message % (self, down_to, len(self)) + Color.end) 278 self.copy(new_path) 279 return 280 # Select verbosity # 281 import tqdm 282 if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count) 283 else: wrapper = lambda x: x 284 # Generator # 285 def iterator(): 286 for read in wrapper(isubsample(self, down_to)): 287 yield read 288 # Do it # 289 subsampled.write(iterator()) 290 # Did it work # 291 assert len(subsampled) == down_to 292 # Return # 293 return subsampled
Pick a given number of sequences from the file pseudo-randomly.
295 def rename_with_num(self, prefix="", new_path=None, remove_desc=True): 296 """Rename every sequence based on a prefix and a number.""" 297 # Temporary path # 298 if new_path is None: numbered = self.__class__(new_temp_path()) 299 else: numbered = self.__class__(new_path) 300 # Generator # 301 def numbered_iterator(): 302 for i,read in enumerate(self): 303 read.id = prefix + str(i) 304 read.seq = read.seq.upper() 305 if remove_desc: read.description = "" 306 yield read 307 # Do it # 308 numbered.write(numbered_iterator()) 309 # Replace it # 310 if new_path is None: 311 os.remove(self.path) 312 shutil.move(numbered, self.path) 313 # Return # 314 return numbered
Rename every sequence based on a prefix and a number.
316 def rename_with_prefix(self, prefix="", new_path=None, in_place=True, 317 remove_desc=True): 318 """Rename every sequence based on a prefix.""" 319 # Temporary path # 320 if new_path is None: prefixed = self.__class__(new_temp_path()) 321 else: prefixed = self.__class__(new_path) 322 # Generator # 323 def prefixed_iterator(): 324 for i,read in enumerate(self): 325 read.id = prefix + read.id 326 if remove_desc: read.description = "" 327 yield read 328 # Do it # 329 prefixed.write(prefixed_iterator()) 330 # Replace it # 331 if in_place: 332 os.remove(self.path) 333 shutil.move(prefixed, self.path) 334 # Return # 335 return prefixed
Rename every sequence based on a prefix.
337 def rename_sequences(self, mapping, new_path=None, in_place=False): 338 """ 339 Will rename all sequences in the current fasta file using 340 the mapping dictionary also provided. In place or at a new path. 341 """ 342 # Where is the new file # 343 if new_path is None: new_fasta = self.__class__(new_temp_path()) 344 else: new_fasta = self.__class__(new_path) 345 # Do it # 346 new_fasta.create() 347 for seq in self: 348 new_name = mapping[seq.description] 349 nucleotides = str(seq.seq) 350 new_fasta.add_str(nucleotides, new_name) 351 new_fasta.close() 352 # Return # 353 if in_place: 354 os.remove(self.path) 355 shutil.move(new_fasta, self.path) 356 return self 357 else: return new_fasta
Will rename all sequences in the current fasta file using the mapping dictionary also provided. In place or at a new path.
359 def extract_length(self, lower_bound=None, 360 upper_bound=None, 361 new_path=None): 362 """Extract a certain length fraction and place them in a new file.""" 363 # Temporary path # 364 if new_path is None: fraction = self.__class__(new_temp_path()) 365 elif isinstance(new_path, FASTA): fraction = new_path 366 else: fraction = self.__class__(new_path) 367 # Generator # 368 if lower_bound is None: lower_bound = 0 369 if upper_bound is None: upper_bound = sys.maxsize 370 def fraction_iterator(): 371 for read in self: 372 if lower_bound <= len(read) <= upper_bound: 373 yield read 374 # Do it # 375 fraction.write(fraction_iterator()) 376 # Return # 377 return fraction
Extract a certain length fraction and place them in a new file.
379 def extract_sequences(self, ids, 380 new_path = None, 381 in_place = False, 382 verbose = False): 383 """ 384 Will take all the sequences from the current file who's id appears in 385 the ids given and place them in a new file. 386 If no path is given, a new temporary path is created and returned. 387 If `in_place` is set to True, the original file is removed and replaced 388 with the result of the extraction. 389 Optionally, the argument `ids` can be a function which has to take 390 one string as only input and return True for keeping the sequence and 391 False for discarding the sequence. 392 """ 393 # Temporary path # 394 if new_path is None: new_fasta = self.__class__(new_temp_path()) 395 elif isinstance(new_path, FASTA): new_fasta = new_path 396 else: new_fasta = self.__class__(new_path) 397 # Select verbosity # 398 import tqdm 399 wrapper = tqdm.tqdm if verbose else lambda x: x 400 # Simple generator # 401 def simple_match(reads): 402 for r in wrapper(reads): 403 if r.id in ids: yield r 404 # Generator with function # 405 def function_match(reads): 406 for r in wrapper(reads): 407 if ids(r.id): yield r 408 # Do it # 409 if callable(ids): 410 new_fasta.write(function_match(self)) 411 else: 412 new_fasta.write(simple_match(self)) 413 # Return # 414 if in_place: 415 os.remove(self.path) 416 shutil.move(new_fasta, self.path) 417 return self 418 else: return new_fasta
Will take all the sequences from the current file who's id appears in
the ids given and place them in a new file.
If no path is given, a new temporary path is created and returned.
If in_place
is set to True, the original file is removed and replaced
with the result of the extraction.
Optionally, the argument ids
can be a function which has to take
one string as only input and return True for keeping the sequence and
False for discarding the sequence.
420 def remove_trailing_stars(self, new_path=None, in_place=True, check=False): 421 """ 422 Remove the bad character that can be inserted by some programs at the 423 end of sequences. 424 """ 425 # Optional check # 426 if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0: 427 return self 428 # Faster with bash utilities # 429 if in_place is True: 430 sh.sed('-i', 's/\\*$//g', self.path) 431 return self 432 # Standard way # 433 if new_path is None: new_fasta = self.__class__(new_temp_path()) 434 else: new_fasta = self.__class__(new_path) 435 new_fasta.create() 436 for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id) 437 new_fasta.close() 438 # Return # 439 return new_fasta
Remove the bad character that can be inserted by some programs at the end of sequences.
461 def remove_duplicates(self, new_path=None, in_place=True): 462 """ 463 If several entries have the same ID in the FASTA file, keep only the 464 first appearance and remove all the others. 465 """ 466 # Generator # 467 def unique_entries(): 468 seen = set() 469 for i, read in enumerate(self): 470 if read.id in seen: continue 471 else: 472 seen.add(read.id) 473 yield read 474 # Return # 475 return self._generator_mod(unique_entries, new_path, in_place)
If several entries have the same ID in the FASTA file, keep only the first appearance and remove all the others.
487 def align(self, out_path=None): 488 """We align the sequences in the fasta file with muscle.""" 489 if out_path is None: out_path = self.prefix_path + '.aln' 490 sh.muscle38("-in", self.path, "-out", out_path) 491 from fasta.aligned import AlignedFASTA 492 return AlignedFASTA(out_path)
We align the sequences in the fasta file with muscle.
494 def template_align(self, ref_path): 495 """ 496 We align the sequences in the fasta file with mothur and a template. 497 """ 498 # Run it # 499 msg = "#align.seqs(candidate=%s, template=%s, search=blast," \ 500 "flip=false, processors=8);" 501 sh.mothur(msg % (self.path, ref_path)) 502 # Move things # 503 shutil.move(self.path[:-6] + '.align', self.p.aligned) 504 shutil.move(self.path[:-6] + '.align.report', self.p.report) 505 shutil.move(self.path[:-6] + '.flip.accnos', self.p.accnos) 506 # Clean up # 507 if os.path.exists('formatdb.log'): 508 os.remove('formatdb.log') 509 if os.path.exists('error.log') and os.path.getsize('error.log') == 0: 510 os.remove('error.log') 511 for path in sh.glob('mothur.*.logfile'): 512 os.remove(path) 513 # Return # 514 return self.p.aligned
We align the sequences in the fasta file with mothur and a template.
516 def index_bowtie(self): 517 """Create an index on the fasta file compatible with bowtie2.""" 518 # It returns exit code 1 if the fasta is empty # 519 assert self 520 # Call the bowtie executable # 521 sh.bowtie2_build(self.path, self.path) 522 return FilePath(self.path + '.1.bt2')
Create an index on the fasta file compatible with bowtie2.
524 def index_samtools(self): 525 """Create an index on the fasta file compatible with samtools.""" 526 sh.samtools('faidx', self.path) 527 return FilePath(self.path + '.fai')
Create an index on the fasta file compatible with samtools.
Sorry for the black magic. The result is an object whose attributes
are all the graphs found in ./graphs.py
initialized with this
instance as only argument.
547 def parse_primers(self, primers, mismatches=None): 548 """ 549 Takes care of identifying primers inside every sequence. 550 Instead of yielding Seq objects now we yield ReadWithPrimers objects. 551 These have extra properties that show the start and end positions 552 of all primers found. 553 """ 554 # Default is zero # 555 if mismatches is None: mismatches = 0 556 # Get the search expressions with mismatches # 557 from fasta.primers import PrimersRegexes 558 regexes = PrimersRegexes(primers, mismatches) 559 # Generate a new special object for every read # 560 from fasta.primers import ReadWithPrimers 561 read_with_primer = lambda read: ReadWithPrimers(read, regexes) 562 generator = (read_with_primer(r) for r in self.parse()) 563 # Add the length to the generator # 564 from plumbing.common import GenWithLength 565 generator = GenWithLength(generator, len(self)) 566 # Return # 567 return generator
Takes care of identifying primers inside every sequence. Instead of yielding Seq objects now we yield ReadWithPrimers objects. These have extra properties that show the start and end positions of all primers found.
Inherited Members
- autopaths.base_path.BasePath
- BasePath
- clean_path
- path
- short_prefix
- extension
- escaped
- absolute_path
- physical_path
- relative_path
- rel_path_from
- exists
- is_symlink
- permissions
- mdate
- mdate_iso
- cdate
- cdate_iso
- unix_style
- wsl_style
- win_style
- with_tilda
- with_home
- link_from
- link_to
- symlinks_on_linux
- symlinks_on_windows
- hard_link_win_to
- autopaths.file_path.FilePath
- prefix_path
- prefix
- name
- filename
- directory
- count_bytes
- size
- contents
- contents_utf8
- md5
- might_be_binary
- contains_binary
- magic_number
- lines
- read
- touch
- writelines
- remove
- copy
- execute
- replace_extension
- new_name_insert
- make_directory
- must_exist
- head
- pretty_head
- tail
- pretty_tail
- move_to
- rename
- gzip_to
- gzip_internal
- gzip_external
- gzip_pigz
- ungzip_to
- ungzip_internal
- ungzip_external
- zip_to
- unzip_to
- untar_to
- untargz_to
- untargz_to_internal
- untargz_to_external
- append
- prepend
- remove_line
- remove_first_line
- replace_line
- replace_word
- sed_replace
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format_map
- maketrans