diff --git a/README.md b/README.md index 3b69694..2f195ce 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,14 @@ and automatically ingest them into an appropriate Butler repository. Containers ---------- -Three containers are built from this repo: enqueue, ingest, and idle. +Four containers are built from this repo: enqueue, ingest, idle, and presence. Building -------- -Pull requests will build versions of all three containers tagged with the PR's "head ref". -If a tag (which should always be on main and prefixed with "v") is pushed, versions of all three containers with that tag's version number will be built. -Otherwise, merges to main will result in versions of all three containers tagged with "latest". +Pull requests will build versions of all four containers tagged with the PR's "head ref". +If a tag (which should always be on main and prefixed with "v") is pushed, versions of all four containers with that tag's version number will be built. +Otherwise, merges to main will result in versions of all four containers tagged with "latest". If the code in this repo has not changed but a new version of the Science Pipelines stack is needed for the ingest container, the ["On-demand ingest build" workflow](https://github.com/lsst-dm/embargo-butler/actions/workflows/build-manually.yaml) should be executed. It takes a ref — which should be the latest tag, not usually a branch — a rubin-env version, and the Science Pipelines release tag (e.g. `w_2023_41`). diff --git a/src/info.py b/src/info.py index 1867611..567e4a7 100644 --- a/src/info.py +++ b/src/info.py @@ -146,9 +146,12 @@ def __init__(self, path): self.instrument = f"{csc}/{generator}" elif len(components) == 6: self.bucket, self.instrument, year, month, day, self.filename = components + elif len(components) == 5: # photodiode data + self.bucket, self.instrument, data_type, self.obs_day, self.filename = components else: raise ValueError(f"Unrecognized number of components: {len(components)}") - self.obs_day = f"{year}{month}{day}" + if not self.obs_day: + self.obs_day = f"{year}{month}{day}" except Exception: logger.exception("Unable to parse: %s", path) raise diff --git a/src/ingest.py b/src/ingest.py index 912e351..9a026a4 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -24,13 +24,16 @@ """ import json import os +import re import socket import time import astropy.io.fits import requests from lsst.daf.butler import Butler +from lsst.pipe.base import Instrument from lsst.obs.base import DefineVisitsTask, RawIngestTask +from lsst.obs.lsst import PhotodiodeIngestTask from lsst.resources import ResourcePath from info import Info @@ -200,7 +203,18 @@ def main(): on_metadata_failure=on_metadata_failure, ) - if not is_lfa: + if is_lfa: + # LSSTCam photodiode is copy mode only. + instrument = Instrument.from_string("LSSTCam", butler.registry) + lsstcam_photodiode_ingester = PhotodiodeIngestTask( + config=PhotodiodeIngestTask.ConfigClass(), + butler=butler, + instrument=instrument, + on_success=on_success, + on_ingest_failure=on_ingest_failure, + on_metadata_failure=on_metadata_failure, + ) + else: define_visits_config = DefineVisitsTask.ConfigClass() define_visits_config.groupExposures = "one-to-one" visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler) @@ -220,6 +234,13 @@ def main(): logger.info("Ingesting %s", resources) refs = None + if is_lfa: + resources_photodiode = [ + resource + for resource in resources + if re.search(r"MTCamera/photodiode.*_photodiode.ecsv", resource) + ] + resources = [resource for resource in resources if resource not in resources_photodiode] try: refs = ingester.run(resources) except Exception: @@ -233,6 +254,22 @@ def main(): info = Info.from_path(resource.geturl()) r.lrem(worker_queue, 0, info.path) + if is_lfa and resources_photodiode: + try: + refs = lsstcam_photodiode_ingester.run(resources_photodiode) + except Exception: + logger.exception( + "Error while ingesting %s, retrying one by one", resources_photodiode + ) + refs = [] + for resource in resources_photodiode: + try: + refs.extend(lsstcam_photodiode_ingester.run([resource])) + except Exception: + logger.exception("Error while ingesting %s", resource) + info = Info.from_path(resource.geturl()) + r.lrem(worker_queue, 0, info.path) + # Define visits if we ingested anything if not is_lfa and refs: ids = [ref.dataId for ref in refs]