Last active
June 17, 2018 18:02
-
-
Save neurodroid/213f08570654de3a5c3b3e6e0a27fa27 to your computer and use it in GitHub Desktop.
Reads Intan Technologies CLAMP data file generated by controller GUI.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Reads Intan Technologies CLAMP data file generated by controller GUI. | |
Christoph Schmidt-Hieber | |
2016-11-05 | |
Mostly a Python version of read_Intan_CLP_file.m from | |
http://www.intantech.com/files/Intan_CLAMP_software_compiled_v1_0.zip | |
as of 2016-11-05 | |
Example: | |
>>> import matplotlib.pyplot as plt | |
>>> import intan | |
>>> intan_file = intan.IntanFile('myexperiment_A_160916_142731.clp') | |
>>> plt.plot(intan_file.data["Time"], intan_file.data["Measured"]) | |
>>> intan_file = intan.IntanFile('myexperiment_AUX_160916_142731.clp') | |
>>> plt.plot(intan_file.data["Time"], intan_file.data["ADC"][1]) | |
""" | |
import sys | |
import numpy as np | |
class IntanFile(object): | |
""" | |
Intan Technologies CLAMP data file generated by controller GUI. | |
Attributes | |
---------- | |
data : dict | |
Dictionary containing the file data. | |
header : dict | |
Dictionary containing the file metadata. | |
""" | |
def __init__(self, filename, read=True, verbose=0): | |
""" | |
Open a file for reading. | |
Parameters | |
---------- | |
filename : str | |
File name (full path) | |
read : bool, optional | |
Read in file data upon initialization. Default: True | |
verbose : int, optional | |
Print information while reading. Default: 0 | |
""" | |
self.filename = filename | |
if read: | |
self.read(verbose=verbose) | |
def read(self, verbose=0): | |
""" | |
Read file. | |
Parameters | |
---------- | |
verbose : int, optional | |
Print information while reading. Default: 0 | |
""" | |
with open(self.filename, 'rb') as self.fid: | |
self._read_header(verbose) | |
if self.header["datatype"] == 0: | |
self._read_data() | |
else: | |
self._read_aux_data() | |
self.fid.close() | |
def _read_segment(self): | |
segment = {} | |
segment["WaveformNumber"] = np.fromfile(self.fid, np.uint8, count=1)[0] | |
segment["TOffset"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1 | |
segment["Start"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1 | |
segment["End"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1 | |
segment["AppliedValue"] = np.fromfile(self.fid, np.float32, count=1)[0] | |
return segment | |
def _read_waveform(self): | |
waveform = {} | |
waveform["Interval"] = np.fromfile(self.fid, np.float32, count=1)[0] | |
numSegments = np.fromfile(self.fid, np.uint16, count=1)[0] | |
waveform["Segments"] = [ | |
self._read_segment() | |
for i in range(numSegments)] | |
return waveform | |
def _read_header_voltage_clamp_settings(self): | |
return { | |
"HoldingVoltage": np.fromfile(self.fid, np.float32, count=1)[0], | |
"NominalResistance": np.fromfile(self.fid, np.float32, count=1)[0], | |
"Resistance": np.fromfile(self.fid, np.float32, count=1)[0], | |
"DesiredBandwidth": np.fromfile(self.fid, np.float32, count=1)[0], | |
"ActualBandwidth": np.fromfile(self.fid, np.float32, count=1)[0] | |
} | |
def _read_header_current_clamp_settings(self): | |
return { | |
"HoldingCurrent": np.fromfile(self.fid, np.float32, count=1)[0], | |
"CurrentStepSize": np.fromfile(self.fid, np.float32, count=1)[0] | |
} | |
def _read_header_settings(self): | |
settings = { | |
"EnableCapacitiveCompensation": np.fromfile( | |
self.fid, np.uint8, count=1)[0], | |
"CapCompensationMagnitude": np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
} | |
filterf = np.fromfile(self.fid, np.float32, count=1)[0] | |
if filterf > 0: | |
settings["FilterCutoff"] = filterf | |
settings["PipetteOffset"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
settings["SamplingRate"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
settings["CellParameters.Rs"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
settings["CellParameters.Rm"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
settings["CellParameters.Cm"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
settings["IsVoltageClamp"] = np.fromfile( | |
self.fid, np.uint8, count=1)[0] | |
settings["vClampX2mode"] = np.fromfile( | |
self.fid, np.uint8, count=1)[0] | |
if settings["IsVoltageClamp"]: | |
settings["VoltageClamp"] = \ | |
self._read_header_voltage_clamp_settings() | |
else: | |
settings["CurrentClamp"] = \ | |
self._read_header_current_clamp_settings() | |
settings["Waveform"] = self._read_waveform() | |
return settings | |
def _read_one_header_channel(self): | |
channel = { | |
"Registers": np.fromfile(self.fid, np.uint16, count=14), | |
"DifferenceAmpResidual": np.fromfile( | |
self.fid, np.int32, count=1)[0], | |
"VoltageAmpResidual": np.fromfile(self.fid, np.int32, count=1)[0] | |
} | |
bestCalibration = np.fromfile(self.fid, np.uint8, count=2*4*2) | |
bestCalibration = bestCalibration.reshape((2, 4, 2), order='F') | |
channel["CoarseCalibration"] = bestCalibration[0, :, :].reshape( | |
(4, 2), order='F') | |
channel["FineCalibration"] = bestCalibration[1, :, :].reshape( | |
(4, 2), order='F') | |
channel["FeedbackResistors"] = np.fromfile( | |
self.fid, np.float32, count=5) | |
channel["DesiredBandwidth"] = np.fromfile( | |
self.fid, np.float32, count=1)[0] | |
return channel | |
def _read_one_header_chip(self, numChannels): | |
return { | |
"Channels": [ | |
self._read_one_header_channel() | |
for i in range(numChannels)], | |
"ChipRegisters": np.fromfile(self.fid, np.uint16, count=4) | |
} | |
def _read_header_chips(self): | |
numChips = np.fromfile(self.fid, np.uint16, count=1)[0] | |
numChannels = np.fromfile(self.fid, np.uint16, count=1)[0] | |
return [ | |
self._read_one_header_chip(numChannels) for i in range(numChips)] | |
def _read_header(self, verbose=0): | |
magic_number = np.fromfile(self.fid, np.uint32, count=1)[0] | |
if magic_number != int('0xf3b1a481', 16): | |
raise RuntimeError('Unrecognized file type.') | |
self.header = { | |
"version_major": np.fromfile(self.fid, np.int16, count=1)[0], | |
"version_minor": np.fromfile(self.fid, np.int16, count=1)[0], | |
"datatype": np.fromfile(self.fid, np.int16, count=1)[0] | |
} | |
version_string = "Data File, Version {0}.{0}\n".format( | |
self.header["version_major"], self.header["version_minor"]) | |
if verbose > 0: | |
sys.stdout.write("Reading Intan Technologies CLAMP") | |
if self.header["datatype"] == 0: | |
if verbose > 0: | |
sys.stdout.write(" " + version_string) | |
NumBytes = np.fromfile(self.fid, np.uint16, count=1)[0] | |
self.header["date_Year"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Month"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Day"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Hour"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Minute"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Second"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["Chips"] = self._read_header_chips() | |
self.header["Settings"] = self._read_header_settings() | |
elif self.header["datatype"] == 1: | |
if verbose > 0: | |
sys.stdout.write(" Auxiliary " + version_string) | |
self.header["NumADCs"] = np.fromfile( | |
self.fid, np.uint16, count=1)[0] | |
NumBytes = np.fromfile(self.fid, np.uint16, count=1)[0] | |
self.header["date_Year"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Month"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Day"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Hour"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Minute"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["date_Second"] = np.fromfile( | |
self.fid, np.int16, count=1)[0] | |
self.header["Settings"] = { | |
"SamplingRate": np.fromfile(self.fid, np.float32, count=1)[0] | |
} | |
else: | |
raise RuntimeError("Unrecognized data type") | |
pos = self.fid.tell() | |
if pos != NumBytes: | |
raise RuntimeError("Header NumBytes doesn't match number of bytes") | |
def _read_data(self): | |
start_pos = self.fid.tell() | |
self.fid.seek(0, 2) | |
file_size = self.fid.tell() | |
self.fid.seek(start_pos, 0) | |
# Timestep + Applied (Software) + Clamp Value + Measured | |
size_of_one = 4 + 4 + 4 + 4 | |
length = int(np.round((file_size-start_pos) / size_of_one)) | |
# Read in the whole thing at once | |
byte_array = np.fromfile(self.fid, np.uint8, count=length*size_of_one) | |
data_matrix = byte_array.reshape((size_of_one, length), order='F') | |
# Now extract the pieces we need | |
self.data = { | |
"Time": data_matrix[0:4, :].reshape( | |
(4*length), order='F').view(np.uint32).astype( | |
np.float64) / self.header["Settings"]["SamplingRate"], | |
"Clamp": data_matrix[4:8, :].reshape( | |
(4*length), order='F').view(np.float32), | |
"TotalClamp": data_matrix[8:12, :].reshape( | |
(4*length), order='F').view(np.float32), | |
"Measured": data_matrix[12:16, :].reshape( | |
(4*length), order='F').view(np.float32) | |
} | |
def _read_aux_data(self): | |
start_pos = self.fid.tell() | |
self.fid.seek(0, 2) | |
file_size = self.fid.tell() | |
self.fid.seek(start_pos, 0) | |
# Timestamps + DigIn + DigOut + ADCs | |
size_of_one = 4 + 2 + 2 + 2 * self.header["NumADCs"] | |
length = int(np.round((file_size-start_pos) / size_of_one)) | |
# Read in the whole thing at once | |
byte_array = np.fromfile(self.fid, np.uint8, count=length*size_of_one) | |
data_matrix = byte_array.reshape((size_of_one, length), order='F') | |
digin = data_matrix[4:6, :].reshape( | |
(2*length), order='F').view(np.uint16) | |
diginbits = np.unpackbits(digin.view(np.uint8)) | |
diginbits = diginbits.reshape((diginbits.shape[0]/16, 16)).T | |
diginbits[0:8, :] = diginbits[7::-1, :] | |
diginbits[8:16, :] = diginbits[15:7:-1, :] | |
digout = data_matrix[6:8, :].reshape( | |
(2*length), order='F').view(np.uint16) | |
digoutbits = np.unpackbits(digout.view(np.uint8)) | |
digoutbits = digoutbits.reshape((digoutbits.shape[0]/16, 16)).T | |
digoutbits[0:8, :] = digoutbits[7::-1, :] | |
digoutbits[8:16, :] = digoutbits[15:7:-1, :] | |
# Now extract the pieces we need | |
self.data = { | |
"Time": data_matrix[0:4, :].reshape( | |
(4*length), order='F').view(np.uint32).astype( | |
np.float64) / self.header["Settings"]["SamplingRate"], | |
"DigitalIn": diginbits[0], | |
"DigitalInAll": diginbits, | |
"DigitalOut": digoutbits[0], | |
"DigitalOutAll": digoutbits, | |
"ADC": np.array([ | |
0.0003125 * data_matrix[2*i+6:2*i+8, :].reshape( | |
(2*length), order='F').view( | |
dtype=np.uint16).astype(np.float64) - 2**15 | |
for i in range(self.header["NumADCs"])]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
2017-06-23 Fix file reading on Windows