-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtoy.py
1615 lines (1352 loc) · 59.3 KB
/
toy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Module to run a simulation of photodetection signals and test various filters.
Classes
-------
Toy : the main class to run the simulations
Noise : abstract class to generate noise, see concrete subclasses
WhiteNoise : generate white noise
DataCycleNoise : generate noise copying it from a source array
Filter : class to apply filters
Functions
---------
downsample : downsample by averaging in groups
"""
import abc
import numpy as np
from matplotlib import pyplot as plt
import numba
import tqdm
import uproot
from scipy import signal
import integrate
from single_filter_analysis import single_filter_analysis
import readwav
import textbox
import npzload
import template as _template
import colormap
def downsample(a, n, axis=-1, dtype=None):
"""
Downsample an array by averaging nearby elements.
Parameters
----------
a : array
The array to downsample.
n : int
The number of averaged elements per group.
axis : int
The axis along which the averaging is computed. Default last.
dtype : data-type, optional
The data type of the output.
Return
------
da : array
The downsampled array. The shape is the same as `a` apart from the
specified axis, which has size a.shape[axis] // n.
"""
if n == 1:
return np.asarray(a, dtype=dtype)
length = a.shape[axis]
axis %= len(a.shape)
trunc_length = length - length % n
idx = (slice(None),) * axis
idx += (slice(0, trunc_length),)
idx += (slice(None),) * (len(a.shape) - axis - 1)
shape = tuple(a.shape[i] for i in range(axis))
shape += (trunc_length // n, n)
shape += tuple(a.shape[i] for i in range(axis + 1, len(a.shape)))
return np.mean(np.reshape(a[idx], shape), axis=axis + 1, dtype=dtype)
class Noise(metaclass=abc.ABCMeta):
"""
Abstract base class for generating noise for simulations.
Concrete subclasses
-------------------
WhiteNoise
DataCycleNoise
Methods
-------
generate : generate an array of noise
"""
def __init__(self, timebase=8):
"""
Parameters
----------
timebase : int
The duration of samples in nanoseconds. Default is 8, i.e. the
sampling frequency of the waveform returned by `generate` is
125 MSa/s.
"""
self.timebase = timebase
@abc.abstractmethod
def generate(self, nevents, event_length, generator=None):
"""
Generate noise with unitary variance.
Parameters
----------
nevents : int
Number of events i.e. independent chunks of simulated data.
event_length : int
Number of samples of each event.
generator : np.random.Generator, optional
Random number generator.
Return
------
events : array (nevents, event_length)
Simulated noise.
"""
pass
class DataCycleNoise(Noise):
def __init__(self, timebase=8, allow_break=False, chunk_skip=None, maxcycles=None):
"""
Class to generate noise cycling through actual noise data.
Parameters
----------
timebase : int
The duration of samples in nanoseconds. Default is 8, i.e. the
sampling frequency of the waveform returned by `generate` is
125 MSa/s.
allow_break : bool
Default False. If True, the event length can be longer than the
noise chunks obtained from data, but there may be breaks in the
events where one sample is not properly correlated with the next.
chunk_skip : int, optional
By default each noise event is copied from a different noise data
chunk to avoid correlations between events. If chunk_skip is
specified, multiple events can be taken from the same chunk,
skipping chunk_skip samples between each event.
maxcycles : int, optional
By default `generate` can reuse the same chunk for different events
if there's not enough noise data for all events. maxcycles, if
specified, set the maximum number of times any chunk can be reused.
Once the limit is hit an exception is raised and the counter is
reset.
Members
-------
allow_break : bool
The parameter given at initialization. Can be changed directly.
Methods
-------
generate
load_proto0_root_file
load_LNGS_wav
save
load
Properties
----------
noise_array
"""
self.timebase = timebase
self.allow_break = allow_break
self.chunk_skip = chunk_skip
self.maxcycles = maxcycles
self.cycle = 0
@property
def noise_array(self):
"""
An array (nchunks, chunk_length) of noise. `generate` uses a chunk for
each event generated, cycling in order through chunks.
"""
if not hasattr(self, '_noise_array'):
raise RuntimeError('Noise information not loaded')
return self._noise_array
@noise_array.setter
def noise_array(self, val):
mean = np.mean(val, axis=1, keepdims=True)
std = np.std(val, axis=1, keepdims=True)
self._noise_array = (val - mean) / std
def generate(self, nevents, event_length, generator=None):
nchunks, chunklen = self.noise_array.shape
if not self.allow_break and event_length > chunklen:
raise ValueError(f'Event length {event_length} > maximum {chunklen}')
if event_length > chunklen // 2 or self.chunk_skip is None:
# each event uses one or more chunks
chperev = int(np.ceil(event_length / chunklen))
assert chperev <= nchunks
newnchunks = (nchunks // chperev) * chperev
noise_array = self.noise_array[:newnchunks].reshape(newnchunks // chperev, chperev * chunklen)
cycle = int(np.ceil(self.cycle / chperev))
nextcycle = (cycle + nevents) * chperev
indices = (cycle + np.arange(nevents)) % len(noise_array)
events = noise_array[:, :event_length][indices]
else:
# each chunk is used for one or more events
effevlen = event_length + self.chunk_skip
evperch = (chunklen + self.chunk_skip) // effevlen
cycle = self.cycle * evperch
nextcycle = int(np.ceil((cycle + nevents) / evperch))
flatindices = cycle + np.arange(nevents)
indices0 = (flatindices // evperch) % nchunks
indices0 = indices0[:, None]
indices1 = (flatindices % evperch) * effevlen
indices1 = indices1[:, None] + np.arange(event_length)
events = self.noise_array[indices0, indices1]
if self.maxcycles is not None and nextcycle > self.maxcycles * nchunks:
self.cycle = 0
ncycles = int(np.ceil(nextcycle / nchunks))
raise RuntimeError(f'Data cycled {ncycles} times > limit {self.maxcycles}')
else:
self.cycle = nextcycle
assert events.shape == (nevents, event_length)
return events
def load_proto0_root_file(self, filename, channel, maxevents=None):
"""
Load noise from a specific file that Simone Stracka gave me. It's not
on the repository.
Parameters
----------
filename : str
File name.
channel : str
ADC channel to read. Must choose the correct one depending on the
SiPM used.
maxevents : int, optional
The maximum number of events loaded from the wav file.
"""
if self.timebase % 8 != 0:
raise RuntimeError(f'can not load `{filename}` data at 125 MSa/s with timebase={self.timebase}')
root = uproot.open(filename)
tree = root['midas_data']
if maxevents is None:
maxevents = 2 ** 31
nsamples = tree.array('nsamples')
nz = np.flatnonzero(nsamples)
if maxevents < len(nz):
maxevents = nz[maxevents]
entrystop = min(tree.numentries, maxevents)
noise = tree.array(channel, entrystop=entrystop)
counts = np.unique(noise.counts)
assert len(counts) == 2 and counts[0] == 0, 'inconsistent array lengths'
array = noise._content.reshape(-1, counts[-1])
self.noise_array = downsample(array, self.timebase // 8, dtype=np.float32)
def load_LNGS_wav(self, filename, maxevents=None):
"""
Load noise from a LNGS wav file. THERE SOME ASSUMPTIONS HERE ON WHAT'S
IN THE FILE, RECHECK IT WORKS IF I CHANGE THE FILE.
Parameters
----------
filename : str
The wav file path.
maxevents : int, optional
The maximum number of events loaded from the wav file.
"""
data = readwav.readwav(filename, maxevents=maxevents, mmap=False, quiet=True)
baseline_zone = data[:, 0, :8900]
ignore = np.any((0 <= baseline_zone) & (baseline_zone < 700), axis=-1)
downsampled = downsample(baseline_zone, self.timebase, dtype=np.float32)
self.noise_array = downsampled[~ignore]
def save(self, filename):
np.savez(filename, noise_array=self.noise_array, timebase=self.timebase)
def load(self, filename):
arch = np.load(filename)
try:
timebase = arch['timebase'].item()
if self.timebase % timebase != 0:
raise ValueError(f'timebase={self.timebase} in object not a multiple of timebase={timebase} in file `{filename}`')
self.noise_array = downsample(arch['noise_array'], self.timebase // timebase)
finally:
arch.close()
class WhiteNoise(Noise):
"""
Class to generate white noise.
Methods
-------
generate
"""
def generate(self, nevents, event_length, generator=None):
if generator is None:
generator = np.random.default_rng()
return generator.standard_normal(size=(nevents, event_length))
class Filter:
def __init__(self, events, boundary=0, rmargin=0):
"""
Class to apply various filters to the same piece of data.
Parameters
----------
events : array (nevents, event_length)
An array of events. The filter is applied separately to each event.
boundary : scalar
The past boundary condition, it is like each event has an infinite
series of samples with value `boundary` before sample 0.
rmargin : int
The signals are prolonged this much in the future filling with the
value `boundary`. Default 0.
Methods
-------
The filter methods are:
moving_average
exponential_moving_average
matched
Each method has the following return signature:
filtered : float array (nevents, event_length + margin)
filtered[:, i] is the filter output after reading sample
events[:, i].
and accepts an optional parameter `out` where the output is written to.
The method `all` computes all the filters in one array.
"""
self.events = events
self.boundary = boundary
self.rmargin = rmargin
def _add_boundary(self, left, right=0):
nevents, event_length = self.events.shape
events = np.empty((nevents, left + event_length + right))
events[:, :left] = self.boundary
events[:, left:left + event_length] = self.events
events[:, left + event_length:] = self.boundary
return events
def _out(self, out):
nevents, event_length = self.events.shape
if out is None:
return np.empty((nevents, event_length + self.rmargin))
else:
return out
def moving_average(self, nsamples, out=None):
"""
Parameters
----------
nsamples : int
The number of averaged samples.
"""
events = self._add_boundary(nsamples, self.rmargin)
cs = np.cumsum(events, axis=1)
out = self._out(out)
out[...] = cs[:, nsamples:]
out -= cs[:, :-nsamples]
out /= nsamples
return out
def exponential_moving_average(self, tau, out=None):
"""
Parameters
----------
tau : float
The exponential scale parameter. The filter is
y[i] = a * y[i - 1] + (1 - a) * x[i]
where a = 1 - 1/tau.
"""
out = self._out(out)
a = 1 - 1 / tau # Taylor of exp(-1/tau)
_exponential_moving_average(self.events, a, self.boundary, out)
return out
def matched(self, template, out=None):
"""
Parameters
----------
template : 1D array
The array which is cross-correlated with the signal.
"""
events = self._add_boundary(len(template) - 1, self.rmargin)
out = self._out(out)
_correlate(events, template, out)
return out
def all(self, template):
"""
Convenience method to run all the filters.
Parameters
----------
template : 1D array
The template for the matched filter. The averaged number of samples
and the exponential length scale of the moving average and the
exponential moving average filters respectively is the length of the
template.
Return
------
out : float array (4, nevents, event_length)
The first axis is over filters in this order:
no filter
moving average
exponential moving average
matched filter
"""
out = np.empty((4,) + self.events.shape)
out[0] = self.events
self.moving_average(len(template), out[1])
self.exponential_moving_average(len(template), out[2])
self.matched(template, out[3])
return out
@staticmethod
def name(ifilter, short=False):
"""
Return the name of a filter based on the indexing used in Filter.all().
"""
names = [
('No filter' , 'unfiltered'),
('Moving average' , 'movavg' ),
('Exponential moving average', 'expmovavg' ),
('Cross correlation' , 'crosscorr' ),
]
return names[ifilter][short]
@staticmethod
def tauname(ifilter):
"""
Return the name of the filter length parameter (uses LaTeX).
"""
names = [None, 'N', '$\\tau$', 'N']
return names[ifilter]
@numba.jit(cache=True, nopython=True)
def _exponential_moving_average(events, a, boundary, out):
for i in numba.prange(len(events)):
out[i, 0] = a * boundary + (1 - a) * events[i, 0]
for j in range(1, events.shape[1]):
out[i, j] = a * out[i, j - 1] + (1 - a) * events[i, j]
for j in range(events.shape[1], out.shape[1]):
out[i, j] = a * out[i, j - 1] + (1 - a) * boundary
@numba.jit(cache=True, nopython=True)
def _correlate1(events, template, out):
for i in numba.prange(len(events)):
for j in range(out.shape[1]):
out[i, j] = np.dot(events[i, j:j + len(template)], template)
def _correlate2(events, template, out):
out[:] = signal.fftconvolve(events, template[None, ::-1], axes=1, mode='valid')
_correlate = _correlate2
def run_sliced(fun, ntot, n=None):
"""
Run a cycle which calls a given function with a progressing slice as sole
argument until a range is covered, printing a progressbar.
Parameters
----------
fun : function
A function with a single parameter which is a slice object.
ntot : int
The end of the range covered by the sequence of slices.
n : int, optional
The length of each slice (the last slice may be shorter). If None, the
function is called once with the slice 0:ntot.
"""
if n is None:
fun(slice(0, ntot))
else:
for i in tqdm.tqdm(range(ntot // n + bool(ntot % n))):
start = i * n
end = min((i + 1) * n, ntot)
s = slice(start, end)
fun(s)
class Toy(npzload.NPZLoad):
def __init__(self, template, tau, snr, noisegen=None, timebase=8, upsampling=False):
"""
A Toy object simulates 1 p.e. signals with noise, each signal in a
separate "event", and localize the signal with filters, for a range of
values of the filter parameters and the SNR.
Parameters
----------
template : template.Template
A template object.
tau : array (ntau,)
Values of the filters length parameter, in number of samples.
snr : array (nsnr,)
SNR values. The SNR is the average signal peak height @ 1 GSa/s
over the noise RMS @ chosen timebase.
noisegen : Noise
An instance of a subclass of Noise. Default white noise.
timebase : int
The duration of a sample in nanoseconds. Default is 8 i.e.
125 MSa/s.
upsampling : bool
Default False. If True, compute the temporal localization with
upsampling to 1 GSa/s.
Methods
-------
run : generate the events
window_center : helper function to make an argument to run_window
run_window : extract windows from the events generated
sampling_str : string describing the sampling frequency
mftempl : get a matched filter template.
templocres : compute temporal localization resolution
filteredsnr : compute the SNR with signal-within-noise amplitude
snrratio : compute the ratio SNR after over before filtering
plot_event : plot a single event
plot_loc_all : plot temporal localization resolution vs. parameters
plot_loc : plot temporal localization histogram
plot_val : plot filtered signal value histogram
plot_event_window : plot an event with the windowed matched filter
plot_loc_window : plot temporal localization resolution after windowing
save : save to file.
Class methods
-------------
load : load from file. The loaded object can do plots but not run again.
Members
-------
template : template.Template
The template object.
tau : array (ntau,)
The values of the length parameters of the filters.
snr : array (nsnr,)
SNR values.
sigma : array (nsnr,)
The standard deviation of the noise for each SNR.
event_length : int
The number of samples in each simulated event.
output : array (nevents,)
A structured numpy array with these fields, filled after calling
`run`:
'filter_start', int, (ntau,) :
The sample number where the filtering starts, the same for each
event.
'filter_skip', int, (ntau,) :
The number of initial samples from the filtered zone not used
to compute results, the same for each event.
'loctrue', float :
The sample number where the signal is generated. See
template.Template.generate.
'loc', float, (4, ntau, nsnr) :
Signal localization. It is not calibrated. Computed with
parabolic interpolation.
'locraw' :
Like 'loc' but without interpolation.
'locup' :
Like 'loc' but with upsampling. Present only if upsampling=True.
'locupraw' :
Like 'locup' but without interpolation.
'value', float, (4, ntau, nsnr) :
The filtered value at the minimum, with inverted sign.
'valueclean', float, (4, ntau) :
Like 'value' but filtering only the signal without noise.
'filtnoisesdev', float, (4, ntau) :
The standard deviation of filtered unitary variance noise for
each filter.
output_event : array (nevents,)
A structured numpy array with these fields, filled after calling
`run`:
'signal', float, (event_length,) :
The signal and baseline without noise.
'noise', float, (event_length,) :
The zero-centered noise with unitary rms. The complete event is
obtained with signal + sigma * noise.
output_window : array (nevents,)
A structured numpy array with these fields, filled after calling
`run_window`:
'wstart', int, (ncenter, nwin) :
The sample number where the window starts.
'wloc', float, (ncenter, nwin, ntau, nsnr) :
The localization from the window (the sample number is still
relative to the whole event).
wlen : array (nwin,)
The window lengths used by `run_window`.
wlmargin : array (nwin,)
The left margin of the windows used by `run_window`.
wcenter : array (ncenter,)
The `wcenter` argument to `run_window` (see `window_center`).
"""
tau = np.asarray(tau)
assert template.template_length >= np.max(tau) * timebase
snr = np.asarray(snr)
if noisegen is None:
noisegen = WhiteNoise(timebase=timebase)
assert timebase == noisegen.timebase, 'timebase of Noise object must match timebase of Toy object'
self.template = template
self.tau = tau
self.snr = snr
self.noisegen = noisegen
self.timebase = timebase
self.upsampling = upsampling
self.sigma = template.max(timebase=1, aligned='trigger') / snr
self.margin = 512 // timebase
self.event_length = np.max(tau) + template.template_length // timebase + self.margin
self.templs = self._make_templ_array(timebase)
self.uptempls = self._make_templ_array(1)
def _make_templ_array(self, timebase):
maxlen = np.max(self.tau) * self.timebase // timebase
templs = np.zeros(len(self.tau), dtype=[
('length', int),
('offset', float),
('template', float, maxlen)
])
for i in range(len(self.tau)):
length = self.tau[i] * self.timebase // timebase
templ, offset = self.template.matched_filter_template(length, timebase=timebase, aligned='trigger')
assert len(templ) == length
templs[i]['length'] = length
templs[i]['offset'] = offset
templs[i]['template'][:length] = templ
return templs
def mftempl(self, itau, upsampling=False):
"""
Return a matched filter template used.
Parameters
----------
itau : int
The index into the tau array for the filter length.
upsampling : bool
Default False. If True, return the template at 1 GSa/s.
Return
------
template : array
The template. It is normalized to unit sum.
"""
templs = self.uptempls if upsampling else self.templs
templ = templs[itau]
return templ['template'][:templ['length']]
def run(self, nevents, pbar=None, seed=0):
"""
Simulate signals and localize them. After running this function, the
members `output` and `output_event` are filled.
Parameters
----------
nevents : int
The number of events. Each event contains one and only one signal.
pbar : int, optional
If given, a progress bar is shown that ticks every `pbar` events.
seed : int
Seed for the random number generator (default 0).
"""
generator = np.random.default_rng(seed)
ntau = len(self.tau)
nsnr = len(self.snr)
dtype = [
('filter_start', int, (ntau,)),
('filter_skip', int, (ntau,)),
('loctrue', float),
('loc', float, (4, ntau, nsnr)),
('locraw', int, (4, ntau, nsnr)),
('value', float, (4, ntau, nsnr)),
('valueclean', float, (4, ntau)),
('filtnoisesdev', float, (4, ntau)),
]
# if self.dobaseline:
# dtype.append(('baseline', float, (4, ntau, nsnr)))
if self.upsampling:
dtype.append(('locup', float, (4, ntau, nsnr)))
dtype.append(('locupraw', float, (4, ntau, nsnr)))
output = np.empty(nevents, dtype=dtype)
dtype = [
('signal', float, self.event_length),
('noise', float, self.event_length),
]
output_event = np.empty(nevents, dtype)
def fun(s):
self._run(output[s], output_event[s], generator)
run_sliced(fun, nevents, pbar)
self.output = output
self.output_event = output_event
def _run(self, output, output_event, generator):
# Get sizes of things.
nevents = len(output)
ntau = len(self.tau)
nsnr = len(self.snr)
event_length = self.event_length
upsampling = self.upsampling
timebase = self.timebase
template_length = self.template.template_length // timebase
margin = self.margin
# Generate signal and noise arrays.
signal_loc_offset = event_length - template_length - margin // 2
signal_loc_cont = generator.uniform(size=nevents)
signal_loc = signal_loc_offset + signal_loc_cont
simulated_signal = self.template.generate(event_length, signal_loc, generator, timebase=timebase, aligned='trigger')
simulated_noise = self.noisegen.generate(nevents, event_length, generator)
# # Filter with a moving average to compute the baseline.
# if self.dobaseline:
# filt_noise = Filter(simulated_noise)
# filt_signal = Filter(simulated_signal, self.template.baseline)
# bs_noise = filt_noise.moving_average(self.bslen)
# bs_signal = filt_signal.moving_average(self.bslen)
# Upsampled signal and noise (without baseline zone).
if upsampling:
upskip = signal_loc_offset - margin // 2
simulated_noise_up = np.repeat(simulated_noise[:, upskip:], timebase, axis=-1)
simulated_signal_up = np.repeat(simulated_signal[:, upskip:], timebase, axis=-1)
filt_noise_up = Filter(simulated_noise_up)
filt_signal_up = Filter(simulated_signal_up)
# Arrays filled in the cycle over tau.
minima = np.empty((4, 4, ntau, nsnr, nevents))
# first axis = (loc, locraw, locup, locupraw)
minval = np.empty_like(minima)
sdev = np.empty((4, ntau, nevents))
cleanval = np.empty_like(sdev)
filter_start, filter_skip = np.empty((2, ntau), int)
# Indices used for the interpolation.
indices = tuple(np.ogrid[:4, :nsnr, :nevents])
indices_clean = tuple(np.ogrid[:4, :nevents])
for itau in range(ntau):
tau = self.tau[itau]
# Get the matched filter template.
mf_templ = self.mftempl(itau)
# Filter objects.
skip = signal_loc_offset - margin // 2 - tau
filt_noise = Filter(simulated_noise[:, skip:])
filt_signal = Filter(simulated_signal[:, skip:])
filter_start[itau] = skip
# Filter the signal and noise separately.
noise = filt_noise.all(mf_templ)[..., tau:]
signal = filt_signal.all(mf_templ)[..., tau:]
skip += tau
filter_skip[itau] = tau
# Compute the standard deviation of the filtered noise.
sdev[:, itau] = np.std(noise, axis=-1)
# Combine the noise and signal with the given SNR.
sim = signal[:, None, :, :] + self.sigma[None, :, None, None] * noise[:, None, :, :]
assert sim.shape == (4, nsnr, nevents, event_length - skip)
# Interpolate the minimum with a parabola, without noise.
x0 = np.argmin(signal, axis=-1)
xp = np.minimum(x0 + 1, event_length - skip - 1)
xm = np.maximum(x0 - 1, 0)
y0 = signal[indices_clean + (x0,)]
yp = signal[indices_clean + (xp,)]
ym = signal[indices_clean + (xm,)]
num = yp - ym
denom = yp + ym - 2 * y0
cleanval[:, itau] = y0 - 1/8 * num ** 2 / denom
# Interpolate the minimum with a parabola.
x0 = np.argmin(sim, axis=-1)
xp = np.minimum(x0 + 1, event_length - skip - 1)
xm = np.maximum(x0 - 1, 0)
y0 = sim[indices + (x0,)]
yp = sim[indices + (xp,)]
ym = sim[indices + (xm,)]
num = yp - ym
denom = yp + ym - 2 * y0
# TODO handle the case denom == 0
minima[0, :, itau] = x0 - 1/2 * num / denom + skip
minval[0, :, itau] = y0 - 1/8 * num ** 2 / denom
minima[1, :, itau] = x0 + skip
minval[1, :, itau] = y0
if upsampling:
# Get the matched filter template.
mf_templ = self.mftempl(itau, True)
# Filter the signal and noise separately.
noise = filt_noise_up.all(mf_templ)
signal = filt_signal_up.all(mf_templ)
# Combine the noise and signal with the given SNR.
sim = signal[:, None, :, :] + self.sigma[None, :, None, None] * noise[:, None, :, :]
assert sim.shape == (4, nsnr, nevents, (event_length - upskip) * timebase)
# Interpolate the minimum with a parabola.
x0 = np.argmin(sim, axis=-1)
xp = np.minimum(x0 + 1, (event_length - upskip) * timebase - 1)
xm = np.maximum(x0 - 1, 0)
y0 = sim[indices + (x0,)]
yp = sim[indices + (xp,)]
ym = sim[indices + (xm,)]
num = yp - ym
denom = yp + ym - 2 * y0
# TODO handle the case denom == 0
minima[2, :, itau] = (x0 - 1/2 * num / denom) / timebase + upskip
minval[2, :, itau] = y0 - 1/8 * num ** 2 / denom
minima[3, :, itau] = x0 / timebase + upskip
minval[3, :, itau] = y0
# # Compute the baseline.
# idx0 = np.arange(nevents)
# idx1 = np.array(minima[1], int) - bsoffset - self.tau[:, None, None]
# baseline = bs_signal[idx0, idx1] + self.sigma[None, :, None] * bs_noise[idx0, idx1]
# Align approximately the localization.
# loc = np.asarray(minima, float)
# loc[[0, 2], :3] -= self.template.maxoffset(timebase, aligned='trigger')
# loc[:, 1:] -= self.tau[:, None, None]
# loc[[0, 2], 3] += self.templs['offset'][:, None, None]
# Write results in the output arrays.
output['loctrue'] = signal_loc
output['loc'] = np.moveaxis(minima[0], -1, 0)
output['locraw'] = np.moveaxis(minima[1], -1, 0)
if upsampling:
output['locup'] = np.moveaxis(minima[2], -1, 0)
output['locupraw'] = np.moveaxis(minima[3], -1, 0)
output['value'] = np.moveaxis(-minval[0], -1, 0)
output['valueclean'] = np.moveaxis(-cleanval, -1, 0)
output['filtnoisesdev'] = np.moveaxis(sdev, -1, 0)
output['filter_start'] = filter_start
output['filter_skip'] = filter_skip
output_event['signal'] = simulated_signal
output_event['noise'] = simulated_noise
def window_center(self, ifilter, isnr, itau='best'):
"""
Calibrate the temporal localization of a given filter centering the
median on the true location and return it in a format suitable as input
to run_window().
Parameters
----------
ifilter : int array (ncenter,)
The indices of the filter, see Filter.all.
isnr : int array (ncenter,)
The indices of the SNR.
itau : int array (ncenter,) or str
The indices of tau. If `best` (default), take the best temporal
resolution for each filter and SNR.
Return
------
wcenter : array (1 + ncenter,)
A structured numpy array with these fields:
'ifilter', int
'itau', int
'isnr', int
'center', int, (nevents,)
The calibrated temporal localization for given filter, tau,
snr, rounded to nearest integer.
The first entry in the array has the indices fields set to 9999 and
the 'center' field set to the true signal location.
"""
locfield = 'loc'
ifilter = np.asarray(ifilter)
isnr = np.asarray(isnr)
loctrue = self.output['loctrue']
locall = self.output[locfield]
if isinstance(itau, str) and itau == 'best':
res = self.templocres(locfield)
itau2d = np.argmin(res, axis=1)
itau = itau2d[ifilter, isnr]
else:
itau = np.asarray(itau)
loc = locall[:, ifilter, itau, isnr].T
corr = np.median(loc - loctrue, axis=-1)
center = loc - corr[:, None]
wcenter = np.empty(1 + len(itau), dtype=[
('ifilter', int),
('itau', int),
('isnr', int),
('center', int, (len(loctrue),))
])
wcenter[0] = (9999, 9999, 9999, np.rint(loctrue))
wcenter[1:]['ifilter'] = ifilter
wcenter[1:]['itau'] = itau
wcenter[1:]['isnr'] = isnr
wcenter[1:]['center'] = np.rint(center)
return wcenter
def run_window(self, wlen, wlmargin, wcenter=None, pbar=None):
"""
Extract a subset of samples from simulated signals and relocalize the
signal filtering only in the window with the matched filter.
This function sets the member `output_window`.
Parameters
----------
wlen : int array (nwin,)
Lengths for windows that are extracted around the localized signal.
wlmargin : int array (nwin,)
The window starts `wlmargin` samples before the signal template
start.
wcenter : array (ncenter,), optional
If specified, the window position is relative to the specified
sample for each event instead of the signal template start.
Use the method window_center() to compute this array.
pbar : int, optional
If given, a progress bar is shown that ticks every `pbar` events.
"""
# TODO For speed I should run a single tau cross correlation filter
# on each window instead of all taus.
wlen = np.asarray(wlen)
wlmargin = np.asarray(wlmargin)
if wcenter is None:
i = np.empty(0, int)
wcenter = self.window_center(i, i, i)
nwin, = wlen.shape
assert wlmargin.shape == (nwin,)
nevents = len(self.output)
ncenter, = wcenter.shape
output = np.empty(nevents, dtype=[
('wstart', int, (ncenter, nwin)),
('wloc', float, (ncenter, nwin, len(self.tau), len(self.snr)))
])
def fun(s):
self._run_window(output[s], self.output[s], self.output_event[s], wlen, wlmargin, wcenter['center'][:, s])
run_sliced(fun, len(self.output), pbar)
self.output_window = output
self.wlen = wlen
self.wlmargin = wlmargin
self.wcenter = wcenter