""" Hview.py A GUI driven program for looking at cross entropy of Lorenz models with the following characteristics: 1. Measurements are quantized x_0 values (not squared) 2. Likelihood is integrated over quantization step Sliders: ts Integration time step Nt Number of time steps to simulate Ybin Quantization step size for observation Ydev Standard deviation of the observation noise epsilon Xdev Standard deviation of the state noise eta. Covariance is I*(Xdev^2). T Log_10 of fraction Nt to view Tc Coarse adjustment of the center of sequence to view Tf Fine adjustment of the center of the sequence to view theta Angle for viewing phase portrait phi Angle for viewing phase portrait Buttons: Quit Quit run Call "calculate()" and "replot()" filter Call "filter()" and "replot()" save Write information about displayed interval [Tmin:Tmax] to the file "Save_Hview_T_"+str(Tmin) calculate() runs a new simulation always relaxing from ic=[1,1,1] and calls filter(). filter() applies the extended Kalman filter EKF.ForwardEKF to the simulated measurements. replot() replots. Copyright (c) 2005, 2008 Andrew Fraser This file is part of HMM_DS_Code. HMM_DS_Code is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. HMM_DS_Code is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import Tkinter, os, math, numpy, numpy.linalg, lorenz, EKF import scipy.special, random, sys LAI = numpy.linalg.inv ERF = scipy.special.erf root = Tkinter.Tk() XEstring = Tkinter.StringVar() # String objects for displaying YEstring = Tkinter.StringVar() # averages of log X error and # incremental Y log likelihood if len(sys.argv) == 2: data_dir = sys.argv[1]+'/' else: data_dir = '' # Parameters s=10.0 r = 28.0 b = 8.0/3 ts = 0.25 Nt = 500 theta = 0 # Viewing angle phi = 0 # Viewing angle Tview = 0.5 Tfine = 0.0 Trange = 1.0 # Number of samples to view Tmax = Nt Tmin = 0 Ystep = 1e-4 # Measurement quantization DevEpsilon = 1e-2 # Square root of the variance of measurement noise SigmaEpsilon = DevEpsilon**2 DevEta = 1e-6 # Square root of the variance of state noise SigmaEta = DevEta**2 # These assignments initialize global names results = None Y = None X = None alphaSig = None alphaM = None aM = None aSigma = None gammaM = None gammaS = None F = None LogStateError = None IncLogLike = None def G_func(x): # Function that returns measurement and derivative return numpy.mat(numpy.eye(1))*x[0,0],numpy.mat([[1.0,0.0,0.0]]) def lorstep(x): # Return mu_x(t+1) and derivative F(t) x = x.A.reshape((3,)) # View as array instead of matrix fc,D = lorenz.Ltan_one(x,s,b,r,ts) return numpy.mat(fc.reshape((3,1))),numpy.mat(D) # View as matrices def filter(): global alphaSig,alphaM,aM,aSigma,F,LogStateError,IncLogLike,gammaM,gammaS alphaSig = Nt*[None] # Updated state covariances alphaM = Nt*[None] # Updated state means aM = Nt*[None] # Forecast state means aSigma = Nt*[None] # Forecast state covariances gammaM = Nt*[None] # Forecast measurement means gammaS = Nt*[None] # Forecast measurement covariances LogStateError = Nt*[None] IncLogLike = Nt*[None] # Cheat and use the known initial state for the state mean mux = numpy.mat(results[0][0]).T # Initial state variance Sigmax = numpy.mat(numpy.eye(3)*1e-3) # Inverse meas. noise SigEp = numpy.mat(numpy.eye(1))*SigmaEpsilon # Time dependent dynamic noise SigmaEtaT = Nt*[None] for t in xrange(Nt): SigmaEtaT[t] = numpy.eye(3)*SigmaEta # Call the filtering function EKF.ForwardEKF(Y, SigmaEtaT, SigEp, mux, Sigmax, lorstep, G_func, alphaM=alphaM, alphaSig=alphaSig, aM=aM, aSigma=aSigma) # Generate error time series ALE = 0.0 # Average Log X Error AILLY = 0.0 # Average Incremental Log Likelihood of Y Gt = numpy.matrix([1.0,0,0]).T for t in xrange(Nt): xest = aM[t] + alphaM[t] xerr = numpy.mat(X[t]).T - xest fe = numpy.dot(xerr.T,xerr) # Filtered state error squared LogStateError[t] = math.log(fe,10)/2 ALE += LogStateError[t] xt = aM[t][0] sigma_gamma = Gt.T*aSigma[t]*Gt + SigmaEpsilon gammaM[t] = xt gammaS[t] = sigma_gamma #Integrate over bin size here mean = aM[t][0] top = Y[t][0] + Ystep/2 bottom = Y[t][0] - Ystep/2 if sigma_gamma < (Ystep**2)*1e-4: # print 'small sigma_gamma' if bottom < mean and mean < top: like0 = 1.0 else: like0 = 0.0 else: dev = math.sqrt(sigma_gamma*2.0) #2.0 for erf definition z0 = (bottom - mean)/dev z1 = (top - mean)/dev like0 = (ERF(z1) - ERF(z0))/2 z0 = (Y[t][0] - Ystep/2 - mean)/20 z1 = (Y[t][0] + Ystep/2 - mean)/20 like1 = (ERF(z1) - ERF(z0))/2 a = 0.999 # Kludge to limit cost of getting lost like = a*like0 + (1-a)*like1 if like <= 0: ye = Y[t][0] - aM[t][0] # Forecast Y error print 'like=',like, ' ye=',ye, ' dev=',dev IncLogLike[t] = -0.5*(math.log(2*math.pi*sigma_gamma) + ye*ye/sigma_gamma )/math.log(10) else: IncLogLike[t] = math.log(like,10) AILLY += IncLogLike[t] ALE/= Nt; XEstring.set('LXerror=%5.3f'%ALE) AILLY/= Nt; YEstring.set('LogLikeY=%5.3f'%AILLY) def calculate(ev=None): global results, X, Y results = [[],[],[],[]] random.seed(3) EKF.TanGen0(results,s=s,r=r,b=b,ts=ts,Nt=Nt, DevEta=DevEta, DevEpsilon=DevEpsilon) X = numpy.array(results[0],numpy.float64) Y = numpy.array(results[1],numpy.float64)/Ystep - 0.5 Y = numpy.ceil(Y)*Ystep filter() calculate() # Pipes for plotting PipeY = os.popen ("gnuplot",'w') # Y time series PipeY.write('set title "Y"\nset nokey\n') PipeGamma = os.popen ("gnuplot",'w') # Y time series PipeGamma.write('set title "Gamma"\n') PipeError = os.popen ("gnuplot",'w') # Error time series PipeError.write('set title "Log_10(|x(t)-\hat x(t)|)"\nset nokey\n') PipePP = os.popen ("gnuplot",'w') # Phase portrait PipePP.write('set nokey\n') PipeLL = os.popen ("gnuplot",'w') # Log likelihood time series PipeLL.write('set title "Y Log Likelihood"\nset nokey\n') def plotPP(): # Phase Portrait Zrot = numpy.array([[math.cos(theta), -math.sin(theta), 0], [math.sin(theta), math.cos(theta), 0], [0, 0, 1]]) Xrot = numpy.array([[1, 0, 0], [0, math.cos(phi), -math.sin(phi)], [0, math.sin(phi), math.cos(phi)]]) rot = numpy.mat(numpy.dot(Zrot,Xrot)) # print "rot.shape=", rot.shape, "XC.shape=", XC.shape #print "Nt=",Nt,"Tmin=",Tmin,"Tmax=",Tmax Ave = numpy.average(X) XC = numpy.mat(X - Ave) # XC.shape == (Nt,3) Max = math.sqrt(2)*XC.max() XR = XC*rot name1 = "/tmp/EKFPP" f = open(name1,'w') for t in xrange(Tmin,Tmax): print >>f, XR[t,0], XR[t,1] f.close() for t in xrange(Tmin,Tmax): XC.T[:,t] = aM[t] + alphaM[t] - Ave XR = numpy.dot(XC,rot) name2 = "/tmp/EKFPP2" f = open(name2,'w') for t in xrange(Tmin,Tmax): print >>f, XR[t,0], XR[t,1] f.close() PipePP.write('set yrange [' + str(-Max) + ':' + str(Max) + ']\n') PipePP.write('set xrange [' + str(-Max) + ':' + str(Max) + ']\n') PipePP.write('plot "'+name1+'" with lines, "'+name2+'" with lines \n') PipePP.flush() def plotY(): name = "/tmp/EKFY" f = open(name,'w') for t in xrange(Tmin,Tmax): print >>f, t, Y[t][0] f.close() PipeY.write('set xrange [' + str(Tmin) + ':' + str(Tmax) + ']\n') PipeY.write('plot "'+name+'" with lines\n') PipeY.flush() def plotGamma(): name = "/tmp/EKFYerror" f = open(name,'w') for t in xrange(Tmin,Tmax): a = gammaM[t][0,0] b = gammaS[t][0,0] print >>f, t, Y[t][0]-a, math.sqrt(b) f.close() PipeGamma.write('set xrange [' + str(Tmin) + ':' + str(Tmax) + ']\n') line = 'plot "'+name+'" using 1:2 with lines title "yerror", "'+\ name+'" using 1:3 with lines title "rootvar"\n' PipeGamma.write(line) PipeGamma.flush() def plotError(): name = "/tmp/EKFError" f = open(name,'w') for t in xrange(Tmin,Tmax): print >>f, t, LogStateError[t] f.close() PipeError.write('set xrange [' + str(Tmin) + ':' + str(Tmax) + ']\n') PipeError.write('plot "'+name+'" with lines\n') PipeError.flush() def plotLL(): name = "/tmp/EKFLL" f = open(name,'w') for t in xrange(Tmin,Tmax): print >>f, t, IncLogLike[t] f.close() PipeLL.write('set xrange [' + str(Tmin) + ':' + str(Tmax) + ']\n') PipeLL.write('plot "'+name+'" with lines\n') PipeLL.flush() def save(): name = data_dir+'Save_Hview_T_'+str(Tmin) f = open(name,'w') print >>f, 'This file generated by "save" from Hview.py' print >>f, 'Tmin=',Tmin,'Tmax=',Tmax,'ts=',ts,'Ystep=',Ystep print >>f, 'DevEpsilon=',DevEpsilon,'DevEta=',DevEta print >>f, 't Y[t][0] gammaM[t] gammaS[t] IncLogLike[t]' for t in xrange(Tmin,Tmax): print >>f, t, Y[t][0], gammaM[t][0,0], gammaS[t][0,0], IncLogLike[t] t = Tmin print >>f, 'alphaM[',t,']=\n',alphaM[t] print >>f, 'alphaSig[',t,']=\n',alphaSig[t] print >>f, 'aM[',t,']=\n',aM[t] print >>f, 'aSigma[',t,']=\n',aSigma[t] f.close() def replot(): global Tmax, Tmin Tmax = min(Nt,int((Tview + Tfine + Trange/2)*Nt+1)) Tmin = max(0,int((Tview + Tfine - Trange/2)*Nt-1)) if Tmin >= Nt - 1: Tmin = Nt - 2 if Tmax <= 1: Tmax = 2 if Tmin >= Tmax: print "How did I get Tmin=",Tmin, "Tmax=",Tmax Tmin = 0 Tmax = Nt plotPP() plotY() plotGamma() plotError() plotLL() def run(): calculate() replot() def filter_plot(): filter() replot() def newTheta(ev=None): global theta theta = THETA.get() plotPP() def newPhi(ev=None): global phi phi = PHI.get() plotPP() def newTview(ev=None): global Tview Tview = TVIEW.get() replot() def newTfine(ev=None): global Tfine Tfine = TFINE.get() replot() def newTrange(ev=None): global Trange Trange = 10**TRANGE.get() replot() def newNt(ev=None): global Nt Nt = NT.get() def newTS(ev=None): global ts ts = TS.get() def newYstep(ev=None): global Ystep ex = YSTEP.get() Ystep = 10**(ex) def newDevEta(ev=None): global DevEta global SigmaEta ex = DEVETA.get() DevEta = 10**(ex) SigmaEta = DevEta**2 def newDevEpsilon(ev=None): global DevEpsilon global SigmaEpsilon ex = DEVEPSILON.get() DevEpsilon = 10**(ex) SigmaEpsilon = DevEpsilon**2 # The stuff below defines the look and initial settings of the GUI. # It also ties service routines to the items in the GUI. root.title('Hview') row1 = Tkinter.Frame(root) row1.pack(side=Tkinter.TOP) TS = Tkinter.Scale(row1, label='ts',length=200,resolution=0.01, from_=0.01,to=2.0,command=newTS) TS.set(0.25) TS.pack(side=Tkinter.LEFT) NT = Tkinter.Scale(row1, label='Nt',length=200,resolution=1, from_=1,to=10000,command=newNt) NT.set(300) NT.pack(side=Tkinter.LEFT) quit = Tkinter.Button(row1,text='QUIT', command=root.quit) quit.pack(side=Tkinter.TOP) RUN = Tkinter.Button(row1,text='run', command=run) RUN.pack(side=Tkinter.TOP) FILTER = Tkinter.Button(row1,text='filter', command=filter_plot) FILTER.pack(side=Tkinter.TOP) SAVE = Tkinter.Button(row1,text='SAVE', command=save) SAVE.pack(side=Tkinter.TOP) X_Error = Tkinter.Label(row1,textvariable=XEstring) X_Error.pack(side=Tkinter.TOP) Y_Error = Tkinter.Label(row1,textvariable=YEstring) Y_Error.pack(side=Tkinter.TOP) row2 = Tkinter.Frame(root) row2.pack(side=Tkinter.TOP) # Log_10 of quantization interval for y YSTEP = Tkinter.Scale(row2, label='Ybin',length=200,resolution=0.1, from_=-5,to=1,command=newYstep) YSTEP.set(-4) YSTEP.pack(side=Tkinter.LEFT) # Log_10 of deviation of observation noise DEVEPSILON = Tkinter.Scale(row2, label='Ydev',length=200,resolution=0.1, from_=-10,to=0,command=newDevEpsilon) DEVEPSILON.set(-2) DEVEPSILON.pack(side=Tkinter.LEFT) # Log_10 of deviation of state noise DEVETA = Tkinter.Scale(row2, label='Xdev',length=200,resolution=0.1, from_=-7,to=0,command=newDevEta) DEVETA.set(-6) DEVETA.pack(side=Tkinter.LEFT) # TRANGE = Tkinter.Scale(row2, label='T',length=200, resolution=0.01,from_=0,to=-4,command=newTrange) TRANGE.set(0) TRANGE.pack(side=Tkinter.LEFT) row3 = Tkinter.Frame(root) row3.pack(side=Tkinter.TOP) TVIEW = Tkinter.Scale(row3, label='Tc',length=200, resolution=0.01,from_=0.01,to=1,command=newTview) TVIEW.set(0.5) TVIEW.pack(side=Tkinter.LEFT) TFINE = Tkinter.Scale(row3, label='Tf',length=200, resolution=0.1e-4,from_=-0.05,to=0.05,command=newTfine) TFINE.set(0.0) TFINE.pack(side=Tkinter.LEFT) THETA = Tkinter.Scale(row3, font='Symbol 10', label='q',length=200, resolution=0.01, from_=0,to=2*math.pi,command=newTheta) THETA.set(0) THETA.pack(side=Tkinter.LEFT) PHI = Tkinter.Scale(row3, font='Symbol 10', label='f',length=200, resolution=0.01,from_=0, to=2*math.pi,command=newPhi) PHI.set(0) PHI.pack(side=Tkinter.LEFT) Tkinter.mainloop() #Local Variables: #mode:python #End: