Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Issue 1354 (random pipeline failures) #1355

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions jwql/shared_tasks/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,27 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
status_f.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir)))
status_f.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file)))
status_f.write(f"\t outputs is {outputs}\n")
sys.stderr.write("Running run_pipe\n")
sys.stderr.write("\t input_file_basename is {} ({})\n".format(input_file_basename, type(input_file_basename)))
sys.stderr.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir)))
sys.stderr.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file)))
sys.stderr.write(f"\t outputs is {outputs}\n")

try:
sys.stderr.write("Copying file {} to working directory.\n".format(input_file))
copy_files([input_file], work_directory)
sys.stderr.write("Setting permissions on {}\n".format(uncal_file))
set_permissions(uncal_file)

steps = get_pipeline_steps(instrument)
sys.stderr.write("Pipeline steps initialized to {}\n".format(steps))

# If the input file is a file other than uncal.fits, then we may only need to run a
# subset of steps. Check the completed steps in the input file. Find the latest step
# that has been completed, and skip that plus all prior steps
if 'uncal' not in input_file:
completed_steps = completed_pipeline_steps(input_file)
sys.stderr.write("Steps {} already completed.\n".format(completed_steps))

# Reverse the boolean value, so that now steps answers the question: "Do we need
# to run this step?""
Expand All @@ -81,18 +90,21 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co

for step in steps:
if not steps[step]:
sys.stderr.write("Setting last_run to {}.\n".format(step))
last_run = deepcopy(step)

for step in steps:
if step == last_run:
break
if step != last_run:
sys.stderr.write("Setting {} to skip while looking for last_run.\n".format(step))
steps[step] = False

# Set any steps the user specifically asks to skip
for step, step_dict in step_args.items():
if 'skip' in step_dict:
if step_dict['skip']:
sys.stderr.write("Setting step {} to skip by user request.\n".format(step))
steps[step] = False

# Run each specified step
Expand Down