# ScalarGaussian.py data_dir # Copyright (c) 2005, 2007, 2008 Andrew Fraser # This file is part of HMM_DS_Code. # # HMM_DS_Code is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # HMM_DS_Code is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. import sys, os.path, math, random, scipy, pickle data_dir = sys.argv[1] import Scalar # from chestnut #------------------------------------------------------------ class SGO_HMM(Scalar.HMM): # Scalar Gaussian Observations """ A subclass of Scalar that redefines the following methods: __init__(self, P_S0,P_S0_ergodic,P_ScS,mus,vars): Py_calc(self,y) Calculates Gaussian probabilities reestimate(self,y) Reestimates Gaussian models simulate(self,length,seed=3) Simulates Gaussian observations dump(self): """ def __init__(self, P_S0,P_S0_ergodic,P_ScS,means,vars): """Builds a new hidden Markov model with scalar Gaussian obbservations""" # Assign the following arrays. scipy.mat creats matrix view; no copy self.N = len(P_S0) self.P_S0 = scipy.mat(P_S0) self.P_S0_ergodic = scipy.mat(P_S0_ergodic) self.P_ScS = scipy.mat(P_ScS) self.means = means self.vars = vars self.norms = scipy.zeros(len(vars)) for i in xrange(len(vars)): self.norms[i] = 1/math.sqrt(2*math.pi*self.vars[i]) return # End of __init__() def Py_calc(self,y): """ Allocate self.Py and assign values self.Py[t,i] = P(y(t)|s(t)=i) On entry: self is an SGO_HMM y is a sequence of observations """ # Check size and initialize s.Py for each s self.T = len(y) self.Py = scipy.zeros((self.T,self.N),scipy.float32) for t in xrange(self.T): for i in xrange(self.N): d = y[t]-self.means[i] self.Py[t,i] = self.norms[i]*math.exp(-(d*d)/(2*self.vars[i])) return # End of Py_calc() def reestimate(self,y): """ Reestimate all paramters. In particular, reestimate observation model. """ self.reestimate_s() # Reestimate everything except observation models wT = (self.alpha*self.beta).T # w[t,i] = prob s(t) = i sum_w = wT.sum(axis=1) wT *= y sum_yw = wT.sum(axis=1) wT *= y sum_yyw = wT.sum(axis=1) self.means = (sum_yw/sum_w).T self.vars = (sum_yyw/sum_w).T - self.means*self.means return # End of reestimate() def simulate(self,length,seed=3): """generates a random sequence of observations of given length""" random.seed(seed) # Set up cumulative distributions cum_init = scipy.cumsum(self.P_S0_ergodic.A[0]) cum_tran = scipy.cumsum(self.P_ScS.A,axis=1) # Initialize lists outs = scipy.zeros(length) states = [] def cum_rand(cum): # A little service function return scipy.searchsorted(cum,random.random()) # Select initial state i = cum_rand(cum_init) # Select subsequent states and call model to generate observations for t in xrange(length): states.append(i) outs[t]=random.gauss(self.means[i],self.vars[i]) i = cum_rand(cum_tran[i]) return (states,outs) # End of simulate() def dump(self): def print_V(V): for x in V: print '%5.3f'%x, print '' def print_Name_V(name,V): print name, print_V(V) def print_Name_VV(name,VV): print name for V in VV: print_V(V) print "dumping a %s with %d states"%(type(self), self.N) print_Name_V(' P_S0= ',self.P_S0.A[0]) print_Name_V(' P_S0_ergodic=',self.P_S0_ergodic.A[0]) print_Name_VV('P_ScS=', self.P_ScS.A) print_Name_V('means=', self.means) print_Name_V('vars=', self.vars) return #end of dump() # End of class SGO_HMM ###################################################################### means = [-1.0,1.0] vars = [1.0,1.0] P_S0 = scipy.array([1./3,2./3]) P_S0_ergodic = 1.0*P_S0 P_ScS = scipy.array([ [0.93,0.07], [0.13,0.87] ]) model = SGO_HMM(P_S0,P_S0_ergodic,P_ScS,means,vars) sim = model.simulate(100,3) trajectory = model.decode(sim[1]) x = range(len(sim[0])) file = open(data_dir+'/SGOsimS','w') for t in x: print >>file, t, sim[0][t] file.close() file = open(data_dir+'/SGOdecodeS','w') for t in x: print >>file, t, trajectory[t] file.close() file = open(data_dir+'/SGOsimY','w') for t in x: print >>file, t, sim[1][t] file.close() model.P_ScS = scipy.mat(scipy.ones((2,2),scipy.float64)/2.0) model.means = [-2.0,2.0] model.vars = [2.0,2.0] model.train(sim[1],15) #FixMe: Should this be 20 rather than 15? model.dump() model.P_ScS = scipy.mat(scipy.ones((2,2),scipy.float64)/2.0) model.means = [0.0,3.6] model.vars = [4.0,0.126] model.train(sim[1],5) model.dump() try: for i in xrange(10): model.train(sim[1],1) print 'finished iteration %d' % i except: print 'failed at iteration %d' % i # Local Variables: # mode: python # End: