Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion python/lsst/ip/diffim/detectAndMeasure.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import numpy as np
import requests
import os

import lsst.afw.detection as afwDetection
import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
import lsst.afw.table as afwTable
import lsst.daf.base as dafBase
import lsst.geom
from lsst.ip.diffim.utils import evaluateMaskFraction, computeDifferenceImageMetrics
from lsst.ip.diffim.utils import (evaluateMaskFraction, computeDifferenceImageMetrics,
populate_sattle_visit_cache)
from lsst.meas.algorithms import SkyObjectsTask, SourceDetectionTask, SetPrimaryFlagsTask, MaskStreaksTask
from lsst.meas.base import ForcedMeasurementTask, ApplyApCorrTask, DetectorVisitIdGeneratorConfig
import lsst.meas.deblender
Expand Down Expand Up @@ -303,6 +306,19 @@ class DetectAndMeasureConfig(pipeBase.PipelineTaskConfig,
default=True,
doc="Raise an algorithm error if no diaSources are detected.",
)
run_sattle = pexConfig.Field(
dtype=bool,
default=False,
doc="If true, dia source bounding boxes will be sent for verification"
"to the sattle service."
)
sattle_historical = pexConfig.Field(
dtype=bool,
default=False,
doc="If re-running a pipeline that requires sattle, this should be set "
"to True. This will populate sattle's cache with the historic data "
"closest in time to the exposure."
)
idGenerator = DetectorVisitIdGeneratorConfig.make_field()

def setDefaults(self):
Expand Down Expand Up @@ -375,6 +391,15 @@ def setDefaults(self):
"STREAK", "INJECTED", "INJECTED_TEMPLATE"]
self.skySources.avoidMask = ["DETECTED", "DETECTED_NEGATIVE", "BAD", "NO_DATA", "EDGE"]

def validate(self):
super().validate()

if self.run_sattle:
if not os.getenv("SATTLE_URI_BASE"):
raise pexConfig.FieldValidationError(DetectAndMeasureConfig.run_sattle, self,
"Sattle requested but SATTLE_URI_BASE "
"environment variable not set.")


class DetectAndMeasureTask(lsst.pipe.base.PipelineTask):
"""Detect and measure sources on a difference image.
Expand Down Expand Up @@ -692,6 +717,9 @@ def processResults(self, science, matchedTemplate, difference, sources, idFactor
self.measureDiaSources(initialDiaSources, science, difference, matchedTemplate)
diaSources = self._removeBadSources(initialDiaSources)

if self.config.run_sattle:
diaSources = self.filterSatellites(diaSources, science)

if self.config.doForcedMeasurement:
self.measureForcedSources(diaSources, science, difference.getWcs())

Expand Down Expand Up @@ -949,6 +977,81 @@ def calculateMetrics(self, science, difference, diaSources, kernelSources):
raise BadSubtractionError(ratio=metrics.differenceFootprintRatioStdev,
threshold=self.config.badSubtractionVariationThreshold)

def getSattleDiaSourceAllowlist(self, diaSources, science):
"""Query the sattle service and determine which diaSources are allowed.

Parameters
----------
diaSources : `lsst.afw.table.SourceCatalog`
The catalog of detected sources.
science : `lsst.afw.image.ExposureF`
Science exposure that was subtracted.

Returns
----------
allow_list : `list` of `int`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest speccing this as a sequence or even a collection instead of specifically a list. Or is there a reason it matters (e.g., the outputs are ordered)?

diaSourceIds of diaSources that can be made public.

Raises
------
requests.HTTPError
Raised if sattle call does not return success.
"""
wcs = science.getWcs()
visit_info = science.getInfo().getVisitInfo()
visit_id = visit_info.getId()
sattle_uri_base = os.getenv('SATTLE_URI_BASE')

dia_sources_json = []
for source in diaSources:
source_bbox = source.getFootprint().getBBox()
corners = wcs.pixelToSky([lsst.geom.Point2D(c) for c in source_bbox.getCorners()])
bbox_radec = [[pt.getRa().asDegrees(), pt.getDec().asDegrees()] for pt in corners]
dia_sources_json.append({"diasource_id": source["id"], "bbox": bbox_radec})

payload = {"visit_id": visit_id, "detector_id": science.getDetector(), "diasources": dia_sources_json,
"historical": self.config.sattle_historical}

sattle_output = requests.put(f'{sattle_uri_base}/diasource_allow_list',
json=payload)

# retry once if visit cache is not populated
if sattle_output.status_code == 404:
self.log.warning(f'Visit {visit_id} not found in sattle cache, re-sending')
populate_sattle_visit_cache(visit_info, historical=self.config.sattle_historical)
sattle_output = requests.put(f'{sattle_uri_base}/diasource_allow_list', json=payload)
Comment on lines +1015 to +1022
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For PP it would be useful to time these operations. Though I'm guessing populate_sattle_visit_cache returns a response before it actually populates the cache, if you benefit from calling it in advance?


sattle_output.raise_for_status()

return sattle_output.json()['allow_list']

def filterSatellites(self, diaSources, science):
"""Remove diaSources overlapping predicted satellite positions.

Parameters
----------
diaSources : `lsst.afw.table.SourceCatalog`
The catalog of detected sources.
science : `lsst.afw.image.ExposureF`
Science exposure that was subtracted.

Returns
----------
filterdDiaSources : `lsst.afw.table.SourceCatalog`
Filtered catalog of diaSources
"""

allow_list = self.getSattleDiaSourceAllowlist(diaSources, science)

if allow_list:
allow_set = set(allow_list)
allowed_ids = [source['id'] in allow_set for source in diaSources]
diaSources = diaSources[np.array(allowed_ids)].copy(deep=True)
else:
self.log.warning('Sattle allowlist is empty, all diaSources removed')
diaSources = diaSources[0:0].copy(deep=True)
return diaSources

def _runStreakMasking(self, difference):
"""Do streak masking and optionally save the resulting streak
fit parameters in a catalog.
Expand Down
39 changes: 39 additions & 0 deletions python/lsst/ip/diffim/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import itertools
import numpy as np
import os
import requests
import lsst.geom as geom
import lsst.afw.detection as afwDetection
import lsst.afw.image as afwImage
Expand Down Expand Up @@ -415,3 +417,40 @@ def footprint_mean(sources, sky=0):
differenceFootprintSkyRatioMean=sky_mean,
differenceFootprintSkyRatioStdev=sky_std,
)


def populate_sattle_visit_cache(visit_info, historical=False):
"""Populate a cache of predicted satellite positions in the sattle service.

Parameters
----------
visit_info: `lsst.afw.table.ExposureRecord.visitInfo`
Visit info for the science exposure being processed.
historical: `bool`
Set to True if observations are older than the current day.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "Load satellite positions older than the current day"?

Why can't Sattle just infer this from the timestamp?


Raises
------
requests.HTTPError
Raised if sattle call does not return success.
"""

visit_mjd = visit_info.getDate().toAstropy().mjd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, can @ktlim confirm that DateTime::get(DateSystem.MJD) is deprecated and Astropy is the preferred approach?


exposure_time_days = visit_info.getExposureTime() / 86400.0
exposure_end_mjd = visit_mjd + exposure_time_days / 2.0
exposure_start_mjd = visit_mjd - exposure_time_days / 2.0

boresight_ra = visit_info.boresightRaDec.getRa().asDegrees()
boresight_dec = visit_info.boresightRaDec.getDec().asDegrees()

r = requests.put(
f'{os.getenv("SATTLE_URI_BASE")}/visit_cache',
json={"visit_id": visit_info.getId(),
"exposure_start_mjd": exposure_start_mjd,
"exposure_end_mjd": exposure_end_mjd,
"boresight_ra": boresight_ra,
"boresight_dec": boresight_dec,
"historical": historical})

r.raise_for_status()
Loading