Skip to content

Instantly share code, notes, and snippets.

@abalijepalli
Last active September 11, 2015 00:35
Show Gist options
  • Save abalijepalli/cdc375a4c72f0736f4de to your computer and use it in GitHub Desktop.
Save abalijepalli/cdc375a4c72f0736f4de to your computer and use it in GitHub Desktop.
Generate synthetic multi-state single molecule data for use with MOSAIC (https://github.com/usnistgov/mosaic)
"""
Analyze synthetic multi-state single molecule data generated by syntheticSingleMoleculeData.py
Algorithm settings are imported from .settings file placed in the data directory.
For a description of the settings, please see https://usnistgov.github.io/mosaic/html/doc/settingsFile.html
:Created: 03/05/2015
:Author: Arvind Balijepalli <[email protected]>
:ChangeLog:
.. line-block::
03/16/15 AB Fixed a bug in the database setup that caused individual
database files to be created for each analyzed file.
02/26/15 AB Initial version
"""
import time
import json
import os
import numpy as np
import mosaic.settings as settings
import mosaic.adept2State as a2s
import mosaic.adept as adept
import mosaic.cusumPlus as cplus
import mosaic.sqlite3MDIO as sqlite3MDIO
from syntheticSingleMoleculeData import param_t
class analysisCommon(object):
def __init__(self, datFileList, prmFileList, algoHnd):
self.prmFileList=prmFileList
self.datFileList=datFileList
self.algoHnd=algoHnd
prm=self._readParameters(prmFileList[0])
s1=settings.settings(os.path.dirname(datFileList[0]), defaultwarn=False).settingsDict
sett=s1[str(algoHnd.__name__)]
dt=1./prm['FsKHz']
self.dbHnd=self._setupDB('.', prm['FsKHz'], sett.copy(), algoHnd)
def runAnalysis(self, saveTS):
for i in range(len(self.datFileList)):
datfile=self.datFileList[i]
prmfile=self.prmFileList[i]
dat=self._readDataFile(datfile)
prm=self._readParameters(prmfile)
s1=settings.settings(os.path.dirname(datfile), defaultwarn=False).settingsDict
sett=s1[str(self.algoHnd.__name__)]
dt=1./prm['FsKHz']
noiseRMS=prm['noisefArtHz']*np.sqrt(prm['BkHz']*1000.)/1000.
testobj=algoHnd(
dat,
int(prm['FsKHz']*1000),
eventstart=int(prm['eventDelay'][0]/dt), # event start point
eventend=int(prm['eventDelay'][-1]/dt), # event end point
baselinestats=[ prm['OpenChCurrent'], noiseRMS, 0.0 ],
algosettingsdict=sett,
savets=int(saveTS),
absdatidx=0.0
)
testobj.dataFileHnd=self.dbHnd
testobj.processEvent()
def _readDataFile(self, datfile):
return np.fromfile(datfile, dtype=np.float64)
def _readParameters(self, paramfile):
prm=json.loads("".join((open(paramfile, 'r').readlines())))
for k in prm.keys():
v=eval(prm[k])
prm[k]=param_t[k](v)
return prm
def _setupDB(self, datpath, FsHz, sett, algohnd):
tEventProcObj=algohnd([], FsHz, eventstart=0,eventend=0, baselinestats=[ 0,0,0 ], algosettingsdict=sett, savets=False, absdatidx=0, datafileHnd=None )
mdioDBHnd=sqlite3MDIO.sqlite3MDIO()
mdioDBHnd.initDB(
dbPath=datpath,
tableName='metadata',
colNames=(tEventProcObj.mdHeadings())+['TimeSeries'],
colNames_t=(tEventProcObj.mdHeadingDataType())+['REAL_LIST']
)
mdioDBHnd.writeSettings(str(sett))
mdioDBHnd.writeAnalysisInfo([
datpath,
'bin',
'N/A',
algohnd.__name__,
'None',
0,
0,
])
return mdioDBHnd
if __name__ == '__main__':
n=10
# algoHnd=cplus.cusumPlus
algoHnd=adept.adept
# algoHnd=a2s.adept2State
# d=[]
# for bd in [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8]:
# for rt in [0.4,0.3,0.2,0.15,0.1,0.075,0.05,0.025]:
# d.append((bd,rt))
t1=time.time()
a=analysisCommon(
datFileList=['temp/eventLong_'+str(i)+'_raw.bin' for i in range(n)],
prmFileList=['temp/eventLong_'+str(i)+'_params.json' for i in range(n)],
algoHnd=algoHnd
)
a.runAnalysis(saveTS=False)
os.rename(a.dbHnd.dbFile, './eventMD_ADEPT.sqlite')
print "Analyzed {0} events in {1:.2f} ms.".format(n, 1000*(time.time()-t1))
import json
import glob
from abc import ABCMeta, abstractmethod
import pylab as pl
import numpy as np
import os.path
import mosaic.sqlite3MDIO as sql
from syntheticSingleMoleculeData import param_t
class plotAnalysisCommon(object):
def __init__(self, dbfile, paramprefix):
self.dbfile=dbfile
self.db=[ sql.sqlite3MDIO() for db in dbfile ]
for i in range(len(self.dbfile)):
self.db[i].openDB(self.dbfile[i])
self.paramprefix=paramprefix
self.q=[]
self._readparams()
self.colorcodes=['r', 'b','g', 'c','m', 'y']
# Define enter and exit funcs so this class can be used with a context manager
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
[ d.closeDB() for d in self.db ]
def plotHist(self, **kwargs):
for i in range(len(self.dbfile)):
h=np.histogram(self.q[i], **kwargs)
pl.scatter(h[1][:-1],h[0], s=60, facecolors=self.colorcodes[i], edgecolors='0.5', alpha=0.5)
h1=np.histogram(self.prm, **kwargs)
pl.plot(h1[1][:-1],h1[0], color='0.5', linestyle='--')
pl.ylim([0, max(h1[0]+10)])
try:
pl.xlim(list(kwargs["range"]))
except KeyError:
pass
pl.show()
def dbQuery(self, **kwargs):
for i in range(len(self.dbfile)):
x=self.db[i].queryDB( self._dbStr() )
q=self._dbResultsStack(x)
self.q.append(q)
def _readparams(self):
p=[]
self.prm=[]
prmFileList=glob.glob(os.path.dirname(self.dbfile[0])+self.paramprefix+'*json')
for paramfile in prmFileList:
prm=json.loads("".join((open(paramfile, 'r').readlines())))
for k in prm.keys():
v=eval(prm[k])
prm[k]=param_t[k](v)
p.extend([self._plotParam(prm)])
[ self.prm.extend(element) for element in [item for item in p] ]
@abstractmethod
def _dbStr(self):
pass
@abstractmethod
def _plotParam(self, paramDict):
pass
@abstractmethod
def _dbResultsStack(self, x):
pass
class plotBlockDepth(plotAnalysisCommon):
def _dbStr(self):
return "select BlockDepth from metadata where ProcessingStatus='normal' and ResTime > 0.02"
def _plotParam(self, paramDict):
bd=[paramDict["OpenChCurrent"]]
bd.extend(paramDict["deltaStep"])
return (np.cumsum(bd)/paramDict["OpenChCurrent"])[1:-1]
def _dbResultsStack(self, x):
q=[]
[ q.extend(element) for element in [item[0] for item in x] ]
return q
class plotBlockDepthSRA(plotBlockDepth):
def _dbResultsStack(self, x):
q=[]
[ q.extend(element) for element in [item for item in x] ]
return q
if __name__ == '__main__':
# with plotBlockDepth(["v1DB/eventMD-20150303-205917_long_msa.sqlite", "eventMD-20150317-202556_long_cla.sqlite"], "data/eventLong") as bd:
# with plotBlockDepth(["v1DB/eventMD-20150303-190929_medium_msa.sqlite","eventMD-20150317-205548_medium_cla.sqlite"], "data/eventMedium") as bd:
# with plotBlockDepth(["v1DB/eventMD-20150303-224135_short_msa.sqlite","eventMD-20150317-204632_short_cla.sqlite"], "data/eventShort") as bd:
# with plotBlockDepth(["eventMD-20150317-100929_long_msa.sqlite", "eventMD-20150317-202556_long_cla.sqlite"], "data/eventLong") as bd:
# with plotBlockDepth(["eventMD-20150317-214319_medium_msa.sqlite","eventMD-20150317-205548_medium_cla.sqlite"], "data/eventMedium") as bd:
# with plotBlockDepth(["eventMD-20150317-215954_short_msa.sqlite","eventMD-20150317-204632_short_cla.sqlite"], "data/eventShort") as bd:
with plotBlockDepthSRA(["eventMD-20150521-145304.sqlite"], "data/eventTwoState") as bd:
bd.dbQuery()
bd.plotHist(range=(0.0,0.51), bins=250)
"""
Generate synthetic multi-state single molecule data
:Created: 02/26/2015
:Author: Arvind Balijepalli <[email protected]>
:ChangeLog:
.. line-block::
02/26/15 AB Added timing
02/26/15 AB Initial version
"""
import time
import os
import json
import numpy as np
import mosaic.utilities.fit_funcs as fit_funcs
param_t={
"n" : int,
"OpenChCurrent" : float,
"RCConstant" : list,
"eventDelay" : list,
"deltaStep" : list,
"FsKHz" : int,
"BkHz" : int,
"padding" : int,
"noisefArtHz" : int
}
class BlockedCurrent(object):
"""
Generate a blocked current value given a mean blocked current and std. dev.
This class assumes the blocked current state follows a Gaussian random process.
:Parameters:
- `blockedCurrentMean` : mean current in pA of the blocked state.
- `blockedCurrentMean` : standard deviation in pA of the blocked state.
"""
def __init__(self, blockedCurrentMean, blockedCurrentStd):
self.blockedCurrentMean=blockedCurrentMean
self.blockedCurrentStd=blockedCurrentStd
@property
def BlockedState(self):
# return np.random.normal(self.blockedCurrentMean, self.blockedCurrentStd)
return self.blockedCurrentMean
class ResidenceTime(object):
"""
Generate a residence time for a blocked state, assuming the residence times follow
a sinlge exponential distribution.
:Parameters:
- `meanResidenceTime` : mean residence time of a blocked state in ms.
"""
def __init__(self, meanResidenceTime):
self.meanResidenceTime=meanResidenceTime
@property
def ResidenceTime(self):
return self.meanResidenceTime
# return np.random.exponential(self.meanResidenceTime)
class ConstructMultiStateEvent:
"""
Construct a multi-state event with the specified parameters. In order to add noise, call *setNoiseParameters*.
meanBlockedCurrent: list of blocked current for each sub-state in the event
stdBlockedCurr: Std. deviation of each sub-state in the event. The mean current is picked from a Gaussian distribution defined by meanBlockedCurrent and stdBlockedCurr.
meanResTime: list of mean residence times for each sub-state in the event. The residence time of the state is selected from an exponential distribution defined by this parameter.
BkHz: Instrumentation bandwidth in kHz.
FsKHz: Sampling frequency in kHz.
i0pA: Open channel current in pA.
padding: number of points to include at the start and end of the event.
"""
def __init__(self, meanBlockedCurrent, stdBlockedCurr, meanResTime, BkHz, FsKHz, i0pA, padding):
self.BkHz=BkHz
self.FsKHz=FsKHz
self.padding=padding
self.openChCurrent=i0pA
self.mb=meanBlockedCurrent
self.sb=stdBlockedCurr
self.mr=meanResTime
self.addNoise=False
def generateEvent(self):
b=[BlockedCurrent(mu, self.sb) for mu in self.mb]
r=[ResidenceTime(mu) for mu in self.mr]
residenceTimes=np.array([rtime.ResidenceTime for rtime in r])
self.deltaBlockedCurrent=self._deltaBlockedCurrent( self.openChCurrent, np.array([bstate.BlockedState for bstate in b]) )
self.eventDelay=self._eventDelay( self.padding, self.FsKHz, residenceTimes )
self.RCConstant=[1./self.BkHz for i in range(len(self.deltaBlockedCurrent))]
self.nPoints=int(np.sum(residenceTimes)*self.FsKHz+2*self.padding)
dt=1./self.FsKHz
self.idealEventData=np.array(
fit_funcs.multiStateFunc(
[t*dt for t in range(self.nPoints)],
self.RCConstant,
self.eventDelay,
self.deltaBlockedCurrent,
self.openChCurrent,
len(self.deltaBlockedCurrent)
),
dtype=np.float64)
if self.addNoise:
self._addNoiseProcess()
def setNoiseParams(self, noisefArtHz):
"""
Set the noise in fA/sqrt(Hz)
noise: Parameter to add noise to the event in fA/sqrt(Hz). This allows the noise to be scaled by bandwidth.
"""
self.noisefArtHz=float(noisefArtHz)
self.addNoise=True
def writeEvent(self, label):
if self.addNoise:
self.rawEventData.tofile(label+"_raw.bin")
self.idealEventData.tofile(label+"_ideal.bin")
self._writeParameters(label)
@property
def IdealizedEvent(self):
return idealEventData
def _addNoiseProcess(self):
noiseStd=self.noisefArtHz*np.sqrt(self.BkHz*1000)/1000.
noise=np.random.normal(0, noiseStd, len(self.idealEventData))
self.rawEventData=np.array(
self.idealEventData+noise,
dtype=np.float64
)
def _writeParameters(self, label):
params={}
params["n"]=str(len(self.deltaBlockedCurrent))
params["OpenChCurrent"]=str(self.openChCurrent)
params["RCConstant"]=str(list(self.RCConstant))
params["eventDelay"]=str(list(self.eventDelay))
params["deltaStep"]=str(list(self.deltaBlockedCurrent))
params["BkHz"]=str(self.BkHz)
params["FsKHz"]=str(self.FsKHz)
params["padding"]=str(self.padding)
if self.addNoise:
params["noisefArtHz"]=str(self.noisefArtHz)
with open(label+"_params.json", 'w') as outfile:
json.dump(params, outfile, indent = 4 )
def _eventDelay(self, padding, FsKHz, residenceTimes):
padDelay=float(padding)/FsKHz
d=[padDelay]
d.extend(residenceTimes)
return np.cumsum(d)
def _deltaBlockedCurrent(self, i0pA, blockedCurrentStates):
c=[i0pA]
c.extend(blockedCurrentStates)
c.extend([i0pA])
return np.diff(c)
if __name__ == '__main__':
n=10
# t1=time.time()
# d=[]
# for bd in [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8]:
# for rt in [0.4,0.3,0.2,0.15,0.1,0.075,0.05,0.025]:
# d.append((bd,rt))
# for ds in d:
# # meanBlockedCurrent, stdBlockedCurr, meanResTime, BkHz, FsKHz, i0pA, padding
# c=ConstructMultiStateEvent([ds[0]*150], 1., [ds[1]], 100, 500, 150, 25)
# c.setNoiseParams(15)
# for i in range(n):
# c.generateEvent()
# try:
# os.makedirs('data/bd='+str(ds[0])+'rt='+str(ds[1]))
# except OSError:
# pass
# c.writeEvent('data/bd='+str(ds[0])+'rt='+str(ds[1])+'/eventTwoState_bd='+str(ds[0])+'rt='+str(ds[1])+str(i))
# print "Generated {0} two-state events (bd={1:.2f}, rt={2:.2f}) in {3:.2f} ms.".format(n, ds[0],ds[1], 1000*(time.time()-t1))
# t1=time.time()
# c=ConstructMultiStateEvent([10,60,30], 2.5, [0.05,0.1,0.075], 250, 500, 150, 200)
# c.setNoiseParams(15)
# for i in range(n):
# c.generateEvent()
# c.writeEvent('data/eventShort_'+str(i))
# print "Generated {0} short events in {1:.2f} ms.".format(n, 1000*(time.time()-t1))
# t1=time.time()
# c=ConstructMultiStateEvent([10,60,30], 2.5, [0.06,0.2,0.15], 250, 500, 150, 200)
# c.setNoiseParams(15)
# for i in range(n):
# c.generateEvent()
# c.writeEvent('data/eventMedium_'+str(i))
# print "Generated {0} medium events in {1:.2f} ms.".format(n, 1000*(time.time()-t1))
t1=time.time()
c=ConstructMultiStateEvent([10,60,30], 2.5, [0.8, 1.2, 1.], 100, 500, 150, 200)
c.setNoiseParams(15)
for i in range(n):
c.generateEvent()
c.writeEvent('data/eventLong_'+str(i))
print "Generated {0} long events in {1:.2f} ms.".format(n, 1000*(time.time()-t1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment