Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT demod mapmaker updates #1044

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ec92b5d
load_book: remove unnecessary obsfiledb queries
mhasself Nov 21, 2024
5f1bc11
load_book: set up automatic copy-file-to-fast-disk behavior
mhasself Nov 21, 2024
7f39b19
Add script for making atomic map db
earosenberg Nov 27, 2024
4991096
Remove RA_ref from atomic db
earosenberg Nov 27, 2024
fbb8b81
Merge remote-tracking branch 'origin/mhasself/speed-reading' into sat…
chervias Nov 27, 2024
49a80b4
adding make_atomic_db to cli
chervias Nov 28, 2024
8200026
Implementing argparse
chervias Nov 28, 2024
1f367f3
fixing a conflict
chervias Dec 17, 2024
977da9f
fixing a conflict
chervias Dec 17, 2024
55b4441
Fixing the differences with master in load_book
chervias Dec 18, 2024
a08fc6f
Switching the atomic database to sqlalchemy, also now it is appended …
Dec 18, 2024
c61b714
quick fix
Dec 18, 2024
fdcc782
Remove extra get_obs from make_atomic_filterbin_map
earosenberg Dec 18, 2024
6b94d7b
demod_mapmaker fixes
earosenberg Dec 18, 2024
c82fee5
Fixing the path of files in the atomic DB
chervias Dec 19, 2024
274c8d0
Fixing the pwv function
chervias Dec 19, 2024
8225858
removing the atomic db script since it is not needed anymore
chervias Dec 19, 2024
33b691e
Minor fix to AtomicInfo
chervias Dec 20, 2024
65d9d57
Merge remote-tracking branch 'origin/master' into sat-mapmaker-update…
chervias Jan 3, 2025
2bae2cb
Changing the cleanup made before starting to catch unfinished preproc…
chervias Jan 3, 2025
dddc038
Implementing f_hwp, roll_angle, scan_speed into the atomic db
chervias Jan 4, 2025
7609aa3
Merge remote-tracking branch 'origin/master' into sat-mapmaker-update…
chervias Jan 8, 2025
6c7e1f3
Implementing check when the cuts passed to pointing matrix remove all…
chervias Jan 9, 2025
dc956ff
Implementing a valid column in the atomic database. Valid=False will …
chervias Jan 10, 2025
7eb2de7
Implementing try catch for commiting repeated entries into the atomic…
chervias Jan 10, 2025
3459766
Implementing the sun distance into the atomic database
chervias Jan 11, 2025
d329eb2
Merge remote-tracking branch 'origin/master' into sat-mapmaker-update…
chervias Jan 11, 2025
2c61ae9
Fixing a bug on the cleanup_obs block
chervias Jan 13, 2025
085d3e2
fixing a merge conflict
chervias Jan 16, 2025
135b892
Fixing a bug on the cleanup_mandb block
chervias Jan 16, 2025
a0dde7f
fixing the mapmaking/utilities file
chervias Jan 17, 2025
73053c4
Merge remote-tracking branch 'origin/master' into sat-mapmaker-update…
chervias Jan 21, 2025
c0c2f07
Adding a raise error when calculating the white noise and there is a …
chervias Jan 21, 2025
042c492
Casting the timestreams to dtype_tod before mapmaking. We need to do …
chervias Jan 22, 2025
d7e501a
Fixing some bugs from the previous commit
chervias Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 96 additions & 49 deletions sotodlib/mapmaking/demod_mapmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import numpy as np, os
from pixell import enmap, utils as putils, tilemap, bunch, mpi
import so3g.proj
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker

from .. import core
from .. import coords
from .utilities import recentering_to_quat_lonlat, evaluate_recentering, MultiZipper, unarr, safe_invert_div
from .utilities import import_optional
from .utilities import import_optional, Base
from .noise_model import NmatWhite

hp = import_optional('healpy')
Expand Down Expand Up @@ -146,7 +148,7 @@ def write (self, prefix, tag, x): pass

class DemodSignalMap(DemodSignal):
def __init__(self, shape=None, wcs=None, comm=None, comps="TQU", name="sky", ofmt="{name}", output=True,
ext="fits", dtype=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False,
ext="fits", dtype_map=np.float64, dtype_tod=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False,
nside=None, nside_tile=None):
"""
Parent constructor; use for_rectpix or for_healpix instead. Args described there.
Expand All @@ -156,40 +158,40 @@ def __init__(self, shape=None, wcs=None, comm=None, comps="TQU", name="sky", ofm
self.comps = comps
self.sys = sys
self.recenter = recenter
self.dtype = dtype
self.dtype_map = dtype_map
self.dtype_tod = dtype_tod
self.tile_shape = tile_shape
self.tiled = tiled
self.data = {}
self.Nsplits = Nsplits
self.singlestream = singlestream
self.wrapper = lambda x : x
self.wcs = wcs
ncomp = len(comps)

self.pix_scheme = "rectpix" if (shape is not None) else "healpix"
self.pix_scheme = "rectpix" if (wcs is not None) else "healpix"
if self.pix_scheme == "healpix":
self.tiled = (nside_tile is not None)
self.hp_geom = coords.healpix_utils.get_geometry(nside, nside_tile, ordering='NEST')
npix = 12 * nside**2
self.rhs = np.zeros((Nsplits, ncomp, npix), dtype=dtype)
self.div = np.zeros((Nsplits, ncomp, ncomp, npix), dtype=dtype)
self.hits = np.zeros((Nsplits, npix), dtype=dtype)
self.rhs = np.zeros((Nsplits, ncomp, npix), dtype=dtype_map)
self.div = np.zeros((Nsplits, ncomp, ncomp, npix), dtype=dtype_map)
self.hits = np.zeros((Nsplits, npix), dtype=dtype_map)

if self.tiled:
self.wrapper = coords.healpix_utils.tiled_to_full
else:
shape = tuple(shape[-2:])
if tiled:
geo = tilemap.geometry(shape, wcs, tile_shape=tile_shape)
self.rhs = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,)), dtype=dtype)
self.div = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,ncomp)), dtype=dtype)
self.hits= tilemap.zeros(geo.copy(pre=(Nsplits,)), dtype=dtype)
if shape is None:
# We will set shape, wcs from wcs_kernel on loading the first obs
self.rhs = None
self.div = None
self.hits = None
else:
self.rhs = enmap.zeros((Nsplits, ncomp) +shape, wcs, dtype=dtype)
self.div = enmap.zeros((Nsplits,ncomp,ncomp)+shape, wcs, dtype=dtype)
self.hits= enmap.zeros((Nsplits,)+shape, wcs, dtype=dtype)
self.init_maps_rectpix(shape, wcs)

@classmethod
def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", output=True,
ext="fits", dtype=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False):
ext="fits", dtype_map=np.float64, dtype_tod=np.float32, sys=None, recenter=None, tile_shape=(500,500), tiled=False, Nsplits=1, singlestream=False):
"""
Signal describing a non-distributed sky map. Signal describing a sky map in the coordinate
system given by "sys", which defaults to equatorial coordinates. If tiled==True, then this
Expand All @@ -198,9 +200,9 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o
Arguments
---------
shape : numpy.ndarray
Shape of the output map geometry
Shape of the output map geometry. If None, computed from coords.get_footprint on first add_obs.
wcs : wcs
WCS of the output map geometry
WCS of the output map geometry (or wcs kernel).
comm : MPI.comm
MPI communicator
comps : str, optional
Expand Down Expand Up @@ -234,12 +236,12 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o

"""
return cls(shape=shape, wcs=wcs, comm=comm, comps=comps, name=name, ofmt=ofmt, output=output,
ext=ext, dtype=dtype, sys=sys, recenter=recenter, tile_shape=tile_shape, tiled=tiled,
ext=ext, dtype_map=dtype_map, dtype_tod=dtype_tod, sys=sys, recenter=recenter, tile_shape=tile_shape, tiled=tiled,
Nsplits=Nsplits, singlestream=singlestream, nside=None, nside_tile=None)

@classmethod
def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{name}", output=True,
ext="fits.gz", dtype=np.float32, Nsplits=1, singlestream=False):
ext="fits.gz", dtype_map=np.float64, dtype_tod=np.float32, Nsplits=1, singlestream=False):
"""
Signal describing a sky map in healpix pixelization, NEST ordering.

Expand All @@ -262,7 +264,7 @@ def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{nam
The extension used for the files.
May be 'fits', 'fits.gz', 'h5', 'h5py', or 'npy'.
dtype : numpy.dtype
The data type to use for the time-ordered data.
The data type to use for the maps.
Nsplits : int, optional
Number of splits that you will map simultaneously. By default is 1 when no
splits are requested.
Expand All @@ -272,7 +274,7 @@ def for_healpix(cls, nside, nside_tile=None, comps="TQU", name="sky", ofmt="{nam
obs.demodQ, obs.demodU
"""
return cls(shape=None, wcs=None, comm=None, comps=comps, name=name, ofmt=ofmt, output=output,
ext=ext, dtype=dtype, sys=None, recenter=None, tile_shape=None, tiled=False,
ext=ext, dtype_map=dtype_map, dtype_tod=dtype_tod, sys=None, recenter=None, tile_shape=None, tiled=False,
Nsplits=Nsplits, singlestream=singlestream, nside=nside, nside_tile=nside_tile)

def add_obs(self, id, obs, nmat, Nd, pmap=None, split_labels=None):
Expand All @@ -297,16 +299,27 @@ def add_obs(self, id, obs, nmat, Nd, pmap=None, split_labels=None):
cuts = obs.flags.glitch_flags + ~obs.preprocess.split_flags.cuts[split_labels[n_split]]
if self.pix_scheme == "rectpix":
threads='domdir'
geom = self.rhs.geometry
if self.rhs is None: # Still need to initialize the geometry
geom = None
wcs_kernel = self.wcs
else:
geom = self.rhs.geometry
wcs_kernel = None
else:
threads = ["tiles", "simple"][self.hp_geom.nside_tile is None]
geom = self.hp_geom
pmap_local = coords.pmat.P.for_tod(obs, comps=self.comps, geom=geom, rot=rot, threads=threads, weather=unarr(obs.weather), site=unarr(obs.site), cuts=cuts, hwp=True)
wcs_kernel = None
pmap_local = coords.pmat.P.for_tod(obs, comps=self.comps, geom=geom, rot=rot, wcs_kernel=wcs_kernel, threads=threads, weather=unarr(obs.weather), site=unarr(obs.site), cuts=cuts, hwp=True)
else:
pmap_local = pmap

if self.rhs is None: # Set the geometry now from the pmat
shape, wcs = pmap_local.geom
self.init_maps_rectpix(shape, wcs)
self.wcs = wcs

if not(self.singlestream):
obs_rhs, obs_div, obs_hits = project_all_demod(pmap=pmap_local, signalT=obs.dsT, signalQ=obs.demodQ, signalU=obs.demodU,
obs_rhs, obs_div, obs_hits = project_all_demod(pmap=pmap_local, signalT=obs.dsT.astype(self.dtype_tod), signalQ=obs.demodQ.astype(self.dtype_tod), signalU=obs.demodU.astype(self.dtype_tod),
det_weightsT=2*nmat.ivar, det_weightsQU=nmat.ivar, ncomp=self.ncomp, wrapper=self.wrapper)
else:
obs_rhs, obs_div, obs_hits = project_all_single(pmap=pmap_local, Nd=Nd, det_weights=nmat.ivar, comps='TQU', wrapper=self.wrapper)
Expand Down Expand Up @@ -363,40 +376,60 @@ def write(self, prefix, tag, m):
raise ImportError("Cannot save healpix map as fits; healpy could not be imported. Install healpy or save as npy or h5")
if m.ndim > 2:
m = np.reshape(m, (np.prod(m.shape[:-1]), m.shape[-1]), order='C') # Flatten wrapping axes; healpy.write_map can't handle >2d array
hp.write_map(oname, m.view(self.dtype), nest=(self.hp_geom.ordering=='NEST'), overwrite=True)
hp.write_map(oname, m.view(self.dtype_map), nest=(self.hp_geom.ordering=='NEST'), overwrite=True)
elif self.ext == "npy":
np.save(oname, {'nside': self.hp_geom.nside, 'ordering':self.hp_geom.ordering, 'data':m.view(self.dtype)}, allow_pickle=True)
np.save(oname, {'nside': self.hp_geom.nside, 'ordering':self.hp_geom.ordering, 'data':m.view(self.dtype_map)}, allow_pickle=True)
elif self.ext in ['h5', 'hdf5']:
if h5py is None:
raise ValueError("Cannot save healpix map as hdf5; h5py could not be imported. Install h5py or save as npy or fits")
with h5py.File(oname, 'w') as f:
dset = f.create_dataset("data", m.shape, dtype=self.dtype, data=m)
dset = f.create_dataset("data", m.shape, dtype=self.dtype_map, data=m)
dset.attrs['ordering'] = self.hp_geom.ordering
dset.attrs['nside'] = self.hp_geom.nside
else:
raise ValueError(f"Unknown extension {self.ext}")

return oname

def init_maps_rectpix(self, shape, wcs):
""" Initialize tilemaps or enmaps rhs, div, hits for given shape and wcs"""
shape = tuple(shape[-2:])
Nsplits, ncomp, dtype = self.Nsplits, self.ncomp, self.dtype_map

if self.tiled:
geo = tilemap.geometry(shape, wcs, tile_shape=self.tile_shape)
rhs = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,)), dtype=self.dtype_map)
div = tilemap.zeros(geo.copy(pre=(Nsplits,ncomp,ncomp)), dtype=self.dtype_map)
hits= tilemap.zeros(geo.copy(pre=(Nsplits,)), dtype=self.dtype_map)
else:
rhs = enmap.zeros((Nsplits, ncomp) +shape, wcs, dtype=self.dtype_map)
div = enmap.zeros((Nsplits,ncomp,ncomp)+shape, wcs, dtype=self.dtype_map)
hits= enmap.zeros((Nsplits,)+shape, wcs, dtype=self.dtype_map)
self.rhs = rhs
self.div = div
self.hits = hits
return rhs, div, hits


def setup_demod_map(noise_model, shape=None, wcs=None, nside=None,
comm=mpi.COMM_WORLD, comps='TQU', split_labels=None,
singlestream=False, dtype_tod=np.float32,
dtype_map=np.float32, recenter=None, verbose=0):
dtype_map=np.float64, recenter=None, verbose=0):
"""
Setup the classes for demod mapmaking and return
a DemodMapmmaker object
"""
if shape is not None and wcs is not None:
if wcs is not None:
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps,
dtype=dtype_map, tiled=False,
dtype_map=dtype_map, dtype_tod=dtype_tod, tiled=False,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
recenter=recenter)
elif nside is not None:
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto',
comps=comps, dtype=dtype_map,
comps=comps, dtype_map=dtype_map, dtype_tod=dtype_tod,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
ext="fits.gz")
Expand All @@ -407,31 +440,44 @@ def setup_demod_map(noise_model, shape=None, wcs=None, nside=None,
singlestream=singlestream)
return mapmaker

def write_demod_maps(prefix, data, split_labels=None):
def atomic_db_aux(atomic_db, info, valid = True):
info.valid = valid
engine = create_engine("sqlite:///%s" % atomic_db, echo=False)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
with Session() as session:
session.add(info)
try:
session.commit()
except exc.IntegrityError:
session.rollback()

def write_demod_maps(prefix, data, info, split_labels=None, atomic_db=None):
"""
Write maps from data into files
"""
Nsplits = len(split_labels)
for n_split in range(Nsplits):
if np.all(data.wmap[n_split] == 0.0):
if atomic_db is not None:
atomic_db_aux(atomic_db, info[n_split], valid=False)
continue
data.signal.write(prefix, "%s_wmap"%split_labels[n_split],
data.wmap[n_split])
data.signal.write(prefix, "%s_weights"%split_labels[n_split],
data.weights[n_split])
data.signal.write(prefix, "%s_hits"%split_labels[n_split],
data.signal.hits[n_split])

def write_demod_info(oname, info, split_labels=None):
putils.mkdir(os.path.dirname(oname))
Nsplits = len(split_labels)
for n_split in range(Nsplits):
bunch.write(oname+'_%s_info.hdf'%split_labels[n_split], info[n_split])
if atomic_db is not None:
atomic_db_aux(atomic_db, info[n_split], valid=True)

def make_demod_map(context, obslist, noise_model, info,
preprocess_config, prefix, shape=None, wcs=None,
nside=None, comm=mpi.COMM_WORLD, comps="TQU", t0=0,
dtype_tod=np.float32, dtype_map=np.float32,
tag="", verbose=0, split_labels=None, L=None,
site='so_sat3', recenter=None, singlestream=False):
site='so_sat3', recenter=None, singlestream=False,
atomic_db=None):
"""
Make a demodulated map from the list of observations in obslist.

Expand Down Expand Up @@ -484,6 +530,8 @@ def make_demod_map(context, obslist, noise_model, info,
If True, do not perform demodulated filter+bin mapmaking but
rather regular filter+bin mapmaking, i.e. map from obs.signal
rather than from obs.dsT, obs.demodQ, obs.demodU.
atomic_db : str, optional
Path to the atomic map data base. Maps created will be added to it.

Returns
-------
Expand Down Expand Up @@ -527,7 +575,7 @@ def make_demod_map(context, obslist, noise_model, info,
overwrite=False)
errors.append(error) ; outputs.append((output_init, output_proc)) ;
if error not in [None,'load_success']:
L.info('tod %s:%s:%s failed in the prepoc database'%(obs_id,detset,band))
L.info('tod %s:%s:%s failed in the preproc database'%(obs_id,detset,band))
continue
obs.wrap("weather", np.full(1, "toco"))
obs.wrap("site", np.full(1, site))
Expand All @@ -551,12 +599,11 @@ def make_demod_map(context, obslist, noise_model, info,
div = np.moveaxis(div, -1, 0) # this moves the last axis to the 0th position
weights.append(div)
mapdata = bunch.Bunch(wmap=wmap, weights=weights, signal=mapmaker.signals[0], t0=t0)

info = add_weights_to_info(info, weights, split_labels)

# output to files
write_demod_maps(prefix, mapdata, split_labels=split_labels, )
write_demod_info(prefix, info, split_labels=split_labels )
write_demod_maps(prefix, mapdata, info, split_labels=split_labels, atomic_db=atomic_db)

return errors, outputs

def add_weights_to_info(info, weights, split_labels):
Expand All @@ -572,9 +619,9 @@ def add_weights_to_info(info, weights, split_labels):
sumweights = np.sum(mean_qu[positive])
meanweights = np.mean(mean_qu[positive])
medianweights = np.median(mean_qu[positive])
sub_info['total_weight_qu'] = sumweights
sub_info['mean_weight_qu'] = meanweights
sub_info['median_weight_qu'] = medianweights
sub_info.total_weight_qu = sumweights
sub_info.mean_weight_qu = meanweights
sub_info.median_weight_qu = medianweights
info[isplit] = sub_info
return info

Expand Down Expand Up @@ -672,4 +719,4 @@ def project_all_single(pmap, Nd, det_weights, comps, wrapper=lambda x:x):
pmap.to_weights(dest=div, comps=comps, det_weights=det_weights)
div = wrapper(div)
hits = wrapper(pmap.to_map(signal=np.ones_like(Nd)))
return rhs, div, hits
return rhs, div, hits
2 changes: 2 additions & 0 deletions sotodlib/mapmaking/noise_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def __init__(self, window=2, ivar=None, nwin=None):
def build(self, tod, srate, **kwargs):
#ndet, nsamps = tod.shape
nwin = utils.nint(self.window*srate)
if np.any(np.logical_not(np.isfinite(tod))):
raise ValueError(f"There is a nan when calculating the white noise !!!")
ivar = 1.0/np.var(tod, 1)
return NmatWhite(ivar=ivar, window=self.window, nwin=nwin)
def apply(self, tod, inplace=True):
Expand Down
Loading
Loading