Skip to content

Commit

Permalink
Add mechanism for re-loading the statistic files in live (#4816)
Browse files Browse the repository at this point in the history
* Add mechanism for re-loading the statistic files

* CC fixes

* CC 2

* Fix for/else statement - wrongly implemented
  • Loading branch information
GarethCabournDavies authored Jul 23, 2024
1 parent f756e18 commit 3d9f7e2
Show file tree
Hide file tree
Showing 5 changed files with 443 additions and 26 deletions.
11 changes: 11 additions & 0 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,8 @@ parser.add_argument('--psd-variation', action='store_true',
"values for each single detector triggers found by "
"the search. Required when using a single detector "
"ranking statistic that includes psd variation.")
parser.add_argument("--statistic-refresh-rate", type=float,
help="How often to refresh the statistic object")

scheme.insert_processing_option_group(parser)
LiveSingle.insert_args(parser)
Expand Down Expand Up @@ -1166,6 +1168,8 @@ with ctx:
if analyze_singles and evnt.rank == 0:
sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
for ifo in evnt.trigg_ifos}
for estim in sngl_estimator.values():
estim.start_refresh_thread()

# Create double coincident background estimator
# for every pair of triggering interferometers
Expand All @@ -1186,6 +1190,10 @@ with ctx:
setproctitle('PyCBC Live {} bg estimator'.format(
ppdets(c.ifos, '-')))

def estimator_refresh_threads(_):
c = estimators[my_coinc_id]
c.start_refresh_thread()

def get_coinc(results):
c = estimators[my_coinc_id]
r = c.add_singles(results)
Expand All @@ -1200,6 +1208,7 @@ with ctx:

coinc_pool = BroadcastPool(len(estimators))
coinc_pool.allmap(set_coinc_id, range(len(estimators)))
coinc_pool.broadcast(estimator_refresh_threads, None)

logging.info('Starting')

Expand Down Expand Up @@ -1428,3 +1437,5 @@ if evnt.rank == 1:

if args.enable_profiling is not None and evnt.rank == args.enable_profiling:
pr.dump_stats(f'profiling_rank_{evnt.rank:03d}')

logging.info("Exiting as the end time has been reached")
82 changes: 71 additions & 11 deletions pycbc/events/coinc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@
coincident triggers.
"""

import numpy, logging, pycbc.pnutils, copy
import numpy
import logging
import copy
from datetime import datetime as dt
import time as timemod
import threading

import pycbc.pnutils
from pycbc.detector import Detector, ppdets
from pycbc import conversions as conv

from . import stat as pycbcstat
from .eventmgr_cython import coincbuffer_expireelements
from .eventmgr_cython import coincbuffer_numgreater
from .eventmgr_cython import timecoincidence_constructidxs
Expand Down Expand Up @@ -829,6 +838,7 @@ def __init__(self, num_templates, analysis_block, background_statistic,
ifar_limit=100,
timeslide_interval=.035,
coinc_window_pad=.002,
statistic_refresh_rate=None,
return_background=False,
**kwargs):
"""
Expand Down Expand Up @@ -856,25 +866,31 @@ def __init__(self, num_templates, analysis_block, background_statistic,
coinc_window_pad: float
Amount of time allowed to form a coincidence in addition to the
time of flight in seconds.
statistic_refresh_rate: float
How regularly to run the update_files method on the statistic
class (in seconds), default not do do this
return_background: boolean
If true, background triggers will also be included in the file
output.
kwargs: dict
Additional options for the statistic to use. See stat.py
for more details on statistic options.
"""
from . import stat
self.num_templates = num_templates
self.analysis_block = analysis_block

stat_class = stat.get_statistic(background_statistic)
stat_class = pycbcstat.get_statistic(background_statistic)
self.stat_calculator = stat_class(
sngl_ranking,
stat_files,
ifos=ifos,
**kwargs
)

self.time_stat_refreshed = dt.now()
self.stat_calculator_lock = threading.Lock()
self.statistic_refresh_rate = statistic_refresh_rate

self.timeslide_interval = timeslide_interval
self.return_background = return_background
self.coinc_window_pad = coinc_window_pad
Expand Down Expand Up @@ -955,7 +971,6 @@ def pick_best_coinc(cls, coinc_results):

@classmethod
def from_cli(cls, args, num_templates, analysis_chunk, ifos):
from . import stat

# Allow None inputs
stat_files = args.statistic_files or []
Expand All @@ -964,7 +979,7 @@ def from_cli(cls, args, num_templates, analysis_chunk, ifos):
# flatten the list of lists of filenames to a single list (may be empty)
stat_files = sum(stat_files, [])

kwargs = stat.parse_statistic_keywords_opt(stat_keywords)
kwargs = pycbcstat.parse_statistic_keywords_opt(stat_keywords)

return cls(num_templates, analysis_chunk,
args.ranking_statistic,
Expand All @@ -975,13 +990,13 @@ def from_cli(cls, args, num_templates, analysis_chunk, ifos):
timeslide_interval=args.timeslide_interval,
ifos=ifos,
coinc_window_pad=args.coinc_window_pad,
statistic_refresh_rate=args.statistic_refresh_rate,
**kwargs)

@staticmethod
def insert_args(parser):
from . import stat

stat.insert_statistic_option_group(parser)
pycbcstat.insert_statistic_option_group(parser)

group = parser.add_argument_group('Coincident Background Estimation')
group.add_argument('--store-background', action='store_true',
Expand Down Expand Up @@ -1374,18 +1389,63 @@ def add_singles(self, results):
valid_ifos = [k for k in results.keys() if results[k] and k in self.ifos]
if len(valid_ifos) == 0: return {}

# Add single triggers to the internal buffer
self._add_singles_to_buffer(results, ifos=valid_ifos)
with self.stat_calculator_lock:
# Add single triggers to the internal buffer
self._add_singles_to_buffer(results, ifos=valid_ifos)

# Calculate zerolag and background coincidences
_, coinc_results = self._find_coincs(results, valid_ifos=valid_ifos)
# Calculate zerolag and background coincidences
_, coinc_results = self._find_coincs(results, valid_ifos=valid_ifos)

# record if a coinc is possible in this chunk
if len(valid_ifos) == 2:
coinc_results['coinc_possible'] = True

return coinc_results

def start_refresh_thread(self):
"""
Start a thread managing whether the stat_calculator will be updated
"""
thread = threading.Thread(
target=self.refresh_statistic,
daemon=True
)
logger.info(
"Starting %s statistic refresh thread",
''.join(self.ifos),
)
thread.start()

def refresh_statistic(self):
"""
Function to refresh the stat_calculator at regular intervals
"""
while True:
# How long since the statistic was last updated?
since_stat_refresh = \
(dt.now() - self.time_stat_refreshed).total_seconds()
if since_stat_refresh > self.statistic_refresh_rate:
self.time_stat_refreshed = dt.now()
logger.info(
"Checking %s statistic for updated files",
''.join(self.ifos),
)
with self.stat_calculator_lock:
self.stat_calculator.check_update_files()
# Sleep one second for safety
timemod.sleep(1)
# Now include the time it took the check / update the statistic
since_stat_refresh = \
(dt.now() - self.time_stat_refreshed).total_seconds()
logger.debug(
"%s statistic: Waiting %.3fs for next refresh",
''.join(self.ifos),
self.statistic_refresh_rate - since_stat_refresh,
)
timemod.sleep(
self.statistic_refresh_rate - since_stat_refresh + 1
)


__all__ = [
"background_bin_from_string",
Expand Down
103 changes: 98 additions & 5 deletions pycbc/events/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
"""
import logging
import copy
import h5py
import threading
from datetime import datetime as dt
import time
import numpy as np

from pycbc.events import trigger_fits as fits, stat
from pycbc.types import MultiDetOptionAction
from pycbc import conversions as conv
from pycbc.io.hdf import HFile
from pycbc import bin_utils

logger = logging.getLogger('pycbc.events.single')
Expand All @@ -25,13 +28,58 @@ def __init__(self, ifo,
statistic=None,
sngl_ranking=None,
stat_files=None,
statistic_refresh_rate=None,
**kwargs):
"""
Parameters
----------
ifo: str
Name of the ifo that is being analyzed
newsnr_threshold: float
Minimum value for the reweighted SNR of the event under
consideration. Which reweighted SNR is defined by sngl_ranking
reduced_chisq_threshold: float
Maximum value for the reduced chisquared of the event under
consideration
duration_threshold: float
Minimum value for the duration of the template which found the
event under consideration
fit_file: str or path
(optional) the file containing information about the
single-detector event significance distribution fits
sngl_ifar_est_dist: str
Which trigger distribution to use when calculating IFAR of
single-detector events
fixed_ifar: float
(optional) give a fixed IFAR value to any event which passes the
threshold criteria
statistic: str
The name of the statistic to rank events.
sngl_ranking: str
The single detector ranking to use with the background statistic
stat_files: list of strs
List of filenames that contain information used to construct
various coincident statistics.
maximum_ifar: float
The largest inverse false alarm rate in years that we would like to
calculate.
statistic_refresh_rate: float
How regularly to run the update_files method on the statistic
class (in seconds), default not do do this
kwargs: dict
Additional options for the statistic to use. See stat.py
for more details on statistic options.
"""
self.ifo = ifo
self.fit_file = fit_file
self.sngl_ifar_est_dist = sngl_ifar_est_dist
self.fixed_ifar = fixed_ifar
self.maximum_ifar = maximum_ifar

self.time_stat_refreshed = dt.now()
self.stat_calculator_lock = threading.Lock()
self.statistic_refresh_rate = statistic_refresh_rate

stat_class = stat.get_statistic(statistic)
self.stat_calculator = stat_class(
sngl_ranking,
Expand Down Expand Up @@ -188,6 +236,7 @@ def from_cli(cls, args, ifo):
statistic=args.ranking_statistic,
sngl_ranking=args.sngl_ranking,
stat_files=stat_files,
statistic_refresh_rate=args.statistic_refresh_rate,
**kwargs
)

Expand Down Expand Up @@ -227,7 +276,9 @@ def check(self, trigs, data_reader):
trigsc['chisq_dof'] = (cut_trigs['chisq_dof'] + 2) / 2

# Calculate the ranking reweighted SNR for cutting
single_rank = self.stat_calculator.get_sngl_ranking(trigsc)
with self.stat_calculator_lock:
single_rank = self.stat_calculator.get_sngl_ranking(trigsc)

sngl_idx = single_rank > self.thresholds['ranking']
if not np.any(sngl_idx):
return None
Expand All @@ -236,8 +287,9 @@ def check(self, trigs, data_reader):
for k in trigs}

# Calculate the ranking statistic
sngl_stat = self.stat_calculator.single(cutall_trigs)
rank = self.stat_calculator.rank_stat_single((self.ifo, sngl_stat))
with self.stat_calculator_lock:
sngl_stat = self.stat_calculator.single(cutall_trigs)
rank = self.stat_calculator.rank_stat_single((self.ifo, sngl_stat))

# 'cluster' by taking the maximal statistic value over the trigger set
i = rank.argmax()
Expand Down Expand Up @@ -265,7 +317,7 @@ def calculate_ifar(self, sngl_ranking, duration):
return self.fixed_ifar[self.ifo]

try:
with h5py.File(self.fit_file, 'r') as fit_file:
with HFile(self.fit_file, 'r') as fit_file:
bin_edges = fit_file['bins_edges'][:]
live_time = fit_file[self.ifo].attrs['live_time']
thresh = fit_file.attrs['fit_threshold']
Expand Down Expand Up @@ -303,3 +355,44 @@ def calculate_ifar(self, sngl_ranking, duration):
rate_louder *= len(rates)

return min(conv.sec_to_year(1. / rate_louder), self.maximum_ifar)

def start_refresh_thread(self):
"""
Start a thread managing whether the stat_calculator will be updated
"""
thread = threading.Thread(
target=self.refresh_statistic,
daemon=True
)
logger.info("Starting %s statistic refresh thread", self.ifo)
thread.start()

def refresh_statistic(self):
"""
Function to refresh the stat_calculator at regular intervals
"""
while True:
# How long since the statistic was last updated?
since_stat_refresh = \
(dt.now() - self.time_stat_refreshed).total_seconds()
if since_stat_refresh > self.statistic_refresh_rate:
self.time_stat_refreshed = dt.now()
logger.info(
"Checking %s statistic for updated files",
self.ifo,
)
with self.stat_calculator_lock:
self.stat_calculator.check_update_files()
# Sleep one second for safety
time.sleep(1)
# Now use the time it took the check / update the statistic
since_stat_refresh = \
(dt.now() - self.time_stat_refreshed).total_seconds()
logger.debug(
"%s statistic: Waiting %.3fs for next refresh",
self.ifo,
self.statistic_refresh_rate - since_stat_refresh
)
time.sleep(
self.statistic_refresh_rate - since_stat_refresh
)
Loading

0 comments on commit 3d9f7e2

Please sign in to comment.