"""
Convert ABF files, created by PClamp/Clampex, to NWB v2 files.
"""
from hashlib import sha256
import json
import re
import os
import glob
import warnings
import logging
import numpy as np
import pyabf
from pynwb.device import Device
from pynwb import NWBHDF5IO, NWBFile
from pynwb.icephys import IntracellularElectrode
from ipfx.x_to_nwb.conversion_utils import PLACEHOLDER, V_CLAMP_MODE, I_CLAMP_MODE, I0_CLAMP_MODE, \
parseUnit, getStimulusSeriesClass, getAcquiredSeriesClass, createSeriesName, convertDataset, \
getPackageInfo, createCycleID
log = logging.getLogger(__name__)
[docs]class ABFConverter:
protocolStorageDir = None
adcNamesWithRealData = ["IN 0", "IN 1", "IN 2", "IN 3"]
def __init__(self, inFileOrFolder, outFile, outputFeedbackChannel, compression=True):
"""
Convert the given ABF file to NWB
Keyword arguments:
inFileOrFolder -- input file, or folder with multiple files, in ABF v2 format
outFile -- target filepath (must not exist)
outputFeedbackChannel -- Output ADC data from feedback channels as well (useful for debugging only)
compression -- Toggle compression for HDF5 datasets
"""
inFiles = []
if os.path.isfile(inFileOrFolder):
inFiles.append(inFileOrFolder)
elif os.path.isdir(inFileOrFolder):
inFiles = glob.glob(os.path.join(inFileOrFolder, "*.abf"))
else:
raise ValueError(f"{inFileOrFolder} is neither a folder nor a path.")
self.outputFeedbackChannel = outputFeedbackChannel
self.compression = compression
self._settings = self._getJSONFiles(inFileOrFolder)
self.abfs = []
for inFile in inFiles:
abf = pyabf.ABF(inFile, loadData=False, stimulusFileFolder=ABFConverter.protocolStorageDir)
self.abfs.append(abf)
# ensure that the input file matches our expectations
self._check(abf)
self.refabf = self._getOldestABF()
self._checkAll()
self.totalSeriesCount = self._getMaxTimeSeriesCount()
nwbFile = self._createFile()
device = self._createDevice()
nwbFile.add_device(device)
electrodes = self._createElectrodes(device)
nwbFile.add_ic_electrode(electrodes)
for i in self._createStimulusSeries(electrodes):
nwbFile.add_stimulus(i)
for i in self._createAcquiredSeries(electrodes):
nwbFile.add_acquisition(i)
with NWBHDF5IO(outFile, "w") as io:
io.write(nwbFile, cache_spec=True)
@staticmethod
def _getProtocolName(protocolName):
"""
Return the protocol/stimset name without the channel suffix.
"""
return re.sub(r"_IN\d+$", "", protocolName)
def _getJSONFiles(self, inFileOrFolder):
"""
Search the JSON files with all miscellaneous settings.
If `inFileOrFolder` is a folder we need one JSON file in that folder or
multiple JSON files with the same basename as the ABF files.
If `inFileOrFolder` is a file the JSON file must have the same
basename.
Returns a dict with the abf file/folder name as key and a dictinonary with
the settings as value.
"""
d = {}
def loadJSON(filename):
log.debug(f"Using JSON settings file {filename}.")
with open(filename) as fh:
return json.load(fh)
def addDictEntry(d, filename):
base, _ = os.path.splitext(filename)
settings = base + ".json"
if os.path.isfile(settings):
d[filename] = loadJSON(settings)
return None
warnings.warn(f"Could not find the JSON file {settings} with settings.")
if os.path.isfile(inFileOrFolder):
log.debug("Searching JSON files for file conversion.")
addDictEntry(d, inFileOrFolder)
return d
if os.path.isdir(inFileOrFolder):
files = glob.glob(os.path.join(inFileOrFolder, "*.json"))
numFiles = len(files)
log.debug(f"Found {numFiles} JSON files for folder conversion.")
if numFiles == 0:
warnings.warn(f"Could not find any JSON file with settings.")
return d
elif numFiles == 1:
# compatibility with old datasets with only one JSON file per folder
d[inFileOrFolder] = loadJSON(files[0])
return d
# iterate over all ABF files
files = glob.glob(os.path.join(inFileOrFolder, "*.abf"))
for f in files:
addDictEntry(d, f)
return d
def _check(self, abf):
"""
Check that all prerequisites are met.
"""
if abf.abfVersion["major"] != 2:
raise ValueError(f"Can not handle ABF file format version {abf.abfVersion['major']} sweeps.")
elif not (abf.sweepPointCount > 0):
raise ValueError("The number of data points is not larger than zero.")
elif not (abf.sweepCount > 0):
raise ValueError("Found no sweeps.")
elif not (abf.channelCount > 0):
raise ValueError("Found no channels.")
elif sum(abf._dacSection.nWaveformEnable) == 0:
raise ValueError("All channels are turned off.")
elif len(np.unique(abf._adcSection.nTelegraphInstrument)) > 1:
raise ValueError("Unexpected mismatching telegraph instruments.")
elif len(abf._adcSection.sTelegraphInstrument[0]) == 0:
raise ValueError("Empty telegraph name.")
elif len(abf._protocolSection.sDigitizerType) == 0:
raise ValueError("Empty digitizer type.")
elif abf.channelCount != len(abf.channelList):
raise ValueError("Internal channel count is inconsistent.")
elif abf.sweepCount != len(abf.sweepList):
raise ValueError("Internal sweep count is inconsistent.")
for sweep in range(abf.sweepCount):
for channel in range(abf.channelCount):
abf.setSweep(sweep, channel=channel)
if abf.sweepUnitsX != "sec":
raise ValueError(f"Unexpected x units of {abf.sweepUnitsX}.")
if not abf._dacSection.nWaveformEnable[channel]:
continue
if np.isnan(abf.sweepC).any():
raise ValueError(f"Found at least one 'Not a Number' "
f"entry in stimulus channel {channel} of sweep {sweep} "
f"in file {abf.abfFilePath} using protocol {abf.protocol}.")
def _reduceChannelList(self, abf):
"""
Return a reduced channel list taking into account the feedback channel export setting.
"""
if self.outputFeedbackChannel:
return abf.channelList
cleanChanneList = []
for channel in range(abf.channelCount):
adcName = abf.adcNames[channel]
if adcName in ABFConverter.adcNamesWithRealData:
cleanChanneList.append(abf.channelList[channel])
return cleanChanneList
def _checkAll(self):
"""
Check that all loaded ABF files have a minimum list of properties in common.
These are:
- Digitizer device
- Telegraph device
- Creator Name
- Creator Version
- abfVersion
- channelList
"""
for abf in self.abfs:
source = f"({self.refabf.abfFilePath} vs {abf.abfFilePath})"
if self.refabf._protocolSection.sDigitizerType != abf._protocolSection.sDigitizerType:
raise ValueError(f"Digitizer type does not match in {source}.")
elif self.refabf._adcSection.sTelegraphInstrument[0] != abf._adcSection.sTelegraphInstrument[0]:
raise ValueError(f"Telegraph instrument does not match in {source}.")
elif self.refabf._stringsIndexed.uCreatorName != abf._stringsIndexed.uCreatorName:
raise ValueError(f"Creator Name does not match in {source}.")
elif self.refabf.creatorVersion != abf.creatorVersion:
raise ValueError(f"Creator Version does not match in {source}.")
elif self.refabf.abfVersion != abf.abfVersion:
raise ValueError(f"abfVersion does not match in {source}.")
refChannelList = self._reduceChannelList(self.refabf)
channelList = self._reduceChannelList(abf)
if refChannelList != channelList:
raise ValueError(f"channelList ({refChannelList} vs {channelList} does not match in {source}.")
def _getOldestABF(self):
"""
Return the ABF file with the oldest starting time stamp.
"""
def getTimestamp(abf):
return abf.abfDateTime
return min(self.abfs, key=getTimestamp)
def _getClampMode(self, abf, channel):
"""
Return the clamp mode of the given channel.
"""
return abf._adcSection.nTelegraphMode[channel]
def _getMaxTimeSeriesCount(self):
"""
Return the maximum number of TimeSeries which will be created from all ABF files.
"""
def getCount(abf):
return abf.sweepCount * abf.channelCount
return sum(map(getCount, self.abfs))
def _createFile(self):
"""
Create a pynwb NWBFile object from the ABF file contents.
"""
def formatVersion(version):
return f"{version['major']}.{version['minor']}.{version['bugfix']}.{version['build']}"
def getFileComments(abfs):
"""
Return the file comments of all files. Returns an empty string if none are present.
"""
comments = {}
for abf in abfs:
if len(abf.abfFileComment) > 0:
comments[os.path.basename(abf.abfFilePath)] = abf.abfFileComment
if not len(comments):
return ""
return json.dumps(comments)
session_description = getFileComments(self.abfs)
if len(session_description) == 0:
session_description = PLACEHOLDER
identifier = sha256(" ".join([abf.fileGUID for abf in self.abfs]).encode()).hexdigest()
session_start_time = self.refabf.abfDateTime
creatorName = self.refabf._stringsIndexed.uCreatorName
creatorVersion = formatVersion(self.refabf.creatorVersion)
experiment_description = (f"{creatorName} v{creatorVersion}")
source_script_file_name = "run_x_to_nwb_conversion.py"
source_script = json.dumps(getPackageInfo(), sort_keys=True, indent=4)
session_id = PLACEHOLDER
return NWBFile(session_description=session_description,
identifier=identifier,
session_start_time=session_start_time,
experimenter=None,
experiment_description=experiment_description,
session_id=session_id,
source_script_file_name=source_script_file_name,
source_script=source_script)
def _createDevice(self):
"""
Create a pynwb Device object from the ABF file contents.
"""
digitizer = self.refabf._protocolSection.sDigitizerType
telegraph = self.refabf._adcSection.sTelegraphInstrument[0]
return Device(f"{digitizer} with {telegraph}")
def _createElectrodes(self, device):
"""
Create pynwb ic_electrodes objects from the ABF file contents.
"""
return [IntracellularElectrode(f"Electrode {x:d}",
device,
description=PLACEHOLDER)
for x in self.refabf.channelList]
def _calculateStartingTime(self, abf):
"""
Calculate the starting time of the current sweep of `abf` relative to the reference ABF file.
"""
delta = abf.abfDateTime - self.refabf.abfDateTime
return delta.total_seconds() + abf.sweepX[0]
def _createStimulusSeries(self, electrodes):
"""
Return a list of pynwb stimulus series objects created from the ABF file contents.
"""
series = []
counter = 0
for file_index, abf in enumerate(self.abfs):
stimulus_description = ABFConverter._getProtocolName(abf.protocol)
scale_factor = self._getScaleFactor(abf, stimulus_description)
for sweep in range(abf.sweepCount):
cycle_id = createCycleID([file_index, sweep], total=self.totalSeriesCount)
for channel in range(abf.channelCount):
if not abf._dacSection.nWaveformEnable[channel]:
continue
abf.setSweep(sweep, channel=channel, absoluteTime=True)
name, counter = createSeriesName("index", counter, total=self.totalSeriesCount)
data = convertDataset(abf.sweepC * scale_factor, self.compression)
conversion, unit = parseUnit(abf.sweepUnitsC)
electrode = electrodes[channel]
gain = abf._dacSection.fDACScaleFactor[channel]
resolution = np.nan
starting_time = self._calculateStartingTime(abf)
rate = float(abf.dataRate)
description = json.dumps({"cycle_id": cycle_id,
"protocol": abf.protocol,
"protocolPath": abf.protocolPath,
"file": os.path.basename(abf.abfFilePath),
"name": abf.dacNames[channel],
"number": abf._dacSection.nDACNum[channel]},
sort_keys=True, indent=4)
seriesClass = getStimulusSeriesClass(self._getClampMode(abf, channel))
if seriesClass is not None:
stimulus = seriesClass(name=name,
data=data,
sweep_number=np.uint64(cycle_id),
unit=unit,
electrode=electrode,
gain=gain,
resolution=resolution,
conversion=conversion,
starting_time=starting_time,
rate=rate,
description=description,
stimulus_description=stimulus_description)
series.append(stimulus)
return series
def _findSettingsEntry(self, abf):
"""
Return the settings dictionary for the given abf file, either the file
specific, or the global one for the folder, or None as first tuple element.
The second element is the source of the data.
"""
if self._settings is None:
return None, None
filename = abf.abfFilePath
try:
return self._settings[filename], filename
except KeyError:
dirname = os.path.dirname(filename)
try:
return self._settings[dirname], dirname
except KeyError:
return None, None
def _getScaleFactor(self, abf, stimset):
"""
Return the stimulus scale factor for the stimset of the abf file.
"""
try:
settings, _ = self._findSettingsEntry(abf)
return float(settings["ScaleFactors"][stimset])
except (TypeError, KeyError):
warnings.warn(f"Could not find the scale factor for the stimset {stimset}, using 1.0 as fallback.")
return 1.0
def _getAmplifierSettings(self, abf, clampMode, adcName):
"""
Return a dict with the amplifier settings read out form the JSON file.
Unset values are returned as `NaN`.
"""
d = {}
try:
# JSON stores adcName without spaces
amplifier = "unknown"
abfSettings, _ = self._findSettingsEntry(abf)
adcNameWOSpace = adcName.replace(" ", "")
amplifier = abfSettings["uids"][adcNameWOSpace]
settings = abfSettings[amplifier]
if settings["GetMode"] != clampMode:
warnings.warn(f"Stored clamp mode {settings['GetMode']} does not match requested "
f"clamp mode {clampMode} of channel {adcName}.")
settings = None
except (TypeError, KeyError):
warnings.warn(f"Could not find settings for amplifier {amplifier} of channel {adcName}.")
settings = None
if settings:
if clampMode == V_CLAMP_MODE:
d["capacitance_slow"] = settings["GetSlowCompCap"]
d["capacitance_fast"] = settings["GetFastCompCap"]
if settings["GetRsCompEnable"]:
d["resistance_comp_correction"] = settings["GetRsCompCorrection"]
d["resistance_comp_bandwidth"] = settings["GetRsCompBandwidth"]
d["resistance_comp_prediction"] = settings["GetRsCompPrediction"]
else:
d["resistance_comp_correction"] = np.nan
d["resistance_comp_bandwidth"] = np.nan
d["resistance_comp_prediction"] = np.nan
if settings["GetWholeCellCompEnable"]:
d["whole_cell_capacitance_comp"] = settings["GetWholeCellCompCap"]
d["whole_cell_series_resistance_comp"] = settings["GetWholeCellCompResist"]
else:
d["whole_cell_capacitance_comp"] = np.nan
d["whole_cell_series_resistance_comp"] = np.nan
elif clampMode in (I_CLAMP_MODE, I0_CLAMP_MODE):
if settings["GetHoldingEnable"]:
d["bias_current"] = settings["GetHolding"]
else:
d["bias_current"] = np.nan
if settings["GetBridgeBalEnable"]:
d["bridge_balance"] = settings["GetBridgeBalResist"]
else:
d["bridge_balance"] = np.nan
if settings["GetNeutralizationEnable"]:
d["capacitance_compensation"] = settings["GetNeutralizationCap"]
else:
d["capacitance_compensation"] = np.nan
else:
warnings.warn("Unsupported clamp mode {clampMode}")
else:
if clampMode == V_CLAMP_MODE:
d["capacitance_slow"] = np.nan
d["capacitance_fast"] = np.nan
d["resistance_comp_correction"] = np.nan
d["resistance_comp_bandwidth"] = np.nan
d["resistance_comp_prediction"] = np.nan
d["whole_cell_capacitance_comp"] = np.nan
d["whole_cell_series_resistance_comp"] = np.nan
elif clampMode in (I_CLAMP_MODE, I0_CLAMP_MODE):
d["bias_current"] = np.nan
d["bridge_balance"] = np.nan
d["capacitance_compensation"] = np.nan
else:
warnings.warn("Unsupported clamp mode {clampMode}")
return d
def _createAcquiredSeries(self, electrodes):
"""
Return a list of pynwb acquisition series objects created from the ABF file contents.
"""
series = []
counter = 0
for file_index, abf in enumerate(self.abfs):
stimulus_description = ABFConverter._getProtocolName(abf.protocol)
_, jsonSource = self._findSettingsEntry(abf)
log.debug(f"Using JSON settings for {jsonSource}.")
for sweep in range(abf.sweepCount):
cycle_id = createCycleID([file_index, sweep], total=self.totalSeriesCount)
for channel in range(abf.channelCount):
adcName = abf.adcNames[channel]
if not self.outputFeedbackChannel:
if adcName in ABFConverter.adcNamesWithRealData:
pass
else:
# feedback data, skip
continue
abf.setSweep(sweep, channel=channel, absoluteTime=True)
name, counter = createSeriesName("index", counter, total=self.totalSeriesCount)
data = convertDataset(abf.sweepY, self.compression)
conversion, unit = parseUnit(abf.sweepUnitsY)
electrode = electrodes[channel]
gain = abf._adcSection.fADCProgrammableGain[channel]
resolution = np.nan
starting_time = self._calculateStartingTime(abf)
rate = float(abf.dataRate)
description = json.dumps({"cycle_id": cycle_id,
"protocol": abf.protocol,
"protocolPath": abf.protocolPath,
"file": os.path.basename(abf.abfFilePath),
"name": adcName,
"number": abf._adcSection.nADCNum[channel]},
sort_keys=True, indent=4)
clampMode = self._getClampMode(abf, channel)
settings = self._getAmplifierSettings(abf, clampMode, adcName)
seriesClass = getAcquiredSeriesClass(clampMode)
if clampMode == V_CLAMP_MODE:
acquistion_data = seriesClass(name=name,
data=data,
sweep_number=np.uint64(cycle_id),
unit=unit,
electrode=electrode,
gain=gain,
resolution=resolution,
conversion=conversion,
starting_time=starting_time,
rate=rate,
description=description,
capacitance_slow=settings["capacitance_slow"],
capacitance_fast=settings["capacitance_fast"],
resistance_comp_correction=settings["resistance_comp_correction"],
resistance_comp_bandwidth=settings["resistance_comp_bandwidth"],
resistance_comp_prediction=settings["resistance_comp_prediction"],
stimulus_description=stimulus_description,
whole_cell_capacitance_comp=settings["whole_cell_capacitance_comp"], # noqa: E501
whole_cell_series_resistance_comp=settings["whole_cell_series_resistance_comp"]) # noqa: E501
elif clampMode in (I_CLAMP_MODE, I0_CLAMP_MODE):
acquistion_data = seriesClass(name=name,
data=data,
sweep_number=np.uint64(cycle_id),
unit=unit,
electrode=electrode,
gain=gain,
resolution=resolution,
conversion=conversion,
starting_time=starting_time,
rate=rate,
description=description,
bias_current=settings["bias_current"],
bridge_balance=settings["bridge_balance"],
stimulus_description=stimulus_description,
capacitance_compensation=settings["capacitance_compensation"])
else:
raise ValueError(f"Unsupported clamp mode {clampMode}.")
series.append(acquistion_data)
return series