Skip to content

Commit

Permalink
Idq live (#4850)
Browse files Browse the repository at this point in the history
* Have pycbc live mark flagged triggers instead of removing

* Make stat usable in low-latency

* Add command line argument for whether to use idq for reweighting

* Add logging for iDQ flagged triggers

* Fix bug when using ifo with no dq

* Improve logic for getting ifo frm trigs

* Update for compatibility with Gareth's stat reloading code

* Modify how trig ifo is gotten and add debug statements

* Use logging not print for debugging

* logger not logging

* Fix where tnum is set

* Get rid of excess logging

* Address Gareth's comments

* Codeclimate

* Apply suggestions from code review

Co-authored-by: Gareth S Cabourn Davies <gareth.cabourndavies@ligo.org>

---------

Co-authored-by: Gareth S Cabourn Davies <gareth.cabourndavies@ligo.org>
  • Loading branch information
maxtrevor and GarethCabournDavies authored Aug 22, 2024
1 parent d9f728d commit cf6447e
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 56 deletions.
36 changes: 28 additions & 8 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction
parser.add_argument('--idq-threshold', type=float,
help='Threshold used to veto triggers at times of '
'low iDQ False Alarm Probability')
parser.add_argument('--idq-reweighting', action='store_true',default=False,
help='Reweight triggers based on iDQ False Alarm Probability')
parser.add_argument('--data-quality-channel',
action=MultiDetMultiColonOptionAction,
help="Channel containing data quality information. Used "
Expand Down Expand Up @@ -1311,17 +1313,35 @@ with ctx:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]
if data_reader[ifo].idq is not None:
logging.info("Checking %s's iDQ information", ifo)
logging.info("Reading %s's iDQ information", ifo)
start = data_reader[ifo].start_time
times = results[ifo]['end_time']
idx = data_reader[ifo].idq.indices_of_flag(
flag_active = data_reader[ifo].idq.flag_at_times(
start, valid_pad, times,
padding=data_reader[ifo].dq_padding)
logging.info('Keeping %d/%d %s triggers after iDQ',
len(idx), len(times), ifo)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]
padding=data_reader[ifo].dq_padding
)

if args.idq_reweighting:
logging.info(
'iDQ flagged %d/%d %s triggers',
numpy.sum(flag_active),
len(times),
ifo
)
results[ifo]['dq_state'] = flag_active.astype(int)
else:
# use idq as a veto
keep = numpy.logical_not(flag_active)
logging.info(
'Keeping %d/%d %s triggers after iDQ',
numpy.sum(keep),
len(times),
ifo
)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = \
results[ifo][key][keep]

# Calculate and add the psd variation for the results
if args.psd_variation:
Expand Down
112 changes: 75 additions & 37 deletions pycbc/events/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,12 @@ def single(self, trigs):
numpy.ndarray
The array of single detector values
"""
try:
# exists if accessed via coinc_findtrigs
self.curr_tnum = trigs.template_num
except AttributeError:
# exists for SingleDetTriggers & pycbc_live get_coinc
self.curr_tnum = trigs['template_id']

# single-ifo stat = log of noise rate
sngl_stat = self.lognoiserate(trigs)
Expand All @@ -1681,12 +1687,6 @@ def single(self, trigs):
singles['end_time'] = trigs['end_time'][:]
singles['sigmasq'] = trigs['sigmasq'][:]
singles['snr'] = trigs['snr'][:]
try:
# exists if accessed via coinc_findtrigs
self.curr_tnum = trigs.template_num
except AttributeError:
# exists for SingleDetTriggers & pycbc_live get_coinc
self.curr_tnum = trigs['template_id']

# Store benchmark log volume as single-ifo information since the coinc
# method does not have access to template id
Expand Down Expand Up @@ -2271,14 +2271,46 @@ def __init__(self, sngl_ranking, files=None, ifos=None,
ifos=ifos, **kwargs)
self.dq_rates_by_state = {}
self.dq_bin_by_tid = {}
self.dq_state_segments = {}
self.dq_state_segments = None
self.low_latency = False
self.single_dtype.append(('dq_state', int))

for ifo in self.ifos:
key = f'{ifo}-dq_stat_info'
if key in self.files.keys():
self.dq_rates_by_state[ifo] = self.assign_dq_rates(key)
self.dq_bin_by_tid[ifo] = self.assign_template_bins(key)
self.dq_state_segments[ifo] = self.setup_segments(key)
self.check_low_latency(key)
if not self.low_latency:
if self.dq_state_segments is None:
self.dq_state_segments = {}
self.dq_state_segments[ifo] = self.setup_segments(key)

def check_low_latency(self, key):
"""
Check if the statistic file indicates low latency mode.
Parameters
----------
key: str
Statistic file key string.
Returns
-------
None
"""
ifo = key.split('-')[0]
with h5py.File(self.files[key], 'r') as dq_file:
ifo_grp = dq_file[ifo]
if 'dq_segments' not in ifo_grp.keys():
# if segs are not in file, we must be in LL
if self.dq_state_segments is not None:
raise ValueError(
'Either all dq stat files must have segments or none'
)
self.low_latency = True
elif self.low_latency:
raise ValueError(
'Either all dq stat files must have segments or none'
)

def assign_template_bins(self, key):
"""
Expand Down Expand Up @@ -2337,9 +2369,7 @@ def assign_dq_rates(self, key):

def setup_segments(self, key):
"""
Check if segments definitions are in stat file
If they are, we are running offline and need to store them
If they aren't, we are running online
Store segments from stat file
"""
ifo = key.split('-')[0]
with h5py.File(self.files[key], 'r') as dq_file:
Expand Down Expand Up @@ -2368,35 +2398,32 @@ def update_file(self, key):
return True
# We also need to check if the DQ files have updated
if key.endswith('dq_stat_info'):
ifo = key.split('-')[0]
logger.info(
"Updating %s statistic %s file",
''.join(self.ifos),
ifo,
key
)
self.assign_dq_rates(key)
self.assign_template_bins(key)
self.setup_segments(key)
self.dq_rates_by_state[ifo] = self.assign_dq_rates(key)
self.dq_bin_by_tid[ifo] = self.assign_template_bins(key)
return True
return False

def find_dq_noise_rate(self, trigs, dq_state):
def find_dq_noise_rate(self, trigs):
"""Get dq values for a specific ifo and dq states"""

try:
tnum = trigs.template_num
except AttributeError:
tnum = trigs['template_id']

try:
ifo = trigs.ifo
except AttributeError:
ifo = trigs['ifo']
assert len(numpy.unique(ifo)) == 1
# Should be exactly one ifo provided
ifo = ifo[0]
ifo = trigs.get('ifo', None)
if ifo is None:
ifo = self.ifos[0]
assert ifo in self.ifos

dq_val = numpy.zeros(len(dq_state))
dq_state = trigs['dq_state']
dq_val = numpy.ones(len(dq_state))

tnum = self.curr_tnum
if ifo in self.dq_rates_by_state:
for (i, st) in enumerate(dq_state):
if isinstance(tnum, numpy.ndarray):
Expand Down Expand Up @@ -2437,24 +2464,35 @@ def lognoiserate(self, trigs):
Array of log noise rate density for each input trigger.
"""

# make sure every trig has a dq state
try:
ifo = trigs.ifo
except AttributeError:
ifo = trigs['ifo']
assert len(numpy.unique(ifo)) == 1
# Should be exactly one ifo provided
ifo = ifo[0]

dq_state = self.find_dq_state_by_time(ifo, trigs['end_time'][:])
dq_rate = self.find_dq_noise_rate(trigs, dq_state)
dq_rate = self.find_dq_noise_rate(trigs)
dq_rate = numpy.maximum(dq_rate, 1)

logr_n = ExpFitFgBgNormStatistic.lognoiserate(
self, trigs)
logr_n += numpy.log(dq_rate)
return logr_n

def single(self, trigs):
# make sure every trig has a dq state
try:
ifo = trigs.ifo
except AttributeError:
ifo = trigs.get('ifo', None)
if ifo is None:
ifo = self.ifos[0]
assert ifo in self.ifos

singles = ExpFitFgBgNormStatistic.single(self, trigs)

if self.low_latency:
# trigs should already have a dq state assigned
singles['dq_state'] = trigs['dq_state'][:]
else:
singles['dq_state'] = self.find_dq_state_by_time(
ifo, trigs['end_time'][:]
)
return singles


class DQExpFitFgBgKDEStatistic(DQExpFitFgBgNormStatistic):
"""
Expand Down
35 changes: 24 additions & 11 deletions pycbc/frame/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,41 +896,54 @@ def __init__(self, frame_src,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)

def indices_of_flag(self, start_time, duration, times, padding=0):
""" Return the indices of the times lying in the flagged region
def flag_at_times(self, start_time, duration, times, padding=0):
""" Check whether the idq flag was on at given times
Parameters
----------
start_time: int
Beginning time to request for
duration: int
Number of seconds to check.
times: array of floats
Times to check for an active flag
padding: float
Number of seconds to add around flag inactive times to be considered
inactive as well.
Amount of time in seconds to flag around samples
below the iDQ FAP threshold
Returns
-------
indices: numpy.ndarray
Array of indices marking the location of triggers within valid
time.
flag_state: numpy.ndarray
Boolean array of whether flag was on at given times
"""
from pycbc.events.veto import indices_outside_times
from pycbc.events.veto import indices_within_times

# convert start and end times to buffer indices
sr = self.idq.raw_buffer.sample_rate
s = int((start_time - self.idq.raw_buffer.start_time - padding) * sr) - 1
e = s + int((duration + padding) * sr) + 1

# find samples when iDQ FAP is below threshold and state is valid
idq_fap = self.idq.raw_buffer[s:e]
stamps = idq_fap.sample_times.numpy()
low_fap = idq_fap.numpy() <= self.threshold
idq_valid = self.idq_state.raw_buffer[s:e]
idq_valid = idq_valid.numpy().astype(bool)
valid_low_fap = numpy.logical_and(idq_valid, low_fap)

# find times corresponding to the valid low FAP samples
glitch_idx = numpy.flatnonzero(valid_low_fap)
stamps = idq_fap.sample_times.numpy()
glitch_times = stamps[glitch_idx]

# construct start and end times of flag segments
starts = glitch_times - padding
ends = starts + 1.0 / sr + padding * 2.0
idx = indices_outside_times(times, starts, ends)
return idx

# check if times were flagged
idx = indices_within_times(times, starts, ends)
flagged_bool = numpy.zeros(len(times), dtype=bool)
flagged_bool[idx] = True
return flagged_bool

def advance(self, blocksize):
""" Add blocksize seconds more to the buffer, push blocksize seconds
Expand Down

0 comments on commit cf6447e

Please sign in to comment.