Source code for tral.repeat_list.repeat_list

# (C) 2015 Elke Schaper

from tral.repeat_list import repeat_list_io
"""
    :synopsis: The RepeatList Class.

    .. moduleauthor:: Elke Schaper <elke.schaper@isb-sib.ch>
"""

import logging
import pickle
import sys

LOG = logging.getLogger(__name__)


[docs]class RepeatList: """ A `RepeatList` contains list of repeats that belong to the same sequence, or set of sequences. `RepeatList` contains methods that act on several tandem repeats. For example, methods to * detect overlapping tandem repeats * identify the tandem repeat with highest statistical significance * filter a set of tandem repeats according to different filtering procedures. Attributes: repeats (list of Repeat): The list of tandem repats. """ def __str__(self): """ Create string for RepeatList instance. """ return str(self.repeats) def __init__(self, repeats): self.repeats = repeats def __add__(self, rl): if rl: return RepeatList(self.repeats + rl.repeats) else: return self def intersection(self, rl): if rl: return RepeatList([i for i in self.repeats if i in rl.repeats]) else: return self
[docs] def create(file, input_format): """ Read ``RepeatList`` from file. Read ``RepeatList`` from file (currently, only pickle is supported) Args: input_format (str): Currently only "pickle" file (str): Path to output file .. todo:: Write checks for ``input_format`` and ``file``. """ if input_format == 'pickle': with open(file, 'rb') as fh: return pickle.load(fh) else: raise Exception('input_format is unknown: {}.'.format(input_format))
[docs] def write(self, output_format, file=None, return_string=None, *args): """ Serialize and write ``RepeatList`` instances. Serialize ``RepeatList`` instance using the stated ``output_format``. If a ``file`` is specified, save the String. If ``str`` is specified, give back the String (not possible for pickles). Args: output_format (str): The output format: Either "pickle" or "tsv" file (str): Path to output file .. todo:: Write checks for ``output_format`` and ``file``. """ if output_format == 'pickle': with open(file, 'wb') as fh: pickle.dump(self, fh) file = False elif output_format == 'tsv': output = repeat_list_io.serialize_repeat_list_tsv(self) else: raise Exception('output_format is unknown: {}'.format(output_format)) if file: with open(file, 'w') as fh: fh.write(output) if return_string: return output
[docs] def filter(self, func_name, *args, **kwargs): """ Filter ``repeats`` according to ``func_name``. Filter ``repeats`` according to ``func_name`` Args: func_name (str): The name of a local filtering method. """ # Check: is func_name a str, or is it a method? if it is a method, it # must be in dir(): ``funcname in dir`` == TRUE? func = getattr(sys.modules[__name__], func_name) return RepeatList(func(self, *args, **kwargs))
[docs] def cluster(self, overlap_type, *args): """ Cluster ``repeats`` according to ``overlap_type``. Cluster ``repeats`` according to ``overlap_type``. We assume that overlap of repeats is transitive: If A overlaps with B, and B overlaps with C, (A,B,C) form one cluster. The attribute ``cluster`` is initiated to a dict: self.cluster = {"overlap_type1": [(0,2),(1)], "overlap_type2": [(0),(1), (2)]} In this toy example, the first and the third repeat in ``repeats`` cluster according to "overlap_type1", whereas no repeats cluster according to "overlap_type2". Args: overlap_type (str): The name of a local pairwise repeat overlapping method. """ if not hasattr(self, 'd_cluster'): self.d_cluster = {} l_cluster = [] l_repeat_indices = list(range(len(self.repeats))) while l_repeat_indices: i_cluster = [l_repeat_indices.pop()] l_repeat_in_cluster_check = i_cluster[:] while l_repeat_in_cluster_check: check = l_repeat_in_cluster_check.pop() remaining_indices = l_repeat_indices[:] for i in remaining_indices: if two_repeats_overlap( overlap_type, self.repeats[check], self.repeats[i]): i_cluster.append(i) l_repeat_in_cluster_check.append(i) l_repeat_indices.remove(i) l_cluster.append(i_cluster) l_cluster = [set(i) for i in l_cluster] self.d_cluster[overlap_type] = l_cluster
# FILTERING # Filter methods (func) are all defined in the same way: They return a subset # of self.repeats, also as a list. # Return is a (list of `Repeat`).
[docs]def pvalue(rl, score, threshold): """ Returns all repeats in ``rl`` with a p-Value below a certain threshold. Returns all repeats in ``rl`` with a p-Value below a certain threshold. Args: rl (RepeatList): An instance of the RepeatList class. score (str): The type of score defines the pvalue that is used for filtering threshold (float): All repeats with a pvalue of type `score` above this threshold are filtered out. """ threshold = float(threshold) res = [] for i_repeat in rl.repeats: if i_repeat.pvalue(score) <= threshold: res.append(i_repeat) return res
[docs]def divergence(rl, score, threshold): """ Returns all repeats in ``rl`` with a divergence below a certain threshold. Returns all repeats in ``rl`` with a divergence below a certain threshold. Args: rl (RepeatList): An instance of the RepeatList class. score (str): The type of score defines the divergence that is used for filtering threshold (float): All repeats with a divergence of type `score` above this threshold are filtered out. """ res = [] for i_repeat in rl.repeats: if i_repeat.divergence(score) <= threshold: res.append(i_repeat) return res
[docs]def attribute(rl, attribute, type, threshold): """ Returns all repeats in ``rl`` with a attribute below (above) a certain threshold. Returns all repeats in ``rl`` with a attribute below (above) a certain threshold. Args: rl (RepeatList): An instance of the RepeatList class. attribute (str): The attribute of the Repeat instance type (str): Either "min" or "max" threshold (float): All repeats with an attribute value below (above) this threshold are filtered out. """ threshold = float(threshold) res = [] for i_repeat in rl.repeats: value = getattr(i_repeat, attribute) if type == "min": if value >= threshold: res.append(i_repeat) elif type == "max": if value <= threshold: res.append(i_repeat) else: raise Exception( "type must either be 'min' or 'max'. Instead, it is {}.".format(type)) return res
[docs]def none_overlapping_fixed_repeats(rl, rl_fixed, overlap_type): """ Returns all repeats in ``rl`` none-overlapping with ``rl_fixed``. Returns all repeats in ``rl`` none-overlapping with ``rl_fixed`` according to ``overlap``. Args: rl (RepeatList): An instance of the RepeatList class. rl (rl_fixed): A second instance of the RepeatList class. overlap (list): First list element: Name (str) of an overlap method in repeat_list. All remaining elements are additional arguments for this class. """ res = [] for i_repeat in rl.repeats: for i_repeat_fixed in rl_fixed.repeats: if two_repeats_overlap( overlap_type, repeat1=i_repeat, repeat2=i_repeat_fixed): break else: res.append(i_repeat) return res
[docs]def none_overlapping(rl, overlap, l_criterion): """ Returns all none-overlapping repeats in ``rl``. Returns all none-overlapping repeats in ``rl``. Repeats are clustered according to ``overlap``. Of each cluster, only the best repeat is returned according to ``dCriterion``. Args: rl (RepeatList): An instance of the RepeatList class. overlap (tuple): First element: Name (str) of an overlap method in `repeat_list`. Second element: ``**kwargs``. All remaining elements are additional arguments for this class. l_criterion (list): list of (criterion (str), criterion arguments) tuples. Until only one repeat is remainining in a cluster, the criteria are applied in order. """ overlap_type = overlap[0] if len(overlap) > 1: # has to be adapted to a more convenient way overlap_args = overlap[1] if not (hasattr(rl, 'd_cluster') and overlap_type in rl.d_cluster): if len(overlap) > 1: rl.cluster(overlap_type, overlap_args) else: rl.cluster(overlap_type) res = [] for i_cluster in rl.d_cluster[overlap_type]: i_repeat = [rl.repeats[i] for i in i_cluster] for ctype, cvalue in l_criterion: criterion_type, criterion_value = ctype, cvalue if len(i_repeat) == 1: res.append(i_repeat[0]) break if criterion_type == 'pvalue': min_value = min(i.pvalue(criterion_value) for i in i_repeat) i_repeat = [ i for i in i_repeat if i.pvalue(criterion_value) == min_value] elif criterion_type == 'divergence': min_value = min(i.divergence(criterion_value) for i in i_repeat) i_repeat = [ i for i in i_repeat if i.divergence(criterion_value) == min_value] else: LOG.debug( "repeat_list.none_overlapping(): > 1 Repeats have the same values...") res.append(i_repeat[0]) return res
# OVERLAP DETECTION METHODS
[docs]def two_repeats_overlap(overlap_type, repeat1, repeat2): """ Helper method to test the overlap of ``repeat1`` and ``repeat2``. Helper method to test the overlap of ``repeat1`` and ``repeat2``. The overlap is calculated by the local method ``overlap_type``. Args: overlap_type (str): The name of a local pairwise repeat overlapping method. repeat1 (Repeat): An instance of the Repeat class repeat2 (Repeat): A second instance of the Repeat class Returns: Forwards method output """ is_overlapping = getattr(sys.modules[__name__], overlap_type) # Alternative implementation: # is_overlapping = globals()[overlap_type] return is_overlapping(repeat1, repeat2)
[docs]def shared_char(repeat1, repeat2): """ Do two TRs share at least one char? Return 1 if the two TRs share at least one char (amino acids or nucleotides); else 0. Args: repeat1 (Repeat): An instance of the Repeat class repeat2 (Repeat): A second instance of the Repeat class Returns: Bool: 1 if the repeats share >= 1 char, else 0. """ if (repeat1.begin + repeat1.repeat_region_length - 1 < repeat2.begin) or \ (repeat2.begin + repeat2.repeat_region_length - 1 < repeat1.begin): return False else: return True
[docs]def common_ancestry(repeat1, repeat2): """ Do two TRs share at least one pair of chars with common ancestry? Return 1 if the two TRs share at least one pair of chars (amino acids or nucleotides) with common ancestry; else 0. Args: repeat1 (Repeat): An instance of the Repeat class repeat2 (Repeat): A second instance of the Repeat class Returns: Bool: 1 if the repeats share >= 1 pair of chars with common ancestry, else 0. """ if not hasattr(repeat1, 'msaIT'): repeat1.calc_index_msa() if not hasattr(repeat2, 'msaIT'): repeat2.calc_index_msa() original = repeat2.msaIT potential = repeat1.msaIT try: for p in potential: iP = 0 for iP in range(len(p) - 1): i = 0 j = 0 while original[i][j] != p[iP]: if original[i][j] > p[iP]: i += 1 j = 0 else: j += 1 if len(original) <= i or len(original[i]) <= j: break else: for iPRest in range(iP + 1, len(p)): if p[iPRest] in original[i][j + 1:]: #coverage = repeat1.repeat_region_length/repeat2.repeat_region_length #greediness = repeat1.l_effective/repeat2.l_effective return True except: LOG.warning('error in shared_char with original %s and potential %s', str(repeat1.msaIT), str(repeat2.msaIT)) LOG.warning('original:', str(repeat2.msa)) LOG.warning('original:', str(repeat2.begin)) LOG.warning('potential:', str(repeat1.msa)) LOG.warning('potential:', str(repeat1.begin)) return False return False