Source code for ipfx.qc_feature_extractor
from . import stim_features as stf
from . import epochs as ep
from . import error as er
import logging
import numpy as np
from . import qc_features as qcf
[docs]def extract_blowout(data_set, tags):
"""
Measure blowout voltage
Parameters
----------
data_set: EphysDataSet
tags: list
warning tags
Returns
-------
blowout_mv: float
blowout voltage in mV
"""
ontology = data_set.ontology
try:
blowout_sweep_number = data_set.get_sweep_numbers(ontology.blowout_names)[-1]
blowout_data = data_set.sweep(blowout_sweep_number)
_,test_end_idx = blowout_data.epochs["test"]
blowout_mv = qcf.measure_blowout(blowout_data.v, test_end_idx)
except IndexError as e:
tags.append("Blowout is not available")
blowout_mv = None
return blowout_mv
[docs]def extract_electrode_0(data_set, tags):
"""
Measure electrode zero
Parameters
----------
data_set: EphysDataSet
tags: list
warning tags
Returns
-------
e0: float
"""
ontology = data_set.ontology
try:
bath_sweep_number = data_set.get_sweep_numbers(ontology.bath_names)[-1]
bath_data = data_set.sweep(bath_sweep_number)
e0 = qcf.measure_electrode_0(bath_data.i, bath_data.sampling_rate)
except IndexError as e:
tags.append("Electrode 0 is not available")
e0 = None
return e0
[docs]def extract_recording_date(data_set, tags):
try:
recording_date = data_set.get_recording_date()
except KeyError as e:
tags.append("Recording date is missing")
recording_date = None
return recording_date
[docs]def extract_clamp_seal(data_set, tags, manual_values=None):
"""
Parameters
----------
data_set: EphysDataSet
tags: list
warning tags
manual_values
Returns
-------
"""
ontology = data_set.ontology
try:
seal_sweep_number = data_set.get_sweep_numbers(ontology.seal_names,"VoltageClamp")[-1]
seal_data = data_set.sweep(seal_sweep_number)
seal_gohm = qcf.measure_seal(seal_data.v,
seal_data.i,
seal_data.t)
if seal_gohm is None or not np.isfinite(seal_gohm):
raise er.FeatureError("Could not compute seal")
except IndexError as e:
# seal is not available, for whatever reason. log error
tags.append("Seal is not available")
# look for manual seal value and use it if it's available
seal_gohm = manual_values.get('manual_seal_gohm', None)
if seal_gohm is not None:
tags.append("Using manual seal value: %f" % seal_gohm)
return seal_gohm
[docs]def extract_input_and_access_resistance(data_set, tags, manual_values=None):
"""
Measure input and series (access) resistance in two steps:
1. finding the breakin sweep
2. and then analyzing it
if the value is unavailable then check to see if it was set manually
Parameters
----------
data_set: EphysDataSet
tags: list
warning tags
manual_values: dict
manual/default values
Returns
-------
ir: float
input resistance
sr: float
access resistance
"""
ontology = data_set.ontology
try:
breakin_sweep_number = data_set.get_sweep_numbers(ontology.breakin_names, "VoltageClamp")[-1]
breakin_data = data_set.sweep(breakin_sweep_number)
except IndexError as e:
tags.append("Breakin sweep not found")
breakin_data = None
ir = None # input resistance
sr = None # series resistance
if breakin_data is not None:
ir = extract_input_resistance(breakin_data,tags,manual_values)
sr = extract_initial_access_resistance(breakin_data,tags,manual_values)
return ir, sr
[docs]def extract_input_resistance(breakin_sweep,tags,manual_values):
try:
ir = qcf.measure_input_resistance(breakin_sweep.v,
breakin_sweep.i,
breakin_sweep.t)
except Exception as e:
logging.warning("Error reading input resistance.")
raise
# apply manual value if it's available
if ir is None:
tags.append("Input resistance is not available")
ir = manual_values.get('manual_initial_input_mohm', None)
if ir is not None:
tags.append("Using manual value for input resistance")
return ir
[docs]def extract_initial_access_resistance(breakin_sweep,tags,manual_values):
try:
sr = qcf.measure_initial_access_resistance(breakin_sweep.v,
breakin_sweep.i,
breakin_sweep.t)
except Exception as e:
logging.warning("Error reading initial access resistance.")
raise
# apply manual value if it's available
if sr is None:
tags.append("Initial access resistance is not available")
sr = manual_values.get('manual_initial_access_resistance_mohm', None)
if sr is not None:
tags.append("Using manual initial access resistance")
return sr
[docs]def compute_input_access_resistance_ratio(ir, sr):
sr_ratio = None # input access resistance ratio
if ir is not None and sr is not None:
sr_ratio = sr / ir
else:
logging.warning("Could not compute input/access resistance ratio (sr: %s, ir:: %s)", str(sr), str(ir))
return sr_ratio
[docs]def cell_qc_features(data_set, manual_values=None):
"""
Parameters
----------
data_set : EphysDataSet
dataset
manual_values : dict
default (manual) values that can be passed in through input.json.
Returns
-------
features : dict
cell qc features
tags : list
warning tags
"""
if manual_values is None:
manual_values = {}
features = {}
tags = []
features['blowout_mv'] = extract_blowout(data_set, tags)
features['electrode_0_pa'] = extract_electrode_0(data_set, tags)
features['recording_date'] = extract_recording_date(data_set, tags)
features["seal_gohm"] = extract_clamp_seal(data_set, tags, manual_values)
ir, sr = extract_input_and_access_resistance(data_set, tags)
features['input_resistance_mohm'] = ir
features["initial_access_resistance_mohm"] = sr
features['input_access_resistance_ratio'] = compute_input_access_resistance_ratio(ir, sr)
return features, tags
[docs]def sweep_qc_features(data_set):
ontology = data_set.ontology
sweeps_features = []
iclamp_sweeps = data_set.filtered_sweep_table(clamp_mode=data_set.CURRENT_CLAMP,
stimuli_exclude=["Test", "Search"],
)
if len(iclamp_sweeps.index) == 0:
logging.warning("No current clamp sweeps available to compute QC features")
for sweep_info in iclamp_sweeps.to_dict(orient='records'):
sweep_features = {}
sweep_features.update(sweep_info)
sweep_num = sweep_info['sweep_number']
sweep = data_set.sweep(sweep_num)
is_ramp = sweep_info['stimulus_name'] in ontology.ramp_names
tags = check_sweep_integrity(sweep, is_ramp)
sweep_features["tags"] = tags
stim_features = current_clamp_sweep_stim_features(sweep)
sweep_features.update(stim_features)
if not tags:
qc_features = current_clamp_sweep_qc_features(sweep, is_ramp)
sweep_features.update(qc_features)
else:
logging.warning("sweep {}: {}".format(sweep_num, tags))
sweeps_features.append(sweep_features)
return sweeps_features
[docs]def check_sweep_integrity(sweep, is_ramp):
tags = []
for k,v in sweep.epochs.items():
if not v:
tags.append(F"{k} epoch is missing")
if not is_ramp:
if sweep.epochs["recording"] and sweep.epochs["experiment"]:
if sweep.epochs["recording"][1] < sweep.epochs["experiment"][1]:
tags.append("Recording stopped before completing the experiment epoch")
if np.isnan(sweep.i).any():
tags.append("Stimulus contains NaN values")
return tags
[docs]def current_clamp_sweep_stim_features(sweep):
stim_features = {}
i = sweep.i
t = sweep.t
hz = sweep.sampling_rate
start_time, dur, amp, start_idx, end_idx = stf.get_stim_characteristics(i, t)
stim_features['stimulus_start_time'] = start_time
stim_features['stimulus_amplitude'] = amp
stim_features['stimulus_duration'] = dur
if sweep.epochs["experiment"]:
expt_start_idx, _ = sweep.epochs["experiment"]
interval = stf.find_stim_interval(expt_start_idx, i, hz)
else:
interval = None
stim_features['stimulus_interval'] = interval
return stim_features
[docs]def current_clamp_sweep_qc_features(sweep, is_ramp):
qc_features = {}
voltage = sweep.v
current = sweep.i
hz = sweep.sampling_rate
expt_start_idx, _ = ep.get_experiment_epoch(current, hz)
# measure noise before stimulus
idx0, idx1 = ep.get_first_noise_epoch(expt_start_idx, hz) # count from the beginning of the experiment
_, qc_features["pre_noise_rms_mv"] = qcf.measure_vm(voltage[idx0:idx1])
# measure mean and rms of Vm at end of recording
# do not check for ramps, because they do not have enough time to recover
_, rec_end_idx = ep.get_recording_epoch(voltage)
if not is_ramp:
idx0, idx1 = ep.get_last_stability_epoch(rec_end_idx, hz)
mean_last_stability_epoch, _ = qcf.measure_vm(voltage[idx0:idx1])
idx0, idx1 = ep.get_last_noise_epoch(rec_end_idx, hz)
_, rms_last_noise_epoch = qcf.measure_vm(voltage[idx0:idx1])
else:
rms_last_noise_epoch = None
mean_last_stability_epoch = None
qc_features["post_vm_mv"] = mean_last_stability_epoch
qc_features["post_noise_rms_mv"] = rms_last_noise_epoch
# measure mean and rms of Vm and over extended interval before stimulus, to check stability
stim_start_idx, _ = ep.get_stim_epoch(current)
idx0, idx1 = ep.get_first_stability_epoch(stim_start_idx, hz)
mean_first_stability_epoch, rms_first_stability_epoch = qcf.measure_vm(voltage[idx0:idx1])
qc_features["pre_vm_mv"] = mean_first_stability_epoch
qc_features["slow_vm_mv"] = mean_first_stability_epoch
qc_features["slow_noise_rms_mv"] = rms_first_stability_epoch
qc_features["vm_delta_mv"] = qcf.measure_vm_delta(mean_first_stability_epoch, mean_last_stability_epoch)
return qc_features