Source code for ipfx.spike_features

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import numpy as np
from scipy.optimize import curve_fit
from functools import partial
from . import time_series_utils as tsu
from . import spike_detector as spkd
from . import error as er

[docs]def find_widths(v, t, spike_indexes, peak_indexes, trough_indexes, clipped=None): """Find widths at half-height for spikes. Widths are only returned when heights are defined Parameters ---------- v : numpy array of voltage time series in mV t : numpy array of times in seconds spike_indexes : numpy array of spike indexes peak_indexes : numpy array of spike peak indexes trough_indexes : numpy array of trough indexes Returns ------- widths : numpy array of spike widths in sec """ if not spike_indexes.size or not peak_indexes.size: return np.array([]) if len(spike_indexes) < len(trough_indexes): raise er.FeatureError("Cannot have more troughs than spikes") if clipped is None: clipped = np.zeros_like(spike_indexes, dtype=bool) use_indexes = ~np.isnan(trough_indexes) use_indexes[clipped] = False heights = np.zeros_like(trough_indexes) * np.nan heights[use_indexes] = v[peak_indexes[use_indexes]] - v[trough_indexes[use_indexes].astype(int)] width_levels = np.zeros_like(trough_indexes) * np.nan width_levels[use_indexes] = heights[use_indexes] / 2. + v[trough_indexes[use_indexes].astype(int)] thresh_to_peak_levels = np.zeros_like(trough_indexes) * np.nan thresh_to_peak_levels[use_indexes] = (v[peak_indexes[use_indexes]] - v[spike_indexes[use_indexes]]) / 2. + v[spike_indexes[use_indexes]] # Some spikes in burst may have deep trough but short height, so can't use same # definition for width width_levels[width_levels < v[spike_indexes]] = thresh_to_peak_levels[width_levels < v[spike_indexes]] width_starts = np.zeros_like(trough_indexes) * np.nan width_starts[use_indexes] = np.array([pk - np.flatnonzero(v[pk:spk:-1] <= wl)[0] if np.flatnonzero(v[pk:spk:-1] <= wl).size > 0 else np.nan for pk, spk, wl in zip(peak_indexes[use_indexes], spike_indexes[use_indexes], width_levels[use_indexes])]) width_ends = np.zeros_like(trough_indexes) * np.nan width_ends[use_indexes] = np.array([pk + np.flatnonzero(v[pk:tr] <= wl)[0] if np.flatnonzero(v[pk:tr] <= wl).size > 0 else np.nan for pk, tr, wl in zip(peak_indexes[use_indexes], trough_indexes[use_indexes].astype(int), width_levels[use_indexes])]) missing_widths = np.isnan(width_starts) | np.isnan(width_ends) widths = np.zeros_like(width_starts, dtype=np.float64) widths[~missing_widths] = t[width_ends[~missing_widths].astype(int)] - \ t[width_starts[~missing_widths].astype(int)] if any(missing_widths): widths[missing_widths] = np.nan return widths
[docs]def analyze_trough_details(v, t, spike_indexes, peak_indexes, clipped=None, end=None, filter=10., heavy_filter=1., term_frac=0.01, adp_thresh=0.5, tol=0.5, flat_interval=0.002, adp_max_delta_t=0.005, adp_max_delta_v=10., dvdt=None): """Analyze trough to determine if an ADP exists and whether the reset is a 'detour' or 'direct' Parameters ---------- v : numpy array of voltage time series in mV t : numpy array of times in seconds spike_indexes : numpy array of spike indexes peak_indexes : numpy array of spike peak indexes end : end of time window (optional) filter : cutoff frequency for 4-pole low-pass Bessel filter in kHz (default 1) heavy_filter : lower cutoff frequency for 4-pole low-pass Bessel filter in kHz (default 1) thresh_frac : fraction of average upstroke for threshold calculation (optional, default 0.05) adp_thresh: minimum dV/dt in V/s to exceed to be considered to have an ADP (optional, default 1.5) tol : tolerance for evaluating whether Vm drops appreciably further after end of spike (default 1.0 mV) flat_interval: if the trace is flat for this duration, stop looking for an ADP (default 0.002 s) adp_max_delta_t: max possible ADP delta t (default 0.005 s) adp_max_delta_v: max possible ADP delta v (default 10 mV) dvdt : pre-calculated time-derivative of voltage (optional) Returns ------- isi_types : numpy array of isi reset types (direct or detour) fast_trough_indexes : numpy array of indexes at the start of the trough (i.e. end of the spike) adp_indexes : numpy array of adp indexes (np.nan if there was no ADP in that ISI slow_trough_indexes : numpy array of indexes at the minimum of the slow phase of the trough (if there wasn't just a fast phase) """ if end is None: end = t[-1] end_index = tsu.find_time_index(t, end) if clipped is None: clipped = np.zeros_like(peak_indexes) # Can't evaluate for spikes that are clipped by the window orig_len = len(peak_indexes) valid_spike_indexes = spike_indexes[~clipped] valid_peak_indexes = peak_indexes[~clipped] if dvdt is None: dvdt = tsu.calculate_dvdt(v, t, filter) dvdt_hvy = tsu.calculate_dvdt(v, t, heavy_filter) # Writing as for loop - see if I can vectorize any later fast_trough_indexes = [] adp_indexes = [] slow_trough_indexes = [] isi_types = [] update_clipped = [] for peak, next_spk in zip(valid_peak_indexes, np.append(valid_spike_indexes[1:], end_index)): downstroke = dvdt[peak:next_spk].argmin() + peak target = term_frac * dvdt[downstroke] terminated_points = np.flatnonzero(dvdt[downstroke:next_spk] >= target) if terminated_points.size: terminated = terminated_points[0] + downstroke update_clipped.append(False) else: logging.debug("Could not identify fast trough - marking spike as clipped") isi_types.append(np.nan) fast_trough_indexes.append(np.nan) adp_indexes.append(np.nan) slow_trough_indexes.append(np.nan) update_clipped.append(True) continue # Could there be an ADP? adp_index = np.nan dv_over_thresh = np.flatnonzero(dvdt_hvy[terminated:next_spk] >= adp_thresh) if dv_over_thresh.size: cross = dv_over_thresh[0] + terminated # only want to look for ADP before things get pretty flat # otherwise, could just pick up random transients long after the spike if t[cross] - t[terminated] < flat_interval: # Going back up fast, but could just be going into another spike # so need to check for a reversal (zero-crossing) in dV/dt zero_return_vals = np.flatnonzero(dvdt_hvy[cross:next_spk] <= 0) if zero_return_vals.size: putative_adp_index = zero_return_vals[0] + cross min_index = v[putative_adp_index:next_spk].argmin() + putative_adp_index if (v[putative_adp_index] - v[min_index] >= tol and v[putative_adp_index] - v[terminated] <= adp_max_delta_v and t[putative_adp_index] - t[terminated] <= adp_max_delta_t): adp_index = putative_adp_index slow_phase_min_index = min_index isi_type = "detour" if np.isnan(adp_index): v_term = v[terminated] min_index = v[terminated:next_spk].argmin() + terminated if v_term - v[min_index] >= tol: # dropped further after end of spike -> detour reset isi_type = "detour" slow_phase_min_index = min_index else: isi_type = "direct" isi_types.append(isi_type) fast_trough_indexes.append(terminated) adp_indexes.append(adp_index) if isi_type == "detour": slow_trough_indexes.append(slow_phase_min_index) else: slow_trough_indexes.append(np.nan) # If we had to kick some spikes out before, need to add nans at the end output = [] output.append(np.array(isi_types)) for d in (fast_trough_indexes, adp_indexes, slow_trough_indexes): output.append(np.array(d, dtype=float)) if orig_len > len(isi_types): extra = np.zeros(orig_len - len(isi_types)) * np.nan output = tuple((np.append(o, extra) for o in output)) # The ADP and slow trough for the last spike in a train are not reliably # calculated, and usually extreme when wrong, so we will NaN them out. # # Note that this will result in a 0 value when delta V or delta T is # calculated, which may not be strictly accurate to the trace, but the # magnitude of the difference will be less than in many of the erroneous # cases seen otherwise output[2][-1] = np.nan # ADP output[3][-1] = np.nan # slow trough clipped[~clipped] = update_clipped return output, clipped
[docs]def fit_prespike_time_constant(t, v, start, spike_time, dv_limit=-0.001, tau_limit=0.3): """Finds the dominant time constant of the pre-spike rise in voltage Parameters ---------- v : numpy array of voltage time series in mV t : numpy array of times in seconds start : start of voltage rise (seconds) spike_time : time of first spike (seconds) dv_limit : dV/dt cutoff (default -0.001) Shortens fit window if rate of voltage drop exceeds this limit tau_limit : upper bound for slow time constant (seconds, default 0.3) If the slower time constant of a double-exponential fit is twice that of the faster and exceeds this limit, the faster one will be considered the dominant one Returns ------- tau : dominant time constant (seconds) """ start_index = tsu.find_time_index(t, start) end_index = tsu.find_time_index(t, spike_time) if end_index <= start_index: raise er.FeatureError("Start for pre-spike time constant fit cannot be after the spike time.") v_slice = v[start_index:end_index] t_slice = t[start_index:end_index] # Solve linear version with single exponential first to guess at the time constant y0 = v_slice.max() + 5e-6 # set y0 slightly above v_slice maximum y = -v_slice + y0 y = np.log(y) dy = tsu.calculate_dvdt(y, t_slice, filter=1.0) # End the fit interval if the voltage starts dropping new_end_indexes = np.flatnonzero(dy <= dv_limit) cross_limit = 0.0005 # sec if not new_end_indexes.size or t_slice[new_end_indexes[0]] - t_slice[0] < cross_limit: # either never crosses or crosses too early new_end_index = len(v_slice) else: new_end_index = new_end_indexes[0] K, A_log = np.polyfit(t_slice[:new_end_index] - t_slice[0], y[:new_end_index], 1) A = np.exp(A_log) dbl_exp_y0 = partial(_dbl_exp_fit, y0) try: popt, pcov = curve_fit(dbl_exp_y0, t_slice - t_slice[0], v_slice, p0=(-A / 2.0, -1.0 / K, -A / 2.0, -1.0 / K)) except RuntimeError: # Fall back to single fit tau = -1.0 / K return tau # Find dominant time constant if popt[1] < popt[3]: faster_weight, faster_tau, slower_weight, slower_tau = popt else: slower_weight, slower_tau, faster_weight, faster_tau = popt # These are all empirical values if np.abs(faster_weight) > np.abs(slower_weight): tau = faster_tau elif (slower_tau - faster_tau) / slower_tau <= 0.1: # close enough; just use slower tau = slower_tau elif slower_tau > tau_limit and slower_weight / faster_weight < 2.0: tau = faster_tau else: tau = slower_tau return tau
def _dbl_exp_fit(y0, x, A1, tau1, A2, tau2): penalty = 0 if tau1 < 0 or tau2 < 0: penalty = 1e6 return y0 + A1 * np.exp(-x / tau1) + A2 * np.exp(-x / tau2) + penalty
[docs]def estimate_adjusted_detection_parameters(v_set, t_set, interval_start, interval_end, filter=10): """ Estimate adjusted values for spike detection by analyzing a period when the voltage changes quickly but passively (due to strong current stimulation), which can result in spurious spike detection results. Parameters ---------- v_set : list of numpy arrays of voltage time series in mV t_set : list of numpy arrays of times in seconds interval_start : start of analysis interval (sec) interval_end : end of analysis interval (sec) Returns ------- new_dv_cutoff : adjusted dv/dt cutoff (V/s) new_thresh_frac : adjusted fraction of avg upstroke to find threshold """ if type(v_set) is not list: v_set = list(v_set) if type(t_set) is not list: t_set = list(t_set) if len(v_set) != len(t_set): raise er.FeatureError("t_set and v_set must be lists of equal size") if len(v_set) == 0: raise er.FeatureError("t_set and v_set are empty") start_index = tsu.find_time_index(t_set[0], interval_start) end_index = tsu.find_time_index(t_set[0], interval_end) maxes = [] ends = [] dv_set = [] for v, t in zip(v_set, t_set): dv = tsu.calculate_dvdt(v, t, filter) dv_set.append(dv) maxes.append(dv[start_index:end_index].max()) ends.append(dv[end_index]) maxes = np.array(maxes) ends = np.array(ends) cutoff_adj_factor = 1.1 thresh_frac_adj_factor = 1.2 new_dv_cutoff = np.median(maxes) * cutoff_adj_factor min_thresh = np.median(ends) * thresh_frac_adj_factor all_upstrokes = np.array([]) for v, t, dv in zip(v_set, t_set, dv_set): putative_spikes = spkd.detect_putative_spikes(v, t, dv_cutoff=new_dv_cutoff, filter=filter) peaks = spkd.find_peak_indexes(v, t, putative_spikes) putative_spikes, peaks = spkd.filter_putative_spikes(v, t, putative_spikes, peaks, dvdt=dv, filter=filter) upstrokes = spkd.find_upstroke_indexes(v, t, putative_spikes, peaks, dvdt=dv) if upstrokes.size: all_upstrokes = np.append(all_upstrokes, dv[upstrokes]) new_thresh_frac = min_thresh / all_upstrokes.mean() return new_dv_cutoff, new_thresh_frac