Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving monitor outputs used by the server to central storage #1397

Merged
merged 6 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jwql/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"mast_base_url" : "",
"mast_request_url": "",
"outputs" : "",
"working": "",
"preview_image_filesystem" : "",
"filesystem" : "",
"setup_file" : "",
Expand Down
40 changes: 22 additions & 18 deletions jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ def map_uncal_and_rate_file_lists(self, uncal_files, rate_files, rate_files_to_c
the rate file failed)
"""
# Copy files from filesystem
uncal_copied_files, uncal_not_copied = copy_files(uncal_files, self.data_dir)
rate_copied_files, rate_not_copied = copy_files(rate_files_to_copy, self.data_dir)
uncal_copied_files, uncal_not_copied = copy_files(uncal_files, self.working_data_dir)
rate_copied_files, rate_not_copied = copy_files(rate_files_to_copy, self.working_data_dir)

# Set any rate files that failed to copy to None so
# that we can regenerate them
Expand All @@ -677,7 +677,7 @@ def map_uncal_and_rate_file_lists(self, uncal_files, rate_files, rate_files_to_c
del rate_files[bad_index]

logging.info('\tNew {} observations: '.format(obs_type))
logging.info('\tData dir: {}'.format(self.data_dir))
logging.info('\tData dir: {}'.format(self.working_data_dir))
logging.info('\tCopied to data dir: {}'.format(uncal_copied_files))
logging.info('\tNot copied (failed, or missing from filesystem): {}'.format(uncal_not_copied))

Expand Down Expand Up @@ -800,10 +800,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
self.get_metadata(uncal_file)
if rate_file == 'None':
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file))
logging.info('Calling pipeline for {}'.format(uncal_file))
logging.info("Copying raw file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
logging.info("Copying raw file to {}".format(self.working_data_dir))
copy_files([uncal_file], self.working_data_dir)
if hasattr(self, 'nints') and self.nints > 1:
out_exts[short_name] = ['jump', '1_ramp_fit']
needs_calibration = False
Expand All @@ -817,14 +817,14 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
else:
logging.info("\tRate file found for {}".format(uncal_file))
if os.path.isfile(rate_file):
copy_files([rate_file], self.data_dir)
copy_files([rate_file], self.working_data_dir)
else:
logging.warning("\tRate file {} doesn't actually exist".format(rate_file))
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file))
logging.info('Calling pipeline for {}'.format(uncal_file))
logging.info("Copying raw file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
logging.info("Copying raw file to {}".format(self.working_data_dir))
copy_files([uncal_file], self.working_data_dir)
if hasattr(self, 'nints') and self.nints > 1:
out_exts[short_name] = ['jump', '1_ramp_fit']
needs_calibration = False
Expand All @@ -845,7 +845,7 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
logging.info("Checking files post-calibration")
for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files):
logging.info("\tChecking files {}, {}".format(os.path.basename(uncal_file), os.path.basename(rate_file)))
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file))
if local_uncal_file in outputs:
logging.info("\t\tAdding calibrated file.")
illuminated_slope_files[index] = deepcopy(outputs[local_uncal_file][1])
Expand Down Expand Up @@ -907,10 +907,10 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
logging.info("Checking dark file {} with rate file {}".format(uncal_file, rate_file))
self.get_metadata(uncal_file)
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file))
if not os.path.isfile(local_uncal_file):
logging.info("\tCopying raw file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
logging.info("\tCopying raw file to {}".format(self.working_data_dir))
copy_files([uncal_file], self.working_data_dir)
if hasattr(self, 'nints') and self.nints > 1:
out_exts[short_name] = ['jump', 'fitopt', '1_ramp_fit']
local_processed_files = [local_uncal_file.replace("uncal", x) for x in out_exts[short_name]]
Expand Down Expand Up @@ -938,7 +938,7 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
logging.info("Checking files post-calibration")
for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files):
logging.info("\tChecking files {}, {}".format(uncal_file, rate_file))
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
local_uncal_file = os.path.join(self.working_data_dir, os.path.basename(uncal_file))
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
if local_uncal_file in outputs:
logging.info("\t\tAdding calibrated files")
Expand Down Expand Up @@ -1090,7 +1090,7 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
raise ValueError("Unrecognized type of bad pixel: {}. Cannot update database table.".format(bad_type))

# Remove raw files, rate files, and pipeline products in order to save disk space
files_to_remove = glob(f'{self.data_dir}/*.fits')
files_to_remove = glob(f'{self.working_data_dir}/*.fits')
for filename in files_to_remove:
os.remove(filename)

Expand All @@ -1110,6 +1110,7 @@ def run(self):
logging.info('Begin logging for bad_pixel_monitor')

# Get the output directory
self.working_dir = os.path.join(get_config()['working'], 'bad_pixel_monitor')
york-stsci marked this conversation as resolved.
Show resolved Hide resolved
self.output_dir = os.path.join(get_config()['outputs'], 'bad_pixel_monitor')

# Read in config file that defines the thresholds for the number
Expand Down Expand Up @@ -1251,9 +1252,12 @@ def run(self):
dark_uncal_files, dark_rate_files, dark_rate_files_to_copy = None, None, None

# Set up directories for the copied data
ensure_dir_exists(os.path.join(self.working_dir, 'data'))
ensure_dir_exists(os.path.join(self.output_dir, 'data'))
self.data_dir = os.path.join(self.output_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
ensure_dir_exists(self.data_dir)
self.working_data_dir = os.path.join(self.working_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
self.output_data_dir = os.path.join(self.output_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
ensure_dir_exists(self.working_data_dir)
ensure_dir_exists(self.output_data_dir)

# Copy files from filesystem
if run_flats:
Expand Down
15 changes: 10 additions & 5 deletions jwql/instrument_monitors/common_monitors/bias_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def extract_zeroth_group(self, filename):
The full path to the output file.
"""

output_filename = os.path.join(self.data_dir, os.path.basename(filename).replace('.fits', '_0thgroup.fits'))
output_filename = os.path.join(self.working_data_dir, os.path.basename(filename).replace('.fits', '_0thgroup.fits'))

# Write a new fits file containing the primary and science
# headers from the input file, as well as the 0th group
Expand Down Expand Up @@ -279,7 +279,7 @@ def image_to_png(self, image, outname):
The full path to the output png file.
"""

output_filename = os.path.join(self.data_dir, '{}.png'.format(outname))
output_filename = os.path.join(self.output_data_dir, '{}.png'.format(outname))

# Get image scale limits
z = ZScaleInterval()
Expand Down Expand Up @@ -455,6 +455,8 @@ def run(self):
# Get the output directory and setup a directory to store the data
self.output_dir = os.path.join(get_config()['outputs'], 'bias_monitor')
ensure_dir_exists(os.path.join(self.output_dir, 'data'))
self.working_dir = os.path.join(get_config()['working'], 'bias_monitor')
ensure_dir_exists(os.path.join(self.working_dir, 'data'))

# Use the current time as the end time for MAST query
self.query_end = Time.now().mjd
Expand Down Expand Up @@ -495,14 +497,17 @@ def run(self):
logging.info('\tAperture: {}, new entries: {}'.format(self.aperture, len(new_entries)))

# Set up a directory to store the data for this aperture
self.data_dir = os.path.join(self.output_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
self.working_data_dir = os.path.join(self.working_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
if len(new_entries) > 0:
ensure_dir_exists(self.data_dir)
ensure_dir_exists(self.working_data_dir)
self.output_data_dir = os.path.join(self.output_dir, 'data/{}_{}'.format(self.instrument.lower(), self.aperture.lower()))
if len(new_entries) > 0:
ensure_dir_exists(self.output_data_dir)

# Get any new files to process
new_files = []
for file_entry in new_entries:
output_filename = os.path.join(self.data_dir, file_entry['filename'])
output_filename = os.path.join(self.working_data_dir, file_entry['filename'])
output_filename = output_filename.replace('_uncal.fits', '_uncal_0thgroup.fits').replace('_dark.fits', '_uncal_0thgroup.fits')

# Dont process files that already exist in the bias stats database
Expand Down
22 changes: 11 additions & 11 deletions jwql/instrument_monitors/common_monitors/dark_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class Dark():
output_dir : str
Path into which outputs will be placed

data_dir : str
working_dir : str
Path into which new dark files will be copied to be worked on

query_start : float
Expand Down Expand Up @@ -515,7 +515,7 @@ def get_baseline_filename(self):
else:
filename = query.all()[0].baseline_file
# Specify the full path
filename = os.path.join(self.output_dir, 'mean_slope_images', filename)
filename = os.path.join(get_config()['outputs'], 'dark_monitor', 'mean_slope_images', filename)
logging.info('Baseline filename: {}'.format(filename))

session.close()
Expand Down Expand Up @@ -684,7 +684,7 @@ def process(self, file_list):

rate_file = filename.replace("dark", "rate")
rate_file_name = os.path.basename(rate_file)
local_rate_file = os.path.join(self.data_dir, rate_file_name)
local_rate_file = os.path.join(self.working_data_dir, rate_file_name)

if os.path.isfile(local_rate_file):
logging.info("\t\tFile {} exists, skipping pipeline".format(local_rate_file))
Expand Down Expand Up @@ -814,7 +814,7 @@ def process(self, file_list):
histogram, bins) = self.stats_by_amp(slope_image, amp_bounds)

# Remove the input files in order to save disk space
files_to_remove = glob(f'{self.data_dir}/*fits')
files_to_remove = glob(f'{self.working_data_dir}/*fits')
for filename in files_to_remove:
os.remove(filename)

Expand Down Expand Up @@ -888,8 +888,8 @@ def run(self):

apertures_to_skip = ['NRCALL_FULL', 'NRCAS_FULL', 'NRCBS_FULL']

# Get the output directory
self.output_dir = os.path.join(get_config()['outputs'], 'dark_monitor')
# Get the working directory
self.working_dir = os.path.join(get_config()['working'], 'dark_monitor')

# Read in config file that defines the thresholds for the number
# of dark files that must be present in order for the monitor to run
Expand Down Expand Up @@ -1004,11 +1004,11 @@ def run(self):

if monitor_run:
# Set up directories for the copied data
ensure_dir_exists(os.path.join(self.output_dir, 'data'))
self.data_dir = os.path.join(self.output_dir,
'data/{}_{}'.format(self.instrument.lower(),
self.aperture.lower()))
ensure_dir_exists(self.data_dir)
ensure_dir_exists(os.path.join(self.working_dir, 'data'))
self.working_data_dir = os.path.join(self.working_dir,
'data/{}_{}'.format(self.instrument.lower(),
self.aperture.lower()))
ensure_dir_exists(self.working_data_dir)

# Copy files from filesystem
dark_files, not_copied = copy_files(new_filenames, self.data_dir)
Expand Down
Loading