Skip to content

Commit

Permalink
Add step detection sAHP eCode (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
DrTaDa authored Feb 3, 2023
1 parent 99f850d commit 43ea0ab
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 14 deletions.
7 changes: 6 additions & 1 deletion bluepyefe/ecode/ramp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ def interpret(self, t, current, config_data, reader_data):
# Smooth the current
smooth_current = scipy_signal2d(current, 85)

self.set_timing_ecode(["ton", "toff"], config_data)
self.set_timing_ecode(["ton"], config_data)

if "toff" in config_data and config_data["toff"] is not None:
self.toff = int(round(config_data["toff"] / self.dt))
else:
self.toff = numpy.argmax(smooth_current[self.ton:]) + self.ton

hypamp_value = base_current(current)
self.set_amplitudes_ecode("hypamp", config_data, reader_data, hypamp_value)
Expand Down
109 changes: 96 additions & 13 deletions bluepyefe/ecode/sAHP.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from ..recording import Recording
from .tools import scipy_signal2d
from .tools import base_current

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,16 +80,10 @@ def get_stimulus_parameters(self):
}
return ecode_params

def interpret(self, t, current, config_data, reader_data):
"""Analyse a current array and extract from it the parameters
needed to reconstruct the array"""
self.dt = t[1]
def compute_amp(self, current, config_data, reader_data):

# Smooth the current
smooth_current = scipy_signal2d(current, 85)

self.set_timing_ecode(["ton", "tmid", "tmid2", "toff"], config_data)

hypamp_value = numpy.median(
numpy.concatenate(
(smooth_current[: self.ton], smooth_current[self.toff :])
Expand All @@ -99,8 +94,8 @@ def interpret(self, t, current, config_data, reader_data):
amp_value = numpy.median(
numpy.concatenate(
(
smooth_current[self.ton : self.tmid],
smooth_current[self.tmid2 : self.toff],
smooth_current[self.ton: self.tmid],
smooth_current[self.tmid2: self.toff],
)
)
) - self.hypamp
Expand All @@ -109,10 +104,98 @@ def interpret(self, t, current, config_data, reader_data):
amp2_value = numpy.median(smooth_current[self.tmid : self.tmid2]) - self.hypamp
self.set_amplitudes_ecode("amp2", config_data, reader_data, amp2_value)

# Converting back to ms
for name_timing in ["ton", "tmid", "tmid2", "toff"]:
self.index_to_ms(name_timing, t)
self.tend = len(t) * self.dt
def step_detection(self, current, config_data, reader_data):

# Set the threshold to detect the step
noise_level = numpy.std(numpy.concatenate((self.current[:50], self.current[-50:])))
step_threshold = numpy.clip(4.5 * noise_level, 1e-5, numpy.max(self.current))

# The buffer prevent miss-detection of the step when artifacts are
# present at the very start or very end of the current trace
buffer_detect = 2.0
idx_buffer = int(buffer_detect / self.dt)
idx_buffer = max(1, idx_buffer)

buffer_step = 50
smooth_current = scipy_signal2d(current, 85)

# Infer the beginning of the long step
self.hypamp = base_current(current)

if "ton" in config_data and config_data["ton"] is not None:
self.ton = int(round(config_data["ton"] / self.dt))
else:
tmp_current = numpy.abs(smooth_current[idx_buffer:] - self.hypamp)
self.ton = idx_buffer + numpy.argmax(tmp_current > step_threshold)

# Infer the end of the long step
tmp_current = numpy.flip(
numpy.abs(smooth_current[self.ton:-idx_buffer] - self.hypamp)
)
self.toff = (
(len(current) - numpy.argmax(tmp_current > step_threshold)) - 1 - idx_buffer
)

# Get the amplitude of the step current (relative to hypamp)
self.amp = numpy.median(
numpy.concatenate((smooth_current[self.ton:self.ton + 50],
smooth_current[self.toff - 50:self.toff]))
) - self.hypamp

# Infer the beginning of the short step
tmp_current = numpy.abs(
smooth_current[self.ton + buffer_step:self.toff - buffer_step] -
self.amp - self.hypamp
)
self.tmid = self.ton + buffer_step + numpy.argmax(tmp_current > step_threshold)

# Infer the end of the long step
tmp_current = numpy.flip(
numpy.abs(
smooth_current[self.ton + buffer_step:self.toff - 50] -
self.amp - self.hypamp
)
)
self.tmid2 = (
(self.toff - numpy.argmax(tmp_current > step_threshold)) - 1 - buffer_step
)

self.amp2 = numpy.median(smooth_current[self.tmid:self.tmid2]) - self.hypamp

# Converting back ton and toff to ms
self.ton = self.t[int(round(self.ton))]
self.toff = self.t[int(round(self.toff))]
self.tmid = self.t[int(round(self.tmid))]
self.tmid2 = self.t[int(round(self.tmid2))]
self.tend = len(self.t) * self.dt

# Check for some common step detection failures when the current
# is constant.
if self.ton > self.toff or self.ton > self.tend or \
self.toff > self.tend or self.tmid == self.ton \
or self.tmid2 == self.toff:

self.ton = 0.
self.toff = self.tend

logger.warning(
"The automatic step detection failed for the recording "
f"{self.protocol_name} in files {self.files}. You should "
"specify ton and toff by hand in your files_metadata "
"for this file."
)

def interpret(self, t, current, config_data, reader_data):
"""Analyse a current with a step and extract from it the parameters
needed to reconstruct the array"""

self.dt = t[1]

if "ton" in config_data and "tmid" in config_data:
self.set_timing_ecode(["ton", "tmid", "tmid2", "toff"], config_data)
self.compute_amp(current, config_data, reader_data)
else:
self.step_detection(current, config_data, reader_data)

def generate(self):
"""Generate the current array from the parameters of the ecode"""
Expand Down

0 comments on commit 43ea0ab

Please sign in to comment.