Skip to content

Commit

Permalink
Allow option for workflow to end at inspiral jobs (gwastro#4612)
Browse files Browse the repository at this point in the history
* Updates to stopping after data inspiral jobs and statmap

* Updates to stopping after data inspiral jobs and statmap

* Changed typing of stop-after

* Allow pycbc_make_offline_search_workflow to stop after inspiral jobs.

* Fixed comment
  • Loading branch information
kkacanja authored and bhooshan-gadre committed Mar 4, 2024
1 parent 482e793 commit 0206ba4
Showing 1 changed file with 95 additions and 69 deletions.
164 changes: 95 additions & 69 deletions bin/workflows/pycbc_make_offline_search_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,92 @@ def ifo_combos(ifos):
for ifocomb in combinations:
yield ifocomb

def finalize(container, workflow, finalize_workflow):
# Create the final log file
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

# Create a page to contain a dashboard link
dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = {'title': "Pegasus Dashboard",
'caption': "Link to Pegasus Dashboard",
'cmd': "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)

# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])

wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(),
rdir.base))

container += workflow
container += finalize_workflow

container.add_subworkflow_dependancy(workflow, finalize_workflow)

container.save()

open_box_result_path = rdir['open_box_result']
if os.path.exists(open_box_result_path):
os.chmod(open_box_result_path, 0o0700)
else:
pass

logging.info("Written dax.")

# Close the log and flush to the html file
logging.shutdown()
with open(wf_log_file.storage_path, "r") as logfile:
logdata = logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = {'title': 'Workflow Generation Log',
'caption': "Log of the workflow script %s" % sys.argv[0],
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)

# Add the command line used to a specific file
args_to_output = [sys.argv[0]]
for arg in sys.argv[1:]:
if arg.startswith('--'):
# This is an option, add tab
args_to_output.append(' ' + arg)
else:
# This is a parameter, add two tabs
args_to_output.append(' ' + arg)

gen_str = '<pre>' + ' \\\n'.join(args_to_output) + '</pre>'
kwds = {'title': 'Workflow Generation Command',
'caption': "Command used to generate the workflow.",
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html]))
sys.exit(0)

def check_stop(job_name, container, workflow, finalize_workflow):
#This function will finalize the workflow and stop it at the job specified.
#Under [workflow] supply the option stop-after = and the job name you want the workflow to stop at. Current options are [inspiral, hdf_trigger_merge, statmap]
if workflow.cp.has_option('workflow', 'stop-after'):
stop_after = workflow.cp.get('workflow', 'stop-after')
if stop_after == job_name:
logging.info("Search worflow will be stopped after " + str(job_name))
finalize(container, workflow, finalize_workflow)
else:
pass


parser = argparse.ArgumentParser(description=__doc__[1:])
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--verbose', action='count',
Expand Down Expand Up @@ -183,10 +269,15 @@ ind_insps = insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs,
datafind_files, splitbank_files_fd,
output_dir, tags=['full_data'])

#check to see if workflow should stop at inspiral jobs.
check_stop('inspiral', container, workflow, finalize_workflow)

insps = wf.merge_single_detector_hdf_files(workflow, hdfbank,
insps, output_dir,
tags=['full_data'])

check_stop('hdf_trigger_merge', container, workflow, finalize_workflow)

# setup sngl trigger distribution fitting jobs
# 'statfiles' is list of files used in calculating statistic
# 'dqfiles' is the subset of files containing data quality information
Expand Down Expand Up @@ -264,6 +355,9 @@ for ifo in ifo_ids.keys():
workflow, hdfbank, inspcomb, statfiles, final_veto_file,
final_veto_name, output_dir, tags=ctagsngl)

#check to see if workflow should stop at statmap jobs.
check_stop('statmap', container, workflow, finalize_workflow)

ifo_sets = list(ifo_combos(ifo_ids.keys()))
if analyze_singles:
ifo_sets += [(ifo,) for ifo in ifo_ids.keys()]
Expand Down Expand Up @@ -836,72 +930,4 @@ wf.make_versioning_page(
)

############################ Finalization ####################################

# Create the final log file
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

# Create a page to contain a dashboard link
dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = { 'title' : "Pegasus Dashboard",
'caption' : "Link to Pegasus Dashboard",
'cmd' : "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)

# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])

wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(),
rdir.base))

container += workflow
container += finalize_workflow

container.add_subworkflow_dependancy(workflow, finalize_workflow)

container.save()

# Protect the open box results folder
os.chmod(rdir['open_box_result'], 0o0700)

logging.info("Written dax.")

# Close the log and flush to the html file
logging.shutdown()
with open (wf_log_file.storage_path, "r") as logfile:
logdata=logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = { 'title' : 'Workflow Generation Log',
'caption' : "Log of the workflow script %s" % sys.argv[0],
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)

# Add the command line used to a specific file
args_to_output = [sys.argv[0]]
for arg in sys.argv[1:]:
if arg.startswith('--'):
# This is an option, add tab
args_to_output.append(' ' + arg)
else:
# This is a parameter, add two tabs
args_to_output.append(' ' + arg)

gen_str = '<pre>' + ' \\\n'.join(args_to_output) + '</pre>'
kwds = { 'title' : 'Workflow Generation Command',
'caption' : "Command used to generate the workflow.",
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html]))
finalize(container, workflow, finalize_workflow)

0 comments on commit 0206ba4

Please sign in to comment.