diff --git a/bin/workflows/pycbc_make_offline_search_workflow b/bin/workflows/pycbc_make_offline_search_workflow index ac96e5a9815..25fc1e627ba 100755 --- a/bin/workflows/pycbc_make_offline_search_workflow +++ b/bin/workflows/pycbc_make_offline_search_workflow @@ -57,6 +57,92 @@ def ifo_combos(ifos): for ifocomb in combinations: yield ifocomb +def finalize(container, workflow, finalize_workflow): + # Create the final log file + log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time, + extension='.html', directory=rdir['workflow']) + + gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time, + extension='.html', directory=rdir['workflow']) + + # Create a page to contain a dashboard link + dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time, + extension='.html', directory=rdir['workflow']) + dashboard_str = """

Pegasus Dashboard Page

""" + kwds = {'title': "Pegasus Dashboard", + 'caption': "Link to Pegasus Dashboard", + 'cmd': "PYCBC_SUBMIT_DAX_ARGV", } + save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds) + + # Create pages for the submission script to write data + wf.makedir(rdir['workflow/dax']) + wf.makedir(rdir['workflow/input_map']) + wf.makedir(rdir['workflow/output_map']) + wf.makedir(rdir['workflow/planning']) + + wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(), + rdir.base)) + + container += workflow + container += finalize_workflow + + container.add_subworkflow_dependancy(workflow, finalize_workflow) + + container.save() + + open_box_result_path = rdir['open_box_result'] + if os.path.exists(open_box_result_path): + os.chmod(open_box_result_path, 0o0700) + else: + pass + + logging.info("Written dax.") + + # Close the log and flush to the html file + logging.shutdown() + with open(wf_log_file.storage_path, "r") as logfile: + logdata = logfile.read() + log_str = """ +

Workflow generation script created workflow in output directory: %s

+

Workflow name is: %s

+

Workflow generation script run on host: %s

+
%s
+ """ % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata) + kwds = {'title': 'Workflow Generation Log', + 'caption': "Log of the workflow script %s" % sys.argv[0], + 'cmd': ' '.join(sys.argv), } + save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds) + + # Add the command line used to a specific file + args_to_output = [sys.argv[0]] + for arg in sys.argv[1:]: + if arg.startswith('--'): + # This is an option, add tab + args_to_output.append(' ' + arg) + else: + # This is a parameter, add two tabs + args_to_output.append(' ' + arg) + + gen_str = '
' + ' \\\n'.join(args_to_output) + '
' + kwds = {'title': 'Workflow Generation Command', + 'caption': "Command used to generate the workflow.", + 'cmd': ' '.join(sys.argv), } + save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds) + layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html])) + sys.exit(0) + +def check_stop(job_name, container, workflow, finalize_workflow): + #This function will finalize the workflow and stop it at the job specified. + #Under [workflow] supply the option stop-after = and the job name you want the workflow to stop at. Current options are [inspiral, hdf_trigger_merge, statmap] + if workflow.cp.has_option('workflow', 'stop-after'): + stop_after = workflow.cp.get('workflow', 'stop-after') + if stop_after == job_name: + logging.info("Search worflow will be stopped after " + str(job_name)) + finalize(container, workflow, finalize_workflow) + else: + pass + + parser = argparse.ArgumentParser(description=__doc__[1:]) parser.add_argument('--version', action='version', version=__version__) parser.add_argument('--verbose', action='count', @@ -183,10 +269,15 @@ ind_insps = insps = wf.setup_matchedfltr_workflow(workflow, analyzable_segs, datafind_files, splitbank_files_fd, output_dir, tags=['full_data']) +#check to see if workflow should stop at inspiral jobs. +check_stop('inspiral', container, workflow, finalize_workflow) + insps = wf.merge_single_detector_hdf_files(workflow, hdfbank, insps, output_dir, tags=['full_data']) +check_stop('hdf_trigger_merge', container, workflow, finalize_workflow) + # setup sngl trigger distribution fitting jobs # 'statfiles' is list of files used in calculating statistic # 'dqfiles' is the subset of files containing data quality information @@ -264,6 +355,9 @@ for ifo in ifo_ids.keys(): workflow, hdfbank, inspcomb, statfiles, final_veto_file, final_veto_name, output_dir, tags=ctagsngl) +#check to see if workflow should stop at statmap jobs. +check_stop('statmap', container, workflow, finalize_workflow) + ifo_sets = list(ifo_combos(ifo_ids.keys())) if analyze_singles: ifo_sets += [(ifo,) for ifo in ifo_ids.keys()] @@ -836,72 +930,4 @@ wf.make_versioning_page( ) ############################ Finalization #################################### - -# Create the final log file -log_file_html = wf.File(workflow.ifos, 'WORKFLOW-LOG', workflow.analysis_time, - extension='.html', directory=rdir['workflow']) - -gen_file_html = wf.File(workflow.ifos, 'WORKFLOW-GEN', workflow.analysis_time, - extension='.html', directory=rdir['workflow']) - -# Create a page to contain a dashboard link -dashboard_file = wf.File(workflow.ifos, 'DASHBOARD', workflow.analysis_time, - extension='.html', directory=rdir['workflow']) -dashboard_str = """

Pegasus Dashboard Page

""" -kwds = { 'title' : "Pegasus Dashboard", - 'caption' : "Link to Pegasus Dashboard", - 'cmd' : "PYCBC_SUBMIT_DAX_ARGV", } -save_fig_with_metadata(dashboard_str, dashboard_file.storage_path, **kwds) - -# Create pages for the submission script to write data -wf.makedir(rdir['workflow/dax']) -wf.makedir(rdir['workflow/input_map']) -wf.makedir(rdir['workflow/output_map']) -wf.makedir(rdir['workflow/planning']) - -wf.make_results_web_page(finalize_workflow, os.path.join(os.getcwd(), - rdir.base)) - -container += workflow -container += finalize_workflow - -container.add_subworkflow_dependancy(workflow, finalize_workflow) - -container.save() - -# Protect the open box results folder -os.chmod(rdir['open_box_result'], 0o0700) - -logging.info("Written dax.") - -# Close the log and flush to the html file -logging.shutdown() -with open (wf_log_file.storage_path, "r") as logfile: - logdata=logfile.read() -log_str = """ -

Workflow generation script created workflow in output directory: %s

-

Workflow name is: %s

-

Workflow generation script run on host: %s

-
%s
-""" % (os.getcwd(), args.workflow_name, socket.gethostname(), logdata) -kwds = { 'title' : 'Workflow Generation Log', - 'caption' : "Log of the workflow script %s" % sys.argv[0], - 'cmd' :' '.join(sys.argv), } -save_fig_with_metadata(log_str, log_file_html.storage_path, **kwds) - -# Add the command line used to a specific file -args_to_output = [sys.argv[0]] -for arg in sys.argv[1:]: - if arg.startswith('--'): - # This is an option, add tab - args_to_output.append(' ' + arg) - else: - # This is a parameter, add two tabs - args_to_output.append(' ' + arg) - -gen_str = '
' + ' \\\n'.join(args_to_output) + '
' -kwds = { 'title' : 'Workflow Generation Command', - 'caption' : "Command used to generate the workflow.", - 'cmd' :' '.join(sys.argv), } -save_fig_with_metadata(gen_str, gen_file_html.storage_path, **kwds) -layout.single_layout(rdir['workflow'], ([dashboard_file, gen_file_html, log_file_html])) +finalize(container, workflow, finalize_workflow)