Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pmt pulse injectors #102

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dspeed/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
from .optimize import optimize_1pz, optimize_2pz
from .param_lookup import param_lookup
from .peak_snr_threshold import peak_snr_threshold
from .pmt_pulse_injector import inject_general_logistic, inject_gumbel
from .pole_zero import double_pole_zero, pole_zero
from .poly_fit import poly_diff, poly_exp_rms, poly_fit
from .presum import presum
Expand Down Expand Up @@ -193,4 +194,6 @@
"classification_layer_no_bias",
"classification_layer_with_bias",
"normalisation_layer",
"inject_general_logistic",
"inject_gumbel",
]
123 changes: 123 additions & 0 deletions src/dspeed/processors/pmt_pulse_injector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import annotations

import numpy as np
from numba import guvectorize

from ..utils import numba_defaults_kwargs as nb_kwargs


@guvectorize(
[
"void(float32[:], float32, float32, float32, float32[:])",
"void(float64[:], float64, float64, float64, float64[:])",
],
"(n),(),(),()->(n)",
**nb_kwargs,
)
def inject_gumbel(
wf_in: np.ndarray, a: float, t0: float, beta: float, wf_out: np.ndarray
) -> None:
"""
Injects a Gumbel distribution into the waveform `wf_in`, modifying it in place in `wf_out`.

Parameters:
- wf_in: Input waveform (1D array).
- a: Amplitude of the Gumbel distribution.
- t0: Temporal centroid of the Gumbel distribution.
- beta: Scale parameter (controls spread of the Gumbel distribution).
- wf_out: Output waveform (1D array), modified by adding the Gumbel distribution.
"""

wf_out[:] = np.nan

# Early exit if any of the inputs contain NaN values (invalid inputs).
if np.isnan(wf_in).any() or np.isnan(a) or np.isnan(t0) or np.isnan(beta):
return

wf_out[:] = wf_in[:]

# Define the range of indices over which the Gumbel distribution will be applied.
# Start injecting the distribution at 2 beta below the centroid (t0).
start = t0
mu = t0 + (2 * beta)
end = mu + (8 * beta)

# Ensure the range is within valid waveform boundaries.
if start < 0:
start = 0
if end > len(wf_in):
end = len(wf_in)

# Loop through the specified range and add the Gumbel distribution to wf_out.
for i in range(start, end):

z = (i - mu) / beta
wf_out[i] += (a / beta) * np.exp(-(z + np.exp(-z)))


@guvectorize(
[
"void(float32[:], float32, float32, float32, float32, float32, float32, float32[:])",
"void(float64[:], float64, float64, float64, float64, float64, float64, float64[:])",
],
"(n),(),(),(),(),(),()->(n)",
**nb_kwargs,
)
def inject_general_logistic(
wf_in: np.ndarray,
a: float,
t0: float,
rt: float,
q: float,
v: float,
decay: float,
wf_out: np.ndarray,
) -> None:
r"""Inject sigmoid pulse into existing waveform to simulate pileup.

.. math::

s(t) = \frac{A}{(1 + q \exp[-4 \log(99) (t - t_0 - t_r/2) / t_r])^{1/v}}
e^{-(t-t_0)/\tau}

Parameters
ggmarshall marked this conversation as resolved.
Show resolved Hide resolved
----------
wf_in
the input waveform.
a
the amplitude :math:`A` of the injected waveform.
t0
the position :math:`t_0` of the injected waveform.
rt
the rise time :math:`t_r` of the injected waveform.
q
shaping parameter of the logistic function.
v
shaping parameter of the logistic function.
decay
the decay parameter :math:`\tau` of the injected waveform.
wf_out
the output waveform.
"""

wf_out[:] = np.nan

# Early exit if any of the inputs contain NaN values (invalid inputs).
if (
np.isnan(wf_in).any()
or np.isnan(a)
or np.isnan(t0)
or np.isnan(rt)
or np.isnan(q)
or np.isnan(v)
or np.isnan(decay)
):
return

wf_out[:] = wf_in[:]
rise = 4 * np.log(99) / rt

for t in range(len(wf_out)):
wf_out[t] = wf_out[t] + a / (
(1 + q * np.exp(-rise * (t - t0 - rt / 2))) ** (1 / v)
) * np.exp(-(1 / decay) * (t - t0))
Loading