Skip to content

Commit

Permalink
Various fixes to EventManagers (#4639)
Browse files Browse the repository at this point in the history
* Rename "snr_<detector>" datasets to just "snr"

* Do not store template duration in trigger files

* Cleanup and better document `write_events()` method

* Fix how gating info is written

* Fix imports

* Detriplicate code

* Codeclimate
  • Loading branch information
titodalcanton authored Feb 20, 2024
1 parent aa05af6 commit 232a4ce
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 96 deletions.
15 changes: 12 additions & 3 deletions bin/pycbc_multi_inspiral
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,16 @@ with ctx:
}
network_names = sorted(network_out_vals.keys())
event_mgr = EventManagerCoherent(
args, args.instruments, ifo_names,
[ifo_out_types[n] for n in ifo_names], network_names,
args,
args.instruments,
ifo_names,
[ifo_out_types[n] for n in ifo_names],
network_names,
[network_out_types[n] for n in network_names],
segments=segments, time_slides=time_slides)
segments=segments,
time_slides=time_slides,
gating_info={det: strain_dict[det].gating_info for det in strain_dict}
)

# Template bank: filtering and thinning
logging.info("Read in template bank")
Expand Down Expand Up @@ -697,6 +703,9 @@ with ctx:
event_mgr.cluster_template_network_events(
'time_index', 'reweighted_snr', cluster_window, slide=slide)
event_mgr.finalize_template_events()

logging.info('Writing output')
event_mgr.write_events(args.output)

logging.info("Finished")
logging.info("Time to complete analysis: %d", int(time.time() - time_init))
160 changes: 67 additions & 93 deletions pycbc/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
"""This modules defines functions for clustering and thresholding timeseries to
produces event triggers
"""
import numpy, copy, os.path
import os.path
import copy
import itertools
import logging
import h5py
import pickle
import numpy
import h5py

from pycbc.types import Array
from pycbc.scheme import schemed
Expand Down Expand Up @@ -177,6 +180,23 @@ def cluster_reduce(idx, snr, window_size):
return idx.take(ind), snr.take(ind)


class H5FileSyntSugar(object):
"""Convenience class that adds some syntactic sugar to h5py.File.
"""
def __init__(self, name, prefix=''):
self.f = h5py.File(name, 'w')
self.prefix = prefix

def __setitem__(self, name, data):
self.f.create_dataset(
self.prefix + '/' + name,
data=data,
compression='gzip',
compression_opts=9,
shuffle=True
)


class EventManager(object):
def __init__(self, opt, column, column_types, **kwds):
self.opt = opt
Expand Down Expand Up @@ -407,33 +427,21 @@ def save_performance(self, ncores, nfilters, ntemplates, run_time,
self.write_performance = True

def write_events(self, outname):
""" Write the found events to a sngl inspiral table
"""Write the found events to a file. The only currently supported
format is HDF5, indicated by an .hdf or .h5 extension.
"""
self.make_output_dir(outname)

if '.hdf' in outname:
if outname.endswith(('.hdf', '.h5')):
self.write_to_hdf(outname)
else:
raise ValueError('Cannot write to this format')
return
raise ValueError('Unsupported event output file format')

def write_to_hdf(self, outname):
class fw(object):
def __init__(self, name, prefix):
self.f = h5py.File(name, 'w')
self.prefix = prefix

def __setitem__(self, name, data):
col = self.prefix + '/' + name
self.f.create_dataset(col, data=data,
compression='gzip',
compression_opts=9,
shuffle=True)

self.events.sort(order='template_id')
th = numpy.array([p['tmplt'].template_hash for p in
self.template_params])
tid = self.events['template_id']
f = fw(outname, self.opt.channel_name[0:2])
f = H5FileSyntSugar(outname, self.opt.channel_name[0:2])

if len(self.events):
f['snr'] = abs(self.events['snr'])
Expand Down Expand Up @@ -471,6 +479,10 @@ def __setitem__(self, name, data):
# Not precessing
f['sigmasq'] = self.events['sigmasq']

# Template durations should ideally be stored in the bank file.
# At present, however, a few plotting/visualization codes
# downstream in the offline search workflow rely on durations being
# stored in the trigger files instead.
template_durations = [p['tmplt'].template_duration for p in
self.template_params]
f['template_duration'] = numpy.array(template_durations,
Expand Down Expand Up @@ -595,6 +607,31 @@ def add_template_events_to_ifo(self, ifo, columns, vectors):
self.template_event_dict[ifo] = self.template_events
self.template_events = None

def write_gating_info_to_hdf(self, hf):
"""Write per-detector gating information to an h5py file object.
The information is laid out according to the following groups and
datasets: `/<detector>/gating/{file, auto}/{time, width, pad}` where
"file" and "auto" indicate respectively externally-provided gates and
internally-generated gates (autogating), and "time", "width" and "pad"
indicate the gate center times, total durations and padding durations
in seconds respectively.
"""
if 'gating_info' not in self.global_params:
return
gates = self.global_params['gating_info']
for ifo, gate_type in itertools.product(self.ifos, ['file', 'auto']):
if gate_type not in gates[ifo]:
continue
hf[f'{ifo}/gating/{gate_type}/time'] = numpy.array(
[float(g[0]) for g in gates[ifo][gate_type]]
)
hf[f'{ifo}/gating/{gate_type}/width'] = numpy.array(
[g[1] for g in gates[ifo][gate_type]]
)
hf[f'{ifo}/gating/{gate_type}/pad'] = numpy.array(
[g[2] for g in gates[ifo][gate_type]]
)


class EventManagerCoherent(EventManagerMultiDetBase):
def __init__(self, opt, ifos, column, column_types, network_column,
Expand Down Expand Up @@ -656,7 +693,6 @@ def add_template_network_events(self, columns, vectors):
""" Add a vector indexed """
# initialize with zeros - since vectors can be None, look for the
# longest one that isn't
new_events = None
new_events = numpy.zeros(
max([len(v) for v in vectors if v is not None]),
dtype=self.network_event_dtype
Expand All @@ -681,19 +717,11 @@ def add_template_events_to_network(self, columns, vectors):
self.template_events = None

def write_to_hdf(self, outname):
class fw(object):
def __init__(self, name):
self.f = h5py.File(name, 'w')

def __setitem__(self, name, data):
col = self.prefix + '/' + name
self.f.create_dataset(
col, data=data, compression='gzip', compression_opts=9,
shuffle=True)
self.events.sort(order='template_id')
th = numpy.array(
[p['tmplt'].template_hash for p in self.template_params])
f = fw(outname)
f = H5FileSyntSugar(outname)
self.write_gating_info_to_hdf(f)
# Output network stuff
f.prefix = 'network'
network_events = numpy.array(
Expand Down Expand Up @@ -721,8 +749,7 @@ def __setitem__(self, name, data):
ifo_events = numpy.array([e for e in self.events
if e['ifo'] == self.ifo_dict[ifo]], dtype=self.event_dtype)
if len(ifo_events):
ifo_str = ifo.lower()[0] if ifo != 'H1' else ifo.lower()
f['snr_%s' % ifo_str] = abs(ifo_events['snr'])
f['snr'] = abs(ifo_events['snr'])
f['event_id'] = ifo_events['event_id']
try:
# Precessing
Expand All @@ -736,8 +763,8 @@ def __setitem__(self, name, data):
f['bank_chisq_dof'] = ifo_events['bank_chisq_dof']
f['cont_chisq'] = ifo_events['cont_chisq']
f['end_time'] = ifo_events['time_index'] / \
float(self.opt.sample_rate[ifo_str]) + \
self.opt.gps_start_time[ifo_str]
float(self.opt.sample_rate[ifo]) + \
self.opt.gps_start_time[ifo]
f['time_index'] = ifo_events['time_index']
f['slide_id'] = ifo_events['slide_id']
try:
Expand All @@ -764,11 +791,6 @@ def __setitem__(self, name, data):
dtype=numpy.float32)
f['sigmasq'] = template_sigmasq[tid]

template_durations = [p['tmplt'].template_duration for p in
self.template_params]
f['template_duration'] = numpy.array(template_durations,
dtype=numpy.float32)[tid]

# FIXME: Can we get this value from the autochisq instance?
# cont_dof = self.opt.autochi_number_points
# if self.opt.autochi_onesided is None:
Expand Down Expand Up @@ -820,17 +842,6 @@ def __setitem__(self, name, data):
f['search/setup_time_fraction'] = \
numpy.array([float(self.setup_time) / float(self.run_time)])

if 'gating_info' in self.global_params:
gating_info = self.global_params['gating_info']
for gate_type in ['file', 'auto']:
if gate_type in gating_info:
f['gating/' + gate_type + '/time'] = numpy.array(
[float(g[0]) for g in gating_info[gate_type]])
f['gating/' + gate_type + '/width'] = numpy.array(
[g[1] for g in gating_info[gate_type]])
f['gating/' + gate_type + '/pad'] = numpy.array(
[g[2] for g in gating_info[gate_type]])

def finalize_template_events(self):
# Check that none of the template events have the same time index as an
# existing event in events. I.e. don't list the same ifo event multiple
Expand Down Expand Up @@ -965,41 +976,20 @@ def finalize_template_events(self, perform_coincidence=True,
self.template_event_dict[ifo] = numpy.array([],
dtype=self.event_dtype)

def write_events(self, outname):
""" Write the found events to a sngl inspiral table
"""
self.make_output_dir(outname)

if '.hdf' in outname:
self.write_to_hdf(outname)
else:
raise ValueError('Cannot write to this format')

def write_to_hdf(self, outname):
class fw(object):
def __init__(self, name):
self.f = h5py.File(name, 'w')

def __setitem__(self, name, data):
col = self.prefix + '/' + name
self.f.create_dataset(col, data=data,
compression='gzip',
compression_opts=9,
shuffle=True)

self.events.sort(order='template_id')
th = numpy.array([p['tmplt'].template_hash for p in
self.template_params])
tid = self.events['template_id']
f = fw(outname)
f = H5FileSyntSugar(outname)
self.write_gating_info_to_hdf(f)
for ifo in self.ifos:
f.prefix = ifo
ifo_events = numpy.array([e for e in self.events if
e['ifo'] == self.ifo_dict[ifo]],
dtype=self.event_dtype)
if len(ifo_events):
ifo_str = ifo.lower()[0] if ifo != 'H1' else ifo.lower()
f['snr_%s' % ifo_str] = abs(ifo_events['snr'])
f['snr'] = abs(ifo_events['snr'])
try:
# Precessing
f['u_vals'] = ifo_events['u_vals']
Expand All @@ -1012,8 +1002,8 @@ def __setitem__(self, name, data):
f['bank_chisq_dof'] = ifo_events['bank_chisq_dof']
f['cont_chisq'] = ifo_events['cont_chisq']
f['end_time'] = ifo_events['time_index'] / \
float(self.opt.sample_rate[ifo_str]) + \
self.opt.gps_start_time[ifo_str]
float(self.opt.sample_rate[ifo]) + \
self.opt.gps_start_time[ifo]
try:
# Precessing
template_sigmasq_plus = numpy.array([t['sigmasq_plus'] for
Expand All @@ -1034,11 +1024,6 @@ def __setitem__(self, name, data):
dtype=numpy.float32)
f['sigmasq'] = template_sigmasq[tid]

template_durations = [p['tmplt'].template_duration for p in
self.template_params]
f['template_duration'] = \
numpy.array(template_durations, dtype=numpy.float32)[tid]

# FIXME: Can we get this value from the autochisq instance?
cont_dof = self.opt.autochi_number_points
if self.opt.autochi_onesided is None:
Expand Down Expand Up @@ -1093,17 +1078,6 @@ def __setitem__(self, name, data):
f['search/setup_time_fraction'] = \
numpy.array([float(self.setup_time) / float(self.run_time)])

if 'gating_info' in self.global_params:
gating_info = self.global_params['gating_info']
for gate_type in ['file', 'auto']:
if gate_type in gating_info:
f['gating/' + gate_type + '/time'] = numpy.array(
[float(g[0]) for g in gating_info[gate_type]])
f['gating/' + gate_type + '/width'] = numpy.array(
[g[1] for g in gating_info[gate_type]])
f['gating/' + gate_type + '/pad'] = numpy.array(
[g[2] for g in gating_info[gate_type]])


__all__ = ['threshold_and_cluster', 'findchirp_cluster_over_window',
'threshold', 'cluster_reduce', 'ThresholdCluster',
Expand Down

0 comments on commit 232a4ce

Please sign in to comment.