diff --git a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py index 546fc6d80..cc4b7fb68 100755 --- a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py +++ b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py @@ -984,9 +984,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun else: index += 1 - min_dark_time = min(dark_obstimes) - max_dark_time = max(dark_obstimes) - mid_dark_time = instrument_properties.mean_time(dark_obstimes) + if len(dark_slope_files) > 0: + min_dark_time = min(dark_obstimes) + max_dark_time = max(dark_obstimes) + mid_dark_time = instrument_properties.mean_time(dark_obstimes) # Check whether there are still enough files left to meet the threshold if illuminated_slope_files is None: diff --git a/jwql/shared_tasks/run_pipeline.py b/jwql/shared_tasks/run_pipeline.py index 0531f0337..991e08d47 100755 --- a/jwql/shared_tasks/run_pipeline.py +++ b/jwql/shared_tasks/run_pipeline.py @@ -10,6 +10,7 @@ import shutil import sys import time +import traceback from jwst import datamodels from jwst.dq_init import DQInitStep @@ -161,7 +162,8 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co with open(status_file, "a+") as status_f: status_f.write("EXCEPTION\n") status_f.write("{}\n".format(e)) - status_f.write("FAILED") + status_f.write("FAILED\n") + status_f.write(traceback.format_exc()) sys.exit(1) with open(status_file, "a+") as status_f: @@ -217,6 +219,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T params['refpix'] = dict(odd_even_rows=False) # Default CR rejection threshold is too low + params['jump'] = {} params['jump']['rejection_threshold'] = 15 # Set up to save jump step output @@ -284,7 +287,8 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T with open(status_file, "a+") as status_f: status_f.write("EXCEPTION\n") status_f.write("{}\n".format(e)) - status_f.write("FAILED") + status_f.write("FAILED\n") + status_f.write(traceback.format_exc()) sys.exit(1) with open(status_file, "a+") as status_f: @@ -344,7 +348,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T outputs = args.outputs step_args = args.step_args - status_file = os.path.join(working_path, short_name+"_status.txt") + status_file = os.path.join(working_path, short_name + "_status.txt") with open(status_file, 'w') as out_file: out_file.write("Starting Process\n") out_file.write("\tpipeline is {} ({})\n".format(pipe_type, type(pipe_type))) diff --git a/jwql/shared_tasks/shared_tasks.py b/jwql/shared_tasks/shared_tasks.py index be232935d..f076e2035 100644 --- a/jwql/shared_tasks/shared_tasks.py +++ b/jwql/shared_tasks/shared_tasks.py @@ -1,4 +1,4 @@ - #! /usr/bin/env python +#! /usr/bin/env python """This module contains code for the celery application, which is used for any demanding work which should be restricted in terms of how many iterations are run simultaneously, or @@ -204,7 +204,7 @@ def log_subprocess_output(pipe): If a subprocess STDOUT has been set to subprocess.PIPE, this function will log each line to the logging output. """ - for line in iter(pipe.readline, b''): # b'\n'-separated lines + for line in iter(pipe.readline, b''): # b'\n'-separated lines logging.info("\t{}".format(line.decode('UTF-8').strip())) @@ -224,6 +224,7 @@ def after_setup_celery_logger(logger, **kwargs): def collect_after_task(**kwargs): gc.collect() + def convert_step_args_to_string(args_dict): """Convert the nested dictionary containing pipeline step parameter keyword/value pairs to a string so that it can be passed via command line @@ -239,17 +240,17 @@ def convert_step_args_to_string(args_dict): args_str : str String representation of ``args_dict`` """ - args_str="'{" + args_str = "'{" for i, step in enumerate(args_dict): args_str += f'"{step}":' args_str += '{' for j, (param, val) in enumerate(args_dict[step].items()): args_str += f'"{param}":"{val}"' - if j < len(args_dict[step])-1: + if j < len(args_dict[step]) - 1: args_str += ', ' args_str += "}" - if i < len(args_dict)-1: + if i < len(args_dict) - 1: args_str += ',' args_str += "}'" return args_str @@ -331,7 +332,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, current_dir = os.path.dirname(__file__) cmd_name = os.path.join(current_dir, "run_pipeline.py") outputs = ",".join(ext_or_exts) - result_file = os.path.join(cal_dir, short_name+"_status.txt") + result_file = os.path.join(cal_dir, short_name + "_status.txt") if "all" in ext_or_exts: logging.info("All outputs requested") if instrument.lower() != 'miri': @@ -399,7 +400,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, set_permissions(os.path.join(output_dir, file)) logging.info("Removing local files.") - files_to_remove = glob(os.path.join(cal_dir, short_name+"*")) + files_to_remove = glob(os.path.join(cal_dir, short_name + "*")) for file_name in files_to_remove: logging.info("\tRemoving {}".format(file_name)) os.remove(file_name) @@ -408,7 +409,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, @celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump') -def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True): +def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save_fitopt=True, step_args={}): """Call ``calwebb_detector1`` on the provided file, running all steps up to the ``ramp_fit`` step, and save the result. Optionally run the ``ramp_fit`` step and save the resulting slope file as well. @@ -430,6 +431,13 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save If ``True``, the file of optional outputs from the ramp fitting step of the pipeline is saved. + step_args : dict + A dictionary containing custom arguments to supply to individual pipeline steps. + When a step is run, the dictionary will be checked for a key matching the step + name (as defined in jwql.utils.utils.get_pipeline_steps() for the provided + instrument). The value matching the step key should, itself, be a dictionary that + can be spliced in to step.call() via dereferencing (**dict) + Returns ------- jump_output : str @@ -465,11 +473,11 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save output_dir = os.path.join(config["transfer_dir"], "outgoing") cmd_name = os.path.join(os.path.dirname(__file__), "run_pipeline.py") - result_file = os.path.join(cal_dir, short_name+"_status.txt") + result_file = os.path.join(cal_dir, short_name + "_status.txt") cores = 'all' status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, input_file, - short_name, result_file, cores) + short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") @@ -484,7 +492,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save if core_fail: cores = "half" status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, - input_file, short_name, result_file, cores) + input_file, short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") managed = True @@ -498,7 +506,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save if core_fail: cores = "none" status = run_subprocess(cmd_name, "jump", "all", cal_dir, instrument, - input_file, short_name, result_file, cores) + input_file, short_name, result_file, cores, step_args) if status[-1].strip() == "SUCCEEDED": logging.info("Subprocess reports successful finish.") managed = True @@ -524,7 +532,7 @@ def calwebb_detector1_save_jump(input_file_name, instrument, ramp_fit=True, save files["fitopt_output"] = os.path.join(output_dir, file) logging.info("Removing local files.") - files_to_remove = glob(os.path.join(cal_dir, short_name+"*")) + files_to_remove = glob(os.path.join(cal_dir, short_name + "*")) for file_name in files_to_remove: logging.info("\tRemoving {}".format(file_name)) os.remove(file_name)