Source code for tral.sequence.repeat_detection_io

# (C) 2011, Alexander Korsunsky
# (C) 2011-2015 Elke Schaper

"""
    :synopsis: Parsing repeat detection algorithm output

    .. moduleauthor:: Elke Schaper <elke.schaper@isb-sib.ch>
"""

import logging
import re
import csv

LOG = logging.getLogger(__name__)


class RepeatRegion:

    def __init__(self, protein_id="", begin=None, msa=None):
        self.protein_id = protein_id
        self.begin = begin  # 1-based
        if msa is None:
            msa = []
        self.msa = msa


[docs]def tred_get_repeats(infile): r"""Read repeats from a TRED standard output (stdout) file stream successively. Read repeats from a TRED standard output (stdout) file stream successively. Postcondition: infile points to EOF. Layout of TRED output file:: Start: start End: \d+ Length: \d+ ( \d repeat_unit \d ( alignment_indicator )?)* Args: infile (file stream): File stream of output1 from tred1 myDNA.faa intermediate_output tred2 myDNA.faa intermediate_output output1 output2 Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Layout TRED output syntax. """ # pattern for start index pat_start = re.compile(r"Start: (\d+)") pat_repeat_unit = re.compile(r"\d+\s+([ACGT-]+)") pat_alignment_indicator = re.compile(r"\s+([ACGT-]+)") # Our possible parser states: # # 1: searching for start # 2: searching for first repeat unit # 3: searching for all further repeat units identifier = '' state = 1 repeat_units = [] for i, line in enumerate(infile): LOG.debug("Line %d: %s", i, line[0:-1]) if 1 == state: region = RepeatRegion(protein_id=identifier, msa=[]) match = pat_start.match(line) if match: LOG.debug(" * (1->2) Found start") LOG.debug("Start: %s", match.group(1)) region.begin = int(match.group(1)) state = 2 if 2 == state: match = pat_repeat_unit.match(line) if match: LOG.debug(" * (2->3) Found first repeat_unit") repeat_units.append((match.group(1), True)) state = 3 continue if 3 == state: match = pat_repeat_unit.match(line) if match: LOG.debug(" * (3->3) Found another repeat_unit") repeat_units.append((match.group(1), True)) else: match = pat_alignment_indicator.match(line) if match: LOG.debug(" * (3->3) Found an alignment_indicator unit") repeat_units.append((match.group(1), False)) else: LOG.debug(" * (3->1) Found end of repeat (yielding)") state = 1 region.msa = tred_msa_from_pairwise(repeat_units) yield region
[docs]def tred_msa_from_pairwise(repeat_units): """ Construct a MSA from pairwise alignments. Construct a MSA from pairwise alignments. At the moment, gaps following the repeat are not added. However, these gaps are added automatically when a ``Repeat`` instance is created. Args: repeat_units (list of str): Read in from TRED output files Returns: (list of str) .. todo:: Is the Args format mentioned correctly? """ pat_gap = re.compile(r"(-*)") result = [] index = 0 for iR in range(len(repeat_units)): ru = repeat_units[iR] # The next repeat unit if ru[1]: result.append('-' * index + ru[0]) # How many gaps in the beginning of this repeat unit? index += len(pat_gap.match(ru[0]).group()) # The next alignment indicator else: for iL in range(len(ru[0])): if ru[0][iL] == '-': # enter a gap between # the index + iL and index + iL + 1 character # in each repeat unit in result so far: result = [iRU[:index + iL] + '-' + iRU[index + iL:] for iRU in result] return result
[docs]def treks_get_repeats(infile): """ Read repeats from a T-REKS standard output (stdout) file stream successively. Read repeats from a T-REKS standard output (stdout) file stream successively. Postcondition: infile points to EOF. Layout of T-REKS output file:: protein ::= ">" identifier repeat* # repeat ::= repeat_header sequence* "*"+ # repeat_header ::= "Length:" integer "residues - nb:" integer "from" integer "to" integer "- Psim:"float "region Length:"integer Args: infile (file stream): File stream to the file of the standard output of T-Reks Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Layout T-REKS output syntax. """ # pattern for protein identifier pat_identifier = re.compile(r">(\S+)") # pattern for repeat properties pat_repeat_header = re.compile( r"Length:\s*\d+\s+" r"residues - nb:\s*(\d+)\s+" # 1. number of repeats r"from\s+(\d+)\s+to\s+(\d+)\s*" # 2-3. start and end of repeat region (1-based) r"-\s*Psim:\s*(-?[\d\.]+)\s+" # 4. probability (accept negative for (malformed) log10 values) r"region Length:\s*(\d+)") # 5. length pat_repeat_end = re.compile(r"\*+") # pattern for repeat sequence # FIXME stuff that occurs here is not just A-Z but a set of possible amino # acid symbols pat_sequence = re.compile(r"([A-Z-]+)") # Our possible parser states: # # 1: state between repeats # entry: reset repeat # expect repeat_header(goto 2) OR identifier(store identifier, goto 1) # 2: state for multiple sequence alignment line # expect sequence(goto 3, append sequence) OR repeat_end(return repeat, # goto 1) state = 1 identifier = "" for i, line in enumerate(infile): LOG.debug("Line %d: %s", i, line[0:-1]) if 1 == state: # Parsing header region = RepeatRegion(protein_id=identifier, msa=[]) LOG.debug("msa: %s", "\n".join(region.msa)) match = pat_repeat_header.match(line) if match: LOG.debug(" * (1->2) Found properties") region.begin = int(match.group(2)) # 1-based in T-Reks output end = int(match.group(3)) state = 2 match = pat_identifier.match(line) if match: LOG.debug(" * (1->1) Found identifier") identifier = match.group(1) state = 1 elif 2 == state: # Parsing MSA match = pat_sequence.match(line) if match: LOG.debug(" * (2->2) Found MSA line (appending)") region.msa.append(match.group(1)) state = 2 match = pat_repeat_end.match(line) if match: LOG.debug(" * (2->1) Found end of repeat (yielding)") state = 1 # Perform some checks before yielding msalen = sum(1 for row in region.msa for c in row if c != '-') if region.begin + msalen != end + 1: logging.warn("Inconsistency in T-Reks file: MSA length doesnt match start and end") yield region else: raise AssertionError("Huh? Unknown parser state " + str(state))
[docs]def xstream_get_repeats(infile): """ Read repeats from a XSTREAM output xls chart Read repeats from a XSTREAM output xls chart Postcondition: infile points to EOF. Args: infile (file stream): File stream to read XSTREAM output from a xls chart. Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. """ # The infile is luckily enough in csv format: reader = csv.reader(infile, dialect='excel-tab') # read first row with the fieldnames header = next(reader) LOG.debug("Header has %d fields.", len(header)) if len(header) != 8 and len(header) != 10: raise ValueError( "XStream output file seems to be malformed. " "Make sure to open this function with the generated xls chart.") # when XSTREAM is fed files with multiple sequences, first field is "identifier", # otherwise it is "seq length" if header[0] == "identifier": field_offset = 2 else: field_offset = 0 for row in reader: region = RepeatRegion() if field_offset: region.protein_id = row[0] region.begin = int(row[0 + field_offset]) region.msa = row[4 + field_offset].split() LOG.debug("Found repeat, yielding.") if len(region.msa) >= 2: yield region
[docs]def trust_fill_repeats(msa, begin, sequence, maximal_gap_length=20): ''' return a trust msa that has no longer indels than maximal_gap_length, that contains the indel characters even when not part of the trust output file. Background trust returns tandem repeats, but also distant repeats. ''' gapless_msa = [repeat_unit.replace('-', '').upper() for repeat_unit in msa] sequence = sequence.upper() # Find the start and end positions of the predicted repeat units position = [(begin - 1, begin + len(gapless_msa[0]) - 2)] for repeat_unit in gapless_msa[1:]: find_index = sequence[position[-1][1] + 1:].find(repeat_unit) # Repeat unit could not be found in sequence -> Discard the repeat if find_index == -1: return None, None repeat_unit_begin = find_index + position[-1][1] + 1 position.append( (repeat_unit_begin, repeat_unit_begin + len(repeat_unit) - 1)) # Derive the start and end positions of the gaps gap_position = [(i[1] + 1, j[0] - 1) for i, j in zip(position[:-1], position[1:])] gaps = [i[1] - i[0] + 1 for i in gap_position] # Filter out repeat units that are further apart than maximal_gap_length gap_valid = ''.join( ['1' if i_gap <= maximal_gap_length else '0' for i_gap in gaps]) count_valid_pairs = [len(m.group()) for m in re.finditer(re.compile(r'1+'), gap_valid)] # All repeat units are further apart than maximal_gap_length? -> Discard # the repeat if len(count_valid_pairs) == 0: return None, None # Choose the sequence of pairs closer then maximal_gap_length that is # longest valid_index = gap_valid.find('1' * max(count_valid_pairs)) # Shorten the predicted msa accordingly. msa = msa[valid_index:valid_index + max(count_valid_pairs) + 1] gaps = gaps[valid_index:valid_index + max(count_valid_pairs) + 1] gap_position = gap_position[ valid_index:valid_index + max(count_valid_pairs) + 1] position = position[valid_index:valid_index + max(count_valid_pairs) + 1] # Add missing sequence to the repeat units gap_count_before = 0 for i, i_gap in enumerate(gaps): gap_count_after = gap_count_before + i_gap msa[i] += '-' * (gap_count_before) + sequence[gap_position[i][0]:gap_position[i][1] + 1] + '-' * (sum(gaps) - gap_count_after) gap_count_before = gap_count_after msa[-1] += '-' * sum(gaps) return msa, position[0][0] + 1
[docs]def trust_get_repeats(infile): """ Read repeats from a TRUST standard output (stdout) file stream successively. Read repeats from a TRUST standard output (stdout) file stream successively. Postcondition: infile points to EOF. Layout of TRUST standard output:: protein ::= ">" identifier (repeat_types)* "//" # repeat_types ::= "REPEAT_TYPE" integer "REPEAT_LENGTH" integer (repeat_info)* (">Repeat " integer sequence)* # repeat_info ::= integer integer [integer] [integer] Args: infile (file stream): File stream from TRUST standard output. Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Layout TRUST output syntax. """ pat_identifier = re.compile(r">(\S+)") pat_repeat_type = re.compile(r"REPEAT_TYPE \d+") pat_repeat_length = re.compile(r"REPEAT_LENGTH \d+") pat_repeat_info = re.compile(r"(\d+) (\d+).*$") pat_repeat_header = re.compile(r">Repeat \d+") # FIXME find proper character set here pat_repeat_sequence = re.compile(r"([A-Za-z-]+)") pat_protein_end = re.compile(r"//") # Our possible parser states: # # 0: initial state # expect: identifier(store identifier, goto 1) # 1: before the beginning of a repeat # expect: "REPEAT_TYPE"(goto 2) or "//"(goto 0) # 2: state after having found "REPEAT_TYPE" # entry: reset state of return value (repeat) # expect: "REPEAT_LENGTH"(goto 3) # 3: first line of repeat info # expect: repeat_info(store begin, goto 4) or ">Repeat"(goto 5) # 4: continuing to read repeat info # expect: repeat_info(goto 4) or ">Repeat"(goto 5) # 5: part of MSA sequence # expect: sequence(goto 6) # 6: After first sequence # expect: ">Repeat"(goto 5) or "REPEAT_TYPE"(return repeat, goto 2) or # "//"(return repeat, goto 0) state = 0 region = RepeatRegion() identifier = "" def strip_comments(line): pat_comment = re.compile(r"\s*#.*$") return pat_comment.sub("", line) for i, line in enumerate(infile): line = strip_comments(line) if not line or line == '\n': continue LOG.debug("Line %d: %s", i, line[0:-1]) if 0 == state: match = pat_identifier.match(line) if match: LOG.debug( " *(0->1) Found identifier (storing \"%s\")", match.group(1)) identifier = match.group(1) state = 1 continue elif 1 == state: match = pat_repeat_type.match(line) if match: LOG.debug(" *(1->2) Found REPEAT_TYPE") state = 2 continue match = pat_protein_end.match(line) if match: LOG.debug(" *(1->0) Found protein end") state = 0 continue elif 2 == state: # Entry Action: clear msa cache region = RepeatRegion(protein_id=identifier) match = pat_repeat_length.match(line) if match: LOG.debug(" *(2->3) Found REPEAT_LENGTH") state = 3 continue elif 3 == state: match = pat_repeat_info.match(line) if match: LOG.debug( " *(3->4) Found repeat_info (storing begin: \"%s\")", match.group(1) ) region.begin = int(match.group(1)) state = 4 continue match = pat_repeat_header.match(line) if match: LOG.debug(" *(3->5) Found repeat_header") state = 5 continue elif 4 == state: match = pat_repeat_info.match(line) if match: LOG.debug(" *(4->4) Found repeat_info") state = 4 continue match = pat_repeat_header.match(line) if match: LOG.debug(" *(4->5) Found repeat_header") state = 5 continue elif 5 == state: match = pat_repeat_sequence.match(line) if match: LOG.debug( " *(5->6) Found sequence (storing \"%s\")", match.group(1)) region.msa.append(match.group(1)) state = 6 continue elif 6 == state: match = pat_repeat_header.match(line) if match: LOG.debug(" *(6->5) Found repeat_header") state = 5 continue match = pat_repeat_type.match(line) if match: LOG.debug(" *(6->2) Found REPEAT_TYPE, yielding.") state = 2 if len(region.msa) >= 2: yield region continue match = pat_protein_end.match(line) if match: LOG.debug(" *(6->0) Found protein end, yielding.") state = 0 if len(region.msa) >= 2: yield region continue else: raise AssertionError("Huh? Unknown parser state " + str(state))
################################## TRF - Benson ##########################
[docs]def trf_get_repeats(infile): r"""Read repeats from a TRF txt.html file stream file stream successively. Read repeats from a TRF txt.html file stream file stream successively. Postcondition: infile points to EOF. TRF output file syntax:: Sequence: ``identifier`` Indices: ``begin``--``end`` \d [a-zA-Z]+ # begin (repeat)* 1 (consensus)* # (( \d (repeat)* \d (consensus)* )? \d (repeat)* 1 (consensus)* )+ \d [a-zA-Z]+ # ``Statistics`` Args: infile (file stream): File stream from TRF output txt.html. (generated via e.g. ./trf404.linux64.exe FASTAFILE 2 7 7 80 10 50 500 -d > /dev/null If the -h flag is set, no .txt.html output is produced) Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Layout TRF output syntax. .. todo:: Does not search for the sequence identifier at current! """ # find the name of the sequence ## CURRENTLY NOT IMPLEMENTED #pat_identifier = re.compile(r"Sequence: (\S+)") # find coordinates of the repeat region in the protein pat_coordinates = re.compile(r"Indices: (\d+)--(\d+)") # find a part of a repeat unit and its coordinate pattern_seq = re.compile(r"\s+(\d+) ([ACGT\- ]+)") # find the final tag 'Statistics' pat_statistics = re.compile(r"Statistics") # Our possible parser states: # # state 0: searching for identifier -> 1 # not necessary when sequence identifier is known # state 1: searching for repeat region coordinates -> 2 # state 2: searching for beginning of MSA & save sequence to tmpMSA-> 4 # state 3: new repeat unit: save sequence to tmpMSA -> 6 # state 4: new repeat unit: save sequence to tmp_consensus -> 5 # state 5: # if sequence: save sequence to tmpMSA -> 6 # if end: save tmpMSA to preMSA; save tmp_consensus to consensus; Yield repeat region -> 1 # state 6: check: new repeat unit? save tmp_consensus # 1: use last tmpMSA entry for new tmpMSA; # save tmpMSA to preMSA; # save tmp_consensus to consensus; # save sequence to new tmp_consensus -> 3 # 0: save sequence to tmp_consensus -> 5 state = 1 # identifier = "" # Currently not implemented. preMSA = [] consensus = [] tmp_consensus = None for i, line in enumerate(infile): LOG.debug("Line %d: %s", i, line[0:-1]) # CURRENTLY NOT IMPLEMENTED # if state == 0: #searching for sequenceMSA identifier # tmp = pat_identifier.search(line) # if tmp: # state = 1 # identifier = tmp.group() # continue # elif state == 1: #searching for TR boundaries (indices) if 1 == state: # searching for repeat region coordinates search = pat_coordinates.search(line) if search: LOG.debug(" * (1->2) Found coordinates") state = 2 region = RepeatRegion() region.begin = int(search.group(1)) #region_end = search.group(2) short = False # searching for beginning of MSA & save sequence to tmpMSA-> 4 elif state == 2: match = pattern_seq.match(line) if match and match.group(1) == str(region.begin): LOG.debug(" *(2->4) Found first row of first MSA repeat unit") state = 4 if len(match.group(2).strip().split(" ")) > 1: short = True preMSA = match.group(2).strip().split(" ") LOG.debug("Repeat unit is short; preMSA: %s", str(preMSA)) else: tmpMSA = [match.group(2).strip().split(" ")[0]] LOG.debug(" tmpMSA: %s", str(tmpMSA)) elif state == 3: # new repeat unit: save sequence to tmpMSA -> 4 match = pattern_seq.match(line) if match: LOG.debug(" *(3->5) Found first row of new repeat unit") state = 6 if short: preMSA += match.group(2).strip().split(" ") LOG.debug("Repeat unit is short; preMSA: %s", str(preMSA)) else: tmpMSA.append(match.group(2).strip().split(" ")[0]) LOG.debug(" tmpMSA: %s", str(tmpMSA)) # if end: save tmpMSA to preMSA; save tmp_consensus to consensus; # Yield repeat region -> 1 if pat_statistics.search(line): LOG.debug( " *(5->1) Encountered 'Statistics': No more repeats, yielding.") state = 1 if not short: preMSA.append("".join(tmpMSA)) consensus.append("".join(tmp_consensus)) region.msa = getMSA(preMSA, consensus) if len(region.msa) >= 2: yield region preMSA = [] consensus = [] elif state == 4: # new repeat unit: save sequence to tmp_consensus -> 5 match = pattern_seq.match(line) if match: LOG.debug( " *(4->5) Found first consensus row of the repeat unit") state = 5 if short: consensus = match.group(2).strip().split(" ") LOG.debug( "Repeat unit is short; consensus: %s", str(consensus)) else: tmp_consensus = [match.group(2).strip().split(" ")[0]] LOG.debug(" tmp_consensus: %s", str(tmp_consensus)) elif state == 5: # SEARCHING FOR MSA ROW # if sequence: save sequence to tmpMSA -> 6 match = pattern_seq.match(line) if match: LOG.debug(" *(5->6) Found a MSA row") state = 6 if short: preMSA += match.group(2).strip().split(" ") LOG.debug("Repeat unit is short; preMSA: %s", str(preMSA)) else: tmpMSA.append(match.group(2).strip().split(" ")[0]) LOG.debug(" tmpMSA: %s", str(tmpMSA)) # if end: save tmpMSA to preMSA; save tmp_consensus to consensus; # Yield repeat region -> 1 if pat_statistics.search(line): LOG.debug( " *(5->1) Encountered 'Statistics': No more repeats, yielding.") state = 1 if not short: preMSA.append("".join(tmpMSA)) consensus.append("".join(tmp_consensus)) region.msa = getMSA(preMSA, consensus) if len(region.msa) >= 2: yield region preMSA = [] consensus = [] elif state == 6: # new repeat unit? ## SEARCHING FOR CONSENSUS ROW match = pattern_seq.match(line) # 1: save tmp_consensus -> 3 if match and match.group(1) == '1': LOG.debug( " *(6->3) Found a consensus row of a new repeat unit") state = 3 if short: # NEEDS TO BE CODED consensus += match.group(2).strip().split(" ") LOG.debug( "Repeat unit is short; consensus: %s", str(consensus)) else: newMSA = tmpMSA.pop() preMSA.append("".join(tmpMSA)) consensus.append("".join(tmp_consensus)) tmpMSA = [newMSA] tmp_consensus = [match.group(2).strip().split(" ")[0]] # 0: save sequence to tmp_consensus -> 5 elif match: LOG.debug(" *(6->5) Found a consensus row") state = 5 tmp_consensus.append(match.group(2).strip().split(" ")[0]) # YIELD # aha! there should have been a consensus sequence, but there is # not. Hence we are finished with this repeat! else: LOG.debug(' *(6->1) No consensus row: repeat finished') state = 1 if short: preMSA = preMSA[:-1] else: preMSA.append( "".join( tmpMSA[ 0:- 1])) # The last tmpMSA entry was not a repeat unit consensus.append("".join(tmp_consensus)) LOG.debug(" preMSA: %s", str(preMSA)) LOG.debug(" consensus: %s", str(consensus)) region.msa = getMSA(preMSA, consensus) if len(region.msa) >= 2: yield region preMSA = [] consensus = []
[docs]def getMSA(sequenceMSA, consensusMSA): """ Derive the MSA from a strange combination of consensusMSA and sequenceMSA in TRF (Benson) txt.html output files Args: sequenceMSA (?): consensusMSA (?): Returns: msa (list of str): The multiple sequence alignment predicted by TRF. """ msa = [""] * len(sequenceMSA) while consensusMSA: # CHECK for insertions insertion = 1 while insertion and consensusMSA: insertion = 0 for i_con in consensusMSA: if i_con and i_con[0] == "-": insertion = 1 break # INCLUDE insertions into the msa if insertion: for i in range(len(consensusMSA)): if consensusMSA[i] and consensusMSA[i][0] == "-": msa[i] += sequenceMSA[i][0] sequenceMSA[i] = sequenceMSA[i][1:] consensusMSA[i] = consensusMSA[i][1:] else: msa[i] += "-" # CHECK for deletions and normal sequence if not consensusMSA[0]: break for i in range(len(consensusMSA)): # The last repeat unit can be shorter than the ones before if not sequenceMSA[i]: break msa[i] += sequenceMSA[i][0] sequenceMSA[i] = sequenceMSA[i][1:] consensusMSA[i] = consensusMSA[i][1:] return msa
# ############## HHrepID - Soeding ###########################################
[docs]def hhpredid_get_repeats(infile): r"""Read repeats from a HHREPID standard output (stdout) file stream successively. Read repeats from a HHREPID standard output (stdout) file stream successively. Postcondition: infile points to EOF. Layout of HHREPID standard output:: protein ::= begin"-"\d "+"\d repeatUnit ( \d"-"\d "+"\d repeatUnit )+ Args: infile (file stream): File stream from HHREPID standard output. [Generated by e.g.: ./hhrepid_32 -i FASTAFILE -v 0 -d cal.hhm -o INFILE] Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Layout HHREPID output syntax. """ # find a part of a repeat unit and its first coordinate # minus or \minus? pattern_repeat_unit_count = re.compile(r"Repeats\s+(\d+)") pattern_seq = re.compile(r"[A-Z]+(\d+).*(\d+)\-.*\+[\d]+ ([\-a-zA-Z.]+)") # Our possible parser states: # state1: Find number of repeat units n # state2: Find first (partial) row of the MSA # state3: Find all other (partial) rows of the MSA region = None state = 1 for i, line in enumerate(infile): LOG.debug("Line %d: %s", i, line[0:-1]) if 1 == state: # Find 'Repeats' marker of new repeat search = pattern_repeat_unit_count.search(line) if search: LOG.debug(" *(1->2) Found repeat") state = 2 n = int(search.group(1)) elif 2 == state: # Find first (partial) row of the MSA search = pattern_seq.search(line) if search: LOG.debug(" *(2->3) Found first repeat unit (part)") state = 3 region = RepeatRegion() region.begin = int(search.group(2)) region.msa = [""] * n region.msa[int(search.group(1)) - 1] = search.group(3).replace('.', '-').upper() elif 3 == state: # Find all other (partial) rows of the MSA search = pattern_seq.search(line) if search: LOG.debug(" *(3->3) Found other repeat unit (part)") region.msa[int(search.group(1)) - 1] += search.group(3).replace('.', '-').upper() else: search = pattern_repeat_unit_count.search(line) if search: LOG.debug(" *(3->2) Yield Repeat, begin next") state = 2 n = int(search.group(1)) if len(region.msa) >= 2: yield region region = None else: LOG.warning( "HHPREDID: Msa too short %s", str( region.msa)) # Yield final repeat region. if region is not None: if len(region.msa) >= 2: yield region else: LOG.warning("HHPREDID: Msa too short %s", str(region.msa))
####################################### Phobos TRF ######################
[docs]def phobos_get_repeats(infile): """ Read repeats from a PHOBOS output file stream successively. Read repeats from a PHOBOS output file stream successively. Postcondition: infile points to EOF. Args: infile (file stream): File stream from PHOBOS output. Returns: (Repeat): A generator function is returned that, when called in a loop, yields one repeat per iteration. .. todo:: Show PHOBOS output syntax. """ pattern_begin = re.compile(r"(\d+) :\s+\d") pattern_seq = re.compile(r"([\-ACGT]+)") # Our possible parser states: # # state 1: Find TR begin # state 2: Find first repeat unit # state 3: Find repeat units state = 1 for i, line in enumerate(infile): LOG.debug("Line %d: %s", i, line[0:-1]) if 1 == state: # Find TR offset search = pattern_begin.search(line) if search and search.groups()[0] is not None: LOG.debug(" *(1->2) Found tandem repeat begin") state = 2 region = RepeatRegion() region.begin = int(search.groups()[0]) region.msa = [] elif 2 == state: # Find all other repeat units match = pattern_seq.search(line) if match and match.groups()[0] is not None: LOG.debug(" *(2->3) Found first repeat unit") region.msa.append(match.groups()[0]) state = 3 elif 3 == state: # Find all other repeat units match = pattern_seq.search(line) if match and match.groups()[0] is not None: LOG.debug(" *(3->3) Found a repeat unit") region.msa.append(match.groups()[0]) else: LOG.debug(" *(3->1) repeat region finished, yielding.") state = 1 if len(region.msa) >= 2: yield region else: LOG.warning("phobos: Msa too short %s", str(region.msa))