seqsearch.search.parallel
Written by Lucas Sinclair. MIT Licensed. Contact at www.sinclair.bio
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Written by Lucas Sinclair. 6MIT Licensed. 7Contact at www.sinclair.bio 8""" 9 10# Built-in modules # 11import math, subprocess, multiprocessing 12 13# First party modules # 14from seqsearch.search import SeqSearch 15from seqsearch.search.blast import BLASTquery 16from seqsearch.search.vsearch import VSEARCHquery 17from plumbing.cache import property_cached 18from fasta.splitable import SplitableFASTA 19 20################################################################################ 21class ParallelSeqSearch(SeqSearch): 22 """ 23 The same thing as a SeqSearch but operates by chopping in the input up into 24 smaller pieces and running the algorithm on each piece separately, finally 25 joining the outputs. 26 27 You can specify the number of parts, the size in MB or GB that each part 28 should approximately have, or even how many sequences should be in each 29 part. Specify only one of the three options. 30 31 You can place the pieces in a specific directory. 32 """ 33 34 def __init__(self, 35 input_fasta, 36 database, 37 num_parts = None, # How many fasta pieces should we make 38 part_size = None, # What size in MB should a fasta piece be 39 seqs_per_part = None, # How many sequences in one fasta piece 40 parts_dir = None, # If you want a special directory for the fasta pieces 41 **kwargs): 42 # Determine number of parts # 43 self.num_parts = None 44 # Three possible options # 45 if num_parts: 46 self.num_parts = num_parts 47 if part_size: 48 import humanfriendly 49 self.bytes_target = humanfriendly.parse_size(part_size) 50 self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target)) 51 if seqs_per_part: 52 self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part)) 53 # Default case # 54 if self.num_parts is None: 55 default = min(multiprocessing.cpu_count(), 32) 56 self.num_parts = kwargs.get('num_threads', default) 57 if self.num_parts is True: self.num_parts = default 58 # In case the user wants a special parts directory # 59 self.parts_dir = parts_dir 60 # Super # 61 SeqSearch.__init__(self, input_fasta, database, **kwargs) 62 63 @property_cached 64 def splitable(self): 65 """The input fasta file as it is, but with the ability to split it.""" 66 return SplitableFASTA(self.input_fasta, 67 self.num_parts, 68 base_dir = self.parts_dir) 69 70 @property 71 def queries(self): 72 """A list of all the queries to run.""" 73 if self.algorithm == 'blast': return self.blast_queries 74 if self.algorithm == 'vsearch': return self.vsearch_queries 75 raise NotImplemented(self.algorithm) 76 77 def join_outputs(self): 78 """Join the outputs.""" 79 all_files = ' '.join(q.out_path for q in self.queries) 80 subprocess.check_call('cat %s > %s' % (all_files, self.out_path)) 81 82 #-------------------------------- RUNNING --------------------------------# 83 def run_local(self): 84 """Run the search locally.""" 85 # Chop up the FASTA # 86 self.splitable.split() 87 # Case only one query # 88 if len(self.queries) == 1: self.queries[0].run() 89 # Case many queries # 90 else: 91 for query in self.queries: query.non_block_run() 92 for query in self.queries: query.wait() 93 # Join the results # 94 self.join_outputs() 95 96 #-------------------------- BLAST IMPLEMENTATION -------------------------# 97 @property_cached 98 def blast_queries(self): 99 """Make all BLAST search objects.""" 100 # Select the right BLAST # 101 blast_algo = self.select_blast_algo() 102 # Create a list # 103 return [BLASTquery(query_path = p, 104 db_path = self.database, 105 seq_type = self.seq_type, 106 params = self.blast_params, 107 algorithm = blast_algo, 108 cpus = 1, 109 num = p.num) for p in self.splitable.parts] 110 111 #-------------------------- VSEARCH IMPLEMENTATION -------------------------# 112 @property_cached 113 def vsearch_queries(self): 114 """Make all VSEARCH search objects.""" 115 return [VSEARCHquery(p, self.database, self.vsearch_params) 116 for p in self.splitable.parts]
22class ParallelSeqSearch(SeqSearch): 23 """ 24 The same thing as a SeqSearch but operates by chopping in the input up into 25 smaller pieces and running the algorithm on each piece separately, finally 26 joining the outputs. 27 28 You can specify the number of parts, the size in MB or GB that each part 29 should approximately have, or even how many sequences should be in each 30 part. Specify only one of the three options. 31 32 You can place the pieces in a specific directory. 33 """ 34 35 def __init__(self, 36 input_fasta, 37 database, 38 num_parts = None, # How many fasta pieces should we make 39 part_size = None, # What size in MB should a fasta piece be 40 seqs_per_part = None, # How many sequences in one fasta piece 41 parts_dir = None, # If you want a special directory for the fasta pieces 42 **kwargs): 43 # Determine number of parts # 44 self.num_parts = None 45 # Three possible options # 46 if num_parts: 47 self.num_parts = num_parts 48 if part_size: 49 import humanfriendly 50 self.bytes_target = humanfriendly.parse_size(part_size) 51 self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target)) 52 if seqs_per_part: 53 self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part)) 54 # Default case # 55 if self.num_parts is None: 56 default = min(multiprocessing.cpu_count(), 32) 57 self.num_parts = kwargs.get('num_threads', default) 58 if self.num_parts is True: self.num_parts = default 59 # In case the user wants a special parts directory # 60 self.parts_dir = parts_dir 61 # Super # 62 SeqSearch.__init__(self, input_fasta, database, **kwargs) 63 64 @property_cached 65 def splitable(self): 66 """The input fasta file as it is, but with the ability to split it.""" 67 return SplitableFASTA(self.input_fasta, 68 self.num_parts, 69 base_dir = self.parts_dir) 70 71 @property 72 def queries(self): 73 """A list of all the queries to run.""" 74 if self.algorithm == 'blast': return self.blast_queries 75 if self.algorithm == 'vsearch': return self.vsearch_queries 76 raise NotImplemented(self.algorithm) 77 78 def join_outputs(self): 79 """Join the outputs.""" 80 all_files = ' '.join(q.out_path for q in self.queries) 81 subprocess.check_call('cat %s > %s' % (all_files, self.out_path)) 82 83 #-------------------------------- RUNNING --------------------------------# 84 def run_local(self): 85 """Run the search locally.""" 86 # Chop up the FASTA # 87 self.splitable.split() 88 # Case only one query # 89 if len(self.queries) == 1: self.queries[0].run() 90 # Case many queries # 91 else: 92 for query in self.queries: query.non_block_run() 93 for query in self.queries: query.wait() 94 # Join the results # 95 self.join_outputs() 96 97 #-------------------------- BLAST IMPLEMENTATION -------------------------# 98 @property_cached 99 def blast_queries(self): 100 """Make all BLAST search objects.""" 101 # Select the right BLAST # 102 blast_algo = self.select_blast_algo() 103 # Create a list # 104 return [BLASTquery(query_path = p, 105 db_path = self.database, 106 seq_type = self.seq_type, 107 params = self.blast_params, 108 algorithm = blast_algo, 109 cpus = 1, 110 num = p.num) for p in self.splitable.parts] 111 112 #-------------------------- VSEARCH IMPLEMENTATION -------------------------# 113 @property_cached 114 def vsearch_queries(self): 115 """Make all VSEARCH search objects.""" 116 return [VSEARCHquery(p, self.database, self.vsearch_params) 117 for p in self.splitable.parts]
The same thing as a SeqSearch but operates by chopping in the input up into smaller pieces and running the algorithm on each piece separately, finally joining the outputs.
You can specify the number of parts, the size in MB or GB that each part should approximately have, or even how many sequences should be in each part. Specify only one of the three options.
You can place the pieces in a specific directory.
ParallelSeqSearch( input_fasta, database, num_parts=None, part_size=None, seqs_per_part=None, parts_dir=None, **kwargs)
35 def __init__(self, 36 input_fasta, 37 database, 38 num_parts = None, # How many fasta pieces should we make 39 part_size = None, # What size in MB should a fasta piece be 40 seqs_per_part = None, # How many sequences in one fasta piece 41 parts_dir = None, # If you want a special directory for the fasta pieces 42 **kwargs): 43 # Determine number of parts # 44 self.num_parts = None 45 # Three possible options # 46 if num_parts: 47 self.num_parts = num_parts 48 if part_size: 49 import humanfriendly 50 self.bytes_target = humanfriendly.parse_size(part_size) 51 self.num_parts = int(math.ceil(input_fasta.count_bytes / self.bytes_target)) 52 if seqs_per_part: 53 self.num_parts = int(math.ceil(input_fasta.count / seqs_per_part)) 54 # Default case # 55 if self.num_parts is None: 56 default = min(multiprocessing.cpu_count(), 32) 57 self.num_parts = kwargs.get('num_threads', default) 58 if self.num_parts is True: self.num_parts = default 59 # In case the user wants a special parts directory # 60 self.parts_dir = parts_dir 61 # Super # 62 SeqSearch.__init__(self, input_fasta, database, **kwargs)
def
join_outputs(self):
78 def join_outputs(self): 79 """Join the outputs.""" 80 all_files = ' '.join(q.out_path for q in self.queries) 81 subprocess.check_call('cat %s > %s' % (all_files, self.out_path))
Join the outputs.
def
run_local(self):
84 def run_local(self): 85 """Run the search locally.""" 86 # Chop up the FASTA # 87 self.splitable.split() 88 # Case only one query # 89 if len(self.queries) == 1: self.queries[0].run() 90 # Case many queries # 91 else: 92 for query in self.queries: query.non_block_run() 93 for query in self.queries: query.wait() 94 # Join the results # 95 self.join_outputs()
Run the search locally.