""" EKF.py Module containing functions that help with extended Kalman filtering for the Lorenz system. In October of 2008, I began updating this file to use numpy rather than Numeric or numarry. As of 10/11/08 I have not updated all of the functions. Contents: def ForwardEKF() def BackwardEKF() def TanGen() def ForwardK() def BackwardK() def LogLike() Copyright (c) 2005, 2008 Andrew Fraser This file is part of HMM_DS_Code. HMM_DS_Code is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. HMM_DS_Code is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ ###################### Def forwardEKF() #################### def ForwardEKF( Y, # Observations. Shape = (Nt,dim_y) SigmaEtaT, # Covariances of dynamical noise. Shape = (Nt,dim_x,dim_x) SigEp, # Covariance of measurement noise. Shape = (dim_y,dim_y) mux, # Mean of initial state (forecast). Matrix shape (dim_x,1) Sigmax, # Covariance of initial state. Later cov of forecast F, # Integrator: Maps ic (initial condition) to fc,D # (final condition and derivative) G=None, # G_x(x) returns y,DG observation and derivative # If the following arguments are not None, use them to # return intermediate results DF=None, # Derivatives of state map alphaM=None, # Corrections to means of updtated state distributions alphaSig=None,# Covariances of updtated state distributions aM=None, # Means of forecast state distributions aSigma=None # Covariances of forecast state distributions ): """ This started as a general extended Kalman filter function, but lapsed into a function that is specific for the laser data. """ import copy, math, numpy, numpy.linalg as LA LAI = LA.inv assert type(Sigmax) == numpy.matrix assert type(SigEp) == numpy.matrix Nt,dim_y = Y.shape dim_x,one = mux.shape assert one == 1,'mux.shape=(%d,%d)'%(dim_x,one) assert (dim_x,dim_x) == Sigmax.shape assert (dim_y,dim_y) == SigEp.shape,"dim_y=%d, SigEp.shape=(%d,%d)"%( dim_y, SigEp.shape[0],SigEp.shape[1]) if G == None: # Define G for Tang's laser measurements def G(x): x_0 = x[0,0] DG = numpy.mat(numpy.zeros((1,3))) DG[0,0] = 2*x_0 return numpy.mat(numpy.eye(1))*x_0*x_0,DG Id = numpy.mat(numpy.eye(dim_x)) # Identity matrix for t in xrange(Nt): SigmaEta = numpy.mat(SigmaEtaT[t]) mu_y,Gt = G(mux) # Update: Calculate \Sigma_\alpha(t) and \mu_\alpha(t) Y[t] K = Sigmax*Gt.T*LAI(Gt*Sigmax*Gt.T + SigEp) Sigmaalpha = (Id - K*Gt)*Sigmax mualpha = K*(Y[t] - mu_y) ic = mux + mualpha # This is updated mean of state # Record requested values if alphaM != None: alphaM[t] = mualpha if aSigma != None: aSigma[t] = copy.copy(Sigmax) if alphaSig != None: alphaSig[t] = Sigmaalpha if DF != None: DF[t] = copy.copy(Ft) if aM != None: aM[t] = copy.copy(mux) # Forecast: Calcualte \mu_x(t+1) and \Sigma_x(t+1) mux,Ft = F(ic)# Calculate mux and Ft Sigmax = Ft*Sigmaalpha*Ft.T + SigmaEta ################# End of ForwardEKF() #################### ################ Def BackwardEKF() ############# def BackwardEKF( Y, # Observations SigmaEtaT, # Covariances of dynamical noise (time dependent) SEI, # Inverse covariance of measurement noise alphaM, # Corrections to ICs for integrations in forward aM, # Results of integrations in forward F, # F[t] is the derivative of the forward state map betaM, # Mean of back forcast betaSI # Inverse covariance of back forecast ): import copy, math, Numeric as N, LinearAlgebra as LA # Abbreviate common functions NMM = N.matrixmultiply NT = N.transpose LAI = LA.inverse mubeta = N.zeros(3,N.Float32) # Corrections to alpha Sigmabeta = N.identity(3,N.Float32)*1e10 # Should be 1/zero Gt = N.zeros((1,3),N.Float32) # Storage for tangent to G for t in xrange(len(Y)-1,-1,-1): # Save the current values of the beta parameters SigmabetaI = LAI(Sigmabeta) betaSI[t] = SigmabetaI betaM[t] = mubeta # Unpack remembered values FI = LAI(F[t]) SigmaEta = SigmaEtaT[t] # x0 is the back forecast estimate for time t x0 = mubeta[0]+alphaM[t][0]+aM[t][0] dy = Y[t] - x0*x0 x0 = aM[t][0]+alphaM[t][0] Gt[0,0] = 2*x0 # Do an update: Calculate \Sigma_b(t) and \mu_b(t) using dy SIb = SigmabetaI + NMM(NT(Gt),NMM(SEI,Gt)) Sigmab = LAI(SIb) mub = alphaM[t] + mubeta + NMM(Sigmab,NMM(NT(Gt),NMM(SEI,dy))) # mub is the updated deviation of mean from aM[t] if t < 1: return [mub,Sigmab] # Back forecast: Calcualte \mu_beta(t-1) and \Sigma_beta(t-1) mubeta = NMM(FI,mub) Sigmabeta = NMM(FI,NMM(SigmaEta + Sigmab,NT(FI))) ################ End of BackwardEKF ##################### """ TanGen() Creates 4 time series: X, a sample path of the noisy Lorenz system, Y, a sequence of observations, F a sequence of tangents to the state dynamics, G a sequence of tangents to the observation functions """ def TanGen( results, # list of empty lists [X,Y,F,G] DevEta = 0.001, # Dynamical noise DevEpsilon = 0.004, # Measurement noise s=10.0, r = 28.0, b = 8.0/3,# Lorenz parameters ts = 0.15, # Sample interval Nt = 300, # Number of samples trelax = 7.5 # Time to relax to attractor ): import lorenz, numpy, copy, pickle, random RG = random.gauss # Storage for initial conditions ic = N.ones(3,N.Float32) # Relax to the attractor: Nrelax = 3 temp = N.ones((Nrelax,3),N.Float32) # Storage for results NumpyLor.LstepsPN(ic,s,b,r,trelax,Nrelax,temp) ic = temp[Nrelax-1] # reset ICs X = results[0] Y = results[1] F = results[2] G = results[3] tempTan = N.ones((3,3),N.Float32) # Storage for one step integration for t in xrange(Nt): X.append(copy.copy(ic)) Y.append(numpy.array([ic[0]**2+RG(0.0,DevEpsilon)])) G.append(numpy.array([[2*ic[0],0,0]],)) # Call to integrator. ic is overwritten with result NumpyLor.LtanstepPN(ic,s,b,r,ts,ic,tempTan) # Add random Gaussian noise to ic ic = ic + N.array([RG(0.0,DevEta),RG(0.0,DevEta),RG(0.0,DevEta)]) ic = ic.astype(N.Float32) F.append(copy.copy(tempTan)) f = open('XYGF','w') pickle.dump(results,f,protocol=1) f.close() """ recover by: import Numeric, pickle f = open('XYGF','r') results = pickle.load(f) """ """ ForwardK A forward Kalman filter that requires state and measurement derivatives and a reference state trajectory. """ def ForwardK( X, # The actual state vectors Y, # The ovservations F, # The derivative of the state map G, # The derivative of observation wrt to state SigmaEta_list, # List of state noises SEI_list, # List of inverse measurement noises alphaSI, # Pass empty list. Return inverse covaraince of # up-dated state estimates alphaM # Pass empty list. Return Means of up-dated # state estimates ): import copy, math, Numeric as N, LinearAlgebra as LA # Abbreviate common functions NMM = N.matrixmultiply NT = N.transpose LAI = LA.inverse # The next four values should be parameters? Sigmax = N.identity(3,N.Float32)*5 mux = N.ones(3,N.Float32)*1 SigmaEta = N.identity(3,N.Float32)*1 SEI = N.ones((1,1),N.Float32)*1 # Sigma_epsilon^{-1} for t in xrange(len(X)): # Calculate error of forecast observation. ythat = X[t][0]*X[t][0] + NMM(G[t],mux) dy = Y[t] - ythat # Do an update: Calculate \Sigma_\alpha(t) and \mu_\alpha(t) # using forecast error (eq:KUpdate in book) SIalpha = LAI(Sigmax) + NMM(NT(G[t]),NMM(SEI,G[t])) Sigmaalpha = LAI(SIalpha) mualpha = mux + NMM(Sigmaalpha,NMM(NT(G[t]),NMM(SEI,dy))) alphaSI.append(SIalpha) alphaM.append(mualpha) # Forecast: Calcualte \mu_x(t+1) and \Sigma_x(t+1), eq:Kfore # in book. Note mualpha is mean deviation from X mux = NMM(F[t],mualpha) Sigmax = NMM(F[t],NMM(Sigmaalpha,NT(F[t]))) + SigmaEta """ BackwardK A backward Kalman filter that requires state and measurement derivatives and a reference state trajectory. The filter tracks deviations from the reference trajecory. """ def BackwardK( X, # The actual state vectors Y, # The ovservations F, # The derivative of the state map G, # The derivative of observation wrt to state SigmaEta_list, # List of state noises SEI, # Inverse measurement noise betaSI, # Pass empty list. Return inverse covaraince of # filtered state estimates betaM # Pass empty list. Return Means of filtered # state estimates ): import copy, math, Numeric as N, LinearAlgebra as LA # Abbreviate common functions NMM = N.matrixmultiply NT = N.transpose LAI = LA.inverse # Initialize beta, the backcast for T-1 mubeta = N.zeros(3,N.Float32) Sigmabeta = N.identity(3,N.Float32)*1e10 # Should be 1/zero for t in xrange(len(X)-1,-1,-1): # Start at T-1 and step by 1 to 0 SigmaEta = SigmaEta_list[t] # Read state noise SigmabetaI = LAI(Sigmabeta) # Invert state variance # Save the current values of the beta (backcast) parameters betaSI[t] = SigmabetaI betaM[t] = mubeta # Calculate error of backcast observation ythat = X[t][0]*X[t][0] + NMM(G[t],mubeta) dy = Y[t] - ythat # Do an update: Calculate \Sigma_b(t) and \mu_b(t) using y(t) # eq:BUpdate in book SIb = SigmabetaI + NMM(NT(G[t]),NMM(SEI,G[t])) Sigmab = LAI(SIb) # Invert state variance mub = mubeta + NMM(Sigmab,NMM(NT(G[t]),NMM(SEI,dy))) # Backcast: Calculate \mu_beta(t-1) and \Sigma_beta(t-1). # Note that mub is the mean of the updated distribution for # the deviation from X[t] if t>0: # Invert F[t-1], the derivative of X from t-1 to t FI = LAI(F[t-1]) # eq:BFore(a&b) in book mubeta = NMM(FI,mub) Sigmabeta = NMM(FI,NMM((SigmaEta+Sigmab),NT(FI))) ###################### Def forwardEKF0() #################### # Like forwardEKF(), but observations are X_0 not X_0**2 def ForwardEKF0( Y, # Observations SigmaEtaT, # Covariances of dynamical noise (time dependent) SEI, # Inverse covariance of measurement noise mux, # Mean of initial state. Later, mean of forecast Sigmax, # Covariance of initial state. Later cov of forecast F, # Integrator: F(ic,fc,D) takes initial condition ic # and produces final condition fc and derivative D alphaM, # Corrections to means of updtated state distributions alphaSI, # Inverse covariances of updtated state distributions aM, # Means of forecast state distributions aSigma, # Covariances of forecast state distributions DF, # Derivatives of state map ): import copy, math, Numeric as N, LinearAlgebra as LA # Abbreviate common functions NMM = N.matrixmultiply NT = N.transpose LAI = LA.inverse Ft = N.zeros((3,3),N.Float32) # Storage for tangent to F Gt = N.zeros((1,3),N.Float32) # Storage for tangent to G for t in xrange(len(Y)): SigmaEta = SigmaEtaT[t] #Gt[0][0] = 2*mux[0] # Estimated derivative of Y wrt X #dy = Y[t] - mux[0]**2 # Error of forecast Y Gt[0][0] = 1.0 # Estimated derivative of Y wrt X dy = Y[t] - mux[0] # Error of forecast Y aM[t] = copy.copy(mux) # backward needs the forecast for time t aSigma[t] = copy.copy(Sigmax) # Update: Calculate \Sigma_\alpha(t) and \mu_\alpha(t) using y(t) SIalpha = LAI(Sigmax) + NMM(NT(Gt),NMM(SEI,Gt)) Sigmaalpha = LAI(SIalpha) mualpha = NMM(Sigmaalpha,NMM(NT(Gt),NMM(SEI,dy))) alphaM[t] = mualpha # Save only correction ic = mux + mualpha # Check for big ic R = math.sqrt(NMM(ic,ic)) if R > 100: print "t=",t,"R=",R,"Shrinking ic" ic = ic/R # Forecast: Calcualte \mu_x(t+1) and \Sigma_x(t+1) F(ic,mux,Ft)# Calculate mux and Ft Sigmax = NMM(Ft,NMM(Sigmaalpha,NT(Ft))) + SigmaEta # Remember values you need for backward and smoothing alphaSI[t] = SIalpha DF[t] = copy.copy(Ft) ################# End of ForwardEKF0() #################### def TanGen0( results, # list of empty lists [X,Y,F,G] DevEta = 0.001, # Dynamical noise DevEpsilon = 0.004, # Measurement noise s=10.0, r = 28.0, b = 8.0/3,# Lorenz parameters ts = 0.15, # Sample interval Nt = 300, # Number of samples trelax = 7.5 # Time to relax to attractor ): """ Simulate the evolution of the whole system for Nt time steps. Record and return the sequence of states (X), observations (Y), derivatives of state maps (F) and derivatives of observation maps (G). TanGen0() is like TanGen(), but observation is X_0 instead of X_0**2 """ import lorenz, numpy, random RG = random.gauss # Storage for initial conditions ic = numpy.ones(3) # Relax to the attractor: Nrelax = 3 temp = lorenz.Lsteps(ic,s,b,r,trelax,Nrelax) ic = temp[Nrelax-1] # reset ICs X = results[0] # X, Y, F, and G are empty lists that Y = results[1] # get appended to below F = results[2] G = results[3] for t in xrange(Nt): X.append(ic) Y.append(numpy.array([ic[0]+RG(0.0,DevEpsilon)])) G.append(numpy.array([[1.0,0,0]])) # Call to integrator. ic is overwritten with result ic,tan = lorenz.Ltan_one(ic,s,b,r,ts) ic = ic + numpy.array([RG(0.0,DevEta),RG(0.0,DevEta),RG(0.0,DevEta)]) F.append(tan) #End of TanGen0() ###################### Def LogLike() #################### """ LogLike(L,Y) takes parameters in list L and observations in list Y and returns the negative log likelihood """ def LogLike(L, # Lorenz/Laser parameters Y, # Time series of observations aM=None # Forecast means ): # L is the argument list. Unpack it first import lorenz, math, numpy, numpy.linalg as LA ic = L[0:3] r = L[3] s = L[4] b = L[5] ts = L[6] offset = L[7] scale = L[8] sigma_epsilon = L[9] # measurment noise sigma_eta = L[10] # Dynamic noise def lorstep(x): # Return mu_x(t+1) and derivative F(t) # Check for big ic x = x.A.reshape((3,)) # View as array instead of matrix R = math.sqrt(float(numpy.dot(x,x))) if R > 100: print "R=%6.0f Shrinking ic"%R x /= R fc,D = lorenz.Ltan_one(x,s,b,r,ts) return numpy.mat(fc.reshape((3,1))),numpy.mat(D) # View as matrices Nt = len(Y) yso = numpy.empty((Nt,1)) # Scaled and offset for t in xrange(Nt): yso[t] = float(Y[t] - offset)/scale SigmaEta = numpy.mat(numpy.eye(3))*(sigma_eta**2) SigmaEtaT = Nt*[SigmaEta] # Less memory than numpy.empty((Nt,3,3)) SigmaEpsilon = numpy.mat(numpy.eye(1))*sigma_epsilon**2 Sigma_x = numpy.mat(numpy.eye(3))*1e-2 # Parameter? # Allocate lists of results so that they can be indexed alphaSig = Nt*[None] alphaM = Nt*[None] if aM == None: aM = Nt*[None] aSigma = Nt*[None] ic = numpy.mat(numpy.array(ic).reshape((3,1))) ForwardEKF(yso, SigmaEtaT,SigmaEpsilon, ic, Sigma_x, lorstep, DF=None,alphaM=None, alphaSig=None ,aM=aM, aSigma=aSigma) # Calculate the log likelihood LL = 0.0 for t in xrange(Nt): xt = float(aM[t][0]) Gt = 2*xt sigma_gamma = float(Gt*aSigma[t][0,0]*Gt + SigmaEpsilon[0,0]) ye = float(yso[t][0] - xt**2) # Forecast Y error sigma_gamma *= scale**2 ye *= scale try: ill = -0.5*(math.log(2*math.pi*sigma_gamma) + ye*ye/sigma_gamma ) except (ValueError, OverflowError), error_string: print 'inc log like (ill) is sick: %s\nt=%d '%(error_string,t) print 'ye=%f, sigma_gamma=%f'%(ye,sigma_gamma) ill = -0.5*(math.log(2*math.pi*sigma_gamma)) LL += ill return (LL) ###################### End of LogLike() #################### #Local Variables: #mode:python #End: