Source code for ipfx.attach_metadata.sink.nwb2_sink

"""Sink for appending to an NWBFile (not in place)
"""
import io
from pathlib import Path
from typing import (
    List, Dict, Any, Set, Optional, Union
)
import datetime

import pynwb
import h5py
import hdmf

from ipfx.attach_metadata.sink.metadata_sink import (
    MetadataSink, OneOrMany
)


PathLike = Union[
    str, 
    Path
]


[docs]class Nwb2Sink(MetadataSink): """ A metadata sink which modifies an NWBFile (in-memory representation of an NWB 2 file) """ @property def targets(self) -> List[Dict[str, Any]]: return self._targets @property def supported_cell_fields(self) -> Set[str]: return { # TODO update list based on dandi reqs "subject_id", "institution", "electrode_id", "electrode_resistance", "session_id", "age", "genotype", "sex", "species", "date_of_birth" } @property def supported_sweep_fields(self) -> Set[str]: return { # TODO update list based on dandi reqs "gain", } def __init__( self, nwb_path: Optional[PathLike] ): self._targets: List[Dict[str, Any]] = [] if nwb_path is not None: self._initial_load_nwbfile(nwb_path) def _initial_load_nwbfile(self, nwb_path: PathLike): """Reads an nwbfile from an argued path into memory Parameters ---------- nwb_path : points to an h5 nwb file. """ with open(nwb_path, "rb") as file_: self._data = io.BytesIO(file_.read()) self._reload_nwbfile() def _reload_nwbfile(self): """ Construct an nwbfile from this object's _data buffer. """ self._h5_file = h5py.File(self._data, "r+") self._nwb_io = pynwb.NWBHDF5IO( path=self._h5_file.filename, mode="r+", file=self._h5_file, load_namespaces=True ) self.nwbfile = self._nwb_io.read() def _commit_nwb_changes(self): """Write this sink's nwbfile to its _data buffer. After calling this method, further modifications of this sink's NWBFile **WILL NOT** be recorded until _reload_nwbfile """ set_container_sources(self.nwbfile, self._h5_file.filename) self.nwbfile.set_modified(True) # Because the NWB schema versions of NWB data produced by MIES are older # we do not want to cache the newer schema versions that IPFX is currently using # WARNING: Doing this may introduce fragility down the road though if IPFX writes NWB fields # that require the newer schema versions... self._nwb_io.write(self.nwbfile, cache_spec=False) self._nwb_io.close() self._h5_file.close() def _get_single_ic_electrode(self) -> pynwb.icephys.IntracellularElectrode: """Find the unique electrode used during this session. Returns ------- electrode object Raises ------ ValueError : If there is not exactly 1 intracellular electrode in this file. """ keys = list(self.nwbfile.ic_electrodes.keys()) if len(keys) != 1: raise ValueError( "expected exactly 1 intracellular electrode, found " f"{len(keys)}" ) electrode = self.nwbfile.icephys_electrodes[keys[0]] electrode.set_modified(True) return electrode def _get_sweep_series( self, sweep_id: int ) -> List[pynwb.icephys.PatchClampSeries]: """ Obtain the PatchClampSeries object corresponding to this sweep id Parameters ---------- sweep_id : Unique identifier for this sweep Returns ------- A collection of PatchClampSeries object for this sweep """ return self.nwbfile.sweep_table.get_series(sweep_id) def _get_subject(self) -> pynwb.file.Subject: """Obtain this NWBFile's subject field, constructing it if needed Returns ------- The NWBFile's (potentially newly created) subject field """ if self.nwbfile.subject is None: self.nwbfile.subject = pynwb.file.Subject() self.nwbfile.subject.set_modified(True) return self.nwbfile.subject
[docs] def register(self, name: str, value: Any, sweep_id: Optional[int] = None): """ Attaches a named piece of metadata to this sink's internal store. Should dispatch to a protected method which carries out appropriate validations and transformations. Parameters ---------- name : the well-known name of the metadata value : the value of the metadata (before any required transformations) sweep_id : If provided, this will be interpreted as sweep-level metadata and sweep_id will be used to identify the sweep to which value ought to be attached. If None, this will be interpreted as cell-level metadata Raises ------ ValueError : An argued piece of metadata is not supported by this sink """ if sweep_id is None: if name == "subject_id": self._get_subject().subject_id = value elif name == "institution": self.nwbfile.institution = value elif name == "electrode_id": self._get_single_ic_electrode().name = str(value) elif name == "electrode_resistance": self._get_single_ic_electrode().resistance = value elif name == "session_id": self.nwbfile.session_id = value elif name == "age": self._get_subject().age = value elif name == "genotype": self._get_subject().genotype = value elif name == "sex": self._get_subject().sex = value elif name == "species": self._get_subject().species = value elif name == "date_of_birth": self._get_subject().date_of_birth = datetime.datetime.strptime(value, f"%Y-%m-%d %H:%M:%S %z") else: self._cant_attach(name, sweep_id) elif isinstance(sweep_id, int): all_series = self._get_sweep_series(sweep_id) for series in all_series: if name == "gain": series.gain = value else: self._cant_attach(name, sweep_id) else: self._cant_attach(name, sweep_id)
def _cant_attach(self, name: str, sweep_id: Optional[int]): """Helper - raises if attachment of a particular field is not supported """ raise ValueError( "unable to attach metadata field: " f"{name} (sweep_id: {sweep_id})" )
[docs] def serialize(self, targets: Optional[OneOrMany[Dict[str, Any]]] = None): """ Writes this sink's data to an external target or targets. Does not modify this sink. Parameters ---------- targets : If provided, these targets will be written to. Otherwise, write to targets previously defined by register_target. """ self._commit_nwb_changes() for target in self._ensure_plural_targets(targets): with open(target["output_path"], "wb") as file_: file_.write(self._data.getvalue()) self._reload_nwbfile()
[docs]def set_container_sources( container: hdmf.container.AbstractContainer, source: str ): """Traverse an NWBFile starting at a given container, setting the container_source attribute inplace on each container. Parameters ---------- container : container_source will be set on this object as well as on each of its applicable children. source : The new value of container source """ children = [container] while children: current = children.pop() # 💀💀💀 # container_source is set on write, but cannot be overrwritten, making # read -> modify -> write elsewhere # pretty tricky! # this is a fragile workaround if hasattr(current, "_AbstractContainer__container_source"): setattr( current, "_AbstractContainer__container_source", source ) if hasattr(current, "children"): children.extend(current.children)