diff --git a/sotodlib/mapmaking/demod_mapmaker.py b/sotodlib/mapmaking/demod_mapmaker.py index 8b13d9152..ef5b53d12 100644 --- a/sotodlib/mapmaking/demod_mapmaker.py +++ b/sotodlib/mapmaking/demod_mapmaker.py @@ -9,11 +9,13 @@ import numpy as np, os from pixell import enmap, utils as putils, tilemap, bunch, mpi import so3g.proj +from sqlalchemy import create_engine, exc +from sqlalchemy.orm import sessionmaker from .. import core from .. import coords from .utilities import recentering_to_quat_lonlat, evaluate_recentering, MultiZipper, unarr, safe_invert_div -from .utilities import import_optional +from .utilities import import_optional, Base from .noise_model import NmatWhite hp = import_optional('healpy') @@ -146,7 +148,7 @@ def write (self, prefix, tag, x): pass class DemodSignalMap(DemodSignal): def __init__(self, shape=None, wcs=None, comm=None, comps="TQU", name="sky", ofmt="{name}", output=True, - ext="fits", dtype=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False, + ext="fits", dtype_map=np.float64, dtype_tod=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False, nside=None, nside_tile=None): """ Parent constructor; use for_rectpix or for_healpix instead. Args described there. @@ -156,40 +158,40 @@ def __init__(self, shape=None, wcs=None, comm=None, comps="TQU", name="sky", ofm self.comps = comps self.sys = sys self.recenter = recenter - self.dtype = dtype + self.dtype_map = dtype_map + self.dtype_tod = dtype_tod + self.tile_shape = tile_shape self.tiled = tiled self.data = {} self.Nsplits = Nsplits self.singlestream = singlestream self.wrapper = lambda x : x + self.wcs = wcs ncomp = len(comps) - self.pix_scheme = "rectpix" if (shape is not None) else "healpix" + self.pix_scheme = "rectpix" if (wcs is not None) else "healpix" if self.pix_scheme == "healpix": self.tiled = (nside_tile is not None) self.hp_geom = coords.healpix_utils.get_geometry(nside, nside_tile, ordering='NEST') npix = 12 * nside**2 - self.rhs = np.zeros((Nsplits, ncomp, npix), dtype=dtype) - self.div = np.zeros((Nsplits, ncomp, ncomp, npix), dtype=dtype) - self.hits = np.zeros((Nsplits, npix), dtype=dtype) + self.rhs = np.zeros((Nsplits, ncomp, npix), dtype=dtype_map) + self.div = np.zeros((Nsplits, ncomp, ncomp, npix), dtype=dtype_map) + self.hits = np.zeros((Nsplits, npix), dtype=dtype_map) if self.tiled: self.wrapper = coords.healpix_utils.tiled_to_full else: - shape = tuple(shape[-2:]) - if tiled: - geo = tilemap.geometry(shape, wcs, tile_shape=tile_shape) - self.rhs = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,)), dtype=dtype) - self.div = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,ncomp)), dtype=dtype) - self.hits= tilemap.zeros(geo.copy(pre=(Nsplits,)), dtype=dtype) + if shape is None: + # We will set shape, wcs from wcs_kernel on loading the first obs + self.rhs = None + self.div = None + self.hits = None else: - self.rhs = enmap.zeros((Nsplits, ncomp) +shape, wcs, dtype=dtype) - self.div = enmap.zeros((Nsplits,ncomp,ncomp)+shape, wcs, dtype=dtype) - self.hits= enmap.zeros((Nsplits,)+shape, wcs, dtype=dtype) + self.init_maps_rectpix(shape, wcs) @classmethod def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", output=True, - ext="fits", dtype=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False): + ext="fits", dtype_map=np.float64, dtype_tod=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False): """ Signal describing a non-distributed sky map. Signal describing a sky map in the coordinate system given by "sys", which defaults to equatorial coordinates. If tiled==True, then this @@ -198,9 +200,9 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o Arguments --------- shape : numpy.ndarray - Shape of the output map geometry + Shape of the output map geometry. If None, computed from coords.get_footprint on first add_obs. wcs : wcs - WCS of the output map geometry + WCS of the output map geometry (or wcs kernel). comm : MPI.comm MPI communicator comps : str, optional @@ -234,12 +236,12 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o """ return cls(shape=shape, wcs=wcs, comm=comm, comps=comps, name=name, ofmt=ofmt, output=output, - ext=ext, dtype=dtype, sys=sys, recenter=recenter, tile_shape=tile_shape, tiled=tiled, + ext=ext, dtype_map=dtype_map, dtype_tod=dtype_tod, sys=sys, recenter=recenter, tile_shape=tile_shape, tiled=tiled, Nsplits=Nsplits, singlestream=singlestream, nside=None, nside_tile=None) @classmethod def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{name}", output=True, - ext="fits.gz", dtype=np.float32, Nsplits=1, singlestream=False): + ext="fits.gz", dtype_map=np.float64, dtype_tod=np.float32, Nsplits=1, singlestream=False): """ Signal describing a sky map in healpix pixelization, NEST ordering. @@ -262,7 +264,7 @@ def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{nam The extension used for the files. May be 'fits', 'fits.gz', 'h5', 'h5py', or 'npy'. dtype : numpy.dtype - The data type to use for the time-ordered data. + The data type to use for the maps. Nsplits : int, optional Number of splits that you will map simultaneously. By default is 1 when no splits are requested. @@ -272,7 +274,7 @@ def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{nam obs.demodQ, obs.demodU """ return cls(shape=None, wcs=None, comm=None, comps=comps, name=name, ofmt=ofmt, output=output, - ext=ext, dtype=dtype, sys=None, recenter=None, tile_shape=None, tiled=False, + ext=ext, dtype_map=dtype_map, dtype_tod=dtype_tod, sys=None, recenter=None, tile_shape=None, tiled=False, Nsplits=Nsplits, singlestream=singlestream, nside=nside, nside_tile=nside_tile) def add_obs(self, id, obs, nmat, Nd, pmap=None, split_labels=None): @@ -297,16 +299,27 @@ def add_obs(self, id, obs, nmat, Nd, pmap=None, split_labels=None): cuts = obs.flags.glitch_flags + ~obs.preprocess.split_flags.cuts[split_labels[n_split]] if self.pix_scheme == "rectpix": threads='domdir' - geom = self.rhs.geometry + if self.rhs is None: # Still need to initialize the geometry + geom = None + wcs_kernel = self.wcs + else: + geom = self.rhs.geometry + wcs_kernel = None else: threads = ["tiles", "simple"][self.hp_geom.nside_tile is None] geom = self.hp_geom - pmap_local = coords.pmat.P.for_tod(obs, comps=self.comps, geom=geom, rot=rot, threads=threads, weather=unarr(obs.weather), site=unarr(obs.site), cuts=cuts, hwp=True) + wcs_kernel = None + pmap_local = coords.pmat.P.for_tod(obs, comps=self.comps, geom=geom, rot=rot, wcs_kernel=wcs_kernel, threads=threads, weather=unarr(obs.weather), site=unarr(obs.site), cuts=cuts, hwp=True) else: pmap_local = pmap + if self.rhs is None: # Set the geometry now from the pmat + shape, wcs = pmap_local.geom + self.init_maps_rectpix(shape, wcs) + self.wcs = wcs + if not(self.singlestream): - obs_rhs, obs_div, obs_hits = project_all_demod(pmap=pmap_local, signalT=obs.dsT, signalQ=obs.demodQ, signalU=obs.demodU, + obs_rhs, obs_div, obs_hits = project_all_demod(pmap=pmap_local, signalT=obs.dsT.astype(self.dtype_tod), signalQ=obs.demodQ.astype(self.dtype_tod), signalU=obs.demodU.astype(self.dtype_tod), det_weightsT=2*nmat.ivar, det_weightsQU=nmat.ivar, ncomp=self.ncomp, wrapper=self.wrapper) else: obs_rhs, obs_div, obs_hits = project_all_single(pmap=pmap_local, Nd=Nd, det_weights=nmat.ivar, comps='TQU', wrapper=self.wrapper) @@ -363,14 +376,14 @@ def write(self, prefix, tag, m): raise ImportError("Cannot save healpix map as fits; healpy could not be imported. Install healpy or save as npy or h5") if m.ndim > 2: m = np.reshape(m, (np.prod(m.shape[:-1]), m.shape[-1]), order='C') # Flatten wrapping axes; healpy.write_map can't handle >2d array - hp.write_map(oname, m.view(self.dtype), nest=(self.hp_geom.ordering=='NEST'), overwrite=True) + hp.write_map(oname, m.view(self.dtype_map), nest=(self.hp_geom.ordering=='NEST'), overwrite=True) elif self.ext == "npy": - np.save(oname, {'nside': self.hp_geom.nside, 'ordering':self.hp_geom.ordering, 'data':m.view(self.dtype)}, allow_pickle=True) + np.save(oname, {'nside': self.hp_geom.nside, 'ordering':self.hp_geom.ordering, 'data':m.view(self.dtype_map)}, allow_pickle=True) elif self.ext in ['h5', 'hdf5']: if h5py is None: raise ValueError("Cannot save healpix map as hdf5; h5py could not be imported. Install h5py or save as npy or fits") with h5py.File(oname, 'w') as f: - dset = f.create_dataset("data", m.shape, dtype=self.dtype, data=m) + dset = f.create_dataset("data", m.shape, dtype=self.dtype_map, data=m) dset.attrs['ordering'] = self.hp_geom.ordering dset.attrs['nside'] = self.hp_geom.nside else: @@ -378,25 +391,45 @@ def write(self, prefix, tag, m): return oname + def init_maps_rectpix(self, shape, wcs): + """ Initialize tilemaps or enmaps rhs, div, hits for given shape and wcs""" + shape = tuple(shape[-2:]) + Nsplits, ncomp, dtype = self.Nsplits, self.ncomp, self.dtype_map + + if self.tiled: + geo = tilemap.geometry(shape, wcs, tile_shape=self.tile_shape) + rhs = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,)), dtype=self.dtype_map) + div = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,ncomp)), dtype=self.dtype_map) + hits= tilemap.zeros(geo.copy(pre=(Nsplits,)), dtype=self.dtype_map) + else: + rhs = enmap.zeros((Nsplits, ncomp) +shape, wcs, dtype=self.dtype_map) + div = enmap.zeros((Nsplits,ncomp,ncomp)+shape, wcs, dtype=self.dtype_map) + hits= enmap.zeros((Nsplits,)+shape, wcs, dtype=self.dtype_map) + self.rhs = rhs + self.div = div + self.hits = hits + return rhs, div, hits + + def setup_demod_map(noise_model, shape=None, wcs=None, nside=None, comm=mpi.COMM_WORLD, comps='TQU', split_labels=None, singlestream=False, dtype_tod=np.float32, - dtype_map=np.float32, recenter=None, verbose=0): + dtype_map=np.float64, recenter=None, verbose=0): """ Setup the classes for demod mapmaking and return a DemodMapmmaker object """ - if shape is not None and wcs is not None: + if wcs is not None: Nsplits = len(split_labels) signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps, - dtype=dtype_map, tiled=False, + dtype_map=dtype_map, dtype_tod=dtype_tod, tiled=False, ofmt="", Nsplits=Nsplits, singlestream=singlestream, recenter=recenter) elif nside is not None: Nsplits = len(split_labels) signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto', - comps=comps, dtype=dtype_map, + comps=comps, dtype_map=dtype_map, dtype_tod=dtype_tod, ofmt="", Nsplits=Nsplits, singlestream=singlestream, ext="fits.gz") @@ -407,31 +440,44 @@ def setup_demod_map(noise_model, shape=None, wcs=None, nside=None, singlestream=singlestream) return mapmaker -def write_demod_maps(prefix, data, split_labels=None): +def atomic_db_aux(atomic_db, info, valid = True): + info.valid = valid + engine = create_engine("sqlite:///%s" % atomic_db, echo=False) + Base.metadata.create_all(bind=engine) + Session = sessionmaker(bind=engine) + with Session() as session: + session.add(info) + try: + session.commit() + except exc.IntegrityError: + session.rollback() + +def write_demod_maps(prefix, data, info, split_labels=None, atomic_db=None): """ Write maps from data into files """ Nsplits = len(split_labels) for n_split in range(Nsplits): + if np.all(data.wmap[n_split] == 0.0): + if atomic_db is not None: + atomic_db_aux(atomic_db, info[n_split], valid=False) + continue data.signal.write(prefix, "%s_wmap"%split_labels[n_split], data.wmap[n_split]) data.signal.write(prefix, "%s_weights"%split_labels[n_split], data.weights[n_split]) data.signal.write(prefix, "%s_hits"%split_labels[n_split], data.signal.hits[n_split]) - -def write_demod_info(oname, info, split_labels=None): - putils.mkdir(os.path.dirname(oname)) - Nsplits = len(split_labels) - for n_split in range(Nsplits): - bunch.write(oname+'_%s_info.hdf'%split_labels[n_split], info[n_split]) + if atomic_db is not None: + atomic_db_aux(atomic_db, info[n_split], valid=True) def make_demod_map(context, obslist, noise_model, info, preprocess_config, prefix, shape=None, wcs=None, nside=None, comm=mpi.COMM_WORLD, comps="TQU", t0=0, dtype_tod=np.float32, dtype_map=np.float32, tag="", verbose=0, split_labels=None, L=None, - site='so_sat3', recenter=None, singlestream=False): + site='so_sat3', recenter=None, singlestream=False, + atomic_db=None): """ Make a demodulated map from the list of observations in obslist. @@ -484,6 +530,8 @@ def make_demod_map(context, obslist, noise_model, info, If True, do not perform demodulated filter+bin mapmaking but rather regular filter+bin mapmaking, i.e. map from obs.signal rather than from obs.dsT, obs.demodQ, obs.demodU. + atomic_db : str, optional + Path to the atomic map data base. Maps created will be added to it. Returns ------- @@ -527,7 +575,7 @@ def make_demod_map(context, obslist, noise_model, info, overwrite=False) errors.append(error) ; outputs.append((output_init, output_proc)) ; if error not in [None,'load_success']: - L.info('tod %s:%s:%s failed in the prepoc database'%(obs_id,detset,band)) + L.info('tod %s:%s:%s failed in the preproc database'%(obs_id,detset,band)) continue obs.wrap("weather", np.full(1, "toco")) obs.wrap("site", np.full(1, site)) @@ -551,12 +599,11 @@ def make_demod_map(context, obslist, noise_model, info, div = np.moveaxis(div, -1, 0) # this moves the last axis to the 0th position weights.append(div) mapdata = bunch.Bunch(wmap=wmap, weights=weights, signal=mapmaker.signals[0], t0=t0) - info = add_weights_to_info(info, weights, split_labels) # output to files - write_demod_maps(prefix, mapdata, split_labels=split_labels, ) - write_demod_info(prefix, info, split_labels=split_labels ) + write_demod_maps(prefix, mapdata, info, split_labels=split_labels, atomic_db=atomic_db) + return errors, outputs def add_weights_to_info(info, weights, split_labels): @@ -572,9 +619,9 @@ def add_weights_to_info(info, weights, split_labels): sumweights = np.sum(mean_qu[positive]) meanweights = np.mean(mean_qu[positive]) medianweights = np.median(mean_qu[positive]) - sub_info['total_weight_qu'] = sumweights - sub_info['mean_weight_qu'] = meanweights - sub_info['median_weight_qu'] = medianweights + sub_info.total_weight_qu = sumweights + sub_info.mean_weight_qu = meanweights + sub_info.median_weight_qu = medianweights info[isplit] = sub_info return info @@ -672,4 +719,4 @@ def project_all_single(pmap, Nd, det_weights, comps, wrapper=lambda x:x): pmap.to_weights(dest=div, comps=comps, det_weights=det_weights) div = wrapper(div) hits = wrapper(pmap.to_map(signal=np.ones_like(Nd))) - return rhs, div, hits + return rhs, div, hits \ No newline at end of file diff --git a/sotodlib/mapmaking/noise_model.py b/sotodlib/mapmaking/noise_model.py index a78b2c2aa..2356a788c 100644 --- a/sotodlib/mapmaking/noise_model.py +++ b/sotodlib/mapmaking/noise_model.py @@ -263,6 +263,8 @@ def __init__(self, window=2, ivar=None, nwin=None): def build(self, tod, srate, **kwargs): #ndet, nsamps = tod.shape nwin = utils.nint(self.window*srate) + if np.any(np.logical_not(np.isfinite(tod))): + raise ValueError(f"There is a nan when calculating the white noise !!!") ivar = 1.0/np.var(tod, 1) return NmatWhite(ivar=ivar, window=self.window, nwin=nwin) def apply(self, tod, inplace=True): diff --git a/sotodlib/mapmaking/utilities.py b/sotodlib/mapmaking/utilities.py index f6c714a7d..f6b7da29b 100644 --- a/sotodlib/mapmaking/utilities.py +++ b/sotodlib/mapmaking/utilities.py @@ -1,7 +1,6 @@ -from typing import Any, Union - +from typing import Optional, Any, Union +from sqlalchemy.orm import declarative_base, Mapped, mapped_column import importlib - import numpy as np import so3g from pixell import enmap, fft, resample, tilemap, utils @@ -546,3 +545,44 @@ def import_optional(module_name): return module except: return None + +Base = declarative_base() + +class AtomicInfo(Base): + __tablename__ = "atomic" + + obs_id: Mapped[str] = mapped_column(primary_key=True) + telescope: Mapped[str] = mapped_column(primary_key=True) + freq_channel: Mapped[str] = mapped_column(primary_key=True) + wafer: Mapped[str] = mapped_column(primary_key=True) + ctime: Mapped[int] = mapped_column(primary_key=True) + split_label: Mapped[str] = mapped_column(primary_key=True) + valid: Mapped[Optional[bool]] + split_detail: Mapped[Optional[str]] + prefix_path: Mapped[Optional[str]] + elevation: Mapped[Optional[float]] + azimuth: Mapped[Optional[float]] + pwv: Mapped[Optional[float]] + total_weight_qu: Mapped[Optional[float]] + mean_weight_qu: Mapped[Optional[float]] + median_weight_qu: Mapped[Optional[float]] + leakage_avg: Mapped[Optional[float]] + noise_avg: Mapped[Optional[float]] + ampl_2f_avg: Mapped[Optional[float]] + gain_avg: Mapped[Optional[float]] + tau_avg: Mapped[Optional[float]] + f_hwp: Mapped[Optional[float]] + roll_angle: Mapped[Optional[float]] + scan_speed: Mapped[Optional[float]] + sun_distance: Mapped[Optional[float]] + + def __init__(self, obs_id, telescope, freq_channel, wafer, ctime, split_label): + self.obs_id = obs_id + self.telescope = telescope + self.freq_channel = freq_channel + self.wafer = wafer + self.ctime = ctime + self.split_label = split_label + + def __repr__(self): + return f"({self.obs_id},{self.telescope},{self.freq_channel},{self.wafer},{self.ctime},{self.split_label})" diff --git a/sotodlib/site_pipeline/make_atomic_filterbin_map.py b/sotodlib/site_pipeline/make_atomic_filterbin_map.py index 193c6e7ff..27c83b48f 100644 --- a/sotodlib/site_pipeline/make_atomic_filterbin_map.py +++ b/sotodlib/site_pipeline/make_atomic_filterbin_map.py @@ -2,21 +2,26 @@ from typing import Optional from dataclasses import dataclass import time +import datetime as dt import warnings import os import logging import yaml import multiprocessing import traceback -import sqlite3 +import ephem + +from sqlalchemy import create_engine, select +from sqlalchemy.orm import sessionmaker + import numpy as np -import so3g import sotodlib.site_pipeline.util as util from sotodlib import coords, mapmaking from sotodlib.core import Context from sotodlib.io import hk_utils from sotodlib.preprocess import preprocess_util -from pixell import enmap, utils as putils, bunch +from so3g.proj import coords as so3g_coords +from pixell import enmap, utils as putils from pixell import wcsutils, colors, memory, mpi from concurrent.futures import ProcessPoolExecutor, as_completed @@ -34,7 +39,8 @@ class Cfg: If 2 files, representing 2 layers of preprocessing, they should be separated by a comma. area: str - WCS kernel for rectangular pixels + WCS kernel for rectangular pixels. Filename to map/geometry + or valid string for coords.get_wcs_kernel. nside: int Nside for HEALPIX pixels query: str @@ -163,15 +169,13 @@ def from_yaml(cls, path) -> "Cfg": d = yaml.safe_load(f) return cls(**d) - class DataMissing(Exception): pass - -def get_pwv(obs, data_dir): +def get_pwv(start_time, stop_time, data_dir): try: - pwv_info = hk_utils.get_detcosamp_hkaman( - obs, alias=['pwv'], + pwv_info = hk_utils.get_hkaman( + float(start_time), float(stop_time), alias=['pwv'], fields=['site.env-radiometer-class.feeds.pwvs.pwv'], data_dir=data_dir) pwv_all = pwv_info['env-radiometer-class']['env-radiometer-class'][0] @@ -180,39 +184,12 @@ def get_pwv(obs, data_dir): pwv = 0.0 return pwv - -def read_tods(context, obslist, - dtype_tod=np.float32, only_hits=False, site='so_sat3', - l2_data=None): - context = Context(context) - # this function will run on multiprocessing and can be returned in any - # random order we will also return the obslist to keep track of the order - my_tods = [] - pwvs = [] - ind = 0 - obs_id, detset, band, obs_ind = obslist[ind] - meta = context.get_meta( - obs_id, dets={"wafer_slot": detset, "wafer.bandpass": band}) - tod = context.get_obs(meta, no_signal=True) - to_remove = [] - for field in tod._fields: - if field not in ['obs_info', 'flags', 'signal', 'focal_plane', 'timestamps', 'boresight']: - to_remove.append(field) - for field in to_remove: - tod.move(field, None) - tod.flags.wrap( - 'glitch_flags', so3g.proj.RangesMatrix.zeros(tod.shape[:2]), - [(0, 'dets'), (1, 'samps')]) - my_tods.append(tod) - - tod_temp = tod.restrict('dets', meta.dets.vals[:1], in_place=False) - if l2_data is not None: - pwvs.append(get_pwv(tod_temp, data_dir=l2_data)) - else: - pwvs.append(np.nan) - del tod_temp - return bunch.Bunch(obslist=obslist, my_tods=my_tods, pwvs=pwvs) - +def get_sun_distance(site, ctime, az, el): + site_ = so3g_coords.SITES[site].ephem_observer() + dtime = dt.datetime.fromtimestamp(ctime, dt.timezone.utc) + site_.date = ephem.Date(dtime) + sun = ephem.Sun(site_) + return np.degrees(ephem.separation((sun.az, sun.alt), (np.radians(az), np.radians(el)))) class ColoredFormatter(logging.Formatter): def __init__(self, msg, colors={'DEBUG':colors.reset, @@ -275,9 +252,33 @@ def main(config_file: str) -> None: comm = mpi.FAKE_WORLD # Fake communicator since we won't use MPI verbose = args.verbose - args.quiet - if args.area is not None: - shape, wcs = enmap.read_map_geometry(args.area) - wcs = wcsutils.WCS(wcs.to_header()) + + recenter = None + if args.center_at: + recenter = mapmaking.parse_recentering(args.center_at) + + if args.area is not None: # Set shape, wcs for rectpix + try: + # Load shape, wcs from a map or geometry + shape, wcs = enmap.read_map_geometry(args.area) + wcs = wcsutils.WCS(wcs.to_header()) + except FileNotFoundError: + # See if area is a wcs_kernel string + try: + shape = None + wcs = coords.get_wcs_kernel(args.area) + except ValueError: + L.error("'area' not a valid filename or wcs_kernel string") + exit(1) + if recenter is not None: + # wcs_kernel string not allowed for recenter=True + if shape is None: + L.error("'area' must be a map geometry file, not wcs_kernel string, to use recenter") + exit(1) + else: + # If not recenter, we set shape=None so get_footprint will be called + shape = None + elif args.nside is not None: pass # here I will map in healpix else: @@ -287,9 +288,6 @@ def main(config_file: str) -> None: noise_model = mapmaking.NmatWhite() putils.mkdir(args.odir) - recenter = None - if args.center_at: - recenter = mapmaking.parse_recentering(args.center_at) preprocess_config_str = [s.strip() for s in args.preprocess_config.split(",")] preprocess_config = [] ; errlog = [] for preproc_cf in preprocess_config_str: @@ -308,8 +306,7 @@ def main(config_file: str) -> None: warnings.warn("You are using single precision for maps, we advice to use double precision") context_obj = Context(args.context) - # obslists is a dict, obskeys is a list, periods is an array, only rank 0 - # will do this and broadcast to others. + # obslists is a dict, obskeys is a list, periods is an array try: obslists, obskeys, periods, obs_infos = mapmaking.build_obslists( context_obj, args.query, nset=args.nset, wafer=args.wafer, @@ -318,8 +315,7 @@ def main(config_file: str) -> None: except mapmaking.NoTODFound as err: L.exception(err) exit(1) - L.info(f'Done with build_obslists, running {len(obslists)} maps') - cwd = os.getcwd() + L.info(f'Running {len(obslists)} maps after build_obslists') split_labels = [] if args.all_splits: @@ -343,22 +339,23 @@ def main(config_file: str) -> None: # if we do we will not run them again. if isinstance(args.atomic_db, str): if os.path.isfile(args.atomic_db) and not args.only_hits: - # open the connector, in reading mode only - conn = sqlite3.connect(args.atomic_db) - cursor = conn.cursor() + engine = create_engine("sqlite:///%s" % args.atomic_db, echo=False) + Session = sessionmaker(bind=engine) + session = Session() + keys_to_remove = [] # Now we have obslists and splits ready, we look through the database # to remove the maps we already have from it for key, value in obslists.items(): missing_split = False for split_label in split_labels: - query_ = 'SELECT * from atomic where obs_id="%s" and\ - telescope="%s" and freq_channel="%s" and wafer="%s" and\ - split_label="%s"' % ( - value[0][0], obs_infos[value[0][3]].telescope, key[2], - key[1], split_label) - res = cursor.execute(query_) - matches = res.fetchall() + query_ = select(mapmaking.AtomicInfo).filter_by( + obs_id=value[0][0], + telescope=obs_infos[value[0][3]].telescope, + freq_channel=key[2], + wafer=key[1], + split_label=split_label) + matches = session.execute(query_).scalars().all() if len(matches) == 0: # this means one of the requested splits is missing # in the data base @@ -371,97 +368,68 @@ def main(config_file: str) -> None: for key in keys_to_remove: obskeys.remove(key) del obslists[key] - conn.close() # I close since I only wanted to read + engine.dispose() obslists_arr = [item for key, item in obslists.items()] - tod_list = [] # this list will receive the outputs from read_tods - L.info('Starting with read_tods') - with ProcessPoolExecutor(args.nproc) as exe: - futures = [exe.submit( - read_tods, args.context, obslist, dtype_tod=args.dtype_tod, - only_hits=args.only_hits, l2_data=args.hk_data_path, - site=args.site) - for obslist in obslists_arr] - for future in as_completed(futures): - try: - tod_list.append(future.result()) - except Exception as e: - # if read_tods fails for some reason we log into the first preproc DB - future_write_to_log(e, errlog[0]) - continue - futures.remove(future) - # flatten the list of lists - L.info('Done with read_tods') - - my_tods = [bb.my_tods for bb in tod_list] - if args.area is not None: - subgeoms = [] - for obs in my_tods: - if recenter is None: - subshape, subwcs = coords.get_footprint(obs[0], wcs) - subgeoms.append((subshape, subwcs)) - else: - subshape = shape - subwcs = wcs - subgeoms.append((subshape, subwcs)) + L.info(f'Running {len(obslists_arr)} maps after removing duplicate maps') # clean up lingering files from previous incomplete runs + if len(preprocess_config)==1: + policy_dir_init = os.path.join(os.path.dirname(preprocess_config[0]['archive']['policy']['filename']), 'temp') + else: + policy_dir_init = os.path.join(os.path.dirname(preprocess_config[0]['archive']['policy']['filename']), 'temp') + policy_dir_proc = os.path.join(os.path.dirname(preprocess_config[1]['archive']['policy']['filename']), 'temp_proc') for obs in obslists_arr: obs_id = obs[0][0] if len(preprocess_config)==1: - preprocess_util.save_group_and_cleanup(obs_id, preprocess_config[0], - subdir='temp', remove=False) + preprocess_util.cleanup_obs(obs_id, policy_dir_init, errlog[0], preprocess_config[0], subdir='temp', remove=False) else: - preprocess_util.save_group_and_cleanup(obs_id, preprocess_config[0], - subdir='temp', remove=False) - preprocess_util.save_group_and_cleanup(obs_id, preprocess_config[1], - subdir='temp_proc', remove=False) - + preprocess_util.cleanup_obs(obs_id, policy_dir_init, errlog[0], preprocess_config[0], subdir='temp', remove=False) + preprocess_util.cleanup_obs(obs_id, policy_dir_proc, errlog[1], preprocess_config[1], subdir='temp_proc', remove=False) run_list = [] - for oi in range(len(my_tods)): - # tod_list[oi].obslist[0] is the old obslist - pid = tod_list[oi].obslist[0][3] - detset = tod_list[oi].obslist[0][1] - band = tod_list[oi].obslist[0][2] - obslist = tod_list[oi].obslist + for oi, ol in enumerate(obslists_arr): + pid = ol[0][3] + detset = ol[0][1] + band = ol[0][2] + obslist = ol t = putils.floor(periods[pid, 0]) t5 = ("%05d" % t)[:5] prefix = "%s/%s/atomic_%010d_%s_%s" % ( args.odir, t5, t, detset, band) - if args.area is not None: - subshape, subwcs = subgeoms[oi] tag = "%5d/%d" % (oi+1, len(obskeys)) putils.mkdir(os.path.dirname(prefix)) - pwv_atomic = tod_list[oi].pwvs[0] + pwv_atomic = get_pwv(periods[pid, 0], periods[pid, 1], args.hk_data_path) + # Save file for data base of atomic maps. # We will write an individual file, # another script will loop over those files # and write into sqlite data base if not args.only_hits: - info = [] + info_list = [] for split_label in split_labels: - info.append(bunch.Bunch( - pid=pid, - obs_id=obslist[0][0].encode(), - telescope=obs_infos[obslist[0][3]].telescope.encode(), - freq_channel=band.encode(), - wafer=detset.encode(), + info = mapmaking.AtomicInfo( + obs_id=obslist[0][0], + telescope=obs_infos[obslist[0][3]].telescope, + freq_channel=band, + wafer=detset, ctime=int(t), - split_label=split_label.encode(), - split_detail=''.encode(), - prefix_path=str(cwd + '/' + prefix + '_%s' % - split_label).encode(), - elevation=obs_infos[obslist[0][3]].el_center, - azimuth=obs_infos[obslist[0][3]].az_center, - pwv=float(pwv_atomic))) + split_label=split_label + ) + info.split_detail = '' + info.prefix_path = str(prefix + '_%s' % split_label) + info.elevation = obs_infos[obslist[0][3]].el_center + info.azimuth = obs_infos[obslist[0][3]].az_center + info.pwv = float(pwv_atomic) + info.roll_angle = obs_infos[obslist[0][3]].roll_center + info.sun_distance = get_sun_distance(args.site, int(t), obs_infos[obslist[0][3]].az_center, obs_infos[obslist[0][3]].el_center) + info_list.append(info) # inputs that are unique per atomic map go into run_list if args.area is not None: - run_list.append([obslist, subshape, subwcs, info, prefix, t]) + run_list.append([obslist, shape, wcs, info_list, prefix, t]) elif args.nside is not None: - run_list.append([obslist, None, None, info, prefix, t]) - # Done with creating run_list + run_list.append([obslist, None, None, info_list, prefix, t]) with ProcessPoolExecutor(args.nproc) as exe: futures = [exe.submit( @@ -476,7 +444,8 @@ def main(config_file: str) -> None: verbose=verbose, split_labels=split_labels, singlestream=args.singlestream, - site=args.site) for r in run_list] + site=args.site, + atomic_db=args.atomic_db) for r in run_list] for future in as_completed(futures): L.info('New future as_completed result') try: @@ -487,8 +456,8 @@ def main(config_file: str) -> None: futures.remove(future) for ii in range(len(errors)): for idx_prepoc in range(len(preprocess_config)): - preprocess_util.cleanup_mandb(errors[ii], outputs[ii][idx_prepoc], - preprocess_config[idx_prepoc], L) + if isinstance(outputs[ii][idx_prepoc], dict): + preprocess_util.cleanup_mandb(errors[ii], outputs[ii][idx_prepoc], preprocess_config[idx_prepoc], L) L.info("Done") return True diff --git a/tests/test_demod_mapmaker.py b/tests/test_demod_mapmaker.py index 2bc91c9e5..32b02a7de 100644 --- a/tests/test_demod_mapmaker.py +++ b/tests/test_demod_mapmaker.py @@ -32,6 +32,10 @@ def test_make_map_rectpix(self): imap = make_map(obs, shape=shape, wcs=wcs) check_map(imap, Q_stream, U_stream, TOL=1e-9) + # Try calling get_footprint + imap = make_map(obs, shape=None, wcs=wcs) + check_map(imap, Q_stream, U_stream, TOL=1e-9) + def make_map(obs, nside=None, nside_tile=None, shape=None, wcs=None, comps='TQU'): if nside is not None: