diff --git a/bin/pycbc_live b/bin/pycbc_live index 95b279cfa5d..763d4ef0f09 100755 --- a/bin/pycbc_live +++ b/bin/pycbc_live @@ -109,6 +109,8 @@ class LiveEventManager(object): # Keep track of which events have been uploaded self.last_few_coincs_uploaded = [] + # Give this a nominal value, it is required but not used + self.fu_cores = None if self.run_snr_optimization: # preestimate the number of CPU cores that we can afford giving # to followup processes without slowing down the main search @@ -284,9 +286,9 @@ class LiveEventManager(object): triggers[f'foreground/pvalue_{ifo}_saturated'] = pv_sat def setup_optimize_snr( - self, results, live_ifos, triggering_ifos, fname, gid + self, results, live_ifos, triggering_ifos, fname ): - """Setup and start the network SNR optimization process for a + """Setup the network SNR optimization process for a candidate event. See arXiv:2008.07494 for details. Parameters @@ -303,8 +305,6 @@ class LiveEventManager(object): File name where the candidate information has been saved. Used to name other files produced as part of setting up the SNR optimization. - gid : str - GraceDB ID of the candidate. """ template_id = results[f'foreground/{live_ifos[0]}/template_id'] p = props(self.bank.table[template_id]) @@ -360,8 +360,6 @@ class LiveEventManager(object): hdfp['ifar'] = results['foreground/ifar'] if 'p_terr' in results: hdfp['p_terr'] = results['p_terr'] - if gid is not None: - hdfp['gid'] = gid for ifo in args.channel_name: hdfp[f'channel_names/{ifo}'] = args.channel_name[ifo] @@ -391,7 +389,9 @@ class LiveEventManager(object): if self.enable_gracedb_upload: cmd += '--enable-gracedb-upload ' - cmd += '--cores {} '.format(self.fu_cores) + if self.fu_cores: + cmd += '--cores {} '.format(self.fu_cores) + if args.processing_scheme: # we will use the cores for multiple workers of the # optimization routine, so we force the processing scheme @@ -404,6 +404,32 @@ class LiveEventManager(object): opt_scheme = args.processing_scheme.split(':')[0] cmd += '--processing-scheme {}:1 '.format(opt_scheme) + # Save the command which would be used: + snroc_fname = os.path.join(out_dir_path, 'snr_optimize_command.txt') + with open(snroc_fname,'w') as snroc_file: + snroc_file.write(cmd) + + return cmd, out_dir_path + + def run_optimize_snr(self, cmd, out_dir_path, attribute_fname, gid): + """ + Run the optimize_snr instance setup up by setup_optimize_snr + + Parameters + ---------- + cmd: str + Command to tell subprocess what to run for the + optimize_snr process + out_dir_path: os.path or str + Place for storing the SNR optimization results and log + attribute_fname: str + The filename of the attributes of the candidate + gid: str + GraceDB ID of the candidate + """ + if gid is not None: + with h5py.File(attribute_fname, 'a') as hdfp: + hdfp['gid'] = gid log_fname = os.path.join(out_dir_path, 'optimize_snr.log') logging.info('running %s with log to %s', cmd, log_fname) @@ -430,7 +456,7 @@ class LiveEventManager(object): gdbargs['service_url'] = self.gracedb_server self.gracedb = GraceDb(**gdbargs) - def upload_in_thread(self, event, fname, comment, results, live_ifos, ifos, + def upload_in_thread(self, event, fname, comment, cmd, out_dir_path, upload_checks, optimize_snr_checks): gid = None if upload_checks: @@ -442,13 +468,14 @@ class LiveEventManager(object): search=self.gracedb_search, labels=self.gracedb_labels ) + if optimize_snr_checks: - self.setup_optimize_snr( - results, - live_ifos, - ifos, - fname, - gid + logging.info('Optimizing SNR for event above threshold ..') + self.run_optimize_snr( + cmd, + out_dir_path, + fname.replace('.xml.gz', '_attributes.hdf'), + gid ) def check_coincs(self, ifos, coinc_results, psds): @@ -520,17 +547,33 @@ class LiveEventManager(object): self.last_few_coincs_uploaded = \ self.last_few_coincs_uploaded[-10:] + # Save the event if not upload_checks: event.save(fname) + + # Save the event info/data and what _would_ be run for SNR optimizer, + # even if not running it - do this before the thread so no + # data buffers move on in a possible interim period + + # Which IFOs were active? + live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]] + + # Tell SNR optimized event about p_terr + if hasattr(event, 'p_terr') and event.p_terr is not None: + coinc_results['p_terr'] = event.p_terr + + # Do the optimizer setup + cmd, out_dir_path = self.setup_optimize_snr( + coinc_results, + live_ifos, + coinc_ifos, + fname, + ) + + # Now start the thread to run the SNR optimizer if upload_checks or optimize_snr_checks: - if optimize_snr_checks: - logging.info('Optimizing SNR for coinc above threshold ..') - # Tell snr optimized event about p_terr - if hasattr(event, 'p_terr') and event.p_terr is not None: - coinc_results['p_terr'] = event.p_terr - live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]] thread_args = ( - event, fname, comment, coinc_results, live_ifos, coinc_ifos, + event, fname, comment, cmd, out_dir_path, upload_checks, optimize_snr_checks ) gdb_upload_thread = threading.Thread(target=self.upload_in_thread, @@ -590,10 +633,12 @@ class LiveEventManager(object): comment = comment.format(ifo, ppdets(followup_ifos)) # Has a coinc event at this time been uploaded recently? - # If so, skip upload + # If so, skip upload - Note that this means that we _always_ + # prefer an uploaded coincident event over a single coinc_tdiffs = abs(event.merger_time - numpy.array(self.last_few_coincs_uploaded)) nearby_coincs = coinc_tdiffs < 0.1 + if any(nearby_coincs): logging.info( "Single-detector candidate at time %.3f was already " @@ -608,17 +653,38 @@ class LiveEventManager(object): self.ifar_upload_threshold < single['foreground/ifar'] and \ not any(nearby_coincs) and \ self.run_snr_optimization + + # Save the event if not upload_checks: event.save(fname) + + # Save the event info/data and what _would_ be run for SNR optimizer, + # even if not running it - do this before the thread so no + # data buffers move on in a possible interim period + + if any(nearby_coincs): + # Don't even save the snr optimization stuff for singles + # where there is already a coinc + continue + + # Which IFOs were active? + live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]] + + # Tell SNR optimized event about p_terr + if hasattr(event, 'p_terr') and event.p_terr is not None: + single['p_terr'] = event.p_terr + + # Do the optimizer setup + cmd, out_dir_path = self.setup_optimize_snr( + single, + live_ifos, + [ifo], + fname, + ) + if upload_checks or optimize_snr_checks: - if optimize_snr_checks: - logging.info('Optimizing SNR for single above threshold ..') - # Tell snr optimized event about p_terr - if hasattr(event, 'p_terr') and event.p_terr is not None: - single['p_terr'] = event.p_terr - live_ifos = [ifo for ifo in sld if 'snr_series' in sld[ifo]] thread_args = ( - event, fname, comment, single, live_ifos, [ifo], + event, fname, comment, cmd, out_dir_path, upload_checks, optimize_snr_checks ) gdb_upload_thread = threading.Thread(target=self.upload_in_thread,