Source code for tral.hmm.hmm_io

# (C) 2015 Elke Schaper

"""
:synopsis: Input/output for hmms.

.. moduleauthor:: Elke Schaper <elke.schaper@isb-sib.ch>
"""

import logging
import os
import re
import math
import tral

LOG = logging.getLogger(__name__)

# ################################ READ HMMMER3  #########################


[docs]def read(hmm_filename, id=None): """Read HMM file in HMMER3 format. HMMER3 file format is described in detail in ftp://selab.janelia.org/pub/software/hmmer3/3.0/Userguide.pdf, section 8. The general file format is:: HMMER3/b [3.0b2 | June 2009] NAME fn3 ACC PF00041.12 DESC Fibronectin type III domain LENG 86 ALPH amino (...) HMM A C D E F G H I (...) Y m->m m->i m->d i->m i->i d->m d->d COMPO 2.70271 4.89246 3.02314 2.64362 3.59817 2.82566 3.74147 3.08574 (...) 3.22607 2.68618 4.42225 2.77519 2.73123 3.46354 2.40513 3.72494 3.29354 (...) 3.61503 0.00338 6.08833 6.81068 0.61958 0.77255 0.00000 * 1 3.16986 5.21447 4.52134 3.29953 4.34285 4.18764 4.30886 3.35801 (...) 3.93889 1 - - 2.68629 4.42236 2.77530 2.73088 3.46365 2.40512 3.72505 3.29365 (...) 3.61514 0.09796 2.38361 6.81068 0.10064 2.34607 0.48576 0.95510 2 2.70230 5.97353 2.24744 2.62947 5.31433 2.60356 4.43584 4.79731 (...) 4.25623 3 - - 2.68618 4.42225 2.77519 2.73123 3.46354 2.40513 3.72494 3.29354 (...) 3.61503 0.00338 6.08833 6.81068 0.61958 0.77255 0.48576 0.95510 (...) 86 3.03720 5.94099 3.75455 2.96917 5.26587 2.91682 3.66571 4.11840 (...) 4.99111 121 - E 2.68618 4.42225 2.77519 2.73123 3.46354 2.40513 3.72494 3.29354 (...) 3.61503 0.00227 6.08723 * 0.61958 0.77255 0.00000 * // .. important:: The probabilities in this format are stored as negative natural log probabilities, e.g. -ln(0.25) = 1.38629. The special case of 0 probability is stored as ``*`` which in fact means -∞ (minus infinity). Args: hmm_filename (str): Path to the file with model data in the HMMER3 file format. id (str, optional): The identifier for the model to be returned. E.g. for Pfam the ``id`` can look like this: 'PF00560'. If defined, the function returns only the HMM with an identifier that matches the provided id. If a model in the file does not have an identifier to check against, it is skipped. Returns: dict: A dictionary of parameters required to initialise the HMM including the id, alphabet, emission probabilities and transition probabilities. Output format:: { 'id': 'PF08261.7', 'letters': ['A', 'C', 'D', ..., 'Y'], 'COMPO': {'insertion_emissions': [2.68618, 4.42225, ..., 3.61503], 'emissions': [2.28205, 5.14899, ..., 1.92022], 'transitions': [0.01467, 4.62483, ..., -inf]}, '1': {'insertion_emissions': [2.68618, 4.42225, ..., 3.61503], 'emissions': [1.00089, 4.54999, ..., 5.23581], 'transitions': [0.01467, 4.62483, ..., 0.95510]}, ... '8': {'insertion_emissions': [2.68618, 4.42225, ..., 3.61503], 'emissions': [4.12723, 5.39816, ..., 4.58094], 'transitions': [0.00990, 4.62006, ..., -inf]} } """ pat_start_HMMER3 = re.compile(r"HMMER3") pat_accession = re.compile(r"ACC\s+([\w\.]+)") pat_HMM = re.compile(r"HMM") pat_letters = re.compile(r"\w") pat_emissions = re.compile(r"[\w\.]+") pat_insertions = re.compile(r"[\d\.]+") pat_transition = re.compile(r"[\d\.\*]+") pat_end = re.compile(r"//") # Our possible parser states: # # 0: searching for HMMER3 Tag # 0.1 : searching for identifier Tag # 1: searching for HMM Tag # 2: Skipping line "m->m m->i m->d i->m i->i d->m d->d" # 3: searching for emission probabilities OR end of HMM # 4: searching for insertion emission probabilities # 5: searching for transition probabilities state = 0 with open(hmm_filename, "rt") as infile: for i, line in enumerate(infile): LOG.debug("Line %s: %s", i, line[0:-1]) if 0 == state: match = pat_start_HMMER3.match(line) if match: LOG.debug(" * (0->0.1) Found model start.") state = 0.1 elif 0.1 == state: hmm = {'id': None} match = pat_accession.match(line) if match: iID = match.group(1) if id: if id in iID: LOG.debug(" * (0.1->1) Found matching" " identifier.") hmm['id'] = iID state = 1 else: LOG.debug(" * (0.1->0) Found identifier which " "does not the one provided, skipping " "model.") state = 0 else: hmm['id'] = iID state = 1 # This is done in order to be able to parse HMMs without an # accession number, e.g. when created from a Repeat. match2 = pat_HMM.match(line) if match2: letters = pat_letters.findall(line[3:]) if id: LOG.debug(" * (0.1->0) No identifier found to " "compare to, skipping model.") state = 0 else: LOG.debug(" * (0.1->2) No identifier, found" " HMM start") LOG.debug("Letters: %s", letters) hmm['letters'] = letters state = 2 elif 1 == state: match = pat_HMM.match(line) if match: letters = pat_letters.findall(line[3:]) LOG.debug(" * (1->2) Found HMM start") LOG.debug("Letters: %s", letters) hmm['letters'] = letters state = 2 elif 2 == state: LOG.debug(" * (2->3) Skipping line") state = 3 elif 3 == state: findall = pat_emissions.findall(line) if findall: current_hmm_state = findall[0] if current_hmm_state == 'COMPO': string_emissions = findall[1:] else: string_emissions = findall[1:1 + len(hmm['letters'])] LOG.debug(" * (3->4) Found emission probabilities") LOG.debug("Current HMM state: %s", current_hmm_state) emissions = [float(i) if i != '*' else -float('inf') for i in string_emissions] LOG.debug("Emission probabilities: %s", emissions) hmm[current_hmm_state] = {'emissions': emissions} state = 4 elif pat_end.match(line): if id: LOG.debug(" * (3->TERMINAL) HMM Found and compiled," " return HMM.") LOG.info("Yielding {}, stopping".format(hmm['id'])) yield hmm else: LOG.debug(" * (3->0) Finished HMM compilation") LOG.info("Yielding {}".format(hmm['id'])) yield hmm state = 0 else: LOG.debug(" * (3->0) Error: No emission line") state = 0 elif 4 == state: findall = pat_insertions.findall(line) if findall: LOG.debug(" * (4->5) Found insertion emission" " probabilities") emissions = [float(i) if i != '*' else -float('inf') for i in findall] LOG.debug("Insertion emission probabilities: %s", emissions) hmm[current_hmm_state]['insertion_emissions'] = emissions state = 5 else: LOG.debug(" * (4->0) Error: No insertion emission line") state = 0 elif 5 == state: findall = pat_transition.findall(line) if findall: LOG.debug(" * (5->3) Found transition probabilities") transitions = [float(i) if i != '*' else -float('inf') for i in findall] LOG.debug("Transition probabilities: %s", transitions) hmm[current_hmm_state]['transition'] = transitions state = 3 else: LOG.debug(" * (5->0) Error: No transition line") state = 0
[docs]def split_HMMER3_file(hmm_filename, resultdir): """Split HMMER3 models from a single file ``hmm_filename`` into many files in ``resultdir``. Helper function: split HMMER3 models from a single file ``hmm_filename`` into many files in ``resultdir``. The models are named after the HMM accession. Args: hmm_filename (str): Path to HMMER3 file. resultdir (str): Path to directory where result files are stored. """ pat_start_HMMER3 = re.compile(r"HMMER3") pat_accession = re.compile(r"ACC\s+([\w\.]+)") tmp_file = os.path.join(resultdir, "tmp.hmm") state = 0 fh = None acc = None with open(hmm_filename, "rt") as infile: for i, line in enumerate(infile): LOG.debug("Line %s: %s", i, line[0:-1]) if 0 == state: match = pat_start_HMMER3.match(line) if match: if fh: fh.close() os.rename( tmp_file, os.path.join( resultdir, acc + ".hmm")) LOG.debug(" * (0->1) Found HMM start") state = 1 fh = open(tmp_file, "w") fh.write(line) elif 1 == state: fh.write(line) match = pat_accession.match(line) if match: acc = match.group(1).split(".")[0] LOG.debug(" * (1->0) Found accession") state = 0
[docs]def read_HMMER_acc_lengths(hmm_filename): """Read HMM file in HMMER3 format. Return the PFAM ID and the lengths of each model. Read HMM file in HMMER3 format. Return the PFAM ID and the lengths of each model. Args: hmm_filename (str): Path to HMMER3 file. Returns: (dict of str: int): The number of match states for all HMM models in ``hmm_filename``. .. todo:: Decide whether this function is needed. """ pat_accession = re.compile(r"ACC\s+([\w\.]+)") pat_length = re.compile(r"LENG\s+([\w\.]+)") state = 0 data = {} with open(hmm_filename, "rt") as infile: for i, line in enumerate(infile): LOG.debug("Line %s: %s", i, line[0:-1]) if 0 == state: match = pat_accession.match(line) if match: acc = match.group(1) LOG.debug(" * (0->1) Found name") state = 1 elif 1 == state: match = pat_length.match(line) if match: data[acc] = int(match.group(1)) LOG.debug(" * (1->0) Found length") state = 0 return data
[docs]def to_fixed_width(n, max_width, allow_overflow=True, do_round=True): """Format a float to a fixed-width string. Args: - n (float): number to format - max_width (int): width of output string - allow_overflow (bool): allow the result to be larger than max_width for large n? Raises OverflowError if False - do_round (bool): Round final digit? Returns: (str) string representation of n, either padded or rounded to have max_width characters """ # Adapted from https://stackoverflow.com/a/43397325/81658 if do_round: for i in range(max_width - 2, -1, -1): str0 = '{:.{}f}'.format(n, i) if len(str0) <= max_width: break else: str0 = '{:.42f}'.format(n) int_part_len = str0.index('.') if int_part_len <= max_width - 2: str0 = str0[:max_width] else: str0 = str0[:int_part_len] if (not allow_overflow) and (len(str0) > max_width): raise OverflowError("Impossible to represent in fixed-width non-scientific format") return str0
[docs]def write_HMMER(hmm, hmm_file): """Write <hmm> too <hmm_filename> in HMMER3 format. See Section 8 (File Formats) of ftp://selab.janelia.org/pub/software/hmmer3/3.0/Userguide.pdf for the file format specification. Args: hmm (hmm.HMM): HMM object to write hmm_file (str or file): filename or file-like object to write to """ close = False out = None try: # file or filename if hasattr(hmm_file, 'write'): out = hmm_file else: close = True out = open(hmm_file, 'w') # Header section out.write("HMMER3/f [TRAL {}]\n".format(tral.__version__)) if hmm.id: name = hmm.id elif type(hmm_file) == str: # use filename name = os.path.splitext(os.path.basename(hmm_file))[0] elif hasattr(hmm_file, "name"): name = os.path.splitext(os.path.basename(hmm_file.name))[0] else: # final fallback name = "TRAL" out.write("NAME {}\n".format(name)) if hmm.id: out.write("ACC {}\n".format(hmm.id)) out.write("LENG {}\n".format(hmm.l_effective)) if len(hmm.alphabet) == 20: alphabet = "amino" elif len(hmm.alphabet) == 4: if set(hmm.alphabet) == {"A", "G", "C", "T"}: alphabet = "DNA" elif set(hmm.alphabet) == {"A", "G", "C", "U"}: alphabet = "RNA" else: alphabet = "custom" else: alphabet = "custom" out.write("ALPH {}\n".format(alphabet)) out.write("MAP yes\n") out.write("CONS no\n") out.write("RF no\n") out.write("MM no\n") out.write("CS no\n") # HMM header line out.write("HMM ") out.write(" ".join([l.center(7) for l in hmm.alphabet])) out.write("\n") out.write(" " * 10) transitions = ["m->m", "m->i", "m->d", "i->m", "i->i", "d->m", "d->d"] out.write(" ".join([l.center(7) for l in transitions])) out.write("\n") # Model section # TODO is hmm.hmmer always consistent with hmm.p_0/p_e/p_t? if not hmm.hmmer: raise Exception("Unsupported: HMM lacks hmmer-style states") # Composition if "COMPO" not in hmm.hmmer: # TODO Read BEGIN probabilites independently of COMPO line raise KeyError("COMPO currently required") def fmt_prob(p): if p > 1e8 or math.isinf(p): return " *" else: return to_fixed_width(p, 7) out.write(" COMPO ") out.write(" ".join([fmt_prob(p) for p in hmm.hmmer['COMPO']['emissions']])) out.write("\n") # BEGIN probabilities out.write(" " * 10) out.write(" ".join([fmt_prob(p) for p in hmm.hmmer['COMPO']['insertion_emissions']])) out.write("\n") out.write(" " * 10) out.write(" ".join([fmt_prob(p) for p in hmm.hmmer['COMPO']['transition']])) out.write("\n") # Main states for i in range(1, hmm.l_effective + 1): out.write("{:>7} ".format(i)) out.write(" ".join([fmt_prob(p) for p in hmm.hmmer[str(i)]['emissions']])) out.write(" {map} {cons} {rf} {mm} {cs}\n".format(map=i, cons="-", rf="-", mm="-", cs="-")) out.write(" " * 10) out.write(" ".join([fmt_prob(p) for p in hmm.hmmer[str(i)]['insertion_emissions']])) out.write("\n") out.write(" " * 10) out.write(" ".join([fmt_prob(p) for p in hmm.hmmer[str(i)]['transition']])) out.write("\n") out.write("//") finally: if close and out: out.close()
# # # def read_HMM_smart(hmm_filename, id = None, type = "ACC"): # # ''' Write <hmm> too <hmm_filename> in smart format. ''' # # Format not decided yet.