import os
import logging
import pg8000
from allensdk.core.authentication import credential_injector
from allensdk.core.auth_config import LIMS_DB_CREDENTIAL_MAP
from ipfx.string_utils import to_str
TIMEOUT = os.environ.get(
"IPFX_LIMS_TIMEOUT",
os.environ.get(
"IPFX_TEST_TIMEOUT",
None
)
)
if TIMEOUT is not None:
TIMEOUT = float(TIMEOUT) # type: ignore
@credential_injector(LIMS_DB_CREDENTIAL_MAP)
def _connect(user, host, dbname, password, port, timeout=TIMEOUT):
conn = pg8000.connect(
user=user,
host=host,
database=dbname,
password=password,
port=int(port),
timeout=timeout
)
return conn, conn.cursor()
[docs]def able_to_connect_to_lims():
try:
conn, cursor = _connect()
cursor.close()
conn.close()
except pg8000.Error:
# the connection failed
return False
except TypeError:
# a credential was missing
return False
return True
def _select(cursor, query, parameters=None):
if parameters is None:
cursor.execute(query)
else:
pg8000.paramstyle = 'numeric'
cursor.execute(query, parameters)
columns = [ to_str(d[0]) for d in cursor.description ]
return [ dict(zip(columns, c)) for c in cursor.fetchall() ]
[docs]def query(query, parameters=None):
conn, cursor = _connect()
try:
results = _select(cursor, query, parameters=parameters)
finally:
cursor.close()
conn.close()
return results
[docs]def get_sweep_states(specimen_id):
sweep_states = []
res = query("""
select sweep_number, workflow_state from ephys_sweeps
where specimen_id = %d
""" % specimen_id)
for sweep in res:
# only care about manual calls
if sweep["workflow_state"] == "manual_passed":
sweep_states.append({'sweep_number': sweep["sweep_number"],
'passed': True})
elif sweep["workflow_state"] == "manual_failed":
sweep_states.append({'sweep_number': sweep["sweep_number"],
'passed': False})
return sweep_states
[docs]def get_stimuli_description():
stims = query("""
select ersn.name as stimulus_code, est.name as stimulus_name from ephys_raw_stimulus_names ersn
join ephys_stimulus_types est on ersn.ephys_stimulus_type_id = est.id
""")
return stims
[docs]def get_specimen_info_from_lims_by_id(specimen_id):
result = query("""
SELECT s.name, s.ephys_roi_result_id, s.id
FROM specimens s
WHERE s.id = %s
""" % specimen_id)
if len(result) == 0:
logging.info("No result from query to find specimen information")
return None, None, None
result = result[0]
if result:
return result["name"], result["ephys_roi_result_id"], result["id"]
else:
logging.info("Could not find specimen {:d}".format(specimen_id))
return None, None, None
[docs]def get_nwb_path_from_lims(ephys_roi_result):
"""
Try to find NWBIgor file preferentially
If not found, look for a processed NWB file
well known file type ID for NWB files is 475137571
well known file type ID for NWBIgor files is 570280085
Parameters
----------
ephys_roi_result: int
Returns
-------
full path of the nwb file
"""
result = query("""
SELECT f.filename, f.storage_directory FROM well_known_files f
WHERE f.attachable_type = 'EphysRoiResult' AND f.attachable_id = %s AND f.well_known_file_type_id = 570280085
""" % (ephys_roi_result,))
if len(result) == 0:
logging.warning("Fall back to looking for NWB type")
result = query("""
SELECT f.filename, f.storage_directory FROM well_known_files f
WHERE f.attachable_type = 'EphysRoiResult' AND f.attachable_id = %s AND f.well_known_file_type_id = 475137571
""" % (ephys_roi_result,))
result = result[0]
if result:
nwb_path = result["storage_directory"] + result["filename"]
return nwb_path
else:
logging.info("Cannot find NWB file")
return None
[docs]def get_igorh5_path_from_lims(ephys_roi_result):
sql = """
SELECT f.filename, f.storage_directory
FROM well_known_files f
WHERE f.attachable_type = 'EphysRoiResult'
AND f.attachable_id = %s
AND f.well_known_file_type_id = 306905526
""" % ephys_roi_result
result = query(sql)
if len(result) == 0:
logging.info("No result from query to find Igor H5 file")
return None
result = result[0]
if result:
h5_path = result["storage_directory"] + result["filename"]
return h5_path
else:
logging.info("Cannot find Igor H5 file")
return None
[docs]def project_specimen_ids(project, passed_only=True):
SQL = """
SELECT sp.id FROM specimens sp
JOIN ephys_roi_results err ON sp.ephys_roi_result_id = err.id
JOIN projects prj ON prj.id = sp.project_id
WHERE prj.code = '%s'
""" % project
if passed_only:
SQL += "AND err.workflow_state = 'manual_passed'"
results = query(SQL)
sp_ids = [d["id"] for d in results]
return sp_ids