Skip to content

Commit

Permalink
Added option to cut workflow to inspiral jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
kkacanja committed Jan 31, 2024
1 parent 6915369 commit b18a2c3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 16 deletions.
83 changes: 75 additions & 8 deletions bin/workflows/pycbc_make_offline_search_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ wf.add_workflow_settings_cli(parser)
args = parser.parse_args()

# By default, we do logging.info, each --verbose adds a level of verbosity
logging_level = args.verbose + 1 if args.verbose else 1
logging_level = args.verbose + 1 if args.verbose else logging.INFO
pycbc.init_logging(logging_level)

container = wf.Workflow(args, args.workflow_name)
Expand Down Expand Up @@ -178,6 +178,7 @@ bank_plot = wf.make_template_plot(workflow, hdfbank,
######################## Setup the FULL DATA run ##############################
output_dir = "full_data"


# setup the matchedfilter jobs
ind_insps = insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs,
datafind_files, splitbank_files_fd,
Expand All @@ -187,8 +188,76 @@ insps = wf.merge_single_detector_hdf_files(workflow, hdfbank,
insps, output_dir,
tags=['full_data'])

# Check for the only-do section. If inspiral=true then it will only do the workflow up until the merged inspiral hdf files
if 'only-do' in workflow.cp.sections():
only_do_inspiral = workflow.cp.get('only-do', 'inspiral', fallback='false').lower() == 'true'
else:
only_do_inspiral = False

if only_do_inspiral:
# Finalization code
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = {'title': "Pegasus Dashboard",
'caption': "Link to Pegasus Dashboard",
'cmd': "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)

# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])

container += workflow
container += finalize_workflow

container.add_subworkflow_dependancy(workflow, finalize_workflow)
container.save()

logging.info("Written dax.")

logging.shutdown()
with open(wf_log_file.storage_path, "r") as logfile:
logdata = logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = {'title': 'Workflow Generation Log',
'caption': "Log of the workflow script %s" % sys.argv[0],
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)

args_to_output = [sys.argv[0]]
for arg in sys.argv[1:]:
if arg.startswith('--'):
# This is an option, add tab
args_to_output.append(' ' + arg)
else:
# This is a parameter, add two tabs
args_to_output.append(' ' + arg)

gen_str = '<pre>' + ' \\\n'.join(args_to_output) + '</pre>'
kwds = {'title': 'Workflow Generation Command',
'caption': "Command used to generate the workflow.",
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html]))
sys.exit(0)


# setup sngl trigger distribution fitting jobs
# 'statfiles' is list of files used in calculating statistic
# 'statfiles' is list of files used in calculating coinc statistic
# 'dqfiles' is the subset of files containing data quality information
statfiles = []

Expand Down Expand Up @@ -454,8 +523,7 @@ for insp_file in full_insps:
wf.setup_single_det_minifollowups\
(workflow, insp_file, hdfbank, insp_files_seg_file,
data_analysed_name, trig_generated_name, 'daxes', currdir,
statfiles=wf.FileList(statfiles),
fg_file=censored_veto, fg_name='closed_box',
veto_file=censored_veto, veto_segment_name='closed_box',
tags=insp_file.tags + [subsec])

##################### COINC FULL_DATA plots ###################################
Expand Down Expand Up @@ -572,14 +640,13 @@ splitbank_files_inj = wf.setup_splittable_workflow(workflow, [hdfbank],
# setup the injection files
inj_files_base, inj_tags = wf.setup_injection_workflow(workflow,
output_dir="inj_files")

inj_files = []
for inj_file, tag in zip(inj_files_base, inj_tags):
inj_files.append(wf.inj_to_hdf(workflow, inj_file, 'inj_files', [tag]))
#load the files from the injection workflow
inj_files = inj_files_base

inj_coincs = wf.FileList()

found_inj_dict ={}

insps_dict = {}

files_for_combined_injfind = []
Expand Down
29 changes: 21 additions & 8 deletions pycbc/workflow/injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ def cut_distant_injections(workflow, inj_file, out_dir, tags=None):
return node.output_files[0]

def inj_to_hdf(workflow, inj_file, out_dir, tags=None):
""" Convert injection file to hdf format.
If the file is already PyCBC HDF format, this will just make a copy.
"""
Convert injection file to hdf format.
If the file has a PyCBC HDF format,
supply requires-format-conversion = FALSE in either [injections] or
[work
"""
if tags is None:
tags = []
Expand Down Expand Up @@ -233,7 +235,9 @@ def setup_injection_workflow(workflow, output_dir=None,
else:
workflow.add_node(node)
inj_file = node.output_files[0]
inj_files.append(inj_file)
injfile_hdf = inj_to_hdf(workflow, inj_file, output_dir, curr_tags)
inj_files.append(injfile_hdf)

elif injection_method == "PREGENERATED":
file_attrs = {
'ifos': ['HL'],
Expand All @@ -245,13 +249,22 @@ def setup_injection_workflow(workflow, output_dir=None,
"injections-pregenerated-file",
curr_tags
)
curr_file = resolve_url_to_file(injection_path, attrs=file_attrs)
inj_files.append(curr_file)
requires_conversion = workflow.cp.get_opt_tags(
"workflow-injections",
"requires-format-conversion",
curr_tags
)
file = resolve_url_to_file(injection_path, attrs=file_attrs)
if requires_conversion == "TRUE":
logging.info(' Injection file conversion initiated')
injfile_hdf = inj_to_hdf(workflow, file, output_dir, curr_tags)
else:
inj_files.append(file)

else:
err = "Injection method must be one of IN_WORKFLOW, "
err = "Injection method must be one of IN_WORKFLOW."
err += "AT_RUNTIME or PREGENERATED. Got %s." % (injection_method)
raise ValueError(err)

inj_tags.append(inj_tag)

logging.info("Leaving injection module.")
Expand Down

0 comments on commit b18a2c3

Please sign in to comment.