# algorithms_1.py # # Copyright (c) 2004, 2008 Ralf Juengling, Andrew Fraser # See __init__.py for license information # The HMM algorithms can be cast as a "reduce with side effects". # The reduce is along the time dimension, and the necessary side # effect is the in-place update of the intermediate data structures. import itertools, scipy # Changes: # 2008-6-15 Andy took Ralf's 2004 code as a starting point for hmmpy. # Changed to use precomputed output probabilities and to support # multiple sequences. My goal for this version is code that runs # fast. # Array/Matrix layout: # P_ScS[i,j] = prob(s(t+1)=j|s(t)=i) NxN Matrix # Py[t,y] = prob(Y(t)=y) TxC Array # alpha[t,i] = prob(S(t)=i|y_0^t) TxN Array def forward(P_S0, # Probability vector (1xN matrix) for initial state P_ScS, # NxN matrix of state transition probabilities Py, # List (or iterator) of vectors of # observation probabilities. Py[t][s] is the # probability that state s would produce y[t] gamma, # Length T array alpha # TxN array ): P_ScSA = P_ScS.A def fwdkernel(lastalpha, (alphat, gammat, Pyt)): lastalpha *= Pyt gammat[:] = lastalpha.sum() lastalpha /= gammat alphat[:] = lastalpha return scipy.dot(lastalpha, P_ScSA) reduce(fwdkernel, itertools.izip(alpha, gamma, Py), P_S0.A*1.0) # P_S0*1.0 should force a copy return # End of forward() def backward(P_ScS, Py, # Py[t][i] = P(y(t)|S(t)=i) gamma, # Values precomputed by forward() beta # zeros(T, N) ): def bwdkernel(lastbeta, (betat, gammat, Pyt)): betat[:] = lastbeta return scipy.dot(P_ScS, betat*Pyt)/gammat reduce(bwdkernel, itertools.izip(beta[::-1], gamma[::-1], Py[::-1]), 1.0) return # End of backward() def viterbi(T, # Number of time steps N, # Number of states L_S0, # Vector of logs of the initial state probabilities L_ScS, # Array of logs of state transition probabilities L_Py # List (or iterator) of vectors of # logs of observation probabilities. ): def vitkernel(lastnu, (predt, L_Pyt)): # Kernel of forward pass temp = L_ScS.T + lastnu predt[:] = temp.T.argmax(axis=0) # Best predecessor return predt.choose(temp.T) + L_Pyt def btrkernel(lastss, (sst, predt)): # Kernel of backward pass sst[:] = lastss return predt[lastss] pred = scipy.zeros((T, N), scipy.int32) # Array of best predecessors ss = scipy.zeros((T, 1), scipy.int32) # State sequence nu = reduce(vitkernel, itertools.izip(pred[1:], L_Py[1:]), L_Py[0] + L_S0) reduce(btrkernel, itertools.izip(ss[::-1], pred[::-1]), scipy.argmax(nu)) return ss.flat # End of viterbi def reestimate_s(Py, # Py[t][s] = probability that state s would produce y[t] gamma, # Array of values alpha, # Array of values beta, # Array of values u_sum, # Allocated array for new P_ScS P_S0, # Matrix for new initial state probabilities P_S0_ergodic ): """ Key calculations to reestimate model parameters of initial state probability and state transition probabilities. Add to passed arrays u_sum, P_S0, and P_S0_ergodic without normalizing so that the function can be used for multiple segments. Rely on external calculations for the following: 1. Reestimation of output model parameters 2. Multiply u_sum by P_ScS_old 3. Normalization """ def ukernel(u, (alphat_1, betat, gammat, Pyt)): u += scipy.outer(alphat_1/gammat, Pyt*betat) return u reduce(ukernel, itertools.izip(alpha, beta[1:], gamma[1:], Py[1:]), u_sum) P_S0_ergodic += scipy.sum(alpha*beta,axis=0) P_S0 += alpha[0]*beta[0] return # End of reestimate() # To do: Write a normalize and prune function #-------------------------------- # Local Variables: # mode: python # End: