Skip to content

Commit

Permalink
address config and context usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael McCrackan committed Jan 7, 2025
1 parent f43ac70 commit df9521e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 25 deletions.
3 changes: 2 additions & 1 deletion sotodlib/preprocess/pcore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Base Class and PIPELINE register for the preprocessing pipeline scripts."""
import os
import copy
import logging
import numpy as np
from .. import core
Expand Down Expand Up @@ -369,7 +370,7 @@ def __init__(self, modules, plot_dir='./', logger=None, wrap_valid=True):
self.logger = logger
self.plot_dir = plot_dir
self.wrap_valid = wrap_valid
super().__init__( [self._check_item(item) for item in modules])
super().__init__( [self._check_item(item) for item in copy.deepcopy(modules)])

def _check_item(self, item):
if isinstance(item, _Preprocess):
Expand Down
37 changes: 13 additions & 24 deletions sotodlib/preprocess/preprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None,


def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
context_init=None, context_proc=None,
dets=None, meta=None, no_signal=None,
logger=None):
"""Loads the saved information from the preprocessing pipeline from a
Expand All @@ -368,10 +367,6 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
configs_proc: string or dictionary
Second config file or loaded config dictionary to load
dependent databases generated using multilayer_preprocess_tod.py.
context_init: core.Context
Optional. The Context file to use for the initial db.
context_proc: core.Context
Optional. The Context file to use for the dependent db.
dets: dict
Dets to restrict on from info in det_info. See context.get_meta.
meta: AxisManager
Expand All @@ -388,10 +383,10 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
if logger is None:
logger = init_logger("preprocess")

configs_init, context_init = get_preprocess_context(configs_init, context_init)
configs_init, context_init = get_preprocess_context(configs_init)
meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta)

configs_proc, context_proc = get_preprocess_context(configs_proc, context_proc)
configs_proc, context_proc = get_preprocess_context(configs_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)

group_by_init, groups_init, error_init = get_groups(obs_id, configs_init, context_init)
Expand Down Expand Up @@ -640,8 +635,8 @@ def cleanup_obs(obs_id, policy_dir, errlog, configs, context=None,
f.close()


def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None,
context_init=None, context_proc=None, overwrite=False):
def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None,
logger=None, overwrite=False):
"""
This function is expected to receive a single obs_id, and dets dictionary.
The dets dictionary must match the grouping specified in the preprocess
Expand Down Expand Up @@ -670,10 +665,6 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
Filepath or dictionary containing a dependent preprocess configuration file.
logger: PythonLogger
Optional. Logger object or None will generate a new one.
context_init: core.Context
Optional. Context object used for data loading/querying.
context_proc: core.Context
Optional. Context object used for dependent data loading/querying.
overwrite: bool
Optional. Whether or not to overwrite existing entries in the preprocess manifest db.
Expand Down Expand Up @@ -703,14 +694,12 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if type(configs_init) == str:
configs_init = yaml.safe_load(open(configs_init, "r"))

if context_init is None:
context_init = core.Context(configs_init["context_file"])
context_init = core.Context(configs_init["context_file"])

if configs_proc is not None:
if type(configs_proc) == str:
configs_proc = yaml.safe_load(open(configs_proc, "r"))
if context_proc is None:
context_proc = core.Context(configs_proc["context_file"])
context_proc = core.Context(configs_proc["context_file"])

group_by, groups, error = get_groups(obs_id, configs_proc, context_proc)
else:
Expand Down Expand Up @@ -750,8 +739,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
try:
logger.info(f"both db and depdendent db exist for {obs_id} {dets} loading data and applying preprocessing.")
aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init,
configs_proc=configs_proc, context_init=context_init,
context_proc=context_proc, logger=logger)
configs_proc=configs_proc, logger=logger)
error = 'load_success'
return error, [obs_id, dets], [obs_id, dets], aman
except Exception as e:
Expand Down Expand Up @@ -793,15 +781,15 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
aman.move("tags", None)
aman.wrap('tags', tags_proc)

aman.preprocess.wrap('pcfg_ref', aman_cfgs_ref)

proc_aman, success = pipe_proc.run(aman)

# remove fields found in aman.preprocess from proc_aman
for fld_init in init_fields:
if fld_init in proc_aman:
proc_aman.move(fld_init, None)

proc_aman.wrap('pcfg_ref', aman_cfgs_ref)

except Exception as e:
error = f'Failed to run dependent processing pipeline: {obs_id} {dets}'
errmsg = f'Dependent pipeline failed with {type(e)}: {e}'
Expand All @@ -823,7 +811,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
logger.info(f"Generating new preproc db entry for {obs_id} {dets}")
try:
pipe_init = Pipeline(configs_init["process_pipe"], plot_dir=configs_init["plot_dir"], logger=logger)
pck_aman = get_pcfg_check_aman(pipe_init)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
outputs_init = save_group(obs_id, configs_init, dets, context_init, subdir='temp')
aman = context_init.get_obs(obs_id, dets=dets)
tags = np.array(context_init.obsdb.get(aman.obs_info.obs_id, tags=True)['tags'])
Expand Down Expand Up @@ -866,8 +854,7 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if fld_init in proc_aman:
proc_aman.move(fld_init, None)

aman.preprocess.merge(proc_aman)
proc_aman.wrap('pcfg_ref', pck_aman)
proc_aman.wrap('pcfg_ref', aman_cfgs_ref)

except Exception as e:
error = f'Failed to run dependent processing pipeline: {obs_id} {dets}'
Expand All @@ -882,6 +869,8 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
logger.info(f"Saving data to {outputs_proc['temp_file']}:{outputs_proc['db_data']['dataset']}")
proc_aman.save(outputs_proc['temp_file'], outputs_proc['db_data']['dataset'], overwrite)

aman.preprocess.merge(proc_aman)

return error, outputs_init, outputs_proc, aman


Expand Down

0 comments on commit df9521e

Please sign in to comment.