Skip to content

Commit

Permalink
250104 fix preproc or load group (#1086)
Browse files Browse the repository at this point in the history
* Wrap in preproc info on run.

* Updates to add copy and pcfg_check.

* address config and context usage

---------

Co-authored-by: Michael McCrackan <mmccrack@login11.chn.perlmutter.nersc.gov>
  • Loading branch information
msilvafe and Michael McCrackan authored Jan 8, 2025
1 parent 94882fb commit 642b7df
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 25 deletions.
7 changes: 4 additions & 3 deletions sotodlib/preprocess/pcore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Base Class and PIPELINE register for the preprocessing pipeline scripts."""
import os
import copy
import logging
import numpy as np
from .. import core
Expand Down Expand Up @@ -369,7 +370,7 @@ def __init__(self, modules, plot_dir='./', logger=None, wrap_valid=True):
self.logger = logger
self.plot_dir = plot_dir
self.wrap_valid = wrap_valid
super().__init__( [self._check_item(item) for item in modules])
super().__init__( [self._check_item(item) for item in copy.deepcopy(modules)])

def _check_item(self, item):
if isinstance(item, _Preprocess):
Expand Down Expand Up @@ -444,8 +445,8 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
"""
if proc_aman is None:
if 'preprocess' in aman:
proc_aman = aman.preprocess
full = aman.preprocess
proc_aman = aman.preprocess.copy()
full = aman.preprocess.copy()
else:
proc_aman = core.AxisManager(aman.dets, aman.samps)
full = core.AxisManager( aman.dets, aman.samps)
Expand Down
37 changes: 15 additions & 22 deletions sotodlib/preprocess/preprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None,


def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
context_init=None, context_proc=None,
dets=None, meta=None, no_signal=None,
logger=None):
"""Loads the saved information from the preprocessing pipeline from a
Expand All @@ -368,10 +367,6 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
configs_proc: string or dictionary
Second config file or loaded config dictionary to load
dependent databases generated using multilayer_preprocess_tod.py.
context_init: core.Context
Optional. The Context file to use for the initial db.
context_proc: core.Context
Optional. The Context file to use for the dependent db.
dets: dict
Dets to restrict on from info in det_info. See context.get_meta.
meta: AxisManager
Expand All @@ -388,10 +383,10 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
if logger is None:
logger = init_logger("preprocess")

configs_init, context_init = get_preprocess_context(configs_init, context_init)
configs_init, context_init = get_preprocess_context(configs_init)
meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta)

configs_proc, context_proc = get_preprocess_context(configs_proc, context_proc)
configs_proc, context_proc = get_preprocess_context(configs_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)

group_by_init, groups_init, error_init = get_groups(obs_id, configs_init, context_init)
Expand Down Expand Up @@ -640,8 +635,8 @@ def cleanup_obs(obs_id, policy_dir, errlog, configs, context=None,
f.close()


def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None,
context_init=None, context_proc=None, overwrite=False):
def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None,
logger=None, overwrite=False):
"""
This function is expected to receive a single obs_id, and dets dictionary.
The dets dictionary must match the grouping specified in the preprocess
Expand Down Expand Up @@ -670,10 +665,6 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
Filepath or dictionary containing a dependent preprocess configuration file.
logger: PythonLogger
Optional. Logger object or None will generate a new one.
context_init: core.Context
Optional. Context object used for data loading/querying.
context_proc: core.Context
Optional. Context object used for dependent data loading/querying.
overwrite: bool
Optional. Whether or not to overwrite existing entries in the preprocess manifest db.
Expand Down Expand Up @@ -703,14 +694,12 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if type(configs_init) == str:
configs_init = yaml.safe_load(open(configs_init, "r"))

if context_init is None:
context_init = core.Context(configs_init["context_file"])
context_init = core.Context(configs_init["context_file"])

if configs_proc is not None:
if type(configs_proc) == str:
configs_proc = yaml.safe_load(open(configs_proc, "r"))
if context_proc is None:
context_proc = core.Context(configs_proc["context_file"])
context_proc = core.Context(configs_proc["context_file"])

group_by, groups, error = get_groups(obs_id, configs_proc, context_proc)
else:
Expand Down Expand Up @@ -750,8 +739,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
try:
logger.info(f"both db and depdendent db exist for {obs_id} {dets} loading data and applying preprocessing.")
aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init,
configs_proc=configs_proc, context_init=context_init,
context_proc=context_proc, logger=logger)
configs_proc=configs_proc, logger=logger)
error = 'load_success'
return error, [obs_id, dets], [obs_id, dets], aman
except Exception as e:
Expand Down Expand Up @@ -782,6 +770,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
logger.info(f"Generating new dependent preproc db entry for {obs_id} {dets}")
# pipeline for init config
pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
# pipeline for processing config
pipe_proc = Pipeline(configs_proc["process_pipe"], plot_dir=configs_proc["plot_dir"], logger=logger)

Expand All @@ -799,7 +788,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if fld_init in proc_aman:
proc_aman.move(fld_init, None)

proc_aman.wrap('pcfg_ref', get_pcfg_check_aman(pipe_init))
proc_aman.wrap('pcfg_ref', aman_cfgs_ref)

except Exception as e:
error = f'Failed to run dependent processing pipeline: {obs_id} {dets}'
Expand All @@ -814,13 +803,15 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
logger.info(f"Saving data to {outputs_proc['temp_file']}:{outputs_proc['db_data']['dataset']}")
proc_aman.save(outputs_proc['temp_file'], outputs_proc['db_data']['dataset'], overwrite)

aman.preprocess.merge(proc_aman)

return error, [obs_id, dets], outputs_proc, aman
else:
# pipeline for init config
logger.info(f"Generating new preproc db entry for {obs_id} {dets}")
try:
pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger)
pck_aman = get_pcfg_check_aman(pipe_init)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
outputs_init = save_group(obs_id, configs_init, dets, context_init, subdir='temp')
aman = context_init.get_obs(obs_id, dets=dets)
tags = np.array(context_init.obsdb.get(aman.obs_info.obs_id, tags=True)['tags'])
Expand Down Expand Up @@ -863,7 +854,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if fld_init in proc_aman:
proc_aman.move(fld_init, None)

proc_aman.wrap('pcfg_ref', pck_aman)
proc_aman.wrap('pcfg_ref', aman_cfgs_ref)

except Exception as e:
error = f'Failed to run dependent processing pipeline: {obs_id} {dets}'
Expand All @@ -878,6 +869,8 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
logger.info(f"Saving data to {outputs_proc['temp_file']}:{outputs_proc['db_data']['dataset']}")
proc_aman.save(outputs_proc['temp_file'], outputs_proc['db_data']['dataset'], overwrite)

aman.preprocess.merge(proc_aman)

return error, outputs_init, outputs_proc, aman


Expand Down

0 comments on commit 642b7df

Please sign in to comment.