Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45005: In Prompt Processing, use microservice to check for already-arrived Rubin images #192

Merged
merged 7 commits into from
Aug 30, 2024
Merged
43 changes: 7 additions & 36 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["check_for_snap", "next_visit_handler"]
__all__ = ["next_visit_handler"]

import collections.abc
import json
Expand All @@ -43,7 +43,7 @@
from .middleware_interface import get_central_butler, flush_local_repo, \
make_local_repo, make_local_cache, MiddlewareInterface
from .raw import (
get_prefix_from_snap,
check_for_snap,
is_path_consistent,
get_group_id_from_oid,
)
Expand All @@ -59,6 +59,8 @@
calib_repo = os.environ["CALIB_REPO"]
# S3 Endpoint for Buckets; needed for direct Boto access but not Butler
s3_endpoint = os.environ["S3_ENDPOINT_URL"]
# URI of raw image microservice
raw_microservice = os.environ.get("RAW_MICROSERVICE", "")
# Bucket name (not URI) containing raw images
image_bucket = os.environ["IMAGE_BUCKET"]
# Time to wait after expected script completion for image arrival, in seconds
Expand Down Expand Up @@ -175,40 +177,6 @@ def wrapper(*args, **kwargs):
return decorator


def check_for_snap(
instrument: str, group: int, snap: int, detector: int
) -> str | None:
"""Search for new raw files matching a particular data ID.

The search is performed in the active image bucket.

Parameters
----------
instrument, group, snap, detector
The data ID to search for.

Returns
-------
name : `str` or `None`
The raw's location in the active bucket, or `None` if no file
was found. If multiple files match, this function logs an error
but returns one of the files anyway.
"""
prefix = get_prefix_from_snap(instrument, group, detector, snap)
if not prefix:
return None
_log.debug(f"Checking for '{prefix}'")
response = storage_client.list_objects_v2(Bucket=image_bucket, Prefix=prefix)
if response["KeyCount"] == 0:
return None
elif response["KeyCount"] > 1:
_log.error(
f"Multiple files detected for a single detector/group/snap: '{prefix}'"
)
# Contents only exists if >0 objects found.
return response["Contents"][0]['Key']


def parse_next_visit(http_request):
"""Parse a next_visit event and extract its data.

Expand Down Expand Up @@ -379,6 +347,9 @@ def next_visit_handler() -> tuple[str, int]:
# Check to see if any snaps have already arrived
for snap in range(expected_snaps):
oid = check_for_snap(
storage_client,
image_bucket,
raw_microservice,
expected_visit.instrument,
expected_visit.groupId,
snap,
Expand Down
131 changes: 123 additions & 8 deletions python/activator/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

__all__ = [
"is_path_consistent",
"check_for_snap",
"get_prefix_from_snap",
"get_exp_id_from_oid",
"get_group_id_from_oid",
Expand All @@ -36,16 +37,23 @@
]

import json
import logging
import os
import re
import time
import urllib.parse

import requests

from lsst.obs.lsst import LsstCam, LsstComCam, LsstComCamSim
from lsst.obs.lsst.translators.lsst import LsstBaseTranslator
from lsst.resources import ResourcePath

from .visit import FannedOutVisit

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)

# Format for filenames of LSST camera raws uploaded to image bucket:
# instrument/dayobs/obsid/obsid_Rraft_Ssensor.(fits, fz, fits.gz)
LSST_REGEXP = re.compile(
Expand Down Expand Up @@ -149,36 +157,143 @@ def is_path_consistent(oid: str, visit: FannedOutVisit) -> bool:
return False


def check_for_snap(
client,
bucket: str,
microservice: str,
instrument: str,
group: int,
snap: int,
detector: int,
) -> str | None:
"""Search for new raw files matching a particular data ID.

The search is performed in the active image bucket or in a raw
image microservice, if one is available.

Parameters
----------
client : `S3.Client`
The client object with which to do the search.
bucket : `str`
The name of the bucket in which to search.
microservice : `str`
The URI of an optional microservice to assist the search.
instrument, group, snap, detector
The data ID to search for.

Returns
-------
name : `str` or `None`
The raw's object key within ``bucket``, or `None` if no file
was found. If multiple files match, this function logs an error
but returns one of the files anyway.
"""
if microservice:
try:
return _query_microservice(microservice=microservice,
instrument=instrument,
group=group,
detector=detector,
snap=snap,
)
except RuntimeError:
_log.exception("Could not query microservice, falling back to prefix algorithm.")

prefix = get_prefix_from_snap(instrument, group, detector, snap)
if not prefix:
return None
_log.debug(f"Checking for '{prefix}'")
response = client.list_objects_v2(Bucket=bucket, Prefix=prefix)
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved
if response["KeyCount"] == 0:
return None
elif response["KeyCount"] > 1:
_log.error(
f"Multiple files detected for a single detector/group/snap: '{prefix}'"
)
# Contents only exists if >0 objects found.
return response["Contents"][0]['Key']


def get_prefix_from_snap(
instrument: str, group: str, detector: int, snap: int
) -> str | None:
"""Compute path prefix for a raw image object from a data id.

Parameters
----------
instrument: `str`
instrument : `str`
The name of the instrument taking the image.
group: `str`
group : `str`
The group id from the visit, associating the snaps making up the visit.
detector: `int`
detector : `int`
The integer detector id for the image being sought.
snap: `int`
snap : `int`
The snap number within the group for the visit.

Returns
-------
prefix: `str` or None
prefix : `str` or `None`
The prefix to a path to the corresponding raw image object. If it
can be calculated, then the prefix may be the entire path. If no
prefix can be calculated, None is returned.
prefix can be calculated, `None` is returned.
"""

if instrument not in _LSST_CAMERA_LIST:
return f"{instrument}/{detector}/{group}/{snap}/"
# TODO DM-39022: use a microservice to determine paths for LSST cameras.
return None


def _query_microservice(
microservice: str, instrument: str, group: str, detector: int, snap: int
) -> str | None:
"""Look up a raw image's location from the raw image microservice.

Parameters
----------
microservice : `str`
The URI of the microservice to query.
instrument : `str`
The name of the instrument taking the image.
group : `str`
The group id from the visit, associating the snaps making up the visit.
detector : `int`
The integer detector id for the image being sought.
snap : `int`
The snap number within the group for the visit.

Returns
-------
key : `str` or `None`
The raw's object key within its bucket, or `None` if no image was found.

Raises
------
RuntimeError
Raised if this function could not connect to the microservice, or if the
microservice encountered an error.
"""
detector_name = _DETECTOR_FROM_INT[instrument][detector]
uri = f"{microservice}/{instrument}/{group}/{snap}/{detector_name}"
try:
response = requests.get(uri, timeout=1.0)
response.raise_for_status()
unpacked = response.json()
except requests.Timeout as e:
raise RuntimeError("Timed out connecting to raw microservice.") from e
except requests.RequestException as e:
raise RuntimeError("Could not query raw microservice.") from e

if unpacked["error"]:
raise RuntimeError(f"Raw microservice had an internal error: {unpacked['message']}")
if unpacked["present"]:
# Need to return just the key, without the bucket
path = urllib.parse.urlparse(unpacked["uri"], allow_fragments=False).path
# Valid key does not start with a /
return path.lstrip("/")
else:
return None


def get_exp_id_from_oid(oid: str) -> int:
"""Calculate an exposure id from an image object's pathname.

Expand Down
5 changes: 3 additions & 2 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import astropy.table
import astropy.time
import astropy.units as u
import erfa
import psycopg2

import astro_metadata_translator
Expand Down Expand Up @@ -360,7 +361,7 @@ def test_prep_butler_novalid(self):

with warnings.catch_warnings():
# Avoid "dubious year" warnings from using a 2050 date
warnings.simplefilter("ignore", category=astropy.utils.exceptions.ErfaWarning)
warnings.simplefilter("ignore", category=erfa.ErfaWarning)
with self.assertRaises(_MissingDatasetError), \
unittest.mock.patch("activator.middleware_interface.MiddlewareInterface._run_preprocessing") \
as mock_pre:
Expand Down Expand Up @@ -1020,7 +1021,7 @@ def test_filter_calibs_by_date_never(self):
all_calibs = list(self.central_butler.registry.queryDatasets("cpBias"))
with warnings.catch_warnings():
# Avoid "dubious year" warnings from using a 2050 date
warnings.simplefilter("ignore", category=astropy.utils.exceptions.ErfaWarning)
warnings.simplefilter("ignore", category=erfa.ErfaWarning)
future_calibs = list(_filter_calibs_by_date(
self.central_butler, "DECam/calib", all_calibs,
astropy.time.Time("2050-01-01 00:00:00", scale="utc")
Expand Down
Loading
Loading