Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow option for workflow to end at inspiral jobs #4612

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 95 additions & 69 deletions bin/workflows/pycbc_make_offline_search_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,92 @@ def ifo_combos(ifos):
for ifocomb in combinations:
yield ifocomb

def finalize(container, workflow, finalize_workflow):
# Create the final log file
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

# Create a page to contain a dashboard link
dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = {'title': "Pegasus Dashboard",
'caption': "Link to Pegasus Dashboard",
'cmd': "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)

# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])

wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(),
rdir.base))

container += workflow
container += finalize_workflow

container.add_subworkflow_dependancy(workflow, finalize_workflow)

container.save()

open_box_result_path = rdir['open_box_result']
if os.path.exists(open_box_result_path):
os.chmod(open_box_result_path, 0o0700)
else:
pass

logging.info("Written dax.")

# Close the log and flush to the html file
logging.shutdown()
with open(wf_log_file.storage_path, "r") as logfile:
logdata = logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = {'title': 'Workflow Generation Log',
'caption': "Log of the workflow script %s" % sys.argv[0],
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)

# Add the command line used to a specific file
args_to_output = [sys.argv[0]]
for arg in sys.argv[1:]:
if arg.startswith('--'):
# This is an option, add tab
args_to_output.append(' ' + arg)
else:
# This is a parameter, add two tabs
args_to_output.append(' ' + arg)

gen_str = '<pre>' + ' \\\n'.join(args_to_output) + '</pre>'
kwds = {'title': 'Workflow Generation Command',
'caption': "Command used to generate the workflow.",
'cmd': ' '.join(sys.argv), }
save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html]))
sys.exit(0)

def check_stop(job_name, container, workflow, finalize_workflow):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to add a short comment here to explain what the function does.

#This function will finalize the workflow and stop it at the job specified.
#Under [workflow] supply the option stop-after = and the job name you want the workflow to stop at. Current options are [inspiral, hdf_trigger_merge, statmap]
if workflow.cp.has_option('workflow', 'stop-after'):
stop_after = workflow.cp.get('workflow', 'stop-after')
if stop_after == job_name:
logging.info("Search worflow will be stopped after " + str(job_name))
finalize(container, workflow, finalize_workflow)
else:
pass


parser = argparse.ArgumentParser(description=__doc__[1:])
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--verbose', action='count',
Expand Down Expand Up @@ -183,10 +269,15 @@ ind_insps = insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs,
datafind_files, splitbank_files_fd,
output_dir, tags=['full_data'])

#check to see if workflow should stop at inspiral jobs.
check_stop('inspiral', container, workflow, finalize_workflow)

insps = wf.merge_single_detector_hdf_files(workflow, hdfbank,
insps, output_dir,
tags=['full_data'])

check_stop('hdf_trigger_merge', container, workflow, finalize_workflow)

# setup sngl trigger distribution fitting jobs
# 'statfiles' is list of files used in calculating statistic
# 'dqfiles' is the subset of files containing data quality information
Expand Down Expand Up @@ -264,6 +355,9 @@ for ifo in ifo_ids.keys():
workflow, hdfbank, inspcomb, statfiles, final_veto_file,
final_veto_name, output_dir, tags=ctagsngl)

#check to see if workflow should stop at statmap jobs.
check_stop('statmap', container, workflow, finalize_workflow)

ifo_sets = list(ifo_combos(ifo_ids.keys()))
if analyze_singles:
ifo_sets += [(ifo,) for ifo in ifo_ids.keys()]
Expand Down Expand Up @@ -836,72 +930,4 @@ wf.make_versioning_page(
)

############################ Finalization ####################################

# Create the final log file
log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])

# Create a page to contain a dashboard link
dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time,
extension='.html', directory=rdir['workflow'])
dashboard_str = """<center><p style="font-size:20px"><b><a href="PEGASUS_DASHBOARD_URL" target="_blank">Pegasus Dashboard Page</a></b></p></center>"""
kwds = { 'title' : "Pegasus Dashboard",
'caption' : "Link to Pegasus Dashboard",
'cmd' : "PYCBC_SUBMIT_DAX_ARGV", }
save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds)

# Create pages for the submission script to write data
wf.makedir(rdir['workflow/dax'])
wf.makedir(rdir['workflow/input_map'])
wf.makedir(rdir['workflow/output_map'])
wf.makedir(rdir['workflow/planning'])

wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(),
rdir.base))

container += workflow
container += finalize_workflow

container.add_subworkflow_dependancy(workflow, finalize_workflow)

container.save()

# Protect the open box results folder
os.chmod(rdir['open_box_result'], 0o0700)

logging.info("Written dax.")

# Close the log and flush to the html file
logging.shutdown()
with open (wf_log_file.storage_path, "r") as logfile:
logdata=logfile.read()
log_str = """
<p>Workflow generation script created workflow in output directory: %s</p>
<p>Workflow name is: %s</p>
<p>Workflow generation script run on host: %s</p>
<pre>%s</pre>
""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata)
kwds = { 'title' : 'Workflow Generation Log',
'caption' : "Log of the workflow script %s" % sys.argv[0],
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds)

# Add the command line used to a specific file
args_to_output = [sys.argv[0]]
for arg in sys.argv[1:]:
if arg.startswith('--'):
# This is an option, add tab
args_to_output.append(' ' + arg)
else:
# This is a parameter, add two tabs
args_to_output.append(' ' + arg)

gen_str = '<pre>' + ' \\\n'.join(args_to_output) + '</pre>'
kwds = { 'title' : 'Workflow Generation Command',
'caption' : "Command used to generate the workflow.",
'cmd' :' '.join(sys.argv), }
save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds)
layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html]))
finalize(container, workflow, finalize_workflow)
Loading