Skip to content

Commit

Permalink
Merge pull request #1358 from bhilbert4/badpix-monitor-step-args
Browse files Browse the repository at this point in the history
Add step_args to calwebb_detector1_save_jump
  • Loading branch information
mfixstsci authored Oct 19, 2023
2 parents b09eff7 + 481aa25 commit 848a0aa
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
7 changes: 4 additions & 3 deletions jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
else:
index += 1

min_dark_time = min(dark_obstimes)
max_dark_time = max(dark_obstimes)
mid_dark_time = instrument_properties.mean_time(dark_obstimes)
if len(dark_slope_files) > 0:
min_dark_time = min(dark_obstimes)
max_dark_time = max(dark_obstimes)
mid_dark_time = instrument_properties.mean_time(dark_obstimes)

# Check whether there are still enough files left to meet the threshold
if illuminated_slope_files is None:
Expand Down
10 changes: 7 additions & 3 deletions jwql/shared_tasks/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import sys
import time
import traceback

from jwst import datamodels
from jwst.dq_init import DQInitStep
Expand Down Expand Up @@ -161,7 +162,8 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
with open(status_file, "a+") as status_f:
status_f.write("EXCEPTION\n")
status_f.write("{}\n".format(e))
status_f.write("FAILED")
status_f.write("FAILED\n")
status_f.write(traceback.format_exc())
sys.exit(1)

with open(status_file, "a+") as status_f:
Expand Down Expand Up @@ -217,6 +219,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
params['refpix'] = dict(odd_even_rows=False)

# Default CR rejection threshold is too low
params['jump'] = {}
params['jump']['rejection_threshold'] = 15

# Set up to save jump step output
Expand Down Expand Up @@ -284,7 +287,8 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
with open(status_file, "a+") as status_f:
status_f.write("EXCEPTION\n")
status_f.write("{}\n".format(e))
status_f.write("FAILED")
status_f.write("FAILED\n")
status_f.write(traceback.format_exc())
sys.exit(1)

with open(status_file, "a+") as status_f:
Expand Down Expand Up @@ -344,7 +348,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
outputs = args.outputs
step_args = args.step_args

status_file = os.path.join(working_path, short_name+"_status.txt")
status_file = os.path.join(working_path, short_name + "_status.txt")
with open(status_file, 'w') as out_file:
out_file.write("Starting Process\n")
out_file.write("\tpipeline is {} ({})\n".format(pipe_type, type(pipe_type)))
Expand Down
34 changes: 21 additions & 13 deletions jwql/shared_tasks/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#! /usr/bin/env python
#! /usr/bin/env python

"""This module contains code for the celery application, which is used for any demanding
work which should be restricted in terms of how many iterations are run simultaneously, or
Expand Down Expand Up @@ -204,7 +204,7 @@ def log_subprocess_output(pipe):
If a subprocess STDOUT has been set to subprocess.PIPE, this function will log each
line to the logging output.
"""
for line in iter(pipe.readline, b''): # b'\n'-separated lines
for line in iter(pipe.readline, b''): # b'\n'-separated lines
logging.info("\t{}".format(line.decode('UTF-8').strip()))


Expand All @@ -224,6 +224,7 @@ def after_setup_celery_logger(logger, **kwargs):
def collect_after_task(**kwargs):
gc.collect()


def convert_step_args_to_string(args_dict):
"""Convert the nested dictionary containing pipeline step parameter keyword/value pairs
to a string so that it can be passed via command line
Expand All @@ -239,17 +240,17 @@ def convert_step_args_to_string(args_dict):
args_str : str
String representation of ``args_dict``
"""
args_str="'{"
args_str = "'{"

for i, step in enumerate(args_dict):
args_str += f'"{step}":'
args_str += '{'
for j, (param, val) in enumerate(args_dict[step].items()):
args_str += f'"{param}":"{val}"'
if j < len(args_dict[step])-1:
if j < len(args_dict[step]) - 1:
args_str += ', '
args_str += "}"
if i < len(args_dict)-1:
if i < len(args_dict) - 1:
args_str += ','
args_str += "}'"
return args_str
Expand Down Expand Up @@ -331,7 +332,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
current_dir = os.path.dirname(__file__)
cmd_name = os.path.join(current_dir, "run_pipeline.py")
outputs = ",".join(ext_or_exts)
result_file = os.path.join(cal_dir, short_name+"_status.txt")
result_file = os.path.join(cal_dir, short_name + "_status.txt")
if "all" in ext_or_exts:
logging.info("All outputs requested")
if instrument.lower() != 'miri':
Expand Down Expand Up @@ -399,7 +400,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
set_permissions(os.path.join(output_dir, file))

logging.info("Removing local files.")
files_to_remove = glob(os.path.join(cal_dir, short_name+"*"))
files_to_remove = glob(os.path.join(cal_dir, short_name + "*"))
for file_name in files_to_remove:
logging.info("\tRemoving {}".format(file_name))
os.remove(file_name)
Expand All @@ -408,7 +409,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,


@celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump')
def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True):
def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True, step_args={}):
"""Call ``calwebb_detector1`` on the provided file, running all
steps up to the ``ramp_fit`` step, and save the result. Optionally
run the ``ramp_fit`` step and save the resulting slope file as well.
Expand All @@ -430,6 +431,13 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
If ``True``, the file of optional outputs from the ramp fitting
step of the pipeline is saved.
step_args : dict
A dictionary containing custom arguments to supply to individual pipeline steps.
When a step is run, the dictionary will be checked for a key matching the step
name (as defined in jwql.utils.utils.get_pipeline_steps() for the provided
instrument). The value matching the step key should, itself, be a dictionary that
can be spliced in to step.call() via dereferencing (**dict)
Returns
-------
jump_output : str
Expand Down Expand Up @@ -465,11 +473,11 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
output_dir = os.path.join(config["transfer_dir"], "outgoing")

cmd_name = os.path.join(os.path.dirname(__file__), "run_pipeline.py")
result_file = os.path.join(cal_dir, short_name+"_status.txt")
result_file = os.path.join(cal_dir, short_name + "_status.txt")

cores = 'all'
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, input_file,
short_name, result_file, cores)
short_name, result_file, cores, step_args)

if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
Expand All @@ -484,7 +492,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
if core_fail:
cores = "half"
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument,
input_file, short_name, result_file, cores)
input_file, short_name, result_file, cores, step_args)
if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
managed = True
Expand All @@ -498,7 +506,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
if core_fail:
cores = "none"
status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument,
input_file, short_name, result_file, cores)
input_file, short_name, result_file, cores, step_args)
if status[-1].strip() == "SUCCEEDED":
logging.info("Subprocess reports successful finish.")
managed = True
Expand All @@ -524,7 +532,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save
files["fitopt_output"] = os.path.join(output_dir, file)

logging.info("Removing local files.")
files_to_remove = glob(os.path.join(cal_dir, short_name+"*"))
files_to_remove = glob(os.path.join(cal_dir, short_name + "*"))
for file_name in files_to_remove:
logging.info("\tRemoving {}".format(file_name))
os.remove(file_name)
Expand Down

0 comments on commit 848a0aa

Please sign in to comment.