Source code for ipfx.x_to_nwb.hr_stimsetgenerator
import numpy as np
from ipfx.x_to_nwb.hr_segments import getSegmentClass
from ipfx.x_to_nwb.conversion_utils import getChannelRecordIndex, getStimulusRecordIndex
[docs]class StimSetGenerator:
"""
High level class for creating stimsets
"""
def __init__(self, bundle):
self.bundle = bundle
self.cache = {}
pass
[docs] def fetch(self, sweep, trace):
"""
Fetch a stimulus set from the cache, generate it if it is not present.
:param sweep: SweepRecord node
:param trace: TraceRecord node
Return: python list of numpy arrays, one array per sweep
"""
# PGF hierarchy
#
# Root
# stimRec
# channelRec
# segmentRec
try:
stimRec_idx = getStimulusRecordIndex(sweep)
key = None
channelRec_index = getChannelRecordIndex(self.bundle.pgf, sweep, trace)
if channelRec_index is None:
raise ValueError(f"Could not find a ChannelRecord for the given trace.")
key = f"{stimRec_idx}.{channelRec_index}"
entry = self.cache.get(key)
if entry:
return entry
elif entry is False:
return []
stimRec = self.bundle.pgf[stimRec_idx]
channelRec = stimRec[channelRec_index]
if stimRec.ActualDacChannels != 1:
raise ValueError(f"Unsupported ActualDacChannels lengths for "
f"sweep index {stimRec_idx} and ChannelRecord index {channelRec_index}.")
allSweeps = []
for sweep in range(stimRec.NumberSweeps):
stimset = np.empty([0])
for segmentRec in channelRec:
cls = getSegmentClass(stimRec, channelRec, segmentRec)
# print(cls)
segment = cls.createArray(sweep)
stimset = np.append(stimset, segment, axis=0)
allSweeps.append(stimset)
self.cache[key] = allSweeps
except (ValueError, IndexError) as e:
print(e)
if key:
self.cache[key] = False
return []
return self.cache[key]