seqsearch.search.parallel

Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio

  1#!/usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3
  4"""
  5Written by Lucas Sinclair.
  6MIT Licensed.
  7Contact at www.sinclair.bio
  8"""
  9
 10# Built-in modules #
 11import math, subprocess, multiprocessing
 12
 13# First party modules #
 14from seqsearch.search         import SeqSearch
 15from seqsearch.search.blast   import BLASTquery
 16from seqsearch.search.vsearch import VSEARCHquery
 17from plumbing.cache           import property_cached
 18from fasta.splitable          import SplitableFASTA
 19
 20################################################################################
 21class ParallelSeqSearch(SeqSearch):
 22    """
 23    The same thing as a SeqSearch but operates by chopping in the input up into
 24    smaller pieces and running the algorithm on each piece separately, finally
 25    joining the outputs.
 26
 27    You can specify the number of parts, the size in MB or GB that each part
 28    should approximately have, or even how many sequences should be in each
 29    part. Specify only one of the three options.
 30
 31    You can place the pieces in a specific directory.
 32    """
 33
 34    def __init__(self,
 35                 input_fasta,
 36                 database,
 37                 num_parts     = None,   # How many fasta pieces should we make
 38                 part_size     = None,   # What size in MB should a fasta piece be
 39                 seqs_per_part = None,   # How many sequences in one fasta piece
 40                 parts_dir     = None,   # If you want a special directory for the fasta pieces
 41                 **kwargs):
 42        # Determine number of parts #
 43        self.num_parts = None
 44        # Three possible options #
 45        if num_parts:
 46            self.num_parts = num_parts
 47        if part_size:
 48            import humanfriendly
 49            self.bytes_target = humanfriendly.parse_size(part_size)
 50            self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target))
 51        if seqs_per_part:
 52            self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part))
 53        # Default case #
 54        if self.num_parts is None:
 55            default = min(multiprocessing.cpu_count(), 32)
 56            self.num_parts = kwargs.get('num_threads', default)
 57            if self.num_parts is True: self.num_parts = default
 58        # In case the user wants a special parts directory #
 59        self.parts_dir = parts_dir
 60        # Super #
 61        SeqSearch.__init__(self, input_fasta, database, **kwargs)
 62
 63    @property_cached
 64    def splitable(self):
 65        """The input fasta file as it is, but with the ability to split it."""
 66        return SplitableFASTA(self.input_fasta,
 67                              self.num_parts,
 68                              base_dir = self.parts_dir)
 69
 70    @property
 71    def queries(self):
 72        """A list of all the queries to run."""
 73        if self.algorithm == 'blast':   return self.blast_queries
 74        if self.algorithm == 'vsearch': return self.vsearch_queries
 75        raise NotImplemented(self.algorithm)
 76
 77    def join_outputs(self):
 78        """Join the outputs."""
 79        all_files = ' '.join(q.out_path for q in self.queries)
 80        subprocess.check_call('cat %s > %s' % (all_files, self.out_path))
 81
 82    #-------------------------------- RUNNING --------------------------------#
 83    def run_local(self):
 84        """Run the search locally."""
 85        # Chop up the FASTA #
 86        self.splitable.split()
 87        # Case only one query #
 88        if len(self.queries) == 1: self.queries[0].run()
 89        # Case many queries #
 90        else:
 91            for query in self.queries: query.non_block_run()
 92            for query in self.queries: query.wait()
 93        # Join the results #
 94        self.join_outputs()
 95
 96    #-------------------------- BLAST IMPLEMENTATION -------------------------#
 97    @property_cached
 98    def blast_queries(self):
 99        """Make all BLAST search objects."""
100        # Select the right BLAST #
101        blast_algo = self.select_blast_algo()
102        # Create a list #
103        return [BLASTquery(query_path   = p,
104                           db_path      = self.database,
105                           seq_type     = self.seq_type,
106                           params       = self.blast_params,
107                           algorithm    = blast_algo,
108                           cpus         = 1,
109                           num          = p.num) for p in self.splitable.parts]
110
111    #-------------------------- VSEARCH IMPLEMENTATION -------------------------#
112    @property_cached
113    def vsearch_queries(self):
114        """Make all VSEARCH search objects."""
115        return [VSEARCHquery(p, self.database, self.vsearch_params)
116                for p in self.splitable.parts]
class ParallelSeqSearch(seqsearch.search.SeqSearch):
 22class ParallelSeqSearch(SeqSearch):
 23    """
 24    The same thing as a SeqSearch but operates by chopping in the input up into
 25    smaller pieces and running the algorithm on each piece separately, finally
 26    joining the outputs.
 27
 28    You can specify the number of parts, the size in MB or GB that each part
 29    should approximately have, or even how many sequences should be in each
 30    part. Specify only one of the three options.
 31
 32    You can place the pieces in a specific directory.
 33    """
 34
 35    def __init__(self,
 36                 input_fasta,
 37                 database,
 38                 num_parts     = None,   # How many fasta pieces should we make
 39                 part_size     = None,   # What size in MB should a fasta piece be
 40                 seqs_per_part = None,   # How many sequences in one fasta piece
 41                 parts_dir     = None,   # If you want a special directory for the fasta pieces
 42                 **kwargs):
 43        # Determine number of parts #
 44        self.num_parts = None
 45        # Three possible options #
 46        if num_parts:
 47            self.num_parts = num_parts
 48        if part_size:
 49            import humanfriendly
 50            self.bytes_target = humanfriendly.parse_size(part_size)
 51            self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target))
 52        if seqs_per_part:
 53            self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part))
 54        # Default case #
 55        if self.num_parts is None:
 56            default = min(multiprocessing.cpu_count(), 32)
 57            self.num_parts = kwargs.get('num_threads', default)
 58            if self.num_parts is True: self.num_parts = default
 59        # In case the user wants a special parts directory #
 60        self.parts_dir = parts_dir
 61        # Super #
 62        SeqSearch.__init__(self, input_fasta, database, **kwargs)
 63
 64    @property_cached
 65    def splitable(self):
 66        """The input fasta file as it is, but with the ability to split it."""
 67        return SplitableFASTA(self.input_fasta,
 68                              self.num_parts,
 69                              base_dir = self.parts_dir)
 70
 71    @property
 72    def queries(self):
 73        """A list of all the queries to run."""
 74        if self.algorithm == 'blast':   return self.blast_queries
 75        if self.algorithm == 'vsearch': return self.vsearch_queries
 76        raise NotImplemented(self.algorithm)
 77
 78    def join_outputs(self):
 79        """Join the outputs."""
 80        all_files = ' '.join(q.out_path for q in self.queries)
 81        subprocess.check_call('cat %s > %s' % (all_files, self.out_path))
 82
 83    #-------------------------------- RUNNING --------------------------------#
 84    def run_local(self):
 85        """Run the search locally."""
 86        # Chop up the FASTA #
 87        self.splitable.split()
 88        # Case only one query #
 89        if len(self.queries) == 1: self.queries[0].run()
 90        # Case many queries #
 91        else:
 92            for query in self.queries: query.non_block_run()
 93            for query in self.queries: query.wait()
 94        # Join the results #
 95        self.join_outputs()
 96
 97    #-------------------------- BLAST IMPLEMENTATION -------------------------#
 98    @property_cached
 99    def blast_queries(self):
100        """Make all BLAST search objects."""
101        # Select the right BLAST #
102        blast_algo = self.select_blast_algo()
103        # Create a list #
104        return [BLASTquery(query_path   = p,
105                           db_path      = self.database,
106                           seq_type     = self.seq_type,
107                           params       = self.blast_params,
108                           algorithm    = blast_algo,
109                           cpus         = 1,
110                           num          = p.num) for p in self.splitable.parts]
111
112    #-------------------------- VSEARCH IMPLEMENTATION -------------------------#
113    @property_cached
114    def vsearch_queries(self):
115        """Make all VSEARCH search objects."""
116        return [VSEARCHquery(p, self.database, self.vsearch_params)
117                for p in self.splitable.parts]

The same thing as a SeqSearch but operates by chopping in the input up into smaller pieces and running the algorithm on each piece separately, finally joining the outputs.

You can specify the number of parts, the size in MB or GB that each part should approximately have, or even how many sequences should be in each part. Specify only one of the three options.

You can place the pieces in a specific directory.

ParallelSeqSearch( input_fasta, database, num_parts=None, part_size=None, seqs_per_part=None, parts_dir=None, **kwargs)
35    def __init__(self,
36                 input_fasta,
37                 database,
38                 num_parts     = None,   # How many fasta pieces should we make
39                 part_size     = None,   # What size in MB should a fasta piece be
40                 seqs_per_part = None,   # How many sequences in one fasta piece
41                 parts_dir     = None,   # If you want a special directory for the fasta pieces
42                 **kwargs):
43        # Determine number of parts #
44        self.num_parts = None
45        # Three possible options #
46        if num_parts:
47            self.num_parts = num_parts
48        if part_size:
49            import humanfriendly
50            self.bytes_target = humanfriendly.parse_size(part_size)
51            self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target))
52        if seqs_per_part:
53            self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part))
54        # Default case #
55        if self.num_parts is None:
56            default = min(multiprocessing.cpu_count(), 32)
57            self.num_parts = kwargs.get('num_threads', default)
58            if self.num_parts is True: self.num_parts = default
59        # In case the user wants a special parts directory #
60        self.parts_dir = parts_dir
61        # Super #
62        SeqSearch.__init__(self, input_fasta, database, **kwargs)
splitable

The input fasta file as it is, but with the ability to split it.

queries

A list of all the queries to run.

def join_outputs(self):
78    def join_outputs(self):
79        """Join the outputs."""
80        all_files = ' '.join(q.out_path for q in self.queries)
81        subprocess.check_call('cat %s > %s' % (all_files, self.out_path))

Join the outputs.

def run_local(self):
84    def run_local(self):
85        """Run the search locally."""
86        # Chop up the FASTA #
87        self.splitable.split()
88        # Case only one query #
89        if len(self.queries) == 1: self.queries[0].run()
90        # Case many queries #
91        else:
92            for query in self.queries: query.non_block_run()
93            for query in self.queries: query.wait()
94        # Join the results #
95        self.join_outputs()

Run the search locally.

blast_queries

Make all BLAST search objects.

vsearch_queries

Make all VSEARCH search objects.