Source code for tral.sequence.repeat_detection_run

# (C) 2011, Alexander Korsunsky
# (C) 2011-2015 Elke Schaper

"""
    :synopsis: Execution of repeat detection algorithms

    .. moduleauthor:: Elke Schaper <elke.schaper@isb-sib.ch>
"""

from collections import OrderedDict
import distutils
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile

from .. import configuration
from . import repeat_detection_io
from ..paths import CONFIG_DIR, config_file

LOG = logging.getLogger(__name__)

CONFIG_GENERAL = configuration.Configuration.instance().config
REPEAT_DETECTOR_PATH = CONFIG_GENERAL["sequence"]["repeat_detector_path"]


[docs]class BinaryExecutable: """ Contains the executable, and combines executable with parameters. Contains the executable, and combines executable with parameters. Attributes: binary(str): Path to binary """ def __init__(self, binary=None, name=None): """ Construct a BinaryExecutable object. Construct a BinaryExecutable object. Args: binary (str): Path to the binary name(str): Name of the programme. """ if not binary: raise TypeError("A binary executable must be provided.") try: self.binary = shutil.which(binary) except: self.binary = distutils.spawn.find_executable(binary) if not self.binary: raise ValueError( "The executable {} does not exist, although {} was selected \n" "to be executed. Please make sure the executable is in the system path, or \n" "the path to the executable is set correctly in config.ini".format( binary, name))
[docs] def get_execute_tokens(self, *args): """Return the tokens to invoke the program with the arguments args""" return [self.binary] + list(args)
[docs] def get_execute_line(self, *args): """Return the command line to invoke the program with the arguments args""" return " ".join(self.get_execute_tokens(*args))
[docs]def check_java_errors(outfile, errfile, log=LOG, procname=None): """ Check for java problems. Return True if there were problems, else False. Check for these java errors: * Stdout file is empty but stderr file is not. * Java Exception string is indicated in the errfile Return True if there were problems, else False. Args: outfile (file handle): Redirected standard output channel file. errfile (file handle): Redirected standard error channel file If None, no copies are saved. log (?): Name of the log to issue log messages to. If none, no log messages will be issued. .. todo:: Complete docstring """ has_error = False with open(outfile, mode='r') as of, open(errfile, mode='r') as ef: outfile_size = of.seek(0, os.SEEK_END) errfile_str = ef.read() errfile_size = len(errfile_str) if errfile_size != 0 and outfile_size == 0: has_error = True LOG.warning( "Process {} has empty STDOUT but non-empty STDERR.\n" "This error often happens when {} is not installed or\n" "no correct path is given in configfile .tral/config.ini.\n" "Please check.".format( procname, procname)) pat_javaexc = re.compile( r'Exception in thread ".+" \S+:.*$(\s+at .+)+', re.M) m = pat_javaexc.search(errfile_str) if m: has_error = True LOG.warning( "Java Exception probably occured in process \"%s\"! Exception information:\n%s", procname, m.group(0)) return has_error
class TRDetector(object): def __init__(self, executable): """Construct a TRDetector object with executable als executable object""" LOG.debug(executable) self.__executable = executable def run_process(self, working_dir, *args): """Launch a detector process Arguments: working_dir -- Working directory to run process in args -- Arguments to pass to the process Return Value: A tuple with: - The return code - The name to the file with the redirected standard output channel (stdout) - The name to the file with the redirected standard error channel (stderror) The standard output (stdout) and standard error (stderr) channels will be redirected to working_dir/stdout.txt and working_dir/stdout.txt respectively. """ # create working directory in top level working directory, # if it does not yet exist if not os.path.isdir(working_dir): os.makedirs(working_dir) stdoutfname = os.path.join(working_dir, "stdout.txt") stderrfname = os.path.join(working_dir, "stderr.txt") # open stdout and stderr output file __stdout_file = open(stdoutfname, mode='w') __stderr_file = open(stderrfname, mode='w') LOG.debug("Launching process: %s in %s", self.__executable.get_execute_line(*args), working_dir) LOG.debug("Launching process tokens: %s in %s", self.__executable.get_execute_tokens(*args), working_dir) # launch process __process = subprocess.Popen( self.__executable.get_execute_tokens(*args), cwd=working_dir, stdout=__stdout_file, stderr=__stderr_file, close_fds=True, ) __process.wait() __stdout_file.close() __stderr_file.close() return __process.returncode, stdoutfname, stderrfname class DetectorHHrepID(TRDetector): name = 'HHrepID' displayname = "HHrepID" """ Execute vi ./hhrepid_32 -i infile -v 0 -nofilt -d dummyHMM.hmm -o resultfile ./hhrepid -i <query> -d <path to cal.hhm> -tp <path to tp.dat> -fp <path to fp.dat> """ class Configuration: def __init__(self): self.boolopts = { # turn off low-complexity filter (default: filter on) "-nofilt": False, # calibration by shuffling instead of database search (default: # off) "-shuffle": False } dummyhmm = os.path.expanduser(REPEAT_DETECTOR_PATH['HHrepID_dummyhmm']) if not os.path.exists(dummyhmm): # Special case for the data directory try: rel_config = os.path.relpath(dummyhmm, start=CONFIG_DIR) if not rel_config.startswith(".."): dummyhmm = config_file(rel_config) except FileNotFoundError: pass # not in data; error below if not os.path.exists(dummyhmm): raise FileNotFoundError("HHrepID_dummyhmm not found: %s" % dummyhmm) self.valopts = { # <file> input query alignment (fasta/a2m/a3m) or HMM file (.hhm) "-i": None, "-d": dummyhmm, # <path> dummy hmm database file "-o": 'hhrepID.o', # <file> write results and multiple sequence alignment to file (default=none) "-v": 0, # -v: verbose mode (default: show only warnings) ; -v 0: suppress all screen outpu "-P": None, # <float> max p-value of suboptimal alignments in all search rounds but the last one (def=0.1) "-R": None, # <float> max p-value of repeats (def=1) "-T": None, # <float> max total repeat p-value (def=0.001) "-alpha": None, # <float> For calculating repeat p-values: weight of n'th suboptimal alignment vs. (n+1)-st suboptimal alignmen "-k": None, # [0,inf[ For calculating repeat p-values: maximal number of suboptimal alignments considered (def=1) "-cont": None, # <float> probability threshold for masking of inconsistent cells (def=0.001) "-mrgr": None, # [0,inf[ number of merge rounds for achieving consistency (def=3) "-lcon": None, # <float> preserved local context in shuffling (def=2) "-lmin": None, # [0,inf[ minimal length of repeats to be identified (def=7) } def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [] if infile: toks += ['-i', infile] for optstring, optvalue in self.valopts.items(): if optvalue is not None: toks += [str(optstring), str(optvalue)] for optstring, optvalue in self.boolopts.items(): if optvalue: toks.append(optstring) return toks def __init__(self, name=name): """Construct DetectorHHrepID object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorHHrepID, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorHHrepID.name) prog_args = self.config.tokens(infile=infile) # execute detector process retcode, stdoutfname, stderrfname = \ super().run_process(wd, *prog_args) # CHECK WHETHER outfile exists, else give warning and return empty # list. # Process output file, return results resultfile = os.path.join(wd, self.config.valopts["-o"]) if os.path.isfile(resultfile): with open(resultfile, "r") as infilehandle: tmp = list( repeat_detection_io.hhpredid_get_repeats(infilehandle)) return tmp else: return [] class DetectorPhobos(TRDetector): name = 'PHOBOS' displayname = "PHOBOS" # execute via phobos --printRepeatSeqMode 3 DNA.faa phobos.o # or phobos --help to see all options class Configuration: def __init__(self): self.boolopts = { # Phobos will choose the shorter of the two alignments instead # of the longer. Phobos will run faster with this option. "--preferShorterRepeats": False, # dontRemoveMostlyOverlapping. Default: DO remove mostly # overlapping repeats in favour of the one with the higher # internal score "--dontRemoveMostlyOverlapping": False, } self.scriptopts = { "outputfile": 'phobos.o', } self.valopts = { # <int> Display of TR. 0: asIs, 1: Alphabetical normal form, 2: Alphabetical normal form also considering the reverse complement. Default: 2 "--reportUnit": None, "--NPerfectionMode": None, # <int> Treatment of N. 0: asMismatch, 1: asNeutral, 2: asMatch. Default: 0. "--printRepeatSeqMode": '2', # <int> Show the repeat unit alignment "--outputFormat": None, # <int> 0 : Use the Phobos output format. Default: 0 "--minPerfection": None, # <float> Minimum perfection of a satellites. Default: 0. "--maxPerfection": None, # <float> Maximum perfection of a satellites. Default: 100. "--maximum_score_reduction": None, # The maximum amount the score can be reduced before search is aborded. Typical: 6*mismatch-penalty or infinite. Default: infinite "--recursion": None, # The recursion depth used in the search. Values in the range 3 to 7 are recommended. A value of 0 implies a search for perfect repeats only. Default: Not stated "--minUnitLen": None, # <int> Minimum unit length. Default: 1 "--maxUnitLen": None, # <int> Maximum unit length. Default: 10 "--minLength_b": None, # <float> The minimum length of a repeat is determined with: maximum( minLength, minLength_a + minLength_b*(unit-length) ). Default value of minLength_b: 0 "--minLength_a": None, # <int> Default value of minLength_a: 0 "--minLength": None, # <int> Default value of minLength: 0 "--minScore_b": None, # <float> The minimum score of a repeat is determined with: maximum( minScore, minScore_a + minScore_b*(unit-length) ). Default value of minScore_b: 1 "--minScore_a": None, # <int> Default value of minScore_a: 0 "--minScore": None, # <int> Default value of minScore: 6 "--mismatchScore": None, # <int> Score for mismatch - must be negative. Default: -5. Match score is fixed to one. "--indelScore": None, # <int> Score for indels - must be negative. Default: -5. Match score is fixed to one. "--searchMode": 'imperfect' # ['exact','extendExact','imperfect'] } def set_working_dir(self, working_dir): self.scriptopts['outputfile'] = os.path.join( working_dir, self.scriptopts['outputfile']) if not os.path.isdir(working_dir): os.makedirs(working_dir) def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [ optstring for optstring, optvalue in self.boolopts.items() if optvalue] for optstring, optvalue in self.valopts.items(): if optvalue is not None: toks.append(optstring) toks.append(str(optvalue)) if infile: toks.append(infile) toks.append(self.scriptopts['outputfile']) return toks def __init__(self, name=name): """Construct DetectorPhobos object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorPhobos, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorPhobos.name) self.config.set_working_dir(working_dir=wd) # execute detector process retcode, stdoutfname, stderrfname = super( DetectorPhobos, self).run_process( wd, *self.config.tokens(infile)) # alternatively: #prog_args = self.config.tokens(infile=infile) #retcode, stdoutfname, stderrfname = super().run_process(wd, *prog_args) # Process output file, return results If outfile does not exist return # empty list if os.path.isfile(self.config.scriptopts['outputfile']): with open(self.config.scriptopts['outputfile'], "r") as resultfilehandle: tmp = list( repeat_detection_io.phobos_get_repeats(resultfilehandle)) return tmp else: LOG.warning( "Did not find Phobos result file in %s", self.config.scriptopts['outputfile']) return [] class DetectorTRED(TRDetector): name = 'TRED' displayname = "TRED" """ tred is a shell script executing: tred1 myDNA.faa intermediate_output tred2 myDNA.faa intermediate_output output1 output2 At the moment, we do not know how to add parameters to TRED """ class Configuration: def __init__(self): # For now, use Tred with default options only self.boolopts = { } self.valopts = { } def set_result_file(self, result_file=None): self.result_file = result_file def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [] if infile: toks = [infile] toks.append(self.result_file) return toks def __init__(self, name=name): """Construct DetectorTRED object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorTRED, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorTRED.name) self.result_file = os.path.join(wd, 'tred.o') self.config.set_result_file(result_file=self.result_file) prog_args = self.config.tokens(infile=infile) # execute detector process retcode, stdoutfname, stderrfname = \ super().run_process(wd, *prog_args) # CHECK WHETHER outfile exists, else give warning and return empty # list. # Process output file, return results if os.path.isfile(self.result_file): with open(self.result_file, "r") as infilehandle: tmp = list(repeat_detection_io.tred_get_repeats(infilehandle)) return tmp else: LOG.warning( "Did not find TRED result file in %s", self.result_file) return [] class DetectorTREKS(TRDetector): name = 'T-REKS' displayname = "T-REKS" class Configuration: def __init__(self): self.boolopts = { "-overlapfilter": True, "-nosplit": True } self.valopts = { "-msaMode": None, "-clustal": None, "-db": None, "-muscle": None, "-outfile": None, "-similarity": None, "-kmeans": None, "-varIndels": None, "-type": None } def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" bool_toks = [ optstring for optstring, optvalue in self.boolopts.items() if optvalue] value_toks = ([ optstring + "=" + str(optvalue) for optstring, optvalue in self.valopts.items() if optvalue is not None ]) if infile: value_toks.append("-infile=" + infile) return bool_toks + value_toks def __init__(self, name=name, ): """Construct DetectorTREKS object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorTREKS, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorTREKS.name) prog_args = self.config.tokens(infile=infile) # execute detector process retcode, stdoutfname, stderrfname = \ super(DetectorTREKS, self).run_process(wd, *prog_args) if check_java_errors(stdoutfname, stderrfname, log=LOG, procname=DetectorTREKS.displayname): return [] # Process output file, return results with open(stdoutfname, "r") as outfile: tmp = list(repeat_detection_io.treks_get_repeats(outfile)) return tmp class DetectorTRF(TRDetector): name = 'TRF' displayname = "TRF_Benson" # Execute via: # ./trf404.linux64.exe inputfile 2 7 7 80 10 50 500 -d # Result file in this case is called inputfile.2.7.7.80.10.50.500.1.txt.html # WARNING: This result file name is only used if there is only one sequence in the input file! ## result_file_name = ".".join([inputfile] + [str(optvalue) for optstring, optvalue in self.valopts.items() if optvalue != None] + ['1.txt.html']) # ./trf404.linux64.exe File Match Mismatch Delta PM PI Minscore MaxPeriod [options] class Configuration: def __init__(self): self.boolopts = { # no redundancy elimination # Already without the -r flag set, # the three best redundant solutions are displayed "-r": False, # "-h" : False, # When -h is set, the output html of interest is not written :( # "-d" : False, # produce data file, not needed at curren # "-m" : False, # masked sequence file # "-f" : False, # flanking sequence } # The order of TRF flags matters. Hence, we use an ordered # dictionary for valopts. valopts_tmp = [ ("File", None), # input file ("Match", 2), # matching weigh ("Mismatch", 7), # mismatching penalty ("Delta", 7), # indel penalty ("PM", 80), # integer match probability ("PI", 10), # integer indel probability ("Minscore", 50), # minimum alignment score to report ("MaxPeriod", 1000) # maximum period size to report ] self.valopts = OrderedDict(valopts_tmp) def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [] if infile: toks.append(infile) for optstring, optvalue in self.valopts.items(): if optvalue is not None: toks.append(str(optvalue)) for optstring, optvalue in self.boolopts.items(): if optvalue: toks.append(optstring) return toks def __init__(self, name=name): """Construct DetectorTRF object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorTRF, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorTRF.name) prog_args = self.config.tokens(infile=infile) # execute detector process retcode, stdoutfname, stderrfname = \ super().run_process(wd, *prog_args) # get the infile name from the infile path infile_name = re.findall(r"/([_\.\w]+)$", infile)[0] # WARNING! TRF is a volatile programme, this result_file_name might not always be true. # Maybe a general search on txt.html elements in the result dir might # be more appropiate. result_file_name = ".".join( [infile_name] + [ str(optvalue) for optstring, optvalue in self.config.valopts.items() if optvalue is not None] + ['1.txt.html']) # Process output file, return results with open(os.path.join(wd, result_file_name), "r") as outfile: tmp = list(repeat_detection_io.trf_get_repeats(outfile)) return tmp class DetectorTrust(TRDetector): name = 'TRUST' displayname = "TRUST" class Configuration: def __init__(self): self.boolopts = { "-noseg": True, "-force": False, } self.valopts = { "-matrix": os.path.expanduser(REPEAT_DETECTOR_PATH['TRUST_substitutionmatrix']), "-gapo": "8", "-gapx": "2", "-procTotal": "1", } # procTotal: When running on a cluster, specify the amount of processors used # procNr cpu_number: When running on a cluster, specify 0-based CPU-number and amount of processors used def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [ optstring for optstring, optvalue in self.boolopts.items() if optvalue] for optstring, optvalue in self.valopts.items(): if optvalue is not None: toks.append(optstring) toks.append(str(optvalue)) if infile: toks.append("-fasta") toks.append(infile) return toks def __init__(self, name=name): """Construct DetectorTrust object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorTrust, self).__init__(executable) def run_process(self, working_dir, infile): """Run detector process on infile in working_dir and return all repeats found""" wd = os.path.join(working_dir, DetectorTrust.name) # execute detector process retcode, stdoutfname, stderrfname = super( DetectorTrust, self).run_process( wd, *self.config.tokens(infile)) if check_java_errors(stdoutfname, stderrfname, log=LOG, procname=DetectorTrust.displayname): return [] # shutil.copyfile(stdoutfname, YOUR FAVOURITE PATH) # Process output file, return results with open(stdoutfname, "r") as outfile: return list(repeat_detection_io.trust_get_repeats(outfile)) class DetectorXStream(TRDetector): name = 'XSTREAM' displayname = "XSTREAM" class Configuration: def __init__(self): self.boolopts = { # "-n" : False, # sequence type: nucleotide "-l": False, # don't look for periods >1000 # "-p" : False, # don't print repeat alignmen "-h": True, # write to console, not HTML # "-C" : False, # don't print any repeat info "-z": True, # generate spreadsheet .xls output # "-Z" : False, # create statistics spreadshee # "-s" : False, # parse output by input sequence "-f": False, # perform divide & conquer on inpu # "-G" : False, # create color-coded TR block diagram # "-G*" : False, # create single color TR block diagram # "-B" : False, # create sequence comparison map PNG # "-O" : False, # print "mismatch/gaps colored" HTML outpu "-N": False, # turn off nesting "-o": False, # don't remove overlapping TR domain # "-S" : False, # input strain into database "-V": True, # don't invoke D & C automatically for long inpu } self.valopts = { "-g": None, # [integer] set gap number "-i": None, # [0-1] set matching threshold for detection "-I": None, # [0-1] set cons error threshold for printing "-D": None, # [0-1] set max % indels for printing "-m": 1, # [integer] set minimum period "-x": None, # [integer] set maximum period "-e": None, # [any number>=2] set minimum copy number # "-a" : None, # [string] add identifier string to output file "-f": None, # [integer] set fragment length # "-d" : None, # [file path] specify output directory # "-A" : None, # [file] import substitution alphabe "-L": 6, # [integer] set minimum TR domain length "-P": 0, # [0-1] set minimum % sequence coverage # "-Q" : None, # [user,pass,db] send output to MySQL TR database } def tokens(self, infile=None): """Generate command line tokens based on this configuration. Arguments: infile -- generate tokens to pass infile as input file to detector""" toks = [ optstring for optstring, optvalue in self.boolopts.items() if optvalue] toks = toks + \ [optstring + str(optvalue) for optstring, optvalue in self.valopts.items() if optvalue is not None] if infile: toks.append(infile) return toks def __init__(self, name=name): """Construct DetectorXStream object. Arguments: name: The name of the tandem repeat detector. """ self.config = self.Configuration() executable = BinaryExecutable( binary=REPEAT_DETECTOR_PATH[name], name=name) super(DetectorXStream, self).__init__(executable) def find_chartfile(self, searchdir): """Look for XSTREAM output xls file and return filepath""" files = os.listdir(searchdir) # look for the first file that matches XSTREAM_something_chart.xl pat_chartfilename = re.compile(r"^XSTREAM_.*_chart\.xls$") for f in files: if pat_chartfilename.match(f): return os.path.join(searchdir, f) else: return None def run_process(self, working_dir, infile): """ Run detector process on infile in working_dir and return all repeats found. Run detector process on infile in working_dir and return all repeats found. Args: working_dir (str): Working directory infile (str): Infile .. ToDo:: Complete docstrings. """ wd = os.path.join(working_dir, DetectorXStream.name) # execute detector process retcode, stdoutfname, stderrfname = super( DetectorXStream, self).run_process( wd, *self.config.tokens(infile)) if check_java_errors(stdoutfname, stderrfname, log=LOG, procname=DetectorXStream.displayname): return [] # Find output file, cry about it if not found #shutil.rmtree(os.path.join(EXECROOT, '..', 'spielwiese')) #shutil.copytree(working_dir, os.path.join(EXECROOT, '..', 'spielwiese')) chartfilename = self.find_chartfile(wd) if not chartfilename: LOG.error("No XSTREAM output chart file found in %s!", wd) return [] # Process output file, return results with open(chartfilename, "r") as infile: return list(repeat_detection_io.xstream_get_repeats(infile))
[docs]def Detectors(lDetector=None, sequence_type=None): """ Define a global dictionary of all used detector functions. Define a global dictionary of all used detector functions. Args: lDetector (list of str): A list of repeat detection algorithm names. sequence_type (str): Either "AA" or "DNA". Raises: Exception: if at least one of the provided detectors in ``lDetector`` does not exist. """ global DETECTORS if not sequence_type: sequence_type = CONFIG_GENERAL["sequence_type"] if not lDetector: lDetector = CONFIG_GENERAL["sequence"]["repeat_detection"][sequence_type] else: if isinstance(lDetector, str): lDetector = [lDetector] elif not isinstance(lDetector, list): raise TypeError(""" lDetector is not of type list. Please supply a list of TR detectors (e.g. lDetector = ['HHrepID']). If you use TR detectors defined in config.ini, make sure the TR detector list (AA or DNA) is correctly defined.""") if any(i not in FINDER_LIST.keys() for i in lDetector): raise Exception( "Unknown TR detector supplied (Supplied: {}. Known TR detectors: {})".format( lDetector, FINDER_LIST.keys())) DETECTORS = {FINDER_LIST[i].name: FINDER_LIST[i]() for i in lDetector}
[docs]def split_sequence(seq_records, working_dir): """ Split a FASTA file with multiple entries to several FASTA files with one entry Arguments: seq_records -- List of Sequence instances. working_dir -- Output directory for splitted file Returns: A list of tuples containing the Protein identifier and the file name .. TODO: Do not append i to outfiles, but a sequence ID """ outfiles = [] for i, seq in enumerate(seq_records): filename = "sequence_{0:03}.faa".format(i + 1) seq.write(file=os.path.join(working_dir, filename), file_format="fasta") outfiles.append((i, filename)) return outfiles
class DetectorJob: def __init__(self, detector, infile, job_id, protein_id): self.detector = detector self.infile = infile self.job_id = job_id self.protein_id = protein_id ################ RUN A SET OF Tandem Repeat Detection algorithms (TRDs) ##
[docs]def run_detector( seq_records, detectors=None, sequence_type='AA', default=True, local_working_dir=None, num_threads=1): """ Run TRD on sequence_records and return the predicted repeats for each ``seq_records`` and for each tandem repeat detector. Run TRD on sequence_records and return the predicted repeats for each ``seq_records`` and for each tandem repeat detector. Example of return type:: [ # record 1 { 'T-REKS' : [ Repeat(), Repeat(), ...], 'XSTREAM' : [ Repeat(), Repeat(), ...], ... }, # record 2 ... ] Args: seq_records (list of Sequence): A list of Sequence instances detectors (list of str): A list tandem repeat detector names sequence_type (str): Either "AA" or "DNA" default (bool): If True, default values for the detection algorithms are used. local_working_dir (str): Directory where data and results are stored. If provided, temporary files are not deleted/ num_threads (int): Run ``num_threads`` detectors on parallel threads. Returns: list of dictionary: A list with a dictionary for each record in seq_records. The dictionary contains a list of repeats for each detector that was used. """ # Create temporary working dir if local_working_dir: working_dir = local_working_dir else: working_dir = tempfile.mkdtemp() LOG.debug( "repeat_detection_run.run_detector: Created tempfile: %s", working_dir) if not os.path.isdir(working_dir): raise IOError("The specified directory \"" + working_dir + "\" does not exist") # Initialise Detectors Detectors(detectors, sequence_type) # Adjust TRD parameters: if not default: if sequence_type == 'AA': DETECTORS['HHrepID'].config = set_hhrepid_config_open() DETECTORS['TRUST'].config = set_trust_config_open() else: DETECTORS['TRF'].config = set_trf_config_open() DETECTORS['PHOBOS'].config = set_phobos_config_open() DETECTORS['T-REKS'].config = set_treks_config_open() DETECTORS['XSTREAM'].config = set_xstream_config_open() if sequence_type == 'DNA': DETECTORS['T-REKS'].config = set_treks_config_DNA() infiles = split_sequence(seq_records, working_dir) # list of dictionaries for each protein containing detectorname : results # entries results = [ {fname: [] for fname, detector in DETECTORS.items()} for i in range(len(infiles)) ] # Create a list for our jobs joblist = [ DetectorJob( detector=detector, infile=os.path.join(working_dir, infile[1]), job_id=job_id, protein_id=infile[0] ) for fname, detector in DETECTORS.items() for job_id, infile in enumerate(infiles) ] LOG.info("Processing %d input files in %d jobs.", len(infiles), len(joblist)) # put joblist into job queue, thus starting actual work for job in joblist: try: wd = os.path.join(working_dir, "{0:03}".format(job.job_id + 1)) LOG.debug("Launching detector \"%s\" in directory %s", job.detector.name, os.path.join(wd, job.detector.name)) result = job.detector.run_process(wd, job.infile) # lock the mutex for results, append result # with result_lock: results[job.job_id][job.detector.name].extend(result) LOG.debug("Detector \"%s\" returned from job %d", job.detector.name, job.job_id) except: LOG.exception( "Exception occured in worker while processing %s with %s", job.infile, job.detector.name) LOG.info("All jobs returned.") # delete temporary directory if not local_working_dir: try: shutil.rmtree(working_dir) # I guess the error type is known, and you could be more precise :) except: LOG.error("Unexpected error: {0}".format(sys.exc_info()[0])) raise return results
######## SET OPEN CONFIGS ######### def set_hhrepid_config_open(): # construct open configuration for HHrepid config = DetectorHHrepID.Configuration() config.boolopts["-nofilt"] = True # <float> max p-value of suboptimal alignments in all search rounds but the last one (def=0.1) config.valopts["-P"] = 0.6 config.valopts["-T"] = 0.2 # <float> max total repeat p-value (def=0.001) # [0,inf[ minimal length of repeats to be identified (def=7) config.valopts["-lmin"] = 0 LOG.debug("%s config tokens: %s", DETECTORS['hhrepid'].displayname, ", ".join(DETECTORS['hhrepid'].config.tokens())) return config def set_phobos_config_open(): # construct open configuration for Phobos config = DetectorPhobos.Configuration() config.valopts["--maxUnitLen"] = 100 config.valopts["--mismatchScore"] = -2 config.valopts["--indelScore"] = -2 return config # Incomplete def set_treks_config_DNA(): # construct DNA configuration for T-Reks config = DetectorTREKS.Configuration() config.valopts["-type"] = 1 LOG.debug("%s config tokens: %s", DETECTORS['t-reks'].displayname, ", ".join(DETECTORS['t-reks'].config.tokens())) return config def set_treks_config_open(): # construct open configuration for T-Reks config = DetectorTREKS.Configuration() config.valopts["-similarity"] = 0.2 LOG.debug("%s config tokens: %s", DETECTORS['t-reks'].displayname, ", ".join(DETECTORS['t-reks'].config.tokens())) return config def set_trf_config_open(): # construct open configuration for TRF config = DetectorTRF.Configuration() config.valopts["Minscore"] = 20 LOG.debug("%s config tokens: %s", DETECTORS['trf'].displayname, ", ".join(DETECTORS['trf'].config.tokens())) return config def set_trust_config_open(): # construct open configuration for Trust config = DetectorTrust.Configuration() config.boolopts['-force'] = True LOG.debug("%s config tokens: %s", DETECTORS['trust'].displayname, ", ".join(DETECTORS['trust'].config.tokens())) return config def set_xstream_config_open(): # construct open configuration for XStream config = DetectorXStream.Configuration() #config.boolopts[optname] = True config.valopts["-I"] = 0.2 config.valopts["-i"] = 0.1 config.valopts["-g"] = 100 LOG.debug("%s config tokens: %s", DETECTORS['xstream'].displayname, ", ".join(DETECTORS['xstream'].config.tokens())) return config ######################## HARDCODED OVERVIEW DICTIONARIES ################# FINDER_LIST = {"HHrepID": DetectorHHrepID, "Phobos": DetectorPhobos, "TRED": DetectorTRED, "T-REKS": DetectorTREKS, "TRF": DetectorTRF, "TRUST": DetectorTrust, "XSTREAM": DetectorXStream }