""" VARG.py This file implements "Vector AutoRegressive Gaussian" observation models. y is a list of lists y[t][0] is a numpy.array containing the observation vector y[t][1] is a numpy.array containing the context vector P(y|s) = Normal(mu(s,y[1]),cov[s])_y[0] where mu(s,v) = A[s]*v Copyright (c) 2005, 2007, 2008 Andrew Fraser This file is part of HMM_DS_Code. HMM_DS_Code is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. HMM_DS_Code is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ # I raise an exception if eigenvalues or determinants are small or # big. I also disregard state occupancy probabilities that are small small = 1e-20 big = 1e+20 import math, random, scipy, scipy.linalg import Scalar # from chestnut #------------------------------------------------------------ class VARG_HMM(Scalar.HMM): # Scalar Gaussian Observations """ A subclass of Scalar that redefines the following methods: __init__(self, P_S0,P_S0_ergodic,P_ScS,As,ICovs): Py_calc(self,y) Calculates observation probabilities reestimate(self,y) Reestimates observation models simulate(self,length,seed=3) Simulates observations dump(self): """ def __init__(self, P_S0,P_S0_ergodic,P_ScS, As, # List. For each state, map from context to mean ICovs, # For each state, inverse covariance array a=4,b=0.1 # Parameters of inverse Wishart prior ): """Builds a new hidden Markov model with VARG obbservations""" # Assign the following arrays. scipy.mat creats matrix view; no copy self.N = len(P_S0) # Number of states for name,item in (('P_S0_ergodic',P_S0_ergodic), ('P_ScS',P_ScS), ('As',As), ('ICovs',ICovs)): if len(item) != self.N: raise RuntimeError,'len(P_S0) = %d != len(%s) = %d'%(self.N, name,len(item)) self.P_S0 = scipy.mat(P_S0) self.P_S0_ergodic = scipy.mat(P_S0_ergodic) self.P_ScS = scipy.mat(P_ScS) self.As = scipy.array(As) self.ICovs = ICovs self.a = a self.b = b self.normalize() return # End of __init__() def normalize(self): cr,cc = self.ICovs[0].shape self.evals = self.N*[None] self.evecs = self.N*[None] self.norms = scipy.zeros(self.N) for i in xrange(self.N): self.evals[i],self.evecs[i] = scipy.linalg.eigh(self.ICovs[i]) # Save sqrt eigenvalues of Cov not inverse Cov because they are # only used in simulate() self.evals[i] = 1/scipy.sqrt(self.evals[i]) if self.evals[i].min() < small: raise RuntimeError, 'In normalize: Eigenvalue %f too small'%( self.evals[i].min()) if self.evals[i].max() > big: raise RuntimeError, 'In normalize: Eigenvalue %f too big'%( self.evals[i].max()) d = scipy.linalg.det(self.ICovs[i]) if d < small or d > big: raise RuntimeError, 'extreme determinant %f'%d self.norms[i] = 1/math.sqrt((2*math.pi)**cr/d) def Py_calc(self,y): """ Allocate self.Py and assign values self.Py[t,i] = P(y(t)|s(t)=i) On entry: self is a VARG_HMM y is a sequence of observation/context pairs; y[t][0] is the observation and y[t][1] is the context """ # Check size and initialize s.Py for each s self.T = len(y) self.Py = scipy.zeros((self.T,self.N),scipy.float32) for t in xrange(self.T): for i in xrange(self.N): mu = scipy.dot(self.As[i],y[t][1]) d = y[t][0]-mu temp = scipy.dot(self.ICovs[i],d) q = scipy.dot(d,temp) self.Py[t,i] = self.norms[i]*math.exp(-q/2.0) return # End of Py_calc() def reestimate(self,y): """ Reestimate all paramters. In particular, reestimate observation model. FixMe: When regularization is on, foward() should report A.P. not likelihood """ self.reestimate_s() # Reestimate everything except observation models T = len(y) N = self.N dim_Y = len(y[0][0]) dim_X = len(y[0][1]) wY = scipy.zeros((T,dim_Y)) wX = scipy.zeros((T,dim_X)) w = (self.alpha*self.beta).T # w[i,t] = prob s(t) = i sum_w = w.sum(axis=1) root_w = scipy.sqrt(w) for i in xrange(N): for t in xrange(T): wY[t,:] = root_w[i,t]*y[t][0] wX[t,:] = root_w[i,t]*y[t][1] AT,resids,rank,svals = scipy.linalg.lstsq(wX, wY, cond=1e-10, overwrite_a=1, overwrite_b=1) self.As[i] = AT.T zT = wY - scipy.dot(wX,AT) ZZT = scipy.dot(zT.T,zT) # MAP with an inverse Wishart prior Cov = (self.b * scipy.eye(dim_Y) + ZZT)/(self.a + sum_w[i]) self.ICovs[i] = scipy.linalg.inv(Cov) self.normalize() return # End of reestimate() def simulate(self,length,seed=3): """generates a random sequence of observations of given length""" random.seed(seed) scipy.random.seed(seed) N,ar,ac = self.As.shape # Set up cumulative distributions cum_init = scipy.cumsum(self.P_S0_ergodic.A[0]) cum_tran = scipy.cumsum(self.P_ScS.A,axis=1) # Initialize lists outs = length*[None] outs[-1] = [None, scipy.zeros(ac)] states = length*[None] def cum_rand(cum): # A little service function return scipy.searchsorted(cum,random.random()) # Select initial state i = cum_rand(cum_init) # Select subsequent states and call model to generate observations y = scipy.zeros(ar) for t in xrange(length): states[t] = i context = scipy.concatenate((y,outs[t-1][1][:ac-ar])) context[-1] = 1.0 mu = scipy.dot(self.As[i],context) y = scipy.random.multivariate_normal(mu, scipy.linalg.inv(self.ICovs[i])) outs[t] = [y,context] i = cum_rand(cum_tran[i]) return (states,outs) # End of simulate() def dump(self): def print_V(V): for x in V: print '%6.3f'%x, print '' def print_Name_V(name,V): print name, print_V(V) def print_Name_VV(name,VV): print name for V in VV: print_V(V) print "dumping a %s with %d states"%(type(self), self.N) print_Name_V(' P_S0= ',self.P_S0.A[0]) print_Name_V(' P_S0_ergodic=',self.P_S0_ergodic.A[0]) print_Name_VV('P_ScS=', self.P_ScS.A) for i in xrange(self.N): print 'For state %d:'%i print_Name_VV('A=', self.As[i]) print_Name_V('Square roots of eigenvalues=',self.evals[i]) #print_Name_VV('Eigenvectors=',self.evecs[i]) return #end of dump() # End of class VARG_HMM if __name__ == '__main__': # Test code P_S0 = scipy.ones(3)/3.0 P_S0_ergodic = scipy.ones(3)/3.0 P_ScS = scipy.ones((3,3))*0.003 + scipy.eye(3)*0.991 u = 1.003 # Instability s = .99 # Stability r = .2 # Rotation tcos = s*math.cos(r) tsin = s*math.sin(r) As = [ [[tcos, -tsin, 0, 0], [tsin, tcos, 0, 0], [0, 0, u, 0]], [[tcos, 0, -tsin, 0], [0, u, 0, 0], [tsin, 0, tcos, 0]], [[u, 0, 0, 0], [0, tcos, -tsin, 0], [0, tsin, tcos, 0]] ] ICovs = [scipy.eye(3)*100,scipy.eye(3)*100,scipy.eye(3)*100] model = VARG_HMM(P_S0,P_S0_ergodic,P_ScS,As,ICovs) S,Y = model.simulate(10000) # Perturb the model a little and see if training brings it back model.As[1][1,1] = 1.1 model.As[0][1,1] = 0.9 model.ICovs[0] *= .3 model.ICovs[1] *= .2 model.ICovs[2] *= .1 model.normalize() model.dump() model.train(Y,5) model.dump() # Local Variables: # mode: python # End: