diff --git a/sotodlib/preprocess/pcore.py b/sotodlib/preprocess/pcore.py index 3be71629d..7f7840f2c 100644 --- a/sotodlib/preprocess/pcore.py +++ b/sotodlib/preprocess/pcore.py @@ -1,5 +1,6 @@ """Base Class and PIPELINE register for the preprocessing pipeline scripts.""" import os +import copy import logging import numpy as np from .. import core @@ -369,7 +370,7 @@ def __init__(self, modules, plot_dir='./', logger=None, wrap_valid=True): self.logger = logger self.plot_dir = plot_dir self.wrap_valid = wrap_valid - super().__init__( [self._check_item(item) for item in modules]) + super().__init__( [self._check_item(item) for item in copy.deepcopy(modules)]) def _check_item(self, item): if isinstance(item, _Preprocess): @@ -444,8 +445,8 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False): """ if proc_aman is None: if 'preprocess' in aman: - proc_aman = aman.preprocess - full = aman.preprocess + proc_aman = aman.preprocess.copy() + full = aman.preprocess.copy() else: proc_aman = core.AxisManager(aman.dets, aman.samps) full = core.AxisManager( aman.dets, aman.samps) diff --git a/sotodlib/preprocess/preprocess_util.py b/sotodlib/preprocess/preprocess_util.py index 7dfdec4d9..8071dd7c6 100644 --- a/sotodlib/preprocess/preprocess_util.py +++ b/sotodlib/preprocess/preprocess_util.py @@ -348,7 +348,6 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None, def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, - context_init=None, context_proc=None, dets=None, meta=None, no_signal=None, logger=None): """Loads the saved information from the preprocessing pipeline from a @@ -368,10 +367,6 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, configs_proc: string or dictionary Second config file or loaded config dictionary to load dependent databases generated using multilayer_preprocess_tod.py. - context_init: core.Context - Optional. The Context file to use for the initial db. - context_proc: core.Context - Optional. The Context file to use for the dependent db. dets: dict Dets to restrict on from info in det_info. See context.get_meta. meta: AxisManager @@ -388,10 +383,10 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, if logger is None: logger = init_logger("preprocess") - configs_init, context_init = get_preprocess_context(configs_init, context_init) + configs_init, context_init = get_preprocess_context(configs_init) meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta) - configs_proc, context_proc = get_preprocess_context(configs_proc, context_proc) + configs_proc, context_proc = get_preprocess_context(configs_proc) meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta) group_by_init, groups_init, error_init = get_groups(obs_id, configs_init, context_init) @@ -640,8 +635,8 @@ def cleanup_obs(obs_id, policy_dir, errlog, configs, context=None, f.close() -def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None, - context_init=None, context_proc=None, overwrite=False): +def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, + logger=None, overwrite=False): """ This function is expected to receive a single obs_id, and dets dictionary. The dets dictionary must match the grouping specified in the preprocess @@ -670,10 +665,6 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= Filepath or dictionary containing a dependent preprocess configuration file. logger: PythonLogger Optional. Logger object or None will generate a new one. - context_init: core.Context - Optional. Context object used for data loading/querying. - context_proc: core.Context - Optional. Context object used for dependent data loading/querying. overwrite: bool Optional. Whether or not to overwrite existing entries in the preprocess manifest db. @@ -703,14 +694,12 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= if type(configs_init) == str: configs_init = yaml.safe_load(open(configs_init, "r")) - if context_init is None: - context_init = core.Context(configs_init["context_file"]) + context_init = core.Context(configs_init["context_file"]) if configs_proc is not None: if type(configs_proc) == str: configs_proc = yaml.safe_load(open(configs_proc, "r")) - if context_proc is None: - context_proc = core.Context(configs_proc["context_file"]) + context_proc = core.Context(configs_proc["context_file"]) group_by, groups, error = get_groups(obs_id, configs_proc, context_proc) else: @@ -750,8 +739,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= try: logger.info(f"both db and depdendent db exist for {obs_id} {dets} loading data and applying preprocessing.") aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init, - configs_proc=configs_proc, context_init=context_init, - context_proc=context_proc, logger=logger) + configs_proc=configs_proc, logger=logger) error = 'load_success' return error, [obs_id, dets], [obs_id, dets], aman except Exception as e: @@ -782,6 +770,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= logger.info(f"Generating new dependent preproc db entry for {obs_id} {dets}") # pipeline for init config pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger) + aman_cfgs_ref = get_pcfg_check_aman(pipe_init) # pipeline for processing config pipe_proc = Pipeline(configs_proc["process_pipe"], plot_dir=configs_proc["plot_dir"], logger=logger) @@ -799,7 +788,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= if fld_init in proc_aman: proc_aman.move(fld_init, None) - proc_aman.wrap('pcfg_ref', get_pcfg_check_aman(pipe_init)) + proc_aman.wrap('pcfg_ref', aman_cfgs_ref) except Exception as e: error = f'Failed to run dependent processing pipeline: {obs_id} {dets}' @@ -814,13 +803,15 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= logger.info(f"Saving data to {outputs_proc['temp_file']}:{outputs_proc['db_data']['dataset']}") proc_aman.save(outputs_proc['temp_file'], outputs_proc['db_data']['dataset'], overwrite) + aman.preprocess.merge(proc_aman) + return error, [obs_id, dets], outputs_proc, aman else: # pipeline for init config logger.info(f"Generating new preproc db entry for {obs_id} {dets}") try: pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger) - pck_aman = get_pcfg_check_aman(pipe_init) + aman_cfgs_ref = get_pcfg_check_aman(pipe_init) outputs_init = save_group(obs_id, configs_init, dets, context_init, subdir='temp') aman = context_init.get_obs(obs_id, dets=dets) tags = np.array(context_init.obsdb.get(aman.obs_info.obs_id, tags=True)['tags']) @@ -863,7 +854,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= if fld_init in proc_aman: proc_aman.move(fld_init, None) - proc_aman.wrap('pcfg_ref', pck_aman) + proc_aman.wrap('pcfg_ref', aman_cfgs_ref) except Exception as e: error = f'Failed to run dependent processing pipeline: {obs_id} {dets}' @@ -878,6 +869,8 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger= logger.info(f"Saving data to {outputs_proc['temp_file']}:{outputs_proc['db_data']['dataset']}") proc_aman.save(outputs_proc['temp_file'], outputs_proc['db_data']['dataset'], overwrite) + aman.preprocess.merge(proc_aman) + return error, outputs_init, outputs_proc, aman