-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtemplate.py
359 lines (306 loc) · 13.1 KB
/
template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import numpy as np
import numba
from scipy import signal
import npzload
import firstbelowthreshold
from single_filter_analysis import single_filter_analysis
import argminrelmin
@numba.njit(cache=True)
def meanat(x, t, l):
"""
x : array (N, M)
t : int array (N,)
l : int
out : array (N,)
The mean of x from t to t+l.
"""
out = np.empty(len(x))
for i, a in enumerate(x):
out[i] = np.mean(a[t[i]:t[i] + l])
return out
@numba.njit(cache=True)
def vecmeanat(x, x0, mask, t, l):
"""
x : array (N, M)
x0 : array (N,)
mask : bool array (N,)
t : int array (N,)
l : int
out : array (l,)
The mean of x - x0 from t to t+l, only where mask.
"""
out = np.zeros(l)
count = 0
for i, a in enumerate(x):
if mask[i]:
out += a[t[i]:t[i] + l] - x0[i]
count += 1
out /= count
return out
class Template(npzload.NPZLoad):
"""
Class to make a signal template.
Class methods
-------------
load : load a template from a file.
from_lngs : make a template from LNGS data.
Methods
-------
generate : generate signal waveforms from the template.
matched_filter_template : make a template for the matched filter.
maxoffset : return the position of the peak of the template.
max : amplitude of the template.
save : save the template to file.
Properties
----------
template_length : the length of the 1 GSa/s template.
Instance variables
------------------
baseline : scalar
The average baseline.
"""
def __init__(self):
raise NotImplementedError('use a class method to construct Template objects')
@property
def template_length(self):
"""
The length, in ns, of the 1 GSa/s template used to generate waveforms.
"""
return self.templates.shape[1]
@classmethod
def from_lngs(cls, data, length, mask=None, trigger=None, fig=None):
"""
Compute a template from 1 p.e. signals in an LNGS wav.
Parameters
----------
data : array (nevents, nchannels, 15001)
Wav data as read by readwav.readwav(). If it has two channels, the
second channel is the trigger. If there is only one channel,
specify the trigger position with `trigger`.
length : int
Number of samples of the template (@ 1 GSa/s), starting from the
beginning of the trigger impulse.
mask : bool array (nevents,), optional
Mask for the `data` array.
trigger : int, optional
Position of the trigger start in the events. If not specified,
the trigger is read from the second channel of `data`. If specified,
it supersedes `data` even with two channels.
fig : matplotlib figure, optional
If provided, the fingerplot used to select 1 pe pulses is plotted
here.
Return
------
self : Template
A template object.
"""
self = cls.__new__(cls)
if mask is None:
mask = np.ones(len(data), bool)
# Find the trigger.
if trigger is None:
hastrigger = True
trigger = firstbelowthreshold.firstbelowthreshold(data[:, 1], 600)
else:
hastrigger = False
trigger = np.full(len(data), trigger)
# Find spurious signals.
baseline_zone = data[:, 0, :np.min(trigger) - 1000]
spurious = firstbelowthreshold.firstbelowthreshold(baseline_zone, 700) >= 0
mask = mask & ~spurious
# Count photoelectrons using the average.
baseline = np.median(baseline_zone, axis=-1)
value = meanat(data[:, 0], trigger - 21, 1500)
corr_value = baseline - value
snr, center, width = single_filter_analysis(corr_value[mask], return_full=True, fig1=fig)
assert len(center) > 2, len(center)
if snr == 0:
print(f'warning: 0 and 1 pe peaks may have unexpected position')
oursnr = center[1] / width[0]
minsnr = 5
assert oursnr >= minsnr, f'SNR = {oursnr:.3g} < {minsnr}'
# Select the data corresponding to 1 photoelectron.
lower = (center[0] + center[1]) / 2
upper = (center[1] + center[2]) / 2
selection = (lower < corr_value) & (corr_value < upper) & mask
if fig is not None:
ax, = fig.get_axes()
ax.axvspan(lower, upper, color='#f55', zorder=-1, label='selection for template')
ax.legend(loc='upper right')
# Compute the waveform as the mean of the signals.
mtrig = np.full(len(trigger), np.median(trigger))
template = vecmeanat(data[:, 0], baseline, selection, mtrig, length)
# Do a 2-sample moving average to remove the nyquist noise.
tcs = np.pad(np.cumsum(template), (1, 0))
filttempl = (tcs[2:] - tcs[:-2]) / 2
# Redo the template aligning with the cross correlation filter.
delta = 100
t = trigger[0]
filtered = signal.fftconvolve(data[selection, 0, t - delta:t + delta + length - 1], -filttempl[None, ::-1], axes=-1, mode='valid')
indices = np.flatnonzero(selection)
assert filtered.shape == (len(indices), 2 * delta + 1)
idx = argminrelmin.argminrelmin(filtered, axis=-1)
selection[indices] &= idx >= 0
start = np.zeros(len(data), int)
start[indices] = t - delta + idx
template_aligned = vecmeanat(data[:, 0], baseline, selection, start, length)
# Redo the template aligning with the trigger, if available.
templates = [template, template_aligned]
if hastrigger:
template_trigger = vecmeanat(data[:, 0], baseline, selection, trigger, length)
templates.append(template_trigger)
self.templates = np.stack(templates)
self.template_rel_std = np.sqrt(width[1] ** 2 - width[0] ** 2) / center[1]
self.template_N = np.sum(selection)
# For the moving average.
self._cumsum_templates = np.pad(np.cumsum(self.templates, axis=1), [(0,0),(1,0)])
# Compute the noise standard deviation.
STDs = np.std(baseline_zone, axis=1)
self.noise_std = np.mean(STDs[mask])
# Compute the baseline distribution.
bs = baseline[mask]
self.baseline = np.mean(bs)
self.baseline_std = np.std(bs)
# Save filter-aligned template start distribution.
trigarray = start[selection]
self.start_min = np.min(trigarray)
self.start_max = np.max(trigarray)
self.start_median = np.median(trigarray)
return self
def _ma_template(self, n, idx=0):
"""apply a n-moving average to the 1 GSa/s template"""
cs = self._cumsum_templates[idx]
x = (cs[n:] - cs[:-n]) / n
return x
def _gettemplidx(self, aligned):
if aligned == 'trigger' and len(self.templates) < 3:
raise ValueError('trigger-aligned template not available')
if aligned == 'trigger':
return 2
elif aligned:
return 1
else:
return 0
def generate(self, event_length, signal_loc, generator=None, randampl=True, timebase=8, aligned=False):
"""
Simulate signals.
Parameters
----------
event_length : int
Number of samples in each event.
signal_loc : float array (nevents,)
An array of positions of the signal in each event. The position is
in unit of samples but it can be non-integer. The position is the
beginning of the template.
generator : np.random.Generator, optional
Random number generator.
randampl : bool
If True (default), vary the amplitude of signals.
timebase : int
Duration of a sample in nanoseconds. Default is 8 i.e. 125 MSa/s.
aligned : {False, True, 'trigger'}
If False, use as template the average done on subwaveforms at a
fixed offset from the event start. If True, align before averaging
using a filter. If 'trigger', align with the laser trigger.
Return
------
out : array (nevents, event_length)
Simulated signals.
"""
# TODO alignment bug: when signal_loc=[0] the template starts from
# the second sample.
signal_loc = np.asarray(signal_loc)
nevents = len(signal_loc)
out = np.zeros((nevents, event_length))
loc_int = np.array(np.floor(signal_loc), int)
loc_ns = np.array(np.floor(signal_loc * timebase), int) % timebase
loc_subns = (signal_loc * timebase) % 1
idx = self._gettemplidx(aligned)
templ = self._ma_template(timebase, idx)
tlen = ((len(templ) - 1) // timebase) * timebase
indices0 = np.arange(nevents)[:, None]
indices1 = loc_int[:, None] + np.arange(tlen // timebase)
tindices = 1 - loc_ns[:, None] + np.arange(0, tlen, timebase)
weight = loc_subns[:, None]
out[indices0, indices1] = (1 - weight) * templ[tindices] + weight * templ[tindices - 1]
if randampl:
if generator is None:
generator = np.random.default_rng()
out *= 1 + self.template_rel_std * generator.standard_normal((nevents, 1))
return out
def max(self, timebase=8, aligned=False):
"""
Compute the average maximum amplitude of the signals generated by
`generate` if the signal location varies randomly.
Parameters
----------
timebase : int
Duration of a sample in nanoseconds. Default is 8 i.e. 125 MSa/s.
aligned : {False, True, 'trigger'}
If False, use as template the average done on subwaveforms at a
fixed offset from the event start. If True, align before averaging
using a filter. If 'trigger', align with the laser trigger.
Return
------
ampl : scalar
The average maximum signal amplitude (positive).
"""
event_length = 2 + self.templates.shape[1] // timebase
signal_loc = np.linspace(0, 1, 101)[:-1]
signals = self.generate(event_length, signal_loc, randampl=False, timebase=timebase, aligned=aligned)
return np.mean(np.max(np.abs(signals), axis=1))
def maxoffset(self, timebase=8, aligned=False):
"""
Time from the start of the template to the maximum.
Parameters
----------
timebase : int
The unit of time in nanoseconds.
aligned : {False, True, 'trigger'}
If False, use as template the average done on subwaveforms at a
fixed offset from the event start. If True, align before averaging
using a filter. If 'trigger', align with the laser trigger.
"""
idx = self._gettemplidx(aligned)
return np.argmax(np.abs(self.templates[idx])) / timebase
def matched_filter_template(self, length, norm=True, timebase=8, aligned=False):
"""
Return a truncated template for the matched filter. The template slice
is chosen to maximize its vector norm.
Parameters
----------
length : int
Number of samples of the template.
norm : bool, optional
If True (default) the template is normalized to unit sum, so that
the output from the matched filter is comparable to the output from
a moving average filter.
timebase : int
The original template is at 1 GSa/s. The returned template is
downsampled by this factor. Default is 8 (125 MSa/s).
aligned : {False, True, 'trigger'}
If False, use as template the average done on subwaveforms at a
fixed offset from the event start. If True, align before averaging
using a filter. If 'trigger', align with the laser trigger.
Return
------
template : float array (length,)
The template.
offset : float
The offset in unit of sample number of the returned template from
the beginning of the template used to generate the fake signals to
the beginning of the returned template.
"""
len1ghz = timebase * length
assert len1ghz <= self.templates.shape[1]
idx = self._gettemplidx(aligned)
template = self.templates[idx]
cs = np.pad(np.cumsum(template ** 2), (1, 0))
s = cs[len1ghz:] - cs[:-len1ghz] # s[j] = sum(template[j:j+len1ghz]**2)
offset1ghz = np.argmax(s)
offset = offset1ghz / timebase
template = template[offset1ghz:offset1ghz + len1ghz]
template = np.mean(template.reshape(-1, timebase), axis=1)
if norm:
template /= np.sum(template)
return template, offset