""" EXT.py Extensions to the basic routines in Scalar.py. In particular: Training on multiple segments Training with classification data in addition to observations Decoding a classification sequence instead of a state sequence Boosted training (not debugged yet) Copyright (c) 2005, 2007, 2008 Andrew Fraser This file is part of HMM_DS_Code. HMM_DS_Code is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. HMM_DS_Code is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import Scalar,scipy #------------------------------------------------------------ class HMM(Scalar.HMM): """ A subclass of Scalar that defines the following methods: """ def __init__(self, P_S0,P_S0_ergodic,P_ScS,P_YcS=None,C2S=None): Scalar.HMM.__init__(self,P_S0,P_S0_ergodic,P_ScS,P_YcS) self.C2S = C2S if not C2S is None: # Create S2C, a list that maps a state to a Class Class is # an integer NC = len(C2S) S2C = scipy.zeros(self.N,scipy.int32) for c in xrange(NC): for s in C2S[c]: S2C[s] = c self.S2C = S2C else: self.S2C = None return # End of __init__ def class_decode(self, y # Single time series of observations ): self.Py_wo_class(y) T = self.T # Initialize pred[T,NC], nu[NC], state.phi NC = len(self.C2S) pred = scipy.ones((T,NC),scipy.int32)*10000 # Raise exception if used nu = scipy.zeros((NC,)) phi = scipy.zeros(self.N) for c in xrange(NC): for s in self.C2S[c]: nu[c] += self.P_S0_ergodic[0,s] * self.Py[0,s] if nu[c] == 0: for s in self.C2S[c]: phi[s] = 0.0 else: for s in self.C2S[c]: phi[s] = self.P_S0_ergodic[0,s] * self.Py[0,s]/nu[c] nu = scipy.log(nu) # Main loop for t in xrange(1,T): # Find best predecessor for each class CostMatrix = scipy.ones((NC,NC))*\ 1e-300 # Safe for log. Possible error for s_a in xrange(self.N): c_a = self.S2C[s_a] # From Class a for s_b in xrange(self.N): c_b = self.S2C[s_b] # To Class b CostMatrix[c_b,c_a] += phi[s_a] * self.P_ScS[s_a,s_b] \ * self.Py[t,s_b] CostMatrix *= nu best = scipy.argmax(CostMatrix,1)# best[Cb] is best pred[t] = best # predecessor of class Cb # Update nu, ie cost given best sequence leading to each class for c_b in xrange(NC): nu[c_b] = CostMatrix[c_b,best[c_b]] nu /= nu.max() ## Calculate unnormalized new phis (P(state|class,best_sequence)) new_phi = scipy.zeros(self.N) for s_b in xrange(self.N): # Loop over successor states c_b = self.S2C[s_b] # Class of s_b c_a = best[c_b] # Predecessor class for s_a in self.C2S[c_a]: new_phi[s_b] += phi[s_a]*self.P_ScS[s_a,s_b]*self.Py[t,s_b] ## Normalize new phis flag = True for c_b in xrange(NC): tot = 0 for s_b in self.C2S[c_b]: tot += new_phi[s_b] if tot > 0: flag = False for s_b in self.C2S[c_b]: phi[s_b] = new_phi[s_b]/tot if flag: s = 'at time t=%d the sequence is impossible\n'%t raise RuntimeError, s # Backtrack seq = scipy.zeros((T,),scipy.int32) seq[-1] = scipy.argmax(nu) for t in xrange(T-1,0,-1): seq[t-1] = pred[t,seq[t]] return seq def join_Ys(self, Ys # List of observation sequences ): """ Ys is a list of observation sequences with TT total length Results: Y_all A single list of all of the observations Tseg A single array of integers that point to the beginning and end of each segment Nseg Number of segments This is a separate method so that subclasses can redifine it """ import operator Tseg = [0] # List of segment boundaries in concatenated Ys for seg in Ys: Tseg.append(len(seg)+Tseg[-1]) Y_all = reduce(operator.add,Ys) return self.Py_calc(Y_all),Y_all,Tseg,len(Ys) def multi_train(self, Ys, # List of observation sequences N_iter=1, BoostW=None, display=True ): """ Train on multiple sequences of observations """ Py,Y_all,Tseg,Nseg = self.join_Ys(Ys) avgs = N_iter*[None] # Average log likelihood per step TT = Tseg[-1] Alpha = scipy.zeros((TT,self.N)) Beta = scipy.zeros((TT,self.N)) Gamma = scipy.zeros((TT,1)) P0s = scipy.matrix(scipy.ones((Nseg,self.N))) #P0s are state probabilities at the beginning of each segment for seg in xrange(Nseg): P0s[seg,:] = self.P_S0 for i in xrange(N_iter): if display: print "iteration %3d:"%i, tot = 0.0 # Both forward() and backward() should operate on each # training segment and put the results in the # corresponding segement of the the alpha, beta and gamma # arrays. Py = self.Py_calc(Y_all) for seg in xrange(Nseg): self.T = Tseg[seg+1] - Tseg[seg] self.alpha = Alpha[Tseg[seg]:Tseg[seg+1],:] self.beta = Beta[Tseg[seg]:Tseg[seg+1],:] self.Py = Py[Tseg[seg]:Tseg[seg+1]] self.gamma = Gamma[Tseg[seg]:Tseg[seg+1]] self.P_S0 = P0s[seg,:] LL = self.forward() #LogLikelihood if display: print "llps[%d,%d]=%7.4f"%(i,seg,LL/self.T), tot += LL self.backward() P0s[seg] = scipy.matrix(self.alpha[0] * self.beta[0]) if seg > 0: # Don't fit state transitions between segments self.beta[0] *= 0 avgs[i] = tot/TT if display: print 'avg=', "%11.8f"% avgs[i] for t in xrange(len(Gamma)): if Gamma[t] < -100: print "Small Gamma at t=",t,"log=",Gamma[t] # Associate all of the alpha and beta segments with the # states and reestimate() self.alpha = Alpha self.beta = Beta self.gamma = Gamma self.Py = Py if BoostW != None: self.alpha *= BoostW self.T = len(Y_all) self.reestimate(Y_all) psum = scipy.zeros(self.N) for seg in xrange(Nseg): psum += Alpha[Tseg[seg]]*Beta[Tseg[seg]] self.P_S0 = scipy.matrix(psum/scipy.sum(psum)) return avgs def boost_train(self, Ys, # Training data N_iter, # Training iterations per boost cycle BoostM # Number of boost cycles ): """ This is a preliminary version. I have not even checked to see if it runs. """ import copy, math models_cm = [] # List with elemants [model,c] where c = F(error rate) YnoC = [] # An array of Classless Ys Cs = [] # An array of classes for Y in Ys: Cs.append(Y[-1]) YnoC.append(Y[:-1]) Cs = scipy.concatenate(Cs) BW = scipy.ones(len(Cs),scipy.float32) # Boosting weights for each t for m in xrange(BoostM): model_m = copy.deepcopy(self) BW /= scipy.add.reduce(BW) # Multiply by len(Cs) to keep regularization same model_m.multi_train(Ys,N_iter=N_iter,BoostW=BW*len(Cs)) # Make model small for saving by stripping it of training arrays model_m.alpha = None model_m.beta = None model_m.gamma = None model_m.Py = None decoded_seq = [] for Y in YnoC: decoded_seq += list(model_m.class_decode(Y)) decoded_seq = scipy.array(decoded_seq) errors = (decoded_seq != Cs) err_m = scipy.dot(errors,BW) c_m = math.log((1-err_m)/err_m) print 'm=',m,'err_m=',err_m,'c_m=',c_m BW = BW * scipy.exp(c_m*errors) models_cm.append([model_m,c_m]) return models_cm # Most subclasses will redifine the following three methods: # Py_wo_class, Py_w_class, and reestimateC def Py_wo_class(self,y): """ Caclculate observation probabilities without Class. Called by class_decode and Py_w_class """ return Scalar.HMM.Py_calc(self,y) def Py_w_class(self,y): """ A method that handles observations and models with Class. y should be a list with elements as follows: y[t][0] is the observation and y[t][1] is the Class. """ self.T = len(y) Y_no_Class = self.T*[None] Class = scipy.zeros(self.T,scipy.int32) for t in xrange(self.T): Y_no_Class[t] = y[t][0] Class[t] = y[t][1] Py = self.Py_wo_class(Y_no_Class) for t in xrange(self.T): Py_t = scipy.zeros(self.N,scipy.float64) for s in self.C2S[Class[t]]: Py_t[s] = Py[t,s] Py[t,:] = Py_t return Py def reestimateC(self,y): """ Reestimate parameters using a sequence of observations that includes Class data. """ Y_no_Class = self.T*[None] for t in xrange(self.T): Y_no_Class[t] = y[t][0] self.reestimate_noC(Y_no_Class) def reestimate_noC(self,Y_no_Class): Scalar.HMM.reestimate(self,Y_no_Class) # End of class HMM if __name__ == '__main__': # Test code """ This is the like the test code in the file Scalar.py except that I define and use C2S. """ import itertools T = 500 C2S = [[0,1],[2]] P_S0 = scipy.array([1./3., 1./3., 1./3.]) P_S0_ergodic = scipy.array([1./7., 4./7., 2./7.]) P_ScS = scipy.array([ [0, 1, 0], [0, .5, .5], [.5, .5, 0] ],scipy.float64) P_YcS = scipy.array([ [1, 0, 0], [0, 1./3., 2./3.], [0, 2./3., 1./3.] ]) # First verify that methods defined in Scalar.py still work mod = HMM(P_S0,P_S0_ergodic,P_ScS,P_YcS=P_YcS,C2S=C2S) S,Y = mod.simulate(T) mod.Py_calc = mod.Py_wo_class E = mod.decode(Y) Y = scipy.array(Y,scipy.int32) L = mod.train(Y,N_iter=10,display=False) print 'Finished testing methods in Scalar.py\n' # Now verify that methods defined here work mod = HMM(P_S0,P_S0_ergodic,P_ScS,P_YcS=P_YcS,C2S=C2S) DC = mod.class_decode(Y) #print 'Test Class decode\n%-5s%-5s%-5s%-5s'%(' t',' y','class',' decoded') #for t in xrange(10): # print '%3d %3d %3d %3d'%(t,Y[t],mod.S2C[S[t]],DC[t]) for t in xrange(2,T-2,1): # Check decoded class pattern if int(Y[t]) is 0: assert (int(DC[t+1]) is 0 and int(DC[t]) is 0 and int(DC[t-1]) is 1 and int(DC[t-2]) is 0) Ys = [] for i in xrange(5): S,Y = mod.simulate(T,seed=i) YC = T*[None] for t in xrange(T): YC[t] = [Y[t],mod.S2C[S[t]]] Ys.append(YC) mod.Py_calc = mod.Py_w_class mod.reestimate = mod.reestimateC mod.P_YcS[0,:] = [.5,.25,.25] mod.P_ScS[0,:] = [.25,.5,.25] print """ Test multi_train on data with Class. Check that LLps increases.""" L = mod.multi_train(Ys,N_iter=10,display=False) for LL,next in itertools.izip(L,L[1:]): print '%6.3f'%(LL/T,), assert next-LL >= 0.0 print """ Test train on data with Class. Check that LLps increases.""" mod = HMM(P_S0,P_S0_ergodic,P_ScS,P_YcS=P_YcS,C2S=C2S) mod.Py_calc = mod.Py_w_class mod.reestimate = mod.reestimateC mod.P_YcS[0,:] = [.5,.25,.25] mod.P_ScS[0,:] = [.25,.5,.25] L = mod.train(YC,N_iter=10,display=False) for LL,next in itertools.izip(L,L[1:]): print '%6.3f'%(LL,), assert next-LL >= 0.0 print '\n\nFinal model is:' mod.dump() #if __name__ == '__main__': # Test code # Local Variables: # mode: python # End: