From 7c978ba14328abccc96bde2371be4243e45953fe Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Wed, 5 Mar 2025 14:03:02 -0800 Subject: [PATCH 1/5] Use deferredDataSetHandles for coadds --- python/lsst/ip/diffim/getTemplate.py | 52 ++++++++++++------- tests/test_getTemplate.py | 33 +++++++++---- tests/utils.py | 74 ++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 29 deletions(-) diff --git a/python/lsst/ip/diffim/getTemplate.py b/python/lsst/ip/diffim/getTemplate.py index 1d1d12523..97554a12b 100644 --- a/python/lsst/ip/diffim/getTemplate.py +++ b/python/lsst/ip/diffim/getTemplate.py @@ -134,7 +134,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): results = self.getExposures(coaddExposures, bbox, skymap, wcs) physical_filter = butlerQC.quantum.dataId["physical_filter"] - outputs = self.run(coaddExposures=results.coaddExposures, + outputs = self.run(coaddExposureHandles=results.coaddExposures, bbox=bbox, wcs=wcs, dataIds=results.dataIds, @@ -184,7 +184,8 @@ def getExposures(self, coaddExposureHandles, bbox, skymap, wcs): ``coaddExposures`` Dict of coadd exposures that overlap the projected bbox, indexed on tract id - (`dict` [`int`, `list` [`lsst.afw.image.Exposure`] ]). + (`dict` [`int`, `list` [`lsst.daf.butler.DeferredDatasetHandle` of + `lsst.afw.image.Exposure`] ]). ``dataIds`` Dict of data IDs of the coadd exposures that overlap the projected bbox, indexed on tract id @@ -214,7 +215,7 @@ def getExposures(self, coaddExposureHandles, bbox, skymap, wcs): if patchPolygon.intersection(detectorPolygon): overlappingArea += patchPolygon.intersectionSingle(detectorPolygon).calculateArea() self.log.info("Using template input tract=%s, patch=%s", dataId['tract'], dataId['patch']) - coaddExposures[dataId['tract']].append(coaddRef.get()) + coaddExposures[dataId['tract']].append(coaddRef) dataIds[dataId['tract']].append(dataId) if not overlappingArea: @@ -224,7 +225,7 @@ def getExposures(self, coaddExposureHandles, bbox, skymap, wcs): dataIds=dataIds) @timeMethod - def run(self, *, coaddExposures, bbox, wcs, dataIds, physical_filter): + def run(self, *, coaddExposureHandles, bbox, wcs, dataIds, physical_filter): """Warp coadds from multiple tracts and patches to form a template to subtract from a science image. @@ -237,14 +238,16 @@ def run(self, *, coaddExposures, bbox, wcs, dataIds, physical_filter): Parameters ---------- - coaddExposures : `dict` [`int`, `list` [`lsst.afw.image.Exposure`]] + coaddExposureHandles : `dict` [`int`, `list` of \ + [`lsst.daf.butler.DeferredDatasetHandle` of \ + `lsst.afw.image.Exposure`]] Coadds to be mosaicked, indexed on tract id. bbox : `lsst.geom.Box2I` Template Bounding box of the detector geometry onto which to - resample the ``coaddExposures``. Modified in-place to include the + resample the ``coaddExposureHandles``. Modified in-place to include the template border. wcs : `lsst.afw.geom.SkyWcs` - Template WCS onto which to resample the ``coaddExposures``. + Template WCS onto which to resample the ``coaddExposureHandles``. dataIds : `dict` [`int`, `list` [`lsst.daf.butler.DataCoordinate`]] Record of the tract and patch of each coaddExposure, indexed on tract id. @@ -265,21 +268,23 @@ def run(self, *, coaddExposures, bbox, wcs, dataIds, physical_filter): NoWorkFound If no coadds are found with sufficient un-masked pixels. """ - band, photoCalib = self._checkInputs(dataIds, coaddExposures) + band, photoCalib = self._checkInputs(dataIds, coaddExposureHandles) bbox.grow(self.config.templateBorderSize) warped = {} catalogs = [] - for tract in coaddExposures: - maskedImages, catalog, totalBox = self._makeExposureCatalog(coaddExposures[tract], + for tract in coaddExposureHandles: + maskedImages, catalog, totalBox = self._makeExposureCatalog(coaddExposureHandles[tract], dataIds[tract]) # Combine images from individual patches together. unwarped = self._merge(maskedImages, totalBox, catalog[0].wcs) potentialInput = self.warper.warpExposure(wcs, unwarped, destBBox=bbox) + if not np.any(np.isfinite(potentialInput.image.array)): self.log.info("No overlap from coadds in tract %s; not including in output.", tract) continue + catalogs.append(catalog) warped[tract] = potentialInput warped[tract].setWcs(wcs) @@ -308,7 +313,9 @@ def _checkInputs(dataIds, coaddExposures): ---------- dataIds : `dict` [`int`, `list` [`lsst.daf.butler.DataCoordinate`]] Record of the tract and patch of each coaddExposure. - coaddExposures : `dict` [`int`, `list` [`lsst.afw.image.Exposure`]] + coaddExposures : `dict` [`int`, `list` of \ + [`lsst.daf.butler.DeferredDatasetHandle` of \ + `lsst.afw.image.Exposure`]] Coadds to be mosaicked. Returns @@ -328,19 +335,22 @@ def _checkInputs(dataIds, coaddExposures): if len(bands) > 1: raise RuntimeError(f"GetTemplateTask called with multiple bands: {bands}") band = bands.pop() - photoCalibs = [exposure.photoCalib for exposures in coaddExposures.values() for exposure in exposures] + photoCalibs = [exposure.get(component="photoCalib") + for exposures in coaddExposures.values() + for exposure in exposures] if not all([photoCalibs[0] == x for x in photoCalibs]): msg = f"GetTemplateTask called with exposures with different photoCalibs: {photoCalibs}" raise RuntimeError(msg) photoCalib = photoCalibs[0] return band, photoCalib - def _makeExposureCatalog(self, exposures, dataIds): + def _makeExposureCatalog(self, exposureRefs, dataIds): """Make an exposure catalog for one tract. Parameters ---------- - exposures : `list` [`lsst.afw.image.Exposuref`] + exposureRefs : `list` of [`lsst.daf.butler.DeferredDatasetHandle` of \ + `lsst.afw.image.Exposure`] Exposures to include in the catalog. dataIds : `list` [`lsst.daf.butler.DataCoordinate`] Data ids of each of the included exposures; must have "tract" and @@ -356,17 +366,21 @@ def _makeExposureCatalog(self, exposures, dataIds): The union of the bounding boxes of all the input exposures. """ catalog = afwTable.ExposureCatalog(self.schema) - catalog.reserve(len(exposures)) - images = [exposure.maskedImage for exposure in exposures] + catalog.reserve(len(exposureRefs)) + exposures = (exposureRef.get() for exposureRef in exposureRefs) + images = [] totalBox = geom.Box2I() + for coadd, dataId in zip(exposures, dataIds): - totalBox = totalBox.expandedTo(coadd.getBBox()) + images.append(coadd.maskedImage) + bbox = coadd.getBBox() + totalBox = totalBox.expandedTo(bbox) record = catalog.addNew() record.setPsf(coadd.psf) record.setWcs(coadd.wcs) record.setPhotoCalib(coadd.photoCalib) - record.setBBox(coadd.getBBox()) - record.setValidPolygon(afwGeom.Polygon(geom.Box2D(coadd.getBBox()).getCorners())) + record.setBBox(bbox) + record.setValidPolygon(afwGeom.Polygon(geom.Box2D(bbox).getCorners())) record.set("tract", dataId["tract"]) record.set("patch", dataId["patch"]) # Weight is used by CoaddPsf, but the PSFs from overlapping patches diff --git a/tests/test_getTemplate.py b/tests/test_getTemplate.py index 1668c8b13..20d174ced 100644 --- a/tests/test_getTemplate.py +++ b/tests/test_getTemplate.py @@ -32,9 +32,12 @@ import lsst.ip.diffim import lsst.meas.algorithms import lsst.meas.base.tests +import lsst.pipe.base as pipeBase import lsst.skymap import lsst.utils.tests +from utils import generate_data_id + # Change this to True, `setup display_ds9`, and open ds9 (or use another afw # display backend) to show the tract/patch layouts on the image. debug = False @@ -155,7 +158,16 @@ def _makePatches(self, tract): warpedPsf = lsst.meas.algorithms.WarpedPsf(self.exposure.psf, xyTransform) warped = warper.warpExposure(patch.wcs, self.exposure, destBBox=box) warped.setPsf(warpedPsf) - self.patches[tract.tract_id].append(warped) + dataRef = pipeBase.InMemoryDatasetHandle( + warped, + storageClass="ExposureF", + copy=True, + dataId=generate_data_id( + tract=tract, + patch=patch, + ) + ) + self.patches[tract.tract_id].append(dataRef) self.dataIds[tract.tract_id].append({"tract": tract.tract_id, "patch": patchId, "band": "a"}) @@ -168,7 +180,7 @@ def _checkMetadata(self, template, config, box, wcs, nInputs): self.assertEqual(template.getBBox(), expectedBox) # WCS should match our exposure, not any of the coadd tracts. for tract in self.patches: - self.assertNotEqual(template.wcs, self.patches[tract][0].wcs) + self.assertNotEqual(template.wcs, self.patches[tract][0].get().wcs) self.assertEqual(template.wcs, self.exposure.wcs) self.assertEqual(template.photoCalib, self.exposure.photoCalib) self.assertEqual(template.getXY0(), expectedBox.getMin()) @@ -209,7 +221,7 @@ def testRunOneTractInput(self): task = lsst.ip.diffim.GetTemplateTask() # Restrict to tract 0, since the box fits in just that tract. # Task modifies the input bbox, so pass a copy. - result = task.run(coaddExposures={0: self.patches[0]}, + result = task.run(coaddExposureHandles={0: self.patches[0]}, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds={0: self.dataIds[0]}, @@ -227,7 +239,7 @@ def testRunOneTractMultipleInputs(self): box = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Point2I(180, 180)) task = lsst.ip.diffim.GetTemplateTask() # Task modifies the input bbox, so pass a copy. - result = task.run(coaddExposures=self.patches, + result = task.run(coaddExposureHandles=self.patches, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds=self.dataIds, @@ -243,7 +255,7 @@ def testRunTwoTracts(self): box = lsst.geom.Box2I(lsst.geom.Point2I(200, 200), lsst.geom.Point2I(600, 600)) task = lsst.ip.diffim.GetTemplateTask() # Task modifies the input bbox, so pass a copy. - result = task.run(coaddExposures=self.patches, + result = task.run(coaddExposureHandles=self.patches, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds=self.dataIds, @@ -259,7 +271,7 @@ def testRunNoTemplate(self): box = lsst.geom.Box2I(lsst.geom.Point2I(1200, 1200), lsst.geom.Point2I(1600, 1600)) task = lsst.ip.diffim.GetTemplateTask() with self.assertRaisesRegex(lsst.pipe.base.NoWorkFound, "No patches found"): - task.run(coaddExposures=self.patches, + task.run(coaddExposureHandles=self.patches, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds=self.dataIds, @@ -276,7 +288,7 @@ def testMissingPatches(self): box = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Point2I(180, 180)) task = lsst.ip.diffim.GetTemplateTask() # Task modifies the input bbox, so pass a copy. - result = task.run(coaddExposures=self.patches, + result = task.run(coaddExposureHandles=self.patches, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds=self.dataIds, @@ -295,8 +307,9 @@ def testMissingPatches(self): ) def testNanInputs(self, box=None, nInput=None): """Test that the template has finite values when some of the input pixels have NaN as variance.""" - for tract, patchCoadds in self.patches.items(): - for patchCoadd in patchCoadds: + for tract, patchRefs in self.patches.items(): + for patchRef in patchRefs: + patchCoadd = patchRef.get() bbox = lsst.geom.Box2I() bbox.include(lsst.geom.Point2I(patchCoadd.getBBox().getCenter())) bbox.grow(3) @@ -304,7 +317,7 @@ def testNanInputs(self, box=None, nInput=None): box = lsst.geom.Box2I(lsst.geom.Point2I(200, 200), lsst.geom.Point2I(600, 600)) task = lsst.ip.diffim.GetTemplateTask() - result = task.run(coaddExposures=self.patches, + result = task.run(coaddExposureHandles=self.patches, bbox=lsst.geom.Box2I(box), wcs=self.exposure.wcs, dataIds=self.dataIds, diff --git a/tests/utils.py b/tests/utils.py index 5832c799e..2bb397c90 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -33,6 +33,7 @@ import lsst.afw.image as afwImage import lsst.afw.math as afwMath import lsst.afw.table as afwTable +from lsst.daf.butler import DataCoordinate, DimensionUniverse import lsst.meas.algorithms as measAlg import lsst.meas.base as measBase from lsst.meas.algorithms.testUtils import plantSources @@ -1147,3 +1148,76 @@ class CustomCoaddPsf(measAlg.CoaddPsf): """ def getAveragePosition(self): return geom.Point2D(-10000, -10000) + + +def generate_data_id(*, + tract: int = 9813, + patch: int = 42, + cell_x: int = 4, + cell_y: int = 2, + band: str = "notR", + ) -> DataCoordinate: + """Generate a DataCoordinate instance to use as data_id. + + Modified from ``generate_data_id`` in ``lsst.cell_coadds.test_utils`` + + Parameters + ---------- + tract : `int`, optional + Tract ID for the data_id + patch : `int`, optional + Patch ID for the data_id + cell_x : `int`, optional + X index of the cell this patch corresponds to + cell_y : `int`, optional + Y index of the cell this patch corresponds to + band : `str`, optional + Band for the data_id + + Returns + ------- + data_id : `lsst.daf.butler.DataCoordinate` + An expanded data_id instance. + """ + universe = DimensionUniverse() + + instrument = universe["instrument"] + instrument_record = instrument.RecordClass( + name="DummyCam", + class_name="lsst.obs.base.instrument_tests.DummyCam", + ) + + skymap = universe["skymap"] + skymap_record = skymap.RecordClass(name="test_skymap") + + band_element = universe["band"] + band_record = band_element.RecordClass(name=band) + + physical_filter = universe["physical_filter"] + physical_filter_record = physical_filter.RecordClass(name=band, instrument="test", band=band) + + patch_element = universe["patch"] + patch_record = patch_element.RecordClass( + skymap="test_skymap", tract=tract, patch=patch, cell_x=cell_x, cell_y=cell_y + ) + + # A dictionary with all the relevant records. + record = { + "instrument": instrument_record, + "patch": patch_record, + "tract": 9813, + "band": band_record.name, + "skymap": skymap_record.name, + "physical_filter": physical_filter_record, + } + + # A dictionary with all the relevant recordIds. + record_id = record.copy() + for key in ( + "instrument", + "physical_filter", + ): + record_id[key] = record_id[key].name + + data_id = DataCoordinate.standardize(record_id, universe=universe) + return data_id.expanded(record) From af79f7e4c64bdac06380d0d3131a34cad27670a5 Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Wed, 5 Mar 2025 14:09:12 -0800 Subject: [PATCH 2/5] Avoid creating additional tract-sized arrays --- python/lsst/ip/diffim/getTemplate.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/lsst/ip/diffim/getTemplate.py b/python/lsst/ip/diffim/getTemplate.py index 97554a12b..6815f7658 100644 --- a/python/lsst/ip/diffim/getTemplate.py +++ b/python/lsst/ip/diffim/getTemplate.py @@ -432,16 +432,16 @@ def _merge(maskedImages, bbox, wcs): merged.maskedImage[maskedImage.getBBox()] += maskedImage weights[maskedImage.getBBox()] += weight - inverseWeights = np.zeros_like(weights.array) good = weights.array > 0 - inverseWeights[good] = 1/weights.array[good] - # Cannot use `merged.maskedImage *= inverseWeights` because that + # Cannot use `merged.maskedImage /= weights` because that # operator divides the variance by the weight twice; in this case - # `inverseWeights` are the exact values we want to scale by. - merged.image.array *= inverseWeights - merged.variance.array *= inverseWeights - merged.mask.array |= merged.mask.getPlaneBitMask("NO_DATA") * (inverseWeights == 0) + # `weights` are the exact values we want to scale by. + weights = weights.array[good] + merged.image.array[good] /= weights + merged.variance.array[good] /= weights + + merged.mask.array[~good] |= merged.mask.getPlaneBitMask("NO_DATA") return merged From ed4de0be27c15af125a2d2b895fbaa166c12164b Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Wed, 5 Mar 2025 14:06:50 -0800 Subject: [PATCH 3/5] Use indices instead of full arrays inside getTemplateTask._merge --- python/lsst/ip/diffim/getTemplate.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/lsst/ip/diffim/getTemplate.py b/python/lsst/ip/diffim/getTemplate.py index 6815f7658..f8851ebbc 100644 --- a/python/lsst/ip/diffim/getTemplate.py +++ b/python/lsst/ip/diffim/getTemplate.py @@ -415,8 +415,7 @@ def _merge(maskedImages, bbox, wcs): for maskedImage in maskedImages: # Catch both zero-value and NaN variance plane pixels good = maskedImage.variance.array > 0 - weight = afwImage.ImageF(maskedImage.getBBox()) - weight.array[good] = maskedImage.variance.array[good]**(-0.5) + weight = maskedImage.variance.array[good]**(-0.5) bad = np.isnan(maskedImage.image.array) | ~good # Note that modifying the patch MaskedImage in place is fine; # we're throwing it away at the end anyway. @@ -427,10 +426,12 @@ def _merge(maskedImages, bbox, wcs): # Cannot use `merged.maskedImage *= weight` because that operator # multiplies the variance by the weight twice; in this case # `weight` are the exact values we want to scale by. - maskedImage.image *= weight - maskedImage.variance *= weight + maskedImage.image.array[good] *= weight + maskedImage.variance.array[good] *= weight + weights[maskedImage.getBBox()].array[good] += weight + # Free memory before creating new large arrays + del weight merged.maskedImage[maskedImage.getBBox()] += maskedImage - weights[maskedImage.getBBox()] += weight good = weights.array > 0 From 9498418e4972d8a28c5f26624c7f260a83ce83b7 Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Wed, 5 Mar 2025 17:43:15 -0800 Subject: [PATCH 4/5] Also catch and remove INF variance --- python/lsst/ip/diffim/getTemplate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/ip/diffim/getTemplate.py b/python/lsst/ip/diffim/getTemplate.py index f8851ebbc..ed551add2 100644 --- a/python/lsst/ip/diffim/getTemplate.py +++ b/python/lsst/ip/diffim/getTemplate.py @@ -414,7 +414,7 @@ def _merge(maskedImages, bbox, wcs): weights = afwImage.ImageF(bbox) for maskedImage in maskedImages: # Catch both zero-value and NaN variance plane pixels - good = maskedImage.variance.array > 0 + good = (maskedImage.variance.array > 0) & (np.isfinite(maskedImage.variance.array)) weight = maskedImage.variance.array[good]**(-0.5) bad = np.isnan(maskedImage.image.array) | ~good # Note that modifying the patch MaskedImage in place is fine; From 858b34fa3b3b8f0da52041a030f7d07180ef761b Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Wed, 5 Mar 2025 21:27:35 -0800 Subject: [PATCH 5/5] Delete large arrays once no longer needed to reduce peak memory use --- python/lsst/ip/diffim/getTemplate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/lsst/ip/diffim/getTemplate.py b/python/lsst/ip/diffim/getTemplate.py index ed551add2..1904bc3d7 100644 --- a/python/lsst/ip/diffim/getTemplate.py +++ b/python/lsst/ip/diffim/getTemplate.py @@ -279,8 +279,12 @@ def run(self, *, coaddExposureHandles, bbox, wcs, dataIds, physical_filter): dataIds[tract]) # Combine images from individual patches together. unwarped = self._merge(maskedImages, totalBox, catalog[0].wcs) + # Delete `maskedImages` after combining into one large image to reduce peak memory use + del maskedImages potentialInput = self.warper.warpExposure(wcs, unwarped, destBBox=bbox) + # Delete the single large `unwarped` image after warping to reduce peak memory use + del unwarped if not np.any(np.isfinite(potentialInput.image.array)): self.log.info("No overlap from coadds in tract %s; not including in output.", tract) continue