Skip to content

Commit

Permalink
Threading race condition bugfix for optimize_snr in pycbc_live (#4342)
Browse files Browse the repository at this point in the history
* setup snr_opt before threading

* Some minor tweaks, setup optimize_snr even if not performing it

* Dont open file unless using it

* TDC comments

* clarify comments

* bug

---------

Co-authored-by: GarethCabournDavies <gareth.cabourndavies@ligo.org>
  • Loading branch information
xangma and GarethCabournDavies authored Aug 8, 2023
1 parent 32e0b90 commit bfff82b
Showing 1 changed file with 95 additions and 29 deletions.
124 changes: 95 additions & 29 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class LiveEventManager(object):
# Keep track of which events have been uploaded
self.last_few_coincs_uploaded = []

# Give this a nominal value, it is required but not used
self.fu_cores = None
if self.run_snr_optimization:
# preestimate the number of CPU cores that we can afford giving
# to followup processes without slowing down the main search
Expand Down Expand Up @@ -284,9 +286,9 @@ class LiveEventManager(object):
triggers[f'foreground/pvalue_{ifo}_saturated'] = pv_sat

def setup_optimize_snr(
self, results, live_ifos, triggering_ifos, fname, gid
self, results, live_ifos, triggering_ifos, fname
):
"""Setup and start the network SNR optimization process for a
"""Setup the network SNR optimization process for a
candidate event. See arXiv:2008.07494 for details.
Parameters
Expand All @@ -303,8 +305,6 @@ class LiveEventManager(object):
File name where the candidate information has been saved.
Used to name other files produced as part of setting up the
SNR optimization.
gid : str
GraceDB ID of the candidate.
"""
template_id = results[f'foreground/{live_ifos[0]}/template_id']
p = props(self.bank.table[template_id])
Expand Down Expand Up @@ -360,8 +360,6 @@ class LiveEventManager(object):
hdfp['ifar'] = results['foreground/ifar']
if 'p_terr' in results:
hdfp['p_terr'] = results['p_terr']
if gid is not None:
hdfp['gid'] = gid

for ifo in args.channel_name:
hdfp[f'channel_names/{ifo}'] = args.channel_name[ifo]
Expand Down Expand Up @@ -391,7 +389,9 @@ class LiveEventManager(object):
if self.enable_gracedb_upload:
cmd += '--enable-gracedb-upload '

cmd += '--cores {} '.format(self.fu_cores)
if self.fu_cores:
cmd += '--cores {} '.format(self.fu_cores)

if args.processing_scheme:
# we will use the cores for multiple workers of the
# optimization routine, so we force the processing scheme
Expand All @@ -404,6 +404,32 @@ class LiveEventManager(object):
opt_scheme = args.processing_scheme.split(':')[0]
cmd += '--processing-scheme {}:1 '.format(opt_scheme)

# Save the command which would be used:
snroc_fname = os.path.join(out_dir_path, 'snr_optimize_command.txt')
with open(snroc_fname,'w') as snroc_file:
snroc_file.write(cmd)

return cmd, out_dir_path

def run_optimize_snr(self, cmd, out_dir_path, attribute_fname, gid):
"""
Run the optimize_snr instance setup up by setup_optimize_snr
Parameters
----------
cmd: str
Command to tell subprocess what to run for the
optimize_snr process
out_dir_path: os.path or str
Place for storing the SNR optimization results and log
attribute_fname: str
The filename of the attributes of the candidate
gid: str
GraceDB ID of the candidate
"""
if gid is not None:
with h5py.File(attribute_fname, 'a') as hdfp:
hdfp['gid'] = gid
log_fname = os.path.join(out_dir_path, 'optimize_snr.log')

logging.info('running %s with log to %s', cmd, log_fname)
Expand All @@ -430,7 +456,7 @@ class LiveEventManager(object):
gdbargs['service_url'] = self.gracedb_server
self.gracedb = GraceDb(**gdbargs)

def upload_in_thread(self, event, fname, comment, results, live_ifos, ifos,
def upload_in_thread(self, event, fname, comment, cmd, out_dir_path,
upload_checks, optimize_snr_checks):
gid = None
if upload_checks:
Expand All @@ -442,13 +468,14 @@ class LiveEventManager(object):
search=self.gracedb_search,
labels=self.gracedb_labels
)

if optimize_snr_checks:
self.setup_optimize_snr(
results,
live_ifos,
ifos,
fname,
gid
logging.info('Optimizing SNR for event above threshold ..')
self.run_optimize_snr(
cmd,
out_dir_path,
fname.replace('.xml.gz', '_attributes.hdf'),
gid
)

def check_coincs(self, ifos, coinc_results, psds):
Expand Down Expand Up @@ -520,17 +547,33 @@ class LiveEventManager(object):
self.last_few_coincs_uploaded = \
self.last_few_coincs_uploaded[-10:]

# Save the event
if not upload_checks:
event.save(fname)

# Save the event info/data and what _would_ be run for SNR optimizer,
# even if not running it - do this before the thread so no
# data buffers move on in a possible interim period

# Which IFOs were active?
live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]

# Tell SNR optimized event about p_terr
if hasattr(event, 'p_terr') and event.p_terr is not None:
coinc_results['p_terr'] = event.p_terr

# Do the optimizer setup
cmd, out_dir_path = self.setup_optimize_snr(
coinc_results,
live_ifos,
coinc_ifos,
fname,
)

# Now start the thread to run the SNR optimizer
if upload_checks or optimize_snr_checks:
if optimize_snr_checks:
logging.info('Optimizing SNR for coinc above threshold ..')
# Tell snr optimized event about p_terr
if hasattr(event, 'p_terr') and event.p_terr is not None:
coinc_results['p_terr'] = event.p_terr
live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]
thread_args = (
event, fname, comment, coinc_results, live_ifos, coinc_ifos,
event, fname, comment, cmd, out_dir_path,
upload_checks, optimize_snr_checks
)
gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
Expand Down Expand Up @@ -590,10 +633,12 @@ class LiveEventManager(object):
comment = comment.format(ifo, ppdets(followup_ifos))

# Has a coinc event at this time been uploaded recently?
# If so, skip upload
# If so, skip upload - Note that this means that we _always_
# prefer an uploaded coincident event over a single
coinc_tdiffs = abs(event.merger_time -
numpy.array(self.last_few_coincs_uploaded))
nearby_coincs = coinc_tdiffs < 0.1

if any(nearby_coincs):
logging.info(
"Single-detector candidate at time %.3f was already "
Expand All @@ -608,17 +653,38 @@ class LiveEventManager(object):
self.ifar_upload_threshold < single['foreground/ifar'] and \
not any(nearby_coincs) and \
self.run_snr_optimization

# Save the event
if not upload_checks:
event.save(fname)

# Save the event info/data and what _would_ be run for SNR optimizer,
# even if not running it - do this before the thread so no
# data buffers move on in a possible interim period

if any(nearby_coincs):
# Don't even save the snr optimization stuff for singles
# where there is already a coinc
continue

# Which IFOs were active?
live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]

# Tell SNR optimized event about p_terr
if hasattr(event, 'p_terr') and event.p_terr is not None:
single['p_terr'] = event.p_terr

# Do the optimizer setup
cmd, out_dir_path = self.setup_optimize_snr(
single,
live_ifos,
[ifo],
fname,
)

if upload_checks or optimize_snr_checks:
if optimize_snr_checks:
logging.info('Optimizing SNR for single above threshold ..')
# Tell snr optimized event about p_terr
if hasattr(event, 'p_terr') and event.p_terr is not None:
single['p_terr'] = event.p_terr
live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]]
thread_args = (
event, fname, comment, single, live_ifos, [ifo],
event, fname, comment, cmd, out_dir_path,
upload_checks, optimize_snr_checks
)
gdb_upload_thread = threading.Thread(target=self.upload_in_thread,
Expand Down

0 comments on commit bfff82b

Please sign in to comment.