From 3c085e751b5887c036ff74bb436fc8a0a3f5061f Mon Sep 17 00:00:00 2001 From: tterasaki Date: Tue, 14 May 2024 15:07:48 +0000 Subject: [PATCH 1/8] initial commit for make_nearby_sources_db.py --- .../site_pipeline/make_neaby_sources_db.py | 231 ++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 sotodlib/site_pipeline/make_neaby_sources_db.py diff --git a/sotodlib/site_pipeline/make_neaby_sources_db.py b/sotodlib/site_pipeline/make_neaby_sources_db.py new file mode 100644 index 000000000..798d5383d --- /dev/null +++ b/sotodlib/site_pipeline/make_neaby_sources_db.py @@ -0,0 +1,231 @@ +import os +import yaml +import time +import argparse +import logging +from typing import Optional +from sotodlib import core +import sotodlib.coords.planets as planets + +from sotodlib.site_pipeline import util +logger = util.init_logger('make_cosamp_hk', 'make_cosamp_hk: ') + +def get_parser(parser=None): + if parser is None: + parser = argparse.ArgumentParser() + parser.add_argument( + 'config', + help="configuration yaml file.") + parser.add_argument( + '-o', '--output_dir', action='store', default=None, type=str, + help='output data directory, overwrite config output_dir') + parser.add_argument( + '--verbose', default=2, type=int, + help="increase output verbosity. \ + 0: Error, 1: Warning, 2: Info(default), 3: Debug") + parser.add_argument( + '--overwrite', + help="If true, overwrites existing entries in the database", + action='store_true', + ) + parser.add_argument( + '--query_text', type=str, + help="Query text", + ) + parser.add_argument( + '--query_tags', nargs="*", type=str, + help="Query tags", + ) + parser.add_argument( + '--min_ctime', type=int, + help="Minimum timestamp for the beginning of an observation list", + ) + parser.add_argument( + '--max_ctime', type=int, + help="Maximum timestamp for the beginning of an observation list", + ) + parser.add_argument( + '--obs_id', type=str, + help="obs_id of particular observation if we want to run on just one", + ) + parser.add_argument( + '--update-delay', + help="Number of days (unit is days) in the past to start observation list.", + type=int + ) + return parser + +def main(config: str, + output_dir: Optional[str] = None, + verbose: Optional[int] = 2, + overwrite: Optional[bool] = False, + query_text: Optional[str] = None, + query_tags: Optional[list] = None, + min_ctime: Optional[float] = None, + max_ctime: Optional[float] = None, + obs_id: Optional[str] = None, + update_delay: Optional[str] = None, + ): + """ + Args: + config: str + Path to the configuration file containing processing parameters. + output_dir: str or None + Directory where the processed data will be saved. \ + If not provided, it is extracted from the configuration file. + verbose: str + Verbosity level for logging. Default is 2 (INFO). + overwrite: bool + Whether to overwrite existing output files. Default is False. + query_text: str or None + Text-based query to filter observations. If not provided, it is extracted from the configuration file. + query_tags: list or None + Tags used for further filtering observations. If not provided, it is extracted from the configuration file. + min_ctime: float or None + Minimum timestamp of observations to be queried. If not provided, it is extracted from the configuration file. + max_ctime: float or None + Maximum timestamp of observations to be queried. If not provided, it is extracted from the configuration file. + obs_id: str or None + Specific observation obs_id to process. If provided, overrides other filtering parameters. + update_delay: str or None + Number of days (unit is days) in the past to start observation list. + """ + # Set verbose + if verbose == 0: + logger.setLevel(logging.ERROR) + elif verbose == 1: + logger.setLevel(logging.WARNING) + elif verbose == 2: + logger.setLevel(logging.INFO) + elif verbose == 3: + logger.setLevel(logging.DEBUG) + + # load configuration file + if type(config) == str: + with open(config) as f: + config = yaml.safe_load(f) + + # load context + context_file = config['context_file'] + ctx = core.Context(context_file) + + if output_dir is None: + if 'output_dir' in config.keys(): + output_dir = config['output_dir'] + else: + raise ValueError('output_dir is not specified.') + if query_text is None: + if 'query_text' in config.keys(): + query_text = config['query_text'] + if query_tags is None: + if 'query_tags' in config.keys(): + query_tags = config['query_tags'] + if min_ctime is None: + if 'min_ctime' in config.keys(): + min_ctime = config['min_ctime'] + if max_ctime is None: + if 'max_ctime' in config.keys(): + max_ctime = config['max_ctime'] + if update_delay is None: + if 'update_delay' in config.keys(): + update_delay = config['update_delay'] + + if (min_ctime is None) and (update_delay is not None): + min_ctime = int(time.time()) - update_delay*86400 + logger.info(f'min_ctime: {min_ctime}') + + source_list = configs.get('source_list', None) + source_names = [] + for _s in source_list: + if isinstance(_s, str): + source_names.append(_s) + elif len(_s) == 3: + source_names.append(_s[0]) + else: + raise ValueError('Invalid style of source') + distance = configs.get('distance', 1.) + + # Load metadata sqlite + if not os.path.exists(output_dir): + os.makedirs(output_dir) + man_db_filename = os.path.join(output_dir, f'nearby_sources.sqlite') + if os.path.exists(man_db_filename): + logger.info(f"Mapping {man_db_filename} for the \ + archive index.") + man_db = core.metadata.ManifestDb(man_db_filename) + else: + logger.info(f"Creating {man_db_filename} for the \ + archive index.") + scheme = core.metadata.ManifestScheme() + scheme.add_exact_match('obs:obs_id') + scheme.add_data_field('dataset') + for source_name in source_names: + scheme.add_data_field(source_name) + man_db = core.metadata.ManifestDb(man_db_filename, scheme=scheme) + + # query observations + if obs_id is not None: + tot_query = f"obs_id=='{obs_id}'" + else: + tot_query = "and " + if query_text is not None: + tot_query += f"{query_text} and " + if min_ctime is not None: + tot_query += f"timestamp>={min_ctime} and " + if max_ctime is not None: + tot_query += f"timestamp<={max_ctime} and " + tot_query = tot_query[4:-4] + if tot_query == "": + tot_query = "1" + + logger.info(f'tot_query: {tot_query}') + obs_list= ctx.obsdb.query(tot_query, query_tags) + + # output cosampled data for each obs_id + for obs in obs_list: + obs_id = obs['obs_id'] + try: + meta = ctx.get_meta(obs_id) + _aman = ctx.get_obs(obs_id, dets=[]) + aman = core.AxisManager(meta.dets, _aman.samps) + aman.wrap('timestamps', _aman.timestamps, [(0, 'samps')]) + aman.wrap('boresight', _aman.boresight) + if not 'focal_plane' in list(meta._fields.keys()): + focal_plane = core.AxisManager(meta.dets) + focal_plane.wrap('xi', np.zeros(meta.dets.count), [(0, 'dets')]) + focal_plane.wrap('eta', np.zeros(meta.dets.count), [(0, 'dets')]) + focal_plane.wrap('gamma', np.zeros(meta.dets.count), [(0, 'dets')]) + else: + aman.wrap('focal_plane', meta.focal_plane) + flag_fp_isnan = (np.isnan(aman.focal_plane.xi)) | \ + (np.isnan(aman.focal_plane.eta)) | \ + (np.isnan(aman.focal_plane.gamma)) + aman.restrict('dets', aman.dets.vals[~flag_fp_isnan]) + + nearby_sources = planets.get_nearby_sources(aman, source_list=source_list, distance=distance) + nearby_source_names = [] + for _source in nearby_sources: + nearby_source_names.append(_source[0]) + + + + h5_filename = f"nearby_sources_{obs_id.split('_')[1][:4]}.h5" + output_filename = os.path.join(output_dir, h5_filename) + + entry = {'obs:obs_id': obs_id, 'dataset': obs_id} + for source_name in source_names: + if source_name in nearby_source_names: + entry[source_name] = 1 + else: + entry[source_name] = 0 + man_db.add_entry(entry, + filename=output_filename, replace=overwrite) + logger.info(f"saved: {obs_id}") + + except Exception as e: + logger.error(f"Exception '{e}' thrown while processing {obs_id}") + continue + +if __name__ == '__main__': + util.main_launcher(main, get_parser) + \ No newline at end of file From f699563a5acfed863bd6529f05cc38b906c81733 Mon Sep 17 00:00:00 2001 From: tterasaki Date: Wed, 15 May 2024 02:23:24 +0000 Subject: [PATCH 2/8] checked it works --- sotodlib/site_pipeline/make_neaby_sources_db.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sotodlib/site_pipeline/make_neaby_sources_db.py b/sotodlib/site_pipeline/make_neaby_sources_db.py index 798d5383d..775f11307 100644 --- a/sotodlib/site_pipeline/make_neaby_sources_db.py +++ b/sotodlib/site_pipeline/make_neaby_sources_db.py @@ -4,6 +4,7 @@ import argparse import logging from typing import Optional +import numpy as np from sotodlib import core import sotodlib.coords.planets as planets @@ -134,7 +135,7 @@ def main(config: str, min_ctime = int(time.time()) - update_delay*86400 logger.info(f'min_ctime: {min_ctime}') - source_list = configs.get('source_list', None) + source_list = config.get('source_list', None) source_names = [] for _s in source_list: if isinstance(_s, str): @@ -143,7 +144,7 @@ def main(config: str, source_names.append(_s[0]) else: raise ValueError('Invalid style of source') - distance = configs.get('distance', 1.) + distance = config.get('distance', 1.) # Load metadata sqlite if not os.path.exists(output_dir): @@ -195,6 +196,7 @@ def main(config: str, focal_plane.wrap('xi', np.zeros(meta.dets.count), [(0, 'dets')]) focal_plane.wrap('eta', np.zeros(meta.dets.count), [(0, 'dets')]) focal_plane.wrap('gamma', np.zeros(meta.dets.count), [(0, 'dets')]) + aman.wrap('focal_plane', focal_plane) else: aman.wrap('focal_plane', meta.focal_plane) flag_fp_isnan = (np.isnan(aman.focal_plane.xi)) | \ From 061ee8f8a68d5d112a9ba627697c66ee82b25f43 Mon Sep 17 00:00:00 2001 From: tterasaki Date: Mon, 27 May 2024 02:10:53 +0000 Subject: [PATCH 3/8] update to include each wafer hit bools --- ...ources_db.py => make_nearby_sources_db.py} | 87 ++++++++++++------- 1 file changed, 54 insertions(+), 33 deletions(-) rename sotodlib/site_pipeline/{make_neaby_sources_db.py => make_nearby_sources_db.py} (71%) diff --git a/sotodlib/site_pipeline/make_neaby_sources_db.py b/sotodlib/site_pipeline/make_nearby_sources_db.py similarity index 71% rename from sotodlib/site_pipeline/make_neaby_sources_db.py rename to sotodlib/site_pipeline/make_nearby_sources_db.py index 775f11307..4be0619c6 100644 --- a/sotodlib/site_pipeline/make_neaby_sources_db.py +++ b/sotodlib/site_pipeline/make_nearby_sources_db.py @@ -6,6 +6,7 @@ from typing import Optional import numpy as np from sotodlib import core +from sotodlib.coords import optics import sotodlib.coords.planets as planets from sotodlib.site_pipeline import util @@ -146,6 +147,16 @@ def main(config: str, raise ValueError('Invalid style of source') distance = config.get('distance', 1.) + # Other configurations + telescope = config.get('telescope', 'SAT') + if telescope == 'SAT': + wafer_slots = [f'ws{i}' for i in range(7)] + elif telescope == 'LAT': + wafer_slots = [f'ws{i}' for i in range(3)] + else: + raise NameError('Only "SAT" or "LAT" is supported.') + optics_config = config.get('optics_config', None) + # Load metadata sqlite if not os.path.exists(output_dir): os.makedirs(output_dir) @@ -162,6 +173,8 @@ def main(config: str, scheme.add_data_field('dataset') for source_name in source_names: scheme.add_data_field(source_name) + for ws in wafer_slots: + scheme.add_data_field(f'{source_name}_{ws}') man_db = core.metadata.ManifestDb(man_db_filename, scheme=scheme) # query observations @@ -185,45 +198,53 @@ def main(config: str, # output cosampled data for each obs_id for obs in obs_list: obs_id = obs['obs_id'] + h5_filename = f"nearby_sources_{obs_id.split('_')[1][:4]}.h5" + output_filename = os.path.join(output_dir, h5_filename) + entry = {'obs:obs_id': obs_id, 'dataset': obs_id} + streamed_wafer_slots = ['ws{}'.format(index) for index, bit in enumerate(obs_id.split('_')[-1]) if bit == '1'] try: - meta = ctx.get_meta(obs_id) - _aman = ctx.get_obs(obs_id, dets=[]) - aman = core.AxisManager(meta.dets, _aman.samps) - aman.wrap('timestamps', _aman.timestamps, [(0, 'samps')]) - aman.wrap('boresight', _aman.boresight) - if not 'focal_plane' in list(meta._fields.keys()): - focal_plane = core.AxisManager(meta.dets) - focal_plane.wrap('xi', np.zeros(meta.dets.count), [(0, 'dets')]) - focal_plane.wrap('eta', np.zeros(meta.dets.count), [(0, 'dets')]) - focal_plane.wrap('gamma', np.zeros(meta.dets.count), [(0, 'dets')]) - aman.wrap('focal_plane', focal_plane) - else: - aman.wrap('focal_plane', meta.focal_plane) - flag_fp_isnan = (np.isnan(aman.focal_plane.xi)) | \ - (np.isnan(aman.focal_plane.eta)) | \ - (np.isnan(aman.focal_plane.gamma)) - aman.restrict('dets', aman.dets.vals[~flag_fp_isnan]) - - nearby_sources = planets.get_nearby_sources(aman, source_list=source_list, distance=distance) - nearby_source_names = [] - for _source in nearby_sources: - nearby_source_names.append(_source[0]) - + for ws in wafer_slots: + if ws not in streamed_wafer_slots: + for source_name in source_names: + entry[f'{source_name}_{ws}'] = 0 + else: + meta = ctx.get_meta(obs_id, dets={'wafer_slot': ws}) + _aman = ctx.get_obs(obs_id, dets=[]) + aman = core.AxisManager(meta.dets, _aman.samps) + aman.wrap('timestamps', _aman.timestamps, [(0, 'samps')]) + aman.wrap('boresight', _aman.boresight) + + if not 'focal_plane' in list(meta._fields.keys()): + optics.get_focal_plane(aman, + x = np.zeros(aman.dets.count), + y = np.zeros(aman.dets.count), + wafer_slot=ws, + config_path=optics_config) + else: + aman.wrap('focal_plane', meta.focal_plane) + flag_fp_isnan = (np.isnan(aman.focal_plane.xi)) | \ + (np.isnan(aman.focal_plane.eta)) | \ + (np.isnan(aman.focal_plane.gamma)) + aman.restrict('dets', aman.dets.vals[~flag_fp_isnan]) - - h5_filename = f"nearby_sources_{obs_id.split('_')[1][:4]}.h5" - output_filename = os.path.join(output_dir, h5_filename) + nearby_sources = planets.get_nearby_sources(aman, source_list=source_list, distance=distance) + nearby_source_names = [] + for _source in nearby_sources: + nearby_source_names.append(_source[0]) + for source_name in source_names: + entry[f'{source_name}_{ws}'] = 1 if source_name in nearby_source_names else 0 - entry = {'obs:obs_id': obs_id, 'dataset': obs_id} for source_name in source_names: - if source_name in nearby_source_names: - entry[source_name] = 1 - else: - entry[source_name] = 0 + hit_any = False + for ws in wafer_slots: + if entry[f'{source_name}_{ws}'] == 1: + hit_any = True + entry[f'{source_name}'] = 1 if hit_any else 0 + man_db.add_entry(entry, filename=output_filename, replace=overwrite) - logger.info(f"saved: {obs_id}") - + logger.info(f"saved: {obs_id}, {ws}") + except Exception as e: logger.error(f"Exception '{e}' thrown while processing {obs_id}") continue From 6441adede0cb633a2d03ab04002f397b19798a77 Mon Sep 17 00:00:00 2001 From: tterasaki Date: Wed, 29 May 2024 02:07:55 +0000 Subject: [PATCH 4/8] minor fix --- sotodlib/site_pipeline/make_nearby_sources_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sotodlib/site_pipeline/make_nearby_sources_db.py b/sotodlib/site_pipeline/make_nearby_sources_db.py index 4be0619c6..3c713fab9 100644 --- a/sotodlib/site_pipeline/make_nearby_sources_db.py +++ b/sotodlib/site_pipeline/make_nearby_sources_db.py @@ -243,7 +243,7 @@ def main(config: str, man_db.add_entry(entry, filename=output_filename, replace=overwrite) - logger.info(f"saved: {obs_id}, {ws}") + logger.info(f"saved: {obs_id}") except Exception as e: logger.error(f"Exception '{e}' thrown while processing {obs_id}") From 166741c8e068d7f13056b4579b4b6e07aafaa07c Mon Sep 17 00:00:00 2001 From: David Nguyen Date: Mon, 12 Aug 2024 23:19:34 +0000 Subject: [PATCH 5/8] add wafer source tags to preprocess_obs db and SSOFootprint --- sotodlib/preprocess/processes.py | 41 +++++++++++++++++++++ sotodlib/site_pipeline/preprocess_obs.py | 45 ++++++++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/sotodlib/preprocess/processes.py b/sotodlib/preprocess/processes.py index 186e2b758..5c18fe713 100644 --- a/sotodlib/preprocess/processes.py +++ b/sotodlib/preprocess/processes.py @@ -770,6 +770,26 @@ class SSOFootprint(_Preprocess): """Find nearby sources within a given distance and get SSO footprint and plot each source on the focal plane. + Example config block:: + + - name: "sso_footprint" + calc: + # source_list: ['jupiter'] # remove to find nearby sources + distance: 20 + nstep: 100 + telescope: 'SAT' + wafer_hit_threshold: 10 + save: True + plot: + wafer_offsets: {'ws0': [-2.5, -0.5], + 'ws1': [-2.5, -13], + 'ws2': [-13, -7], + 'ws3': [-13, 5], + 'ws4': [-2.5, 11.5], + 'ws5': [8.5, 5], + 'ws6': [8.5, -7]} + focal_plane: '/so/home/msilvafe/shared_files/sat_hw_positions.npz' + .. autofunction:: sotodlib.obs_ops.sources.get_sso """ name = 'sso_footprint' @@ -785,6 +805,14 @@ def calc_and_save(self, aman, proc_aman): sso_aman = core.AxisManager() nstep = self.calc_cfgs.get("nstep", 100) onsamp = (aman.samps.count+nstep-1)//nstep + telescope = self.calc_cfgs.get("telescope", 'SAT') + if telescope == 'SAT': + wafer_slots = [f'ws{i}' for i in range(7)] + elif telescope == 'LAT': + wafer_slots = [f'ws{i}' for i in range(3)] + else: + raise NameError('Only "SAT" or "LAT" is supported.') + streamed_wafer_slots = ['ws{}'.format(index) for index, bit in enumerate(aman.obs_info.obs_id.split('_')[-1]) if bit == '1'] for sso in ssos: planet = sso xi_p, eta_p = obs_ops.sources.get_sso(aman, planet, nstep=nstep) @@ -794,6 +822,19 @@ def calc_and_save(self, aman, proc_aman): # planet_aman = core.AxisManager(core.OffsetAxis("samps", onsamp)) planet_aman.wrap("xi_p", xi_p, [(0, "ds_samps")]) planet_aman.wrap("eta_p", eta_p, [(0, "ds_samps")]) + for ws in wafer_slots: + if ws in streamed_wafer_slots: + ws_dets = np.where(aman.det_info.wafer_slot == ws)[0] + ws_xi = aman.focal_plane.xi[ws_dets] + ws_eta = aman.focal_plane.eta[ws_dets] + wafer_hit = np.sum([np.any(np.sqrt((xi_p-xi)**2 + (eta_p-eta)**2) < np.deg2rad(0.5)) for xi, eta in zip(ws_xi, ws_eta)]) > self.calc_cfgs.get("wafer_hit_threshold", 10) + if wafer_hit: + planet_aman.wrap(ws, True) + else: + planet_aman.wrap(ws, False) + else: + planet_aman.wrap(ws, False) + sso_aman.wrap(planet, planet_aman) self.save(proc_aman, sso_aman) diff --git a/sotodlib/site_pipeline/preprocess_obs.py b/sotodlib/site_pipeline/preprocess_obs.py index 4b113024b..c21597b96 100644 --- a/sotodlib/site_pipeline/preprocess_obs.py +++ b/sotodlib/site_pipeline/preprocess_obs.py @@ -70,6 +70,25 @@ def preprocess_obs( configs = yaml.safe_load(open(configs, "r")) context = core.Context(configs["context_file"]) + + source_list = configs.get('source_list', None) + source_names = [] + for _s in source_list: + if isinstance(_s, str): + source_names.append(_s) + elif len(_s) == 3: + source_names.append(_s[0]) + else: + raise ValueError('Invalid style of source') + + # Other configurations + telescope = configs.get('telescope', 'SAT') + if telescope == 'SAT': + wafer_slots = [f'ws{i}' for i in range(7)] + elif telescope == 'LAT': + wafer_slots = [f'ws{i}' for i in range(3)] + else: + raise NameError('Only "SAT" or "LAT" is supported.') if os.path.exists(configs['archive']['index']): logger.info(f"Mapping {configs['archive']['index']} for the " @@ -81,6 +100,10 @@ def preprocess_obs( scheme = core.metadata.ManifestScheme() scheme.add_exact_match('obs:obs_id') scheme.add_data_field('dataset') + for source_name in source_names: + scheme.add_data_field(source_name) + for ws in wafer_slots: + scheme.add_data_field(f'{source_name}_{ws}') db = core.metadata.ManifestDb( configs['archive']['index'], scheme=scheme @@ -104,6 +127,28 @@ def preprocess_obs( db_data = {'obs:obs_id': obs_id, 'dataset': dest_dataset} + sso_footprint_process = False + for process in configs['process_pipe']: + if 'name' in process and 'sso_footprint' == process['name']: + sso_footprint_process = True + if sso_footprint_process: + nearby_source_names = [] + for _source in proc_aman.sso_footprint._assignments.keys(): + nearby_source_names.append(_source) + for ws in wafer_slots: + for source_name in source_names: + if source_name in nearby_source_names: + db_data[f'{source_name}_{ws}'] = int(proc_aman.sso_footprint[source_name][ws]) + else: + db_data[f'{source_name}_{ws}'] = 0 + for source_name in source_names: + hit_any = False + for ws in wafer_slots: + if db_data[f'{source_name}_{ws}'] == 1: + hit_any = True + db_data[f'{source_name}'] = 1 if hit_any else 0 + + logger.info(f"Saving to database under {db_data}") if len(db.inspect(db_data)) == 0: h5_path = os.path.relpath(dest_file, From a52df69f82c5494985f3acf5a1394153288d7775 Mon Sep 17 00:00:00 2001 From: David Nguyen Date: Wed, 11 Sep 2024 16:46:35 +0000 Subject: [PATCH 6/8] reformat db --- sotodlib/site_pipeline/preprocess_obs.py | 61 ++++++++++++++++-------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/sotodlib/site_pipeline/preprocess_obs.py b/sotodlib/site_pipeline/preprocess_obs.py index c21597b96..fdb200e0f 100644 --- a/sotodlib/site_pipeline/preprocess_obs.py +++ b/sotodlib/site_pipeline/preprocess_obs.py @@ -100,10 +100,12 @@ def preprocess_obs( scheme = core.metadata.ManifestScheme() scheme.add_exact_match('obs:obs_id') scheme.add_data_field('dataset') - for source_name in source_names: - scheme.add_data_field(source_name) - for ws in wafer_slots: - scheme.add_data_field(f'{source_name}_{ws}') + scheme.add_data_field('source') + scheme.add_data_field('coverage') + # for source_name in source_names: + # scheme.add_data_field(source_name) + # for ws in wafer_slots: + # scheme.add_data_field(f'{source_name}_{ws}') db = core.metadata.ManifestDb( configs['archive']['index'], scheme=scheme @@ -131,23 +133,6 @@ def preprocess_obs( for process in configs['process_pipe']: if 'name' in process and 'sso_footprint' == process['name']: sso_footprint_process = True - if sso_footprint_process: - nearby_source_names = [] - for _source in proc_aman.sso_footprint._assignments.keys(): - nearby_source_names.append(_source) - for ws in wafer_slots: - for source_name in source_names: - if source_name in nearby_source_names: - db_data[f'{source_name}_{ws}'] = int(proc_aman.sso_footprint[source_name][ws]) - else: - db_data[f'{source_name}_{ws}'] = 0 - for source_name in source_names: - hit_any = False - for ws in wafer_slots: - if db_data[f'{source_name}_{ws}'] == 1: - hit_any = True - db_data[f'{source_name}'] = 1 if hit_any else 0 - logger.info(f"Saving to database under {db_data}") if len(db.inspect(db_data)) == 0: @@ -155,6 +140,40 @@ def preprocess_obs( start=os.path.dirname(configs['archive']['index'])) db.add_entry(db_data, h5_path) + if sso_footprint_process: + logger.info(f"Saving per source to database {db_data}") + nearby_source_names = [] + for _source in proc_aman.sso_footprint._assignments.keys(): + nearby_source_names.append(_source) + # for ws in wafer_slots: + # for source_name in source_names: + # if source_name in nearby_source_names: + # db_data[f'{source_name}_{ws}'] = int(proc_aman.sso_footprint[source_name][ws]) + # else: + # db_data[f'{source_name}_{ws}'] = 0 + # for source_name in source_names: + # hit_any = False + # for ws in wafer_slots: + # if db_data[f'{source_name}_{ws}'] == 1: + # hit_any = True + # db_data[f'{source_name}'] = 1 if hit_any else + for source_name in source_names: + coverage = [] + if source_name in nearby_source_names: + db_data = {'obs:obs_id': obs_id, + 'dataset': dest_dataset + f'_{source_name}'} + + for ws in wafer_slots: + if proc_aman.sso_footprint[source_name][ws]: + coverage.append(ws) + db_data['source'] = source_name + db_data['coverage'] = coverage + + if len(db.inspect(db_data)) == 0: + h5_path = os.path.relpath(dest_file, + start=os.path.dirname(configs['archive']['index'])) + db.add_entry(db_data, h5_path) + def load_preprocess_obs(obs_id, configs="preprocess_obs_configs.yaml", context=None ): """ Loads the saved information from the preprocessing pipeline and runs the processing section of the pipeline. From 5e9249d41112345afee4506ad0ff251bf5344bed Mon Sep 17 00:00:00 2001 From: David Nguyen Date: Wed, 11 Sep 2024 20:46:11 +0000 Subject: [PATCH 7/8] update format --- sotodlib/site_pipeline/preprocess_obs.py | 45 ++++++------------------ 1 file changed, 11 insertions(+), 34 deletions(-) diff --git a/sotodlib/site_pipeline/preprocess_obs.py b/sotodlib/site_pipeline/preprocess_obs.py index fdb200e0f..e050a8205 100644 --- a/sotodlib/site_pipeline/preprocess_obs.py +++ b/sotodlib/site_pipeline/preprocess_obs.py @@ -100,12 +100,7 @@ def preprocess_obs( scheme = core.metadata.ManifestScheme() scheme.add_exact_match('obs:obs_id') scheme.add_data_field('dataset') - scheme.add_data_field('source') scheme.add_data_field('coverage') - # for source_name in source_names: - # scheme.add_data_field(source_name) - # for ws in wafer_slots: - # scheme.add_data_field(f'{source_name}_{ws}') db = core.metadata.ManifestDb( configs['archive']['index'], scheme=scheme @@ -133,46 +128,28 @@ def preprocess_obs( for process in configs['process_pipe']: if 'name' in process and 'sso_footprint' == process['name']: sso_footprint_process = True - - logger.info(f"Saving to database under {db_data}") - if len(db.inspect(db_data)) == 0: - h5_path = os.path.relpath(dest_file, - start=os.path.dirname(configs['archive']['index'])) - db.add_entry(db_data, h5_path) if sso_footprint_process: logger.info(f"Saving per source to database {db_data}") nearby_source_names = [] for _source in proc_aman.sso_footprint._assignments.keys(): nearby_source_names.append(_source) - # for ws in wafer_slots: - # for source_name in source_names: - # if source_name in nearby_source_names: - # db_data[f'{source_name}_{ws}'] = int(proc_aman.sso_footprint[source_name][ws]) - # else: - # db_data[f'{source_name}_{ws}'] = 0 - # for source_name in source_names: - # hit_any = False - # for ws in wafer_slots: - # if db_data[f'{source_name}_{ws}'] == 1: - # hit_any = True - # db_data[f'{source_name}'] = 1 if hit_any else + coverage = [] for source_name in source_names: - coverage = [] if source_name in nearby_source_names: - db_data = {'obs:obs_id': obs_id, - 'dataset': dest_dataset + f'_{source_name}'} - for ws in wafer_slots: if proc_aman.sso_footprint[source_name][ws]: - coverage.append(ws) - db_data['source'] = source_name - db_data['coverage'] = coverage + coverage.append(f"{source_name}:{ws}") + db_data['coverage'] = ','.join(coverage) + else: + db_data['coverage'] = None + + logger.info(f"Saving to database under {db_data}") + if len(db.inspect(db_data)) == 0: + h5_path = os.path.relpath(dest_file, + start=os.path.dirname(configs['archive']['index'])) + db.add_entry(db_data, h5_path) - if len(db.inspect(db_data)) == 0: - h5_path = os.path.relpath(dest_file, - start=os.path.dirname(configs['archive']['index'])) - db.add_entry(db_data, h5_path) def load_preprocess_obs(obs_id, configs="preprocess_obs_configs.yaml", context=None ): """ Loads the saved information from the preprocessing pipeline and runs the From cf131a96af67d35353ae65149c7565b999adc7be Mon Sep 17 00:00:00 2001 From: David Nguyen Date: Tue, 10 Dec 2024 21:39:29 +0000 Subject: [PATCH 8/8] add source distances --- sotodlib/preprocess/processes.py | 2 ++ sotodlib/site_pipeline/preprocess_obs.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/sotodlib/preprocess/processes.py b/sotodlib/preprocess/processes.py index 5c18fe713..e220862f7 100644 --- a/sotodlib/preprocess/processes.py +++ b/sotodlib/preprocess/processes.py @@ -835,6 +835,8 @@ def calc_and_save(self, aman, proc_aman): else: planet_aman.wrap(ws, False) + planet_aman.wrap('distance', np.round(np.mean(np.rad2deg(np.sqrt(xi_p**2 + eta_p**2))), 1)) + sso_aman.wrap(planet, planet_aman) self.save(proc_aman, sso_aman) diff --git a/sotodlib/site_pipeline/preprocess_obs.py b/sotodlib/site_pipeline/preprocess_obs.py index e050a8205..082ef70a4 100644 --- a/sotodlib/site_pipeline/preprocess_obs.py +++ b/sotodlib/site_pipeline/preprocess_obs.py @@ -101,6 +101,7 @@ def preprocess_obs( scheme.add_exact_match('obs:obs_id') scheme.add_data_field('dataset') scheme.add_data_field('coverage') + scheme.add_data_field('source_distance') db = core.metadata.ManifestDb( configs['archive']['index'], scheme=scheme @@ -135,14 +136,19 @@ def preprocess_obs( for _source in proc_aman.sso_footprint._assignments.keys(): nearby_source_names.append(_source) coverage = [] + distances = [] for source_name in source_names: if source_name in nearby_source_names: for ws in wafer_slots: if proc_aman.sso_footprint[source_name][ws]: coverage.append(f"{source_name}:{ws}") + + distances.append(f"{source_name}:{proc_aman.sso_footprint[source_name]['distance']}") db_data['coverage'] = ','.join(coverage) + db_data['source_distance'] = ','.join(distances) else: db_data['coverage'] = None + db_data['source_distance'] = None logger.info(f"Saving to database under {db_data}") if len(db.inspect(db_data)) == 0: