# algorithms_0.py # # Copyright (c) 2004, 2008 Ralf Juengling, Andrew Fraser # See __init__.py for license information import scipy # Changes: # 2008-8-31 I (Andy Fraser) took Ralf's 2004 code as a starting point # for hmmpy. Changed to use precomputed output probabilities and to # support multiple sequences. My goals for this version is that the # code be easy to read. I want to use sparse matrices for speed, but # scipy.sparse is not mature enough yet. # Array/Matrix layout: # P_ScS[i,j] = prob(s(t+1)=j|s(t)=i) NxN Matrix # Py[t,y] = prob(Y(t)=y) TxC Array # alpha[t,i] = prob(S(t)=i|y_0^t) TxN Array def forward(P_S0, # Probability vector (1xN matrix) for initial state P_ScS, # NxN matrix of state transition probabilities Py, # List (or iterator) of vectors of # observation probabilities. Py[t][s] is the # probability that state s would produce y[t] gamma, # Length T array alpha # TxN array ): # The following ugly temporary storage cuts forward's runtime by # more than a factor of 2 last = P_S0*1.0 # Allocate and initialize temporary storage lastA = last.A # View as array to make * element-wise multiplicaiton for t in xrange(len(Py)): lastA *= Py[t] # Element-wise multiply gamma[t] = lastA.sum() lastA /= gamma[t] alpha[t,:] = lastA[0,:] last[:,:] = last*P_ScS # Vector matrix product return # End of forward() def backward(P_ScS, Py, # Py[t][i] = P(y(t)|S(t)=i) gamma, # Values precomputed by forward() beta # zeros(T, N) ): # initialize lastbeta = scipy.mat(scipy.ones(beta.shape[1])) lastbetaA = lastbeta.A[0,:] # Array view so * is element-wise multiply pscst = P_ScS.T # Transpose view # iterate for t in xrange(len(Py)-1,-1,-1): beta[t,:] = lastbetaA lastbeta[:,:] = (lastbetaA*Py[t,:]/gamma[t])*pscst return # End of backward() def viterbi(T, # Number of time steps N, # Number of states L_S0, # Vector of logs of the initial state probabilities L_ScS, # Array of logs of state transition probabilities L_Py # List (or iterator) of vectors of # logs of observation probabilities. ): pred = scipy.zeros((T, N), scipy.int32) # Array of best predecessors ss = scipy.zeros((T, 1), scipy.int32) # State sequence nu = L_Py[0] + L_S0 for t in xrange(1,T): omega = L_ScS.T + nu pred[t,:] = omega.T.argmax(axis=0) # Best predecessor nu = pred[t,:].choose(omega.T) + L_Py[t] lasts = scipy.argmax(nu) for t in xrange(T-1,-1,-1): ss[t] = lasts lasts = pred[t,lasts] return ss.flat # End of viterbi def reestimate_s(Py, # Py[t][s] = probability that state s would produce y[t] gamma, # Array of values alpha, # Array of values beta, # Array of values u_sum, # Allocated array for new P_ScS P_S0, # Matrix for new initial state probabilities P_S0_ergodic ): """ Key calculations to reestimate model parameters of initial state probability and state transition probabilities. Add to passed arrays u_sum, P_S0, and P_S0_ergodic without normalizing so that the function can be used for multiple segments. Rely on external calculations for the following: 1. Reestimation of output model parameters 2. Multiply u_sum by P_ScS_old 3. Normalization """ for t in xrange(len(Py)-1): u_sum += scipy.outer(alpha[t]/gamma[t+1], Py[t+1]*beta[t+1,:]) P_S0_ergodic += scipy.sum(alpha*beta,axis=0) P_S0 += alpha[0]*beta[0] return # End of reestimate() #------------------------------ # Local Variables: # mode: python # End: