Source code for tral.hmm.hmm_own_model

# (C) 2012-2015 Elke Schaper
# coding: utf-8

from tral.hmm import hmm_io
from tral.repeat.repeat_score import load_model
import math
import numpy as np
import scipy.stats
import scipy.special
import scipy.linalg
from scipy.optimize import fsolve, minimize_scalar
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.ERROR)


################################### HMM class ############################


[docs]class HMM: """ A cyclic HMM applicable to describe sequence Tandem Repeats """ def __init__( self, tandem_repeat=None, prior_divergence=None, prior_indel_insertion=None, parameters=None): if tandem_repeat: if isinstance(tandem_repeat, str): self.HMM_from_file( tandem_repeat, accession=parameters, prior_indel_insertion=prior_indel_insertion) else: self.HMM_from_TR( tandem_repeat, prior_divergence=prior_divergence, prior_indel_insertion=prior_indel_insertion, emission_file=parameters) else: self.HMM_example() def HMM_example(self): # states = ["N", "B", "M1", "M2", "M3", "E", "C"] self.states = ["N", "M1", "M2", "C"] # Initialisation self.p_t = {iS: {iS2: 0 for iS2 in self.states} for iS in self.states[:-1]} self.p_0 = {iS: 1 / 3 for iS in self.states} # Transition probabilities # Feed Values to p_t self.p_t["N"] = {"N": 0.5, "M1": 0.5} self.p_t["M1"] = {"M2": 0.5, "C": 0.5} self.p_t["M2"] = {"M1": 0.5, "C": 0.5} self.p_t["C"] = {"C": 1} # emissions self.emissions = ["A", "C", "G", "T"] # emission probabilities self.p_e = {iS: {iE: 0.25 for iE in self.emissions} for iS in self.states} self.p_e['M1'] = {"A": 0.9, "C": 0.025, "G": 0.025, "T": 0.025} self.p_e['M2'] = {"A": 0.025, "C": 0.9, "G": 0.025, "T": 0.025}
[docs] def HMM_from_TR_One_step(self, tandem_repeat, t): """ Build a HMM from a TR alignment given the ML divergence t. One step approach: Calc the posterior of the ancestral sequence and use it as the likelihood of the homologous repeat unit sequence. """
# not implemented yet. For implementation, Use the first half of # self.HMM_from_TR()
[docs] def HMM_from_file(self, hmm_file, accession, prior_indel_insertion): """ Load a HMM from hmm_file Store probabilities as log10arithms. Parameters: <hmm_file> = 'path//to/file.hmm' <accession> = 'PF00560' <prior_indel_insertion> = {'mu': 0.5, 'sigma_squared': 0.81} """ bInsertion_state_emission_p_from_file = False bTerminal_state_emission_p_from_file = False hmmer_probabilities = hmm_io.read_HMMER(hmm_file, id=accession) dTranslate_States = { i: str( int(i) - 1) for i in hmmer_probabilities.keys() if i not in [ 'COMPO', 'letters']} hmmer_probabilities = {(i if i in ['COMPO', 'letters'] else dTranslate_States[ i]): j for i, j in hmmer_probabilities.items()} # WARNING: The order of keys in hmmer_probabilities might not be what # you think. You need to adapt your code at several instances (wherever # you assume the states to be ordered, e.g. zip(repeat_states, # hmmer_probabilities.keys)) if not hmmer_probabilities: print("<hmmer_probabilities> were not found :(") return None self.hmmer = hmmer_probabilities # Warning! The following line is only appropriate, if there are only # "letters", "COMPO", and the match state keys in <hmmer_probabilities> l_effective = len(hmmer_probabilities.keys()) - 2 # Assume: sequence_type = 'AA' Q, null_model_emission_p, alphabet = load_model('lg') # Initialise all HMM states to default value repeat_states, insert_states = self.initialise_HMM_structure(l_effective) # Store indel probabilities self.set_indel_probabilities(prior_indel_insertion, l_effective, default=True) # Store null model insertion probabilities for terminal and insertion # states if bTerminal_state_emission_p_from_file: null_model_emission_p = hmmer3_null_model_emission_probabilites( hmmer_probabilities=hmmer_probabilities) self.hmmer_probabilities = hmmer_probabilities # Store match state and insertion state emission probabilities match_state_emission_p = hmmer3_emission_probabilities( hmmer_probabilities=hmmer_probabilities, letters=list( alphabet.keys()), lMatch=repeat_states) if bInsertion_state_emission_p_from_file: insertion_state_emission_p = hmmer3_insertion_probabilities( hmmer_probabilities=hmmer_probabilities, letters=list( alphabet.keys()), lMatch=insertion_states) else: insertion_state_emission_p = None self.set_emission_probabilities( alphabet, null_model_emission_p, match_state_emission_p, repeat_states, insertion_state_emission_p, insert_states)
[docs] def HMM_from_TR( self, tandem_repeat, prior_divergence=None, prior_indel_insertion=None, emission_file=None): """ Build a HMM from a TR <tandem_repeat>. Two step approach: First, calc the posterior of the ancestral sequence Then, calculate the likelihood of the homologous repeat unit sequence. Parameters: <prior_divergence> is {'type': 'alpha'/'fixed_value', 'value': 4} <prior_indel_insertion> is {'mu': 0.5, 'sigma_squared': 0.81} Store probabilities as log10arithms.""" # This will fail at current, as the divergence attribute is a dict. if prior_divergence is None: divergence = divergence_from_FP_simulations(tandem_repeat.l_effective) elif prior_divergence['type'] == 'alpha': divergence = divergence_from_FP_simulations( tandem_repeat.l_effective, alpha=prior_divergence['value']) # print(divergence) # print(prior_divergence['value']) else: divergence = prior_divergence['value'] if prior_indel_insertion is not None and prior_indel_insertion[ 'type'] == 'adaptive': # Adapt the indel probabilities towards the divergences prior_indel_insertion['mu'] = divergence / \ prior_indel_insertion['factor'] prior_indel_insertion[ 'sigma_squared'] = prior_indel_insertion['mu'] / 5 sequence_type = tandem_repeat.sequence_type l_effective = tandem_repeat.l_effective n = tandem_repeat.n # Test: Do you really need this variable? repeat_states, insert_states = self.initialise_HMM_structure(l_effective) # Transitions incorporating deletion or insertion states # We assume that deletions are not longer than the repeat unit, which would hence not be visible anymore. # We also assume that you cannot jump from "I" directly to "C", as is # would have been more likely to directly assume "C" from the foregoing # state. # # Apply the modulus in order to determine the true index of any state, # as you might have crossed the current tandem repeat unit's border: # index(i,l_effective) = i%l_effective insertion_lengths, deletion_lengths = tandem_repeat.gap_structure_HMM() self.set_indel_probabilities( prior_indel_insertion, l_effective, n=n, default=False, insertion_lengths=insertion_lengths, deletion_lengths=deletion_lengths) # Load model of sequence evolution parameters if sequence_type == 'AA': Q, eqFreq, alphabet = load_model('lg') else: Q, eqFreq, alphabet = load_model('tn93') # Calculate ML-emission probabilities for all match states # YOU might want to transfer Q,eqFreq,alphabet to this function??? if not emission_file: likelihood_offspring = calculate_log10_offspring_likelihood( tandem_repeat, divergence) else: hmmer_probabilities = hmm_io.read_HMMER(emission_file)[0] likelihood_offspring = hmmer3_emission_probabilities( hmmer_probabilities=hmmer_probabilities, letters=list( alphabet.keys()), lMatch=repeat_states) # Anpassen! self.set_emission_probabilities( alphabet=alphabet, null_model_emission_p=eqFreq, match_state_emission_p=likelihood_offspring, repeat_states=repeat_states)
[docs] def initialise_HMM_structure(self, l_effective): ''' Initialise all states Set transition probabilities to None/0 Set initial probability for all states equal. ''' # Build a HMM from these likelihoods. repeat_states = ["M{0}".format(str(i)) for i in range(l_effective)] insert_states = ["I{0}".format(str(i)) for i in range(l_effective)] self.states = ["N"] + repeat_states + insert_states + ["C"] logger.debug("HMM states: {0}".format(self.states)) # Initialisation # The transition probability is initially set to None for all states. self.p_t = {iS: {iS2: None for iS2 in self.states} for iS in self.states} # The initial probability is equal in all states. (np.log10(1)=0) self.p_0 = {iS: 0 for iS in self.states} # Transition probabilities # First, mark which transitions are not penalised, e.g. bear a 'probability of np.log10(1)=0': # - Transitions from "N" to any "M_k" # - Transitions from any "M_k" to "C" # - Transitions from "M_k" to "M_k+1" (for k<n) and "M_k" to "M_0" (for k==n) # - Transitions from "N" to "N" # - Transitions from "C" to "C" for i in range(l_effective): self.p_t["N"]["M{0}".format(index(i, l_effective))] = 0 self.p_t["M{0}".format(index(i, l_effective))]["C"] = 0 self.p_t[ "M{0}".format( index( i, l_effective))][ "M{0}".format( index( i + 1, l_effective))] = 0 self.p_t["N"]["N"] = 0 self.p_t["C"]["C"] = 0 return repeat_states, insert_states
def set_emission_probabilities( self, alphabet, null_model_emission_p, match_state_emission_p, repeat_states, insertion_emmission_p=None, insertion_states=None): # emissions self.emissions = alphabet.keys() logger.debug("Emissions: {0}".format(self.emissions)) # emission probabilities # Set all emission probabilities to null model emission probabilities self.p_e = { iS: { letter: np.log10( null_model_emission_p[number]) for letter, number in alphabet.items()} for iS in self.states} # Set match state probabilities to ML-emission probabilities for iRS, iEmission_p in zip(repeat_states, match_state_emission_p): self.p_e[iRS] = iEmission_p if insertion_emmission_p and insertion_states: print("Setting insertion state probabilities") for iIS, iEmission_p in zip( insertion_states, insertion_emmission_p): self.p_e[iIS] = iEmission_p #logger.debug("Emission probabilities: {0}".format(self.p_e['M0'])) def set_indel_probabilities( self, prior_indel_insertion, l_effective, n=2, default=True, insertion_lengths=None, deletion_lengths=None): if default: p_deletion_formation = calculate_log10_indel_probability( 0, n, prior=prior_indel_insertion) p_insertion_formation = calculate_log10_indel_probability( 0, n, prior=prior_indel_insertion) p_deletion_lengths = calculate_log10_probability_indel_lengths( [], l_effective - 1, type='zipf') p_insertion_continued, p_insertion_stopped = calculate_log10_probability_indel_lengths( [], None, type='exponential') # First, calculate the transition probability of all states that only # depends on the gap structure of one site: for i in range(l_effective): if not default: p_deletion_formation = calculate_log10_indel_probability( len(deletion_lengths[i]), n, prior=prior_indel_insertion) p_insertion_formation = calculate_log10_indel_probability( len(insertion_lengths[i]), n, prior=prior_indel_insertion) p_deletion_lengths = calculate_log10_probability_indel_lengths( deletion_lengths[i], l_effective - 1, type='zipf') p_insertion_continued, p_insertion_stopped = calculate_log10_probability_indel_lengths( insertion_lengths[i], None, type='exponential') # In the current setup, moving from "N" to either an insertion, or a deletion state # is less likely than staying in "N". Thus, these probabilities are left at zero. #self.p_t["N"]["I{0}".format(str(i))] = p_insertion # Mi-1 -> Mi+l_effective (deletion(l_effective)) for iC, p_deletion_length in enumerate(p_deletion_lengths): self.p_t[ "M{0}".format( index( i - 1, l_effective))][ "M{0}".format( index( i + iC + 1, l_effective))] = p_deletion_formation + p_deletion_length # Ii -> Mi + l_effective (insertionL) + deletion(l_effective) self.p_t[ "I{0}".format( index( i, l_effective))][ "M{0}".format( index( i + iC + 1, l_effective))] = p_insertion_stopped + p_deletion_formation + p_deletion_length # Mi-1 -> Ii (insertion) self.p_t[ "M{0}".format( index( i - 1, l_effective))][ "I{0}".format( index( i, l_effective))] = p_insertion_formation # Ii -> Ii (1-insertionL) self.p_t[ "I{0}".format( index( i, l_effective))][ "I{0}".format( index( i, l_effective))] = p_insertion_continued # Ii -> Mi (insertionL) self.p_t[ "I{0}".format( index( i, l_effective))][ "M{0}".format( index( i, l_effective))] = p_insertion_stopped # Second, calculate the transition probability of all states that depends on the gap structure of several sites. # These calcs use the information already gathered in self.p_t. # These are deletions, that are directly followed by insertions # (Case 1: deletion from match state. Case 2: deletion from insertion state.) for i in range(l_effective): for iC in range(1, l_effective): # Mi -> Ii+iC (deletion(iC-1) + insertion(i+iC)) self.p_t[ "M{0}".format( index( i, l_effective))][ "I{0}".format( index( i + iC, l_effective))] = self.p_t[ "M{0}".format( index( i, l_effective))][ "M{0}".format( index( i + iC, l_effective))] + self.p_t[ "M{0}".format( index( i + iC - 1, l_effective))][ "I{0}".format( index( i + iC, l_effective))] # Ii -> Ii+l_effective insertion_stop(i) + deletion(i;l_effective) + # insertion_formation(i+l_effective+1) self.p_t["I{0}".format(index(i, l_effective))]["I{0}".format(index(i + iC, l_effective))] = \ self.p_t["I{0}".format(index(i, l_effective))]["M{0}".format(index(i, l_effective))] + \ self.p_t["M{0}".format(index(i - 1, l_effective))]["M{0}".format(index(i + iC, l_effective))] + \ self.p_t["M{0}".format(index(i + iC - 1, l_effective))]["I{0}".format(index(i + iC, l_effective))] logger.debug("Transition probabilities: {0}".format(self.p_t))
################################### Local TR class #######################
[docs]class TR: """ Fake interim TR class """ def __init__(self): self.divergence = 0.6 self.msaTD = ["AAA", "CCC", "GTG"] self.sequence_type = 'DNA' self.n = 3 self.l_effective = 3
################################### MAP parameter estimation 3############
[docs]def divergence_from_FP_simulations(l, alpha=0.1): ''' Which HMM divergence sets the average FP-rate (i.e. the number of falsely positively assigend amino acid on either flanking side to a perfect TR) to in total alpha*l''' # alpha: In average, the number of falsely assigned AAs on both sides of the TR with should not extend l * alpha # Derivation: # FP-rate (average number of AAs predicted falsely as part of a perfect # TR) on one side of the TR as a function of the divergence. f = [ 0.50150, 0.58600, 0.70900, 0.75125, 0.89275, 1.00900, 1.25100, 1.24150, 1.56275, 1.69600, 1.88475, 2.20025, 2.33375, 2.53500, 2.99300, 3.25900, 3.77400, 4.03300, 4.26675, 4.71800, 5.30925, 5.83325, 6.31575, 6.84000, 7.93425, 9.01900, 9.72000, 10.80925, 12.26650, 13.50750, 14.29825, 15.74225, 18.66050, 20.02575, 21.54400, 23.97300, 26.93625, 30.39400, 32.78550, 35.80375, 38.98025, 45.79925, 48.48200, 51.29100, 56.74300, 60.45925, 64.44675, 69.41900, 80.02450, 82.37550, 88.47350, 95.13425, 98.00750, 105.54975, 108.85625, 117.76750, 125.69250, 125.73725, 137.40675, 143.08225, 148.28725, 154.51425, 161.23075, 160.33875, 172.00475, 175.41525, 180.44425, 179.47350, 190.14225, 190.93625, 194.03725, 194.87475, 203.35900, 209.94350, 210.63750, 210.63950, 216.69850, 223.15025, 219.22775, 227.37700, 227.09600, 238.11900, 235.79275, 238.05250, 242.59875, 244.40425] # Sequence divergence associated with f. d = [(i / 10) + 0.5 for i in range(86)] # List <da> contains the linearly weighted averaged divergences of the two # values of the FP-rate that enclose l*alpha: da = [] before = (f[0], d[0]) for iF, iD in zip(f[1:], d[1:]): f_ok = (alpha * (len(da) + 1)) / 2 while iF > f_ok: da.append( ((iF - f_ok) * before[1] + (f_ok - before[0]) * iD) / (iF - before[0])) f_ok = (alpha * (len(da) + 1)) / 2 before = (iF, iD) if l > len(da): return 6 else: return da[l - 1]
def index(i, iMax): return str(i % iMax) def calculate_log10_offspring_likelihood(tandem_repeat, divergence=None): sequence_type = tandem_repeat.sequence_type # Calculate posterior probabilities of ancestral states from the TR # alignment # Load model of sequence evolution parameters if sequence_type == 'AA': Q, eqFreq, alphabet = load_model('lg') else: Q, eqFreq, alphabet = load_model('tn93') P = scipy.linalg.expm(Q * divergence) # Initialise the count matrix (if not already existent) if not hasattr(tandem_repeat, 'msaTDN'): msaTDN_temp = [[alphabet[symbol] for symbol in column if symbol not in [ '-', 'X', '*', 'U']] for column in tandem_repeat.msaTD] tandem_repeat.msaTDN = [] for column in msaTDN_temp: my_diag = {i: 0 for i in range(len(alphabet.keys()))} for iN in column: my_diag[iN] = my_diag[iN] + 1 tandem_repeat.msaTDN.append( np.array([j for j in my_diag.values()])) # Initialise results matrix posterior_ancestor = [] # Calculate the posterior probabilities per column for column in tandem_repeat.msaTDN: posterior = { i: iJ * np.product( np.power( P[i], column)) for i, iJ in enumerate(eqFreq)} normalising_factor = sum(posterior.values()) for key, value in posterior.items(): posterior[key] = value / normalising_factor posterior_ancestor.append(posterior) # Calculate the likelihood of evolved TR units with the given divergence # from these ancestral states likelihood_offspring = [ { iA: np.log10( sum( posterior[iAncestor] * P[iAncestor][i] for iAncestor in alphabet.values())) for iA, i in alphabet.items()} for posterior in posterior_ancestor] return likelihood_offspring
[docs]def loglikelihood_substitution(t, Q, eqFreq, alphabet, tandem_repeat): ''' calculate the likelihood of a repeat assuming a star tree and a sequence model defined by Q, t, eqFreq and alphabet. ''' P = scipy.linalg.expm(Q * t) # msaTDN is a list of tandem_repeat.n numpy arrays. # Each numpy array represents the elements of the alphabet. The entries # are the count of an alphabet element in the respective tandem_repeat # column if not hasattr(tandem_repeat, 'msaTDN'): msaTDN_temp = [[alphabet[symbol] for symbol in column if symbol not in [ '-', 'X', '*', 'U']] for column in tandem_repeat.msaTD] tandem_repeat.msaTDN = [] for column in msaTDN_temp: my_diag = {i: 0 for i in range(len(alphabet.keys()))} for iN in column: my_diag[iN] = my_diag[iN] + 1 tandem_repeat.msaTDN.append( np.array([j for j in my_diag.values()])) # First sum: Over all sites # Second sum: Over all possible ancestral characters # Third product: Over all repeat units likelihood = sum( np.log( np.sum( iJ * np.product( np.power( P[i], column)) for i, iJ in enumerate(eqFreq))) for column in tandem_repeat.msaTDN) return likelihood
[docs]def calculate_log10_probability_indel_lengths( indel_lengths, indel_length_max, type='zipf', prior=None): '''Calculate the probability of indels of different length until a maximum length <indel_length_max>, assuming that either type == 'zipf' indel lengths are distributed following a Zipfian distribution with parameter <indel_zipf>. (Compare: Fletcher,W. and Yang,Z. (2009) INDELible: a flexible simulator of biological sequence evolution. Mol Biol Evol, 26, 1879–1888. In their publication, <indel_zipf> is denoted as <a>, the gap lengths as <u>.) or type == 'exponential' indels lengths are distributed following a (1-alpha)*(alpha^(l-1)) distribution. [Is this model already used? Probably yes. How can i cite it?] A good choice for <indel_length_max> when building HMMs for tandem repeats is <l_effective-1>, as a deletion, that includes a complete repeat unit, will not be visible in the tandem repeat unit alignment, and could therefore be ignored. Return the probabilities as a list. ''' if 'zipf' == type: indel_zipf = calculate_MAP_Indel_length_Zipfian_factor( indel_lengths, prior) zeta_factor = np.log10(1 / scipy.special.zeta(indel_zipf, 1)) return [ np.log10( ((iL + 1) ** -indel_zipf)) + zeta_factor for iL in range(indel_length_max)] elif 'exponential' == type: alpha = calculate_MAP_Indel_length_exponential_factor( indel_lengths, prior) return np.log10(alpha), np.log10(1 - alpha) else: logger.error( "I am not aware of indel length model type: {0}.".format(type))
[docs]def calculate_MAP_Indel_length_exponential_factor(indel_lengths, prior=None): '''Calculate the MAP Exponential decay constant <alpha> that determines the distribution of indel lengths. Assume a Gaussian prior. Input is a list of <indel_lengths> for a particular column. The MAP is calculated numerically, as no nice analytical solution has been found so far. A power function of grade 3 needs to be solved: 0 == prior['sigma_squared']*(sum(indel_lengths) - len(indel_lengths)) - alpha*(prior['sigma_squared']*sum(indel_lengths) + prior['mu']) + (alpha**2)*(1 + prior['mu']) - alpha**3 ''' if prior is None: prior = {'mu': 0.8, 'sigma_squared': 0.1} if len(indel_lengths) == 0: return prior['mu'] else: # As we are only interested in the maximum, we can leave out factors # (e.g. * 1/(math.sqrt(2*math.pi * prior['sigma_squared'])) for the Gaussian) # There is an analytic solution. Compare "Gap it!" or "gaps.nb" posterior = lambda alpha: - np.prod([(1 - alpha) * (alpha ** (lIndel - 1)) for lIndel in indel_lengths]) * \ math.exp(-0.5 * ((alpha - prior['mu']) ** 2) / prior['sigma_squared']) res = minimize_scalar(posterior, bounds=(0, 1), method='bounded') return res.x
[docs]def calculate_MAP_Indel_length_Zipfian_factor(indel_lengths, prior=None): ''' calculate the MAP Zipfian constant <indel_zipf> that determines the distribution of indel lengths. Assume a Gaussian prior. Input is a list of <indel_lengths> for a particular column. The probability distribution of indel lengths is assumed to follow the Zipfian distribution (Compare Fletcher,W. and Yang,Z. (2009) INDELible: a flexible simulator of biological sequence evolution. Mol Biol Evol, 26, 1879–1888. In their publication, <indel_zipf> is denoted as <a>, the gap lengths as <u>.) The MAP is calculated numerically, as there did not seem to be a nice analytical solution. ''' if prior is None: prior = {'mu': 1.821, 'sigma_squared': 1} if len(indel_lengths) == 0: return prior['mu'] else: # The posterior is the likelihood * prior * scaling. # As we are only interested in the maximum, we leave out all factors in # the following equation for the posterior: posterior = lambda indel_zipf: - (scipy.special.zeta(indel_zipf, 1) ** - len(indel_lengths)) * np.prod( [lIndel ** -indel_zipf for lIndel in indel_lengths]) * \ math.exp(-0.5 * ((indel_zipf - prior['mu']) ** 2) / prior['sigma_squared']) res = minimize_scalar(posterior, method='brent') return res.x
[docs]def calculate_log10_indel_probability(nIndels, n, prior=None): ''' calculate the probabilty of an indel per site and per repeat unit from the MAP estimate of the indel rate, given - the <divergence> of the repeat units - a <prior> on the indel rates (normal distributed) - the number of indels <nIndels> in this column - the total length <n> of this column ''' indel_rate_MAP = calculate_MAP_indel_rate(nIndels, n, prior) return np.log10(1 - math.exp(-indel_rate_MAP))
[docs]def calculate_MAP_indel_rate(nIndels, n, prior=None): ''' calculate the MAP indel_rate per repeat unit and site given - an assumed prior distribution of the indel rate described by the dict <prior> - the likelihood of the the observed number of indels <nIndels>. posterior ~ likelihood * prior Derive the posterior with respect to the indel_rate Set the result equal to 0 Solve with respect to the MAP of the indel_rate. For nIndels in {0,1} it is possible to derive the MAP indel_rate analytically. For all other cases, the maximum of the posterior distribution needs to be found in good approximation algorithmically. ln(posterior) ~ ln(likelihood) + ln(prior) ''' if prior is None: prior = {'mu': 0.01, 'sigma_squared': 0.001} if nIndels == 0: # This max is chosen absolutely randomly. do you have any better idea? # (Before: constant 0.001) return max(prior['mu'] / 100, prior['mu'] - prior['sigma_squared'] * n) else: # Calculate the roots numerically res = fsolve( derivative_log_posterior, prior['mu'], args=( prior, n, nIndels)) return res[0]
[docs]def derivative_log_posterior(indel_rate, prior, n, nIndels): ''' assuming that on each site on each repeat unit, an in/del event has occured, or not. Thus, the likelihood of a certain number of in/del events follows the binomial distribution. The prior is assumed to be a gaussian described by prior. Return the derivative with respect to <indel_rate> of the natural logarithm of the posterior probability of <nIndels>, calculated as the product of likelihood and prior. The normalisation factor is ignored for the moment.''' return indel_rate + (prior['mu'] - indel_rate - n * prior['sigma_squared']) * \ math.exp(-indel_rate) - prior['mu'] + (n - nIndels) * prior['sigma_squared']
############################### External parameters ######################
[docs]def hmmer3_emission_probabilities(hmmer_probabilities, letters, lMatch): ''' Get emission probabilities from hmmer3 hmm file. In hmm file, emission probabilities are -ln(p). Return log10(p), i.d. convert between the two. Conversion: p_Local = - p_HMM * log10(e) Parameters (e.g.): letters = ['A', 'C', 'E', 'D', 'G', 'F', 'I', 'H', 'K', 'M', 'L', 'N', 'Q', 'P', 'S', 'R', 'T', 'W', 'V', 'Y'] lMatch = ['M'+str(i) for i in range(24)] Return format (pseudo code): [{iA: np.log10(p(iA,iM)) for iA in alphabet.keys()} for iM in lMatch] ''' # Test: Is the number of match states in both models equal? if not len(hmmer_probabilities.keys()) - 2 == len(lMatch): print('Match states HMMER: {0} Match states local: {1}'.format( len(hmmer_probabilities.keys()) - 2, len(lMatch))) raise ValueError( 'The number of match states in HMMer model and local model does not match') # Test: Are all <letters> represented in the HMMER HMM? if any(iL not in hmmer_probabilities['letters'] for iL in letters): missing = [ iL for iL in letters if iL not in hmmer_probabilities['letters']] print('Missing representation in Hmmer File: {0}'.format(missing)) raise ValueError( 'Some letters in the local HMM are not represented in the HMMER HMM.') return [ { iL: - float(iP) * np.log10( np.exp(1)) for iL, iP in zip( hmmer_probabilities['letters'], hmmer_probabilities[ iMatch[ 1:]]['emissions']) if iL in letters} for iMatch in lMatch]
##################################### Tests ##############################
[docs]def test(): ''' To be implemented... ''' #tandem_repeat = ... divergence = 0 calculate_log10_offspring_likelihood(tandem_repeat, divergence)
##################################### Main ############################### def main(): my_TR = repeat_info.Repeat( begin=0, msa=[ 'A-G', 'ACG', 'ACG'], sequence_type='DNA') #my_TR = TR() my_HMM = HMM(my_TR, divergence=0.1) #print(calculate_indel_rate(1, 6, 1.5)) # print(calculate_probability_indel_lengths(0.366869,6,'exponential')) return my_HMM if __name__ == "__main__": main()