fasta.core

Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio

  1#!/usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3
  4"""
  5Written by Lucas Sinclair.
  6MIT Licensed.
  7Contact at www.sinclair.bio
  8"""
  9
 10# Built-in modules #
 11import os, sys, io, gzip, shutil, itertools, platform
 12from collections import Counter, OrderedDict
 13from six import string_types
 14
 15# Internal modules #
 16from fasta import graphs
 17
 18# First party modules #
 19from plumbing.common     import isubsample
 20from plumbing.color      import Color
 21from plumbing.cache      import property_cached
 22from autopaths.file_path import FilePath
 23from autopaths.tmp_path  import new_temp_path
 24
 25# Third party modules #
 26from tqdm import tqdm
 27if platform.system() == 'Windows': import pbs3 as sh
 28else: import sh
 29
 30# Constants #
 31class Dummy: pass
 32
 33###############################################################################
 34class FASTA(FilePath):
 35    """
 36    A single FASTA file somewhere in the filesystem. You can read from it in
 37    several convenient ways. You can write to it in a automatically buffered
 38    way. There are several other things you can do with a FASTA file.
 39    """
 40
 41    format      = 'fasta'
 42    ext         = 'fasta'
 43    buffer_size = 1000
 44
 45    def __len__(self): return self.count
 46
 47    def __repr__(self):
 48        return '<%s object on "%s">' % (self.__class__.__name__, self.path)
 49
 50    def __contains__(self, other): return other in self.ids
 51
 52    def __enter__(self): return self.create()
 53
 54    def __exit__(self, exc_type, exc_value, traceback): self.close()
 55
 56    def __iter__(self):
 57        for seq in self.parse(): yield seq
 58        self.close()
 59
 60    def __getitem__(self, key):
 61        if   isinstance(key, string_types): return self.sequences[key]
 62        elif isinstance(key, int):          return self.sequences.items()[key]
 63        elif isinstance(key, slice):
 64            return itertools.islice(self, key.start, key.stop, key.step)
 65
 66    #----------------------------- Properties --------------------------------#
 67    @property
 68    def gzipped(self): return True if self.path.endswith('gz') else False
 69
 70    @property
 71    def first(self):
 72        """Just the first sequence."""
 73        from Bio import SeqIO
 74        self.open()
 75        seq = next(SeqIO.parse(self.handle, self.format))
 76        self.close()
 77        return seq
 78
 79    @property_cached
 80    def count(self):
 81        """
 82        Should probably check for file size changes instead of just
 83        caching once TODO.
 84        """
 85        # For debugging purposes #
 86        if False: print("-> counting reads in `%s`" % self.path)
 87        # If we are gzipped we can just use zgrep #
 88        if self.gzipped:
 89            return int(sh.zgrep('-c', "^>", self.path, _ok_code=[0,1]))
 90        else:
 91            return int(sh.grep('-c', "^>", self.path, _ok_code=[0,1]))
 92
 93    @property
 94    def lengths(self):
 95        """All the lengths, one by one, in a list."""
 96        return map(len, self)
 97
 98    @property_cached
 99    def lengths_counter(self):
100        """A Counter() object with all the lengths inside."""
101        return Counter((len(s) for s in self.parse()))
102
103    #-------------------------- Basic IO methods -----------------------------#
104    def open(self, mode='r'):
105        # Two cases #
106        if self.gzipped:
107            self.handle = gzip.open(self.path, mode)
108            self.handle = io.TextIOWrapper(self.handle, encoding='utf8')
109        else:
110            self.handle = open(self.path, mode)
111        # For convenience #
112        return self.handle
113
114    def close(self):
115        # Case we were writing to the file #
116        if hasattr(self, 'buffer'):
117            self.flush()
118            del self.buffer
119        # Standard case #
120        self.handle.close()
121        # For pickling purposes (can't use dill on gzip handles) #
122        del self.handle
123
124    def parse(self):
125        self.open()
126        from Bio import SeqIO
127        return SeqIO.parse(self.handle, self.format)
128
129    @property
130    def progress(self):
131        """Just like self.parse() but will display a progress bar."""
132        return tqdm(self, total=len(self))
133
134    def create(self):
135        """Create the file on the file system."""
136        self.buffer = []
137        self.buf_count = 0
138        if not self.directory.exists: self.directory.create()
139        self.open('w')
140        return self
141
142    def add(self, seqs):
143        """Use this method to add a bunch of SeqRecords at once."""
144        for seq in seqs: self.add_seq(seq)
145
146    def add_seq(self, seq):
147        """Use this method to add a SeqRecord object to this fasta."""
148        self.buffer.append(seq)
149        self.buf_count += 1
150        if self.buf_count % self.buffer_size == 0: self.flush()
151
152    def add_str(self, seq, name=None, description=""):
153        """Use this method to add a sequence as a string to this fasta."""
154        from Bio.SeqRecord import SeqRecord
155        from Bio.Seq import Seq
156        self.add_seq(SeqRecord(Seq(seq), id=name, description=description))
157
158    def add_fasta(self, path):
159        """Use this method to add an other fasta to this fasta."""
160        path = FASTA(path)
161        self.add(path)
162
163    def add_fastas(self, paths):
164        """Use this method to add a bunch of fastas to this fasta."""
165        for p in paths: self.add_fasta(p)
166
167    def flush(self):
168        """Empty the buffer."""
169        from Bio import SeqIO
170        for seq in self.buffer:
171            SeqIO.write(seq, self.handle, self.format)
172        self.buffer = []
173
174    def write(self, reads):
175        from Bio import SeqIO
176        if not self.directory.exists: self.directory.create()
177        self.open('w')
178        SeqIO.write(reads, self.handle, self.format)
179        self.close()
180        return self
181
182    #-------------------------- Compressing the data -------------------------#
183    def compress(self, new_path=None, remove_orig=False, method='slow'):
184        """Turn this FASTA file into a gzipped FASTA file."""
185        # Check we are not compressed already #
186        if self.gzipped:
187            msg = "The fasta file '%s' is already compressed."
188            raise Exception(msg % self.path)
189        # Pick the new path #
190        if new_path is None: new_path = self.path + '.gz'
191        # Do it the fast way or the slow way #
192        if method == 'fast': self.compress_fast(new_path)
193        else:                self.compress_slow(new_path)
194        # Optionally remove the original uncompressed file #
195        if remove_orig: self.remove()
196        # Update the internal path #
197        self.path = new_path
198        # Return #
199        return self.path
200
201    def compress_slow(self, new_path):
202        """Do the compression internally via python."""
203        with gzip.open(new_path, 'wb') as handle:
204            shutil.copyfileobj(self.open('rb'), handle)
205
206    def compress_fast(self, new_path):
207        """Do the compression with an external shell command call."""
208        # We don't want python to be buffering the text for speed #
209        from shell_command import shell_output
210        cmd = 'gzip --stdout %s > %s' % (self.path, new_path)
211        return shell_output(cmd)
212
213    #------------------------- When IDs are important ------------------------#
214    @property_cached
215    def ids(self):
216        """A frozen set of all unique IDs in the file."""
217        as_list = [seq.description.split()[0] for seq in self]
218        as_set = frozenset(as_list)
219        assert len(as_set) == len(as_list)
220        return as_set
221
222    def get_id(self, id_num):
223        """
224        Extract one sequence from the file based on its ID.
225        This is highly ineffective.
226        Consider using the SQLite API instead or memory map the file.
227        """
228        for seq in self:
229            if seq.id == id_num: return seq
230
231    @property_cached
232    def sequences(self):
233        """
234        Another way of easily retrieving sequences. Also highly ineffective.
235        Consider using the SQLite API instead.
236        """
237        return OrderedDict(((seq.id, seq) for seq in self))
238
239    @property_cached
240    def sql(self):
241        """
242        If you access this attribute, we will build an SQLite database
243        out of the FASTA file and you will be able access everything in an
244        indexed fashion, and use the blaze library via sql.frame
245        """
246        from fasta.indexed import DatabaseFASTA, fasta_to_sql
247        db = DatabaseFASTA(self.prefix_path + ".db")
248        if not db.exists: fasta_to_sql(self.path, db.path)
249        return db
250
251    @property_cached
252    def length_by_id(self):
253        """
254        In some use cases you just need the sequence lengths in an indexed
255        fashion. If you access this attribute, we will make a hash map in
256        memory.
257        """
258        hash_map = dict((seq.id, len(seq)) for seq in self)
259        tmp = hash_map.copy()
260        hash_map.update(tmp)
261        return hash_map
262
263    #----------------- Ways of interacting with the data --------------------#
264    def subsample(self, down_to=1, new_path=None, verbose=True):
265        """Pick a given number of sequences from the file pseudo-randomly."""
266        # Pick the destination path #
267        if new_path is None:
268            subsampled = self.__class__(new_temp_path())
269        elif isinstance(new_path, FASTA):
270            subsampled = new_path
271        else:
272            subsampled = self.__class__(new_path)
273        # Check size #
274        if down_to > len(self):
275            message = "Can't subsample %s down to %i. Only down to %i."
276            print(Color.ylw + message % (self, down_to, len(self)) + Color.end)
277            self.copy(new_path)
278            return
279        # Select verbosity #
280        import tqdm
281        if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count)
282        else: wrapper = lambda x: x
283        # Generator #
284        def iterator():
285            for read in wrapper(isubsample(self, down_to)):
286                yield read
287        # Do it #
288        subsampled.write(iterator())
289        # Did it work #
290        assert len(subsampled) == down_to
291        # Return #
292        return subsampled
293
294    def rename_with_num(self, prefix="", new_path=None, remove_desc=True):
295        """Rename every sequence based on a prefix and a number."""
296        # Temporary path #
297        if new_path is None: numbered = self.__class__(new_temp_path())
298        else:                numbered = self.__class__(new_path)
299        # Generator #
300        def numbered_iterator():
301            for i,read in enumerate(self):
302                read.id  = prefix + str(i)
303                read.seq = read.seq.upper()
304                if remove_desc: read.description = ""
305                yield read
306        # Do it #
307        numbered.write(numbered_iterator())
308        # Replace it #
309        if new_path is None:
310            os.remove(self.path)
311            shutil.move(numbered, self.path)
312        # Return #
313        return numbered
314
315    def rename_with_prefix(self, prefix="", new_path=None, in_place=True,
316                           remove_desc=True):
317        """Rename every sequence based on a prefix."""
318        # Temporary path #
319        if new_path is None: prefixed = self.__class__(new_temp_path())
320        else:                prefixed = self.__class__(new_path)
321        # Generator #
322        def prefixed_iterator():
323            for i,read in enumerate(self):
324                read.id = prefix + read.id
325                if remove_desc: read.description = ""
326                yield read
327        # Do it #
328        prefixed.write(prefixed_iterator())
329        # Replace it #
330        if in_place:
331            os.remove(self.path)
332            shutil.move(prefixed, self.path)
333        # Return #
334        return prefixed
335
336    def rename_sequences(self, mapping, new_path=None, in_place=False):
337        """
338        Will rename all sequences in the current fasta file using
339        the mapping dictionary also provided. In place or at a new path.
340        """
341        # Where is the new file #
342        if new_path is None: new_fasta = self.__class__(new_temp_path())
343        else:                new_fasta = self.__class__(new_path)
344        # Do it #
345        new_fasta.create()
346        for seq in self:
347            new_name = mapping[seq.description]
348            nucleotides = str(seq.seq)
349            new_fasta.add_str(nucleotides, new_name)
350        new_fasta.close()
351        # Return #
352        if in_place:
353            os.remove(self.path)
354            shutil.move(new_fasta, self.path)
355            return self
356        else: return new_fasta
357
358    def extract_length(self, lower_bound=None,
359                             upper_bound=None,
360                             new_path=None):
361        """Extract a certain length fraction and place them in a new file."""
362        # Temporary path #
363        if new_path is None: fraction = self.__class__(new_temp_path())
364        elif isinstance(new_path, FASTA): fraction = new_path
365        else:                fraction = self.__class__(new_path)
366        # Generator #
367        if lower_bound is None: lower_bound = 0
368        if upper_bound is None: upper_bound = sys.maxsize
369        def fraction_iterator():
370            for read in self:
371                if lower_bound <= len(read) <= upper_bound:
372                    yield read
373        # Do it #
374        fraction.write(fraction_iterator())
375        # Return #
376        return fraction
377
378    def extract_sequences(self, ids,
379                          new_path = None,
380                          in_place = False,
381                          verbose  = False):
382        """
383        Will take all the sequences from the current file who's id appears in
384        the ids given and place them in a new file.
385        If no path is given, a new temporary path is created and returned.
386        If `in_place` is set to True, the original file is removed and replaced
387        with the result of the extraction.
388        Optionally, the argument `ids` can be a function which has to take
389        one string as only input and return True for keeping the sequence and
390        False for discarding the sequence.
391        """
392        # Temporary path #
393        if new_path is None: new_fasta = self.__class__(new_temp_path())
394        elif isinstance(new_path, FASTA): new_fasta = new_path
395        else:                new_fasta = self.__class__(new_path)
396        # Select verbosity #
397        import tqdm
398        wrapper = tqdm.tqdm if verbose else lambda x: x
399        # Simple generator #
400        def simple_match(reads):
401            for r in wrapper(reads):
402                if r.id in ids: yield r
403        # Generator with function #
404        def function_match(reads):
405            for r in wrapper(reads):
406                if ids(r.id): yield r
407        # Do it #
408        if callable(ids):
409            new_fasta.write(function_match(self))
410        else:
411            new_fasta.write(simple_match(self))
412        # Return #
413        if in_place:
414            os.remove(self.path)
415            shutil.move(new_fasta, self.path)
416            return self
417        else: return new_fasta
418
419    def remove_trailing_stars(self, new_path=None, in_place=True, check=False):
420        """
421        Remove the bad character that can be inserted by some programs at the
422        end of sequences.
423        """
424        # Optional check #
425        if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0:
426            return self
427        # Faster with bash utilities #
428        if in_place is True:
429            sh.sed('-i', 's/\\*$//g', self.path)
430            return self
431        # Standard way #
432        if new_path is None: new_fasta = self.__class__(new_temp_path())
433        else:                new_fasta = self.__class__(new_path)
434        new_fasta.create()
435        for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id)
436        new_fasta.close()
437        # Return #
438        return new_fasta
439
440    def _generator_mod(self, generator, new_path=None, in_place=True):
441        """
442        Generic way of modifying the current fasta either in place or
443        with a new destination pass.
444        Simply, pass a generator function that will yield the new sequences
445        given the current ones.
446        """
447        # Temporary path #
448        if new_path is None: new_fasta = self.__class__(new_temp_path())
449        elif isinstance(new_path, FASTA): new_fasta = new_path
450        else: new_fasta = self.__class__(new_path)
451        # Do it #
452        new_fasta.write(generator())
453        # Return #
454        if in_place:
455            os.remove(self.path)
456            shutil.move(new_fasta, self.path)
457            return self
458        else: return new_fasta
459
460    def remove_duplicates(self, new_path=None, in_place=True):
461        """
462        If several entries have the same ID in the FASTA file, keep only the
463        first appearance and remove all the others.
464        """
465        # Generator #
466        def unique_entries():
467            seen = set()
468            for i, read in enumerate(self):
469                if read.id in seen: continue
470                else:
471                    seen.add(read.id)
472                    yield read
473        # Return #
474        return self._generator_mod(unique_entries, new_path, in_place)
475
476    def convert_U_to_T(self, new_path=None, in_place=True):
477        # Generator #
478        def all_U_to_T():
479            for i, read in enumerate(self):
480                read.seq = read.seq.back_transcribe()
481                yield read
482        # Return #
483        return self._generator_mod(all_U_to_T, new_path, in_place)
484
485    #---------------------------- Third party programs -----------------------#
486    def align(self, out_path=None):
487        """We align the sequences in the fasta file with muscle."""
488        if out_path is None: out_path = self.prefix_path + '.aln'
489        sh.muscle38("-in", self.path, "-out", out_path)
490        from fasta.aligned import AlignedFASTA
491        return AlignedFASTA(out_path)
492
493    def template_align(self, ref_path):
494        """
495        We align the sequences in the fasta file with mothur and a template.
496        """
497        # Run it #
498        msg = "#align.seqs(candidate=%s, template=%s, search=blast," \
499              "flip=false, processors=8);"
500        sh.mothur(msg % (self.path, ref_path))
501        # Move things #
502        shutil.move(self.path[:-6] + '.align',        self.p.aligned)
503        shutil.move(self.path[:-6] + '.align.report', self.p.report)
504        shutil.move(self.path[:-6] + '.flip.accnos',  self.p.accnos)
505        # Clean up #
506        if os.path.exists('formatdb.log'):
507            os.remove('formatdb.log')
508        if os.path.exists('error.log') and os.path.getsize('error.log') == 0:
509            os.remove('error.log')
510        for path in sh.glob('mothur.*.logfile'):
511            os.remove(path)
512        # Return #
513        return self.p.aligned
514
515    def index_bowtie(self):
516        """Create an index on the fasta file compatible with bowtie2."""
517        # It returns exit code 1 if the fasta is empty #
518        assert self
519        # Call the bowtie executable #
520        sh.bowtie2_build(self.path, self.path)
521        return FilePath(self.path + '.1.bt2')
522
523    def index_samtools(self):
524        """Create an index on the fasta file compatible with samtools."""
525        sh.samtools('faidx', self.path)
526        return FilePath(self.path + '.fai')
527
528    #--------------------------------- Graphs --------------------------------#
529    @property_cached
530    def graphs(self):
531        """
532        Sorry for the black magic. The result is an object whose attributes
533        are all the graphs found in `./graphs.py` initialized with this
534        instance as only argument.
535        """
536        # Make a dummy object #
537        result = Dummy()
538        # Loop over graphs #
539        for graph in graphs.__all__:
540            cls = getattr(graphs, graph)
541            setattr(result, cls.short_name, cls(self))
542        # Return #
543        return result
544
545    #-------------------------------- Primers -------------------------------#
546    def parse_primers(self, primers, mismatches=None):
547        """
548        Takes care of identifying primers inside every sequence.
549        Instead of yielding Seq objects now we yield ReadWithPrimers objects.
550        These have extra properties that show the start and end positions
551        of all primers found.
552        """
553        # Default is zero #
554        if mismatches is None: mismatches = 0
555        # Get the search expressions with mismatches #
556        from fasta.primers import PrimersRegexes
557        regexes = PrimersRegexes(primers, mismatches)
558        # Generate a new special object for every read #
559        from fasta.primers import ReadWithPrimers
560        read_with_primer = lambda read: ReadWithPrimers(read, regexes)
561        generator = (read_with_primer(r) for r in self.parse())
562        # Add the length to the generator #
563        from plumbing.common import GenWithLength
564        generator = GenWithLength(generator, len(self))
565        # Return #
566        return generator
class Dummy:
32class Dummy: pass
class FASTA(autopaths.file_path.FilePath):
 35class FASTA(FilePath):
 36    """
 37    A single FASTA file somewhere in the filesystem. You can read from it in
 38    several convenient ways. You can write to it in a automatically buffered
 39    way. There are several other things you can do with a FASTA file.
 40    """
 41
 42    format      = 'fasta'
 43    ext         = 'fasta'
 44    buffer_size = 1000
 45
 46    def __len__(self): return self.count
 47
 48    def __repr__(self):
 49        return '<%s object on "%s">' % (self.__class__.__name__, self.path)
 50
 51    def __contains__(self, other): return other in self.ids
 52
 53    def __enter__(self): return self.create()
 54
 55    def __exit__(self, exc_type, exc_value, traceback): self.close()
 56
 57    def __iter__(self):
 58        for seq in self.parse(): yield seq
 59        self.close()
 60
 61    def __getitem__(self, key):
 62        if   isinstance(key, string_types): return self.sequences[key]
 63        elif isinstance(key, int):          return self.sequences.items()[key]
 64        elif isinstance(key, slice):
 65            return itertools.islice(self, key.start, key.stop, key.step)
 66
 67    #----------------------------- Properties --------------------------------#
 68    @property
 69    def gzipped(self): return True if self.path.endswith('gz') else False
 70
 71    @property
 72    def first(self):
 73        """Just the first sequence."""
 74        from Bio import SeqIO
 75        self.open()
 76        seq = next(SeqIO.parse(self.handle, self.format))
 77        self.close()
 78        return seq
 79
 80    @property_cached
 81    def count(self):
 82        """
 83        Should probably check for file size changes instead of just
 84        caching once TODO.
 85        """
 86        # For debugging purposes #
 87        if False: print("-> counting reads in `%s`" % self.path)
 88        # If we are gzipped we can just use zgrep #
 89        if self.gzipped:
 90            return int(sh.zgrep('-c', "^>", self.path, _ok_code=[0,1]))
 91        else:
 92            return int(sh.grep('-c', "^>", self.path, _ok_code=[0,1]))
 93
 94    @property
 95    def lengths(self):
 96        """All the lengths, one by one, in a list."""
 97        return map(len, self)
 98
 99    @property_cached
100    def lengths_counter(self):
101        """A Counter() object with all the lengths inside."""
102        return Counter((len(s) for s in self.parse()))
103
104    #-------------------------- Basic IO methods -----------------------------#
105    def open(self, mode='r'):
106        # Two cases #
107        if self.gzipped:
108            self.handle = gzip.open(self.path, mode)
109            self.handle = io.TextIOWrapper(self.handle, encoding='utf8')
110        else:
111            self.handle = open(self.path, mode)
112        # For convenience #
113        return self.handle
114
115    def close(self):
116        # Case we were writing to the file #
117        if hasattr(self, 'buffer'):
118            self.flush()
119            del self.buffer
120        # Standard case #
121        self.handle.close()
122        # For pickling purposes (can't use dill on gzip handles) #
123        del self.handle
124
125    def parse(self):
126        self.open()
127        from Bio import SeqIO
128        return SeqIO.parse(self.handle, self.format)
129
130    @property
131    def progress(self):
132        """Just like self.parse() but will display a progress bar."""
133        return tqdm(self, total=len(self))
134
135    def create(self):
136        """Create the file on the file system."""
137        self.buffer = []
138        self.buf_count = 0
139        if not self.directory.exists: self.directory.create()
140        self.open('w')
141        return self
142
143    def add(self, seqs):
144        """Use this method to add a bunch of SeqRecords at once."""
145        for seq in seqs: self.add_seq(seq)
146
147    def add_seq(self, seq):
148        """Use this method to add a SeqRecord object to this fasta."""
149        self.buffer.append(seq)
150        self.buf_count += 1
151        if self.buf_count % self.buffer_size == 0: self.flush()
152
153    def add_str(self, seq, name=None, description=""):
154        """Use this method to add a sequence as a string to this fasta."""
155        from Bio.SeqRecord import SeqRecord
156        from Bio.Seq import Seq
157        self.add_seq(SeqRecord(Seq(seq), id=name, description=description))
158
159    def add_fasta(self, path):
160        """Use this method to add an other fasta to this fasta."""
161        path = FASTA(path)
162        self.add(path)
163
164    def add_fastas(self, paths):
165        """Use this method to add a bunch of fastas to this fasta."""
166        for p in paths: self.add_fasta(p)
167
168    def flush(self):
169        """Empty the buffer."""
170        from Bio import SeqIO
171        for seq in self.buffer:
172            SeqIO.write(seq, self.handle, self.format)
173        self.buffer = []
174
175    def write(self, reads):
176        from Bio import SeqIO
177        if not self.directory.exists: self.directory.create()
178        self.open('w')
179        SeqIO.write(reads, self.handle, self.format)
180        self.close()
181        return self
182
183    #-------------------------- Compressing the data -------------------------#
184    def compress(self, new_path=None, remove_orig=False, method='slow'):
185        """Turn this FASTA file into a gzipped FASTA file."""
186        # Check we are not compressed already #
187        if self.gzipped:
188            msg = "The fasta file '%s' is already compressed."
189            raise Exception(msg % self.path)
190        # Pick the new path #
191        if new_path is None: new_path = self.path + '.gz'
192        # Do it the fast way or the slow way #
193        if method == 'fast': self.compress_fast(new_path)
194        else:                self.compress_slow(new_path)
195        # Optionally remove the original uncompressed file #
196        if remove_orig: self.remove()
197        # Update the internal path #
198        self.path = new_path
199        # Return #
200        return self.path
201
202    def compress_slow(self, new_path):
203        """Do the compression internally via python."""
204        with gzip.open(new_path, 'wb') as handle:
205            shutil.copyfileobj(self.open('rb'), handle)
206
207    def compress_fast(self, new_path):
208        """Do the compression with an external shell command call."""
209        # We don't want python to be buffering the text for speed #
210        from shell_command import shell_output
211        cmd = 'gzip --stdout %s > %s' % (self.path, new_path)
212        return shell_output(cmd)
213
214    #------------------------- When IDs are important ------------------------#
215    @property_cached
216    def ids(self):
217        """A frozen set of all unique IDs in the file."""
218        as_list = [seq.description.split()[0] for seq in self]
219        as_set = frozenset(as_list)
220        assert len(as_set) == len(as_list)
221        return as_set
222
223    def get_id(self, id_num):
224        """
225        Extract one sequence from the file based on its ID.
226        This is highly ineffective.
227        Consider using the SQLite API instead or memory map the file.
228        """
229        for seq in self:
230            if seq.id == id_num: return seq
231
232    @property_cached
233    def sequences(self):
234        """
235        Another way of easily retrieving sequences. Also highly ineffective.
236        Consider using the SQLite API instead.
237        """
238        return OrderedDict(((seq.id, seq) for seq in self))
239
240    @property_cached
241    def sql(self):
242        """
243        If you access this attribute, we will build an SQLite database
244        out of the FASTA file and you will be able access everything in an
245        indexed fashion, and use the blaze library via sql.frame
246        """
247        from fasta.indexed import DatabaseFASTA, fasta_to_sql
248        db = DatabaseFASTA(self.prefix_path + ".db")
249        if not db.exists: fasta_to_sql(self.path, db.path)
250        return db
251
252    @property_cached
253    def length_by_id(self):
254        """
255        In some use cases you just need the sequence lengths in an indexed
256        fashion. If you access this attribute, we will make a hash map in
257        memory.
258        """
259        hash_map = dict((seq.id, len(seq)) for seq in self)
260        tmp = hash_map.copy()
261        hash_map.update(tmp)
262        return hash_map
263
264    #----------------- Ways of interacting with the data --------------------#
265    def subsample(self, down_to=1, new_path=None, verbose=True):
266        """Pick a given number of sequences from the file pseudo-randomly."""
267        # Pick the destination path #
268        if new_path is None:
269            subsampled = self.__class__(new_temp_path())
270        elif isinstance(new_path, FASTA):
271            subsampled = new_path
272        else:
273            subsampled = self.__class__(new_path)
274        # Check size #
275        if down_to > len(self):
276            message = "Can't subsample %s down to %i. Only down to %i."
277            print(Color.ylw + message % (self, down_to, len(self)) + Color.end)
278            self.copy(new_path)
279            return
280        # Select verbosity #
281        import tqdm
282        if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count)
283        else: wrapper = lambda x: x
284        # Generator #
285        def iterator():
286            for read in wrapper(isubsample(self, down_to)):
287                yield read
288        # Do it #
289        subsampled.write(iterator())
290        # Did it work #
291        assert len(subsampled) == down_to
292        # Return #
293        return subsampled
294
295    def rename_with_num(self, prefix="", new_path=None, remove_desc=True):
296        """Rename every sequence based on a prefix and a number."""
297        # Temporary path #
298        if new_path is None: numbered = self.__class__(new_temp_path())
299        else:                numbered = self.__class__(new_path)
300        # Generator #
301        def numbered_iterator():
302            for i,read in enumerate(self):
303                read.id  = prefix + str(i)
304                read.seq = read.seq.upper()
305                if remove_desc: read.description = ""
306                yield read
307        # Do it #
308        numbered.write(numbered_iterator())
309        # Replace it #
310        if new_path is None:
311            os.remove(self.path)
312            shutil.move(numbered, self.path)
313        # Return #
314        return numbered
315
316    def rename_with_prefix(self, prefix="", new_path=None, in_place=True,
317                           remove_desc=True):
318        """Rename every sequence based on a prefix."""
319        # Temporary path #
320        if new_path is None: prefixed = self.__class__(new_temp_path())
321        else:                prefixed = self.__class__(new_path)
322        # Generator #
323        def prefixed_iterator():
324            for i,read in enumerate(self):
325                read.id = prefix + read.id
326                if remove_desc: read.description = ""
327                yield read
328        # Do it #
329        prefixed.write(prefixed_iterator())
330        # Replace it #
331        if in_place:
332            os.remove(self.path)
333            shutil.move(prefixed, self.path)
334        # Return #
335        return prefixed
336
337    def rename_sequences(self, mapping, new_path=None, in_place=False):
338        """
339        Will rename all sequences in the current fasta file using
340        the mapping dictionary also provided. In place or at a new path.
341        """
342        # Where is the new file #
343        if new_path is None: new_fasta = self.__class__(new_temp_path())
344        else:                new_fasta = self.__class__(new_path)
345        # Do it #
346        new_fasta.create()
347        for seq in self:
348            new_name = mapping[seq.description]
349            nucleotides = str(seq.seq)
350            new_fasta.add_str(nucleotides, new_name)
351        new_fasta.close()
352        # Return #
353        if in_place:
354            os.remove(self.path)
355            shutil.move(new_fasta, self.path)
356            return self
357        else: return new_fasta
358
359    def extract_length(self, lower_bound=None,
360                             upper_bound=None,
361                             new_path=None):
362        """Extract a certain length fraction and place them in a new file."""
363        # Temporary path #
364        if new_path is None: fraction = self.__class__(new_temp_path())
365        elif isinstance(new_path, FASTA): fraction = new_path
366        else:                fraction = self.__class__(new_path)
367        # Generator #
368        if lower_bound is None: lower_bound = 0
369        if upper_bound is None: upper_bound = sys.maxsize
370        def fraction_iterator():
371            for read in self:
372                if lower_bound <= len(read) <= upper_bound:
373                    yield read
374        # Do it #
375        fraction.write(fraction_iterator())
376        # Return #
377        return fraction
378
379    def extract_sequences(self, ids,
380                          new_path = None,
381                          in_place = False,
382                          verbose  = False):
383        """
384        Will take all the sequences from the current file who's id appears in
385        the ids given and place them in a new file.
386        If no path is given, a new temporary path is created and returned.
387        If `in_place` is set to True, the original file is removed and replaced
388        with the result of the extraction.
389        Optionally, the argument `ids` can be a function which has to take
390        one string as only input and return True for keeping the sequence and
391        False for discarding the sequence.
392        """
393        # Temporary path #
394        if new_path is None: new_fasta = self.__class__(new_temp_path())
395        elif isinstance(new_path, FASTA): new_fasta = new_path
396        else:                new_fasta = self.__class__(new_path)
397        # Select verbosity #
398        import tqdm
399        wrapper = tqdm.tqdm if verbose else lambda x: x
400        # Simple generator #
401        def simple_match(reads):
402            for r in wrapper(reads):
403                if r.id in ids: yield r
404        # Generator with function #
405        def function_match(reads):
406            for r in wrapper(reads):
407                if ids(r.id): yield r
408        # Do it #
409        if callable(ids):
410            new_fasta.write(function_match(self))
411        else:
412            new_fasta.write(simple_match(self))
413        # Return #
414        if in_place:
415            os.remove(self.path)
416            shutil.move(new_fasta, self.path)
417            return self
418        else: return new_fasta
419
420    def remove_trailing_stars(self, new_path=None, in_place=True, check=False):
421        """
422        Remove the bad character that can be inserted by some programs at the
423        end of sequences.
424        """
425        # Optional check #
426        if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0:
427            return self
428        # Faster with bash utilities #
429        if in_place is True:
430            sh.sed('-i', 's/\\*$//g', self.path)
431            return self
432        # Standard way #
433        if new_path is None: new_fasta = self.__class__(new_temp_path())
434        else:                new_fasta = self.__class__(new_path)
435        new_fasta.create()
436        for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id)
437        new_fasta.close()
438        # Return #
439        return new_fasta
440
441    def _generator_mod(self, generator, new_path=None, in_place=True):
442        """
443        Generic way of modifying the current fasta either in place or
444        with a new destination pass.
445        Simply, pass a generator function that will yield the new sequences
446        given the current ones.
447        """
448        # Temporary path #
449        if new_path is None: new_fasta = self.__class__(new_temp_path())
450        elif isinstance(new_path, FASTA): new_fasta = new_path
451        else: new_fasta = self.__class__(new_path)
452        # Do it #
453        new_fasta.write(generator())
454        # Return #
455        if in_place:
456            os.remove(self.path)
457            shutil.move(new_fasta, self.path)
458            return self
459        else: return new_fasta
460
461    def remove_duplicates(self, new_path=None, in_place=True):
462        """
463        If several entries have the same ID in the FASTA file, keep only the
464        first appearance and remove all the others.
465        """
466        # Generator #
467        def unique_entries():
468            seen = set()
469            for i, read in enumerate(self):
470                if read.id in seen: continue
471                else:
472                    seen.add(read.id)
473                    yield read
474        # Return #
475        return self._generator_mod(unique_entries, new_path, in_place)
476
477    def convert_U_to_T(self, new_path=None, in_place=True):
478        # Generator #
479        def all_U_to_T():
480            for i, read in enumerate(self):
481                read.seq = read.seq.back_transcribe()
482                yield read
483        # Return #
484        return self._generator_mod(all_U_to_T, new_path, in_place)
485
486    #---------------------------- Third party programs -----------------------#
487    def align(self, out_path=None):
488        """We align the sequences in the fasta file with muscle."""
489        if out_path is None: out_path = self.prefix_path + '.aln'
490        sh.muscle38("-in", self.path, "-out", out_path)
491        from fasta.aligned import AlignedFASTA
492        return AlignedFASTA(out_path)
493
494    def template_align(self, ref_path):
495        """
496        We align the sequences in the fasta file with mothur and a template.
497        """
498        # Run it #
499        msg = "#align.seqs(candidate=%s, template=%s, search=blast," \
500              "flip=false, processors=8);"
501        sh.mothur(msg % (self.path, ref_path))
502        # Move things #
503        shutil.move(self.path[:-6] + '.align',        self.p.aligned)
504        shutil.move(self.path[:-6] + '.align.report', self.p.report)
505        shutil.move(self.path[:-6] + '.flip.accnos',  self.p.accnos)
506        # Clean up #
507        if os.path.exists('formatdb.log'):
508            os.remove('formatdb.log')
509        if os.path.exists('error.log') and os.path.getsize('error.log') == 0:
510            os.remove('error.log')
511        for path in sh.glob('mothur.*.logfile'):
512            os.remove(path)
513        # Return #
514        return self.p.aligned
515
516    def index_bowtie(self):
517        """Create an index on the fasta file compatible with bowtie2."""
518        # It returns exit code 1 if the fasta is empty #
519        assert self
520        # Call the bowtie executable #
521        sh.bowtie2_build(self.path, self.path)
522        return FilePath(self.path + '.1.bt2')
523
524    def index_samtools(self):
525        """Create an index on the fasta file compatible with samtools."""
526        sh.samtools('faidx', self.path)
527        return FilePath(self.path + '.fai')
528
529    #--------------------------------- Graphs --------------------------------#
530    @property_cached
531    def graphs(self):
532        """
533        Sorry for the black magic. The result is an object whose attributes
534        are all the graphs found in `./graphs.py` initialized with this
535        instance as only argument.
536        """
537        # Make a dummy object #
538        result = Dummy()
539        # Loop over graphs #
540        for graph in graphs.__all__:
541            cls = getattr(graphs, graph)
542            setattr(result, cls.short_name, cls(self))
543        # Return #
544        return result
545
546    #-------------------------------- Primers -------------------------------#
547    def parse_primers(self, primers, mismatches=None):
548        """
549        Takes care of identifying primers inside every sequence.
550        Instead of yielding Seq objects now we yield ReadWithPrimers objects.
551        These have extra properties that show the start and end positions
552        of all primers found.
553        """
554        # Default is zero #
555        if mismatches is None: mismatches = 0
556        # Get the search expressions with mismatches #
557        from fasta.primers import PrimersRegexes
558        regexes = PrimersRegexes(primers, mismatches)
559        # Generate a new special object for every read #
560        from fasta.primers import ReadWithPrimers
561        read_with_primer = lambda read: ReadWithPrimers(read, regexes)
562        generator = (read_with_primer(r) for r in self.parse())
563        # Add the length to the generator #
564        from plumbing.common import GenWithLength
565        generator = GenWithLength(generator, len(self))
566        # Return #
567        return generator

A single FASTA file somewhere in the filesystem. You can read from it in several convenient ways. You can write to it in a automatically buffered way. There are several other things you can do with a FASTA file.

format = 'fasta'
ext = 'fasta'
buffer_size = 1000
gzipped
first

Just the first sequence.

count

Should probably check for file size changes instead of just caching once TODO.

lengths

All the lengths, one by one, in a list.

lengths_counter

A Counter() object with all the lengths inside.

def open(self, mode='r'):
105    def open(self, mode='r'):
106        # Two cases #
107        if self.gzipped:
108            self.handle = gzip.open(self.path, mode)
109            self.handle = io.TextIOWrapper(self.handle, encoding='utf8')
110        else:
111            self.handle = open(self.path, mode)
112        # For convenience #
113        return self.handle
def close(self):
115    def close(self):
116        # Case we were writing to the file #
117        if hasattr(self, 'buffer'):
118            self.flush()
119            del self.buffer
120        # Standard case #
121        self.handle.close()
122        # For pickling purposes (can't use dill on gzip handles) #
123        del self.handle
def parse(self):
125    def parse(self):
126        self.open()
127        from Bio import SeqIO
128        return SeqIO.parse(self.handle, self.format)
progress

Just like self.parse() but will display a progress bar.

def create(self):
135    def create(self):
136        """Create the file on the file system."""
137        self.buffer = []
138        self.buf_count = 0
139        if not self.directory.exists: self.directory.create()
140        self.open('w')
141        return self

Create the file on the file system.

def add(self, seqs):
143    def add(self, seqs):
144        """Use this method to add a bunch of SeqRecords at once."""
145        for seq in seqs: self.add_seq(seq)

Use this method to add a bunch of SeqRecords at once.

def add_seq(self, seq):
147    def add_seq(self, seq):
148        """Use this method to add a SeqRecord object to this fasta."""
149        self.buffer.append(seq)
150        self.buf_count += 1
151        if self.buf_count % self.buffer_size == 0: self.flush()

Use this method to add a SeqRecord object to this fasta.

def add_str(self, seq, name=None, description=''):
153    def add_str(self, seq, name=None, description=""):
154        """Use this method to add a sequence as a string to this fasta."""
155        from Bio.SeqRecord import SeqRecord
156        from Bio.Seq import Seq
157        self.add_seq(SeqRecord(Seq(seq), id=name, description=description))

Use this method to add a sequence as a string to this fasta.

def add_fasta(self, path):
159    def add_fasta(self, path):
160        """Use this method to add an other fasta to this fasta."""
161        path = FASTA(path)
162        self.add(path)

Use this method to add an other fasta to this fasta.

def add_fastas(self, paths):
164    def add_fastas(self, paths):
165        """Use this method to add a bunch of fastas to this fasta."""
166        for p in paths: self.add_fasta(p)

Use this method to add a bunch of fastas to this fasta.

def flush(self):
168    def flush(self):
169        """Empty the buffer."""
170        from Bio import SeqIO
171        for seq in self.buffer:
172            SeqIO.write(seq, self.handle, self.format)
173        self.buffer = []

Empty the buffer.

def write(self, reads):
175    def write(self, reads):
176        from Bio import SeqIO
177        if not self.directory.exists: self.directory.create()
178        self.open('w')
179        SeqIO.write(reads, self.handle, self.format)
180        self.close()
181        return self
def compress(self, new_path=None, remove_orig=False, method='slow'):
184    def compress(self, new_path=None, remove_orig=False, method='slow'):
185        """Turn this FASTA file into a gzipped FASTA file."""
186        # Check we are not compressed already #
187        if self.gzipped:
188            msg = "The fasta file '%s' is already compressed."
189            raise Exception(msg % self.path)
190        # Pick the new path #
191        if new_path is None: new_path = self.path + '.gz'
192        # Do it the fast way or the slow way #
193        if method == 'fast': self.compress_fast(new_path)
194        else:                self.compress_slow(new_path)
195        # Optionally remove the original uncompressed file #
196        if remove_orig: self.remove()
197        # Update the internal path #
198        self.path = new_path
199        # Return #
200        return self.path

Turn this FASTA file into a gzipped FASTA file.

def compress_slow(self, new_path):
202    def compress_slow(self, new_path):
203        """Do the compression internally via python."""
204        with gzip.open(new_path, 'wb') as handle:
205            shutil.copyfileobj(self.open('rb'), handle)

Do the compression internally via python.

def compress_fast(self, new_path):
207    def compress_fast(self, new_path):
208        """Do the compression with an external shell command call."""
209        # We don't want python to be buffering the text for speed #
210        from shell_command import shell_output
211        cmd = 'gzip --stdout %s > %s' % (self.path, new_path)
212        return shell_output(cmd)

Do the compression with an external shell command call.

ids

A frozen set of all unique IDs in the file.

def get_id(self, id_num):
223    def get_id(self, id_num):
224        """
225        Extract one sequence from the file based on its ID.
226        This is highly ineffective.
227        Consider using the SQLite API instead or memory map the file.
228        """
229        for seq in self:
230            if seq.id == id_num: return seq

Extract one sequence from the file based on its ID. This is highly ineffective. Consider using the SQLite API instead or memory map the file.

sequences

Another way of easily retrieving sequences. Also highly ineffective. Consider using the SQLite API instead.

sql

If you access this attribute, we will build an SQLite database out of the FASTA file and you will be able access everything in an indexed fashion, and use the blaze library via sql.frame

length_by_id

In some use cases you just need the sequence lengths in an indexed fashion. If you access this attribute, we will make a hash map in memory.

def subsample(self, down_to=1, new_path=None, verbose=True):
265    def subsample(self, down_to=1, new_path=None, verbose=True):
266        """Pick a given number of sequences from the file pseudo-randomly."""
267        # Pick the destination path #
268        if new_path is None:
269            subsampled = self.__class__(new_temp_path())
270        elif isinstance(new_path, FASTA):
271            subsampled = new_path
272        else:
273            subsampled = self.__class__(new_path)
274        # Check size #
275        if down_to > len(self):
276            message = "Can't subsample %s down to %i. Only down to %i."
277            print(Color.ylw + message % (self, down_to, len(self)) + Color.end)
278            self.copy(new_path)
279            return
280        # Select verbosity #
281        import tqdm
282        if verbose: wrapper = lambda x: tqdm.tqdm(x, total=self.count)
283        else: wrapper = lambda x: x
284        # Generator #
285        def iterator():
286            for read in wrapper(isubsample(self, down_to)):
287                yield read
288        # Do it #
289        subsampled.write(iterator())
290        # Did it work #
291        assert len(subsampled) == down_to
292        # Return #
293        return subsampled

Pick a given number of sequences from the file pseudo-randomly.

def rename_with_num(self, prefix='', new_path=None, remove_desc=True):
295    def rename_with_num(self, prefix="", new_path=None, remove_desc=True):
296        """Rename every sequence based on a prefix and a number."""
297        # Temporary path #
298        if new_path is None: numbered = self.__class__(new_temp_path())
299        else:                numbered = self.__class__(new_path)
300        # Generator #
301        def numbered_iterator():
302            for i,read in enumerate(self):
303                read.id  = prefix + str(i)
304                read.seq = read.seq.upper()
305                if remove_desc: read.description = ""
306                yield read
307        # Do it #
308        numbered.write(numbered_iterator())
309        # Replace it #
310        if new_path is None:
311            os.remove(self.path)
312            shutil.move(numbered, self.path)
313        # Return #
314        return numbered

Rename every sequence based on a prefix and a number.

def rename_with_prefix(self, prefix='', new_path=None, in_place=True, remove_desc=True):
316    def rename_with_prefix(self, prefix="", new_path=None, in_place=True,
317                           remove_desc=True):
318        """Rename every sequence based on a prefix."""
319        # Temporary path #
320        if new_path is None: prefixed = self.__class__(new_temp_path())
321        else:                prefixed = self.__class__(new_path)
322        # Generator #
323        def prefixed_iterator():
324            for i,read in enumerate(self):
325                read.id = prefix + read.id
326                if remove_desc: read.description = ""
327                yield read
328        # Do it #
329        prefixed.write(prefixed_iterator())
330        # Replace it #
331        if in_place:
332            os.remove(self.path)
333            shutil.move(prefixed, self.path)
334        # Return #
335        return prefixed

Rename every sequence based on a prefix.

def rename_sequences(self, mapping, new_path=None, in_place=False):
337    def rename_sequences(self, mapping, new_path=None, in_place=False):
338        """
339        Will rename all sequences in the current fasta file using
340        the mapping dictionary also provided. In place or at a new path.
341        """
342        # Where is the new file #
343        if new_path is None: new_fasta = self.__class__(new_temp_path())
344        else:                new_fasta = self.__class__(new_path)
345        # Do it #
346        new_fasta.create()
347        for seq in self:
348            new_name = mapping[seq.description]
349            nucleotides = str(seq.seq)
350            new_fasta.add_str(nucleotides, new_name)
351        new_fasta.close()
352        # Return #
353        if in_place:
354            os.remove(self.path)
355            shutil.move(new_fasta, self.path)
356            return self
357        else: return new_fasta

Will rename all sequences in the current fasta file using the mapping dictionary also provided. In place or at a new path.

def extract_length(self, lower_bound=None, upper_bound=None, new_path=None):
359    def extract_length(self, lower_bound=None,
360                             upper_bound=None,
361                             new_path=None):
362        """Extract a certain length fraction and place them in a new file."""
363        # Temporary path #
364        if new_path is None: fraction = self.__class__(new_temp_path())
365        elif isinstance(new_path, FASTA): fraction = new_path
366        else:                fraction = self.__class__(new_path)
367        # Generator #
368        if lower_bound is None: lower_bound = 0
369        if upper_bound is None: upper_bound = sys.maxsize
370        def fraction_iterator():
371            for read in self:
372                if lower_bound <= len(read) <= upper_bound:
373                    yield read
374        # Do it #
375        fraction.write(fraction_iterator())
376        # Return #
377        return fraction

Extract a certain length fraction and place them in a new file.

def extract_sequences(self, ids, new_path=None, in_place=False, verbose=False):
379    def extract_sequences(self, ids,
380                          new_path = None,
381                          in_place = False,
382                          verbose  = False):
383        """
384        Will take all the sequences from the current file who's id appears in
385        the ids given and place them in a new file.
386        If no path is given, a new temporary path is created and returned.
387        If `in_place` is set to True, the original file is removed and replaced
388        with the result of the extraction.
389        Optionally, the argument `ids` can be a function which has to take
390        one string as only input and return True for keeping the sequence and
391        False for discarding the sequence.
392        """
393        # Temporary path #
394        if new_path is None: new_fasta = self.__class__(new_temp_path())
395        elif isinstance(new_path, FASTA): new_fasta = new_path
396        else:                new_fasta = self.__class__(new_path)
397        # Select verbosity #
398        import tqdm
399        wrapper = tqdm.tqdm if verbose else lambda x: x
400        # Simple generator #
401        def simple_match(reads):
402            for r in wrapper(reads):
403                if r.id in ids: yield r
404        # Generator with function #
405        def function_match(reads):
406            for r in wrapper(reads):
407                if ids(r.id): yield r
408        # Do it #
409        if callable(ids):
410            new_fasta.write(function_match(self))
411        else:
412            new_fasta.write(simple_match(self))
413        # Return #
414        if in_place:
415            os.remove(self.path)
416            shutil.move(new_fasta, self.path)
417            return self
418        else: return new_fasta

Will take all the sequences from the current file who's id appears in the ids given and place them in a new file. If no path is given, a new temporary path is created and returned. If in_place is set to True, the original file is removed and replaced with the result of the extraction. Optionally, the argument ids can be a function which has to take one string as only input and return True for keeping the sequence and False for discarding the sequence.

def remove_trailing_stars(self, new_path=None, in_place=True, check=False):
420    def remove_trailing_stars(self, new_path=None, in_place=True, check=False):
421        """
422        Remove the bad character that can be inserted by some programs at the
423        end of sequences.
424        """
425        # Optional check #
426        if check and int(sh.grep('-c', '\\*', self.path, _ok_code=[0,1])) == 0:
427            return self
428        # Faster with bash utilities #
429        if in_place is True:
430            sh.sed('-i', 's/\\*$//g', self.path)
431            return self
432        # Standard way #
433        if new_path is None: new_fasta = self.__class__(new_temp_path())
434        else:                new_fasta = self.__class__(new_path)
435        new_fasta.create()
436        for seq in self: new_fasta.add_str(str(seq.seq).rstrip('*'), seq.id)
437        new_fasta.close()
438        # Return #
439        return new_fasta

Remove the bad character that can be inserted by some programs at the end of sequences.

def remove_duplicates(self, new_path=None, in_place=True):
461    def remove_duplicates(self, new_path=None, in_place=True):
462        """
463        If several entries have the same ID in the FASTA file, keep only the
464        first appearance and remove all the others.
465        """
466        # Generator #
467        def unique_entries():
468            seen = set()
469            for i, read in enumerate(self):
470                if read.id in seen: continue
471                else:
472                    seen.add(read.id)
473                    yield read
474        # Return #
475        return self._generator_mod(unique_entries, new_path, in_place)

If several entries have the same ID in the FASTA file, keep only the first appearance and remove all the others.

def convert_U_to_T(self, new_path=None, in_place=True):
477    def convert_U_to_T(self, new_path=None, in_place=True):
478        # Generator #
479        def all_U_to_T():
480            for i, read in enumerate(self):
481                read.seq = read.seq.back_transcribe()
482                yield read
483        # Return #
484        return self._generator_mod(all_U_to_T, new_path, in_place)
def align(self, out_path=None):
487    def align(self, out_path=None):
488        """We align the sequences in the fasta file with muscle."""
489        if out_path is None: out_path = self.prefix_path + '.aln'
490        sh.muscle38("-in", self.path, "-out", out_path)
491        from fasta.aligned import AlignedFASTA
492        return AlignedFASTA(out_path)

We align the sequences in the fasta file with muscle.

def template_align(self, ref_path):
494    def template_align(self, ref_path):
495        """
496        We align the sequences in the fasta file with mothur and a template.
497        """
498        # Run it #
499        msg = "#align.seqs(candidate=%s, template=%s, search=blast," \
500              "flip=false, processors=8);"
501        sh.mothur(msg % (self.path, ref_path))
502        # Move things #
503        shutil.move(self.path[:-6] + '.align',        self.p.aligned)
504        shutil.move(self.path[:-6] + '.align.report', self.p.report)
505        shutil.move(self.path[:-6] + '.flip.accnos',  self.p.accnos)
506        # Clean up #
507        if os.path.exists('formatdb.log'):
508            os.remove('formatdb.log')
509        if os.path.exists('error.log') and os.path.getsize('error.log') == 0:
510            os.remove('error.log')
511        for path in sh.glob('mothur.*.logfile'):
512            os.remove(path)
513        # Return #
514        return self.p.aligned

We align the sequences in the fasta file with mothur and a template.

def index_bowtie(self):
516    def index_bowtie(self):
517        """Create an index on the fasta file compatible with bowtie2."""
518        # It returns exit code 1 if the fasta is empty #
519        assert self
520        # Call the bowtie executable #
521        sh.bowtie2_build(self.path, self.path)
522        return FilePath(self.path + '.1.bt2')

Create an index on the fasta file compatible with bowtie2.

def index_samtools(self):
524    def index_samtools(self):
525        """Create an index on the fasta file compatible with samtools."""
526        sh.samtools('faidx', self.path)
527        return FilePath(self.path + '.fai')

Create an index on the fasta file compatible with samtools.

graphs

Sorry for the black magic. The result is an object whose attributes are all the graphs found in ./graphs.py initialized with this instance as only argument.

def parse_primers(self, primers, mismatches=None):
547    def parse_primers(self, primers, mismatches=None):
548        """
549        Takes care of identifying primers inside every sequence.
550        Instead of yielding Seq objects now we yield ReadWithPrimers objects.
551        These have extra properties that show the start and end positions
552        of all primers found.
553        """
554        # Default is zero #
555        if mismatches is None: mismatches = 0
556        # Get the search expressions with mismatches #
557        from fasta.primers import PrimersRegexes
558        regexes = PrimersRegexes(primers, mismatches)
559        # Generate a new special object for every read #
560        from fasta.primers import ReadWithPrimers
561        read_with_primer = lambda read: ReadWithPrimers(read, regexes)
562        generator = (read_with_primer(r) for r in self.parse())
563        # Add the length to the generator #
564        from plumbing.common import GenWithLength
565        generator = GenWithLength(generator, len(self))
566        # Return #
567        return generator

Takes care of identifying primers inside every sequence. Instead of yielding Seq objects now we yield ReadWithPrimers objects. These have extra properties that show the start and end positions of all primers found.

Inherited Members
autopaths.base_path.BasePath
BasePath
clean_path
path
short_prefix
extension
escaped
absolute_path
physical_path
relative_path
rel_path_from
exists
permissions
mdate
mdate_iso
cdate
cdate_iso
unix_style
wsl_style
win_style
with_tilda
with_home
autopaths.file_path.FilePath
prefix_path
prefix
name
filename
directory
count_bytes
size
contents
contents_utf8
md5
might_be_binary
contains_binary
magic_number
lines
read
touch
writelines
remove
copy
execute
replace_extension
new_name_insert
make_directory
must_exist
head
pretty_head
tail
pretty_tail
move_to
rename
gzip_to
gzip_internal
gzip_external
gzip_pigz
ungzip_to
ungzip_internal
ungzip_external
zip_to
unzip_to
untar_to
untargz_to
untargz_to_internal
untargz_to_external
append
prepend
remove_line
remove_first_line
replace_line
replace_word
sed_replace
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format_map
maketrans