-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-46586: Modify embargo-butler auto-ingest to handle photodiode files #60
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,9 +146,12 @@ def __init__(self, path): | |
self.instrument = f"{csc}/{generator}" | ||
elif len(components) == 6: | ||
self.bucket, self.instrument, year, month, day, self.filename = components | ||
elif len(components) == 5: # photodiode data | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. This really says that the Camera is not following salobj precedent in generating its LFA data. I guess it may be too late to get that changed. |
||
self.bucket, self.instrument, data_type, self.obs_day, self.filename = components | ||
else: | ||
raise ValueError(f"Unrecognized number of components: {len(components)}") | ||
self.obs_day = f"{year}{month}{day}" | ||
if not self.obs_day: | ||
self.obs_day = f"{year}{month}{day}" | ||
except Exception: | ||
logger.exception("Unable to parse: %s", path) | ||
raise |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,13 +24,16 @@ | |
""" | ||
import json | ||
import os | ||
import re | ||
import socket | ||
import time | ||
|
||
import astropy.io.fits | ||
import requests | ||
from lsst.daf.butler import Butler | ||
from lsst.pipe.base import Instrument | ||
from lsst.obs.base import DefineVisitsTask, RawIngestTask | ||
from lsst.obs.lsst import PhotodiodeIngestTask | ||
from lsst.resources import ResourcePath | ||
|
||
from info import Info | ||
|
@@ -200,7 +203,18 @@ def main(): | |
on_metadata_failure=on_metadata_failure, | ||
) | ||
|
||
if not is_lfa: | ||
if is_lfa: | ||
# LSSTCam photodiode is copy mode only. | ||
instrument = Instrument.from_string("LSSTCam", butler.registry) | ||
lsstcam_photodiode_ingester = PhotodiodeIngestTask( | ||
config=PhotodiodeIngestTask.ConfigClass(), | ||
butler=butler, | ||
instrument=instrument, | ||
on_success=on_success, | ||
on_ingest_failure=on_ingest_failure, | ||
on_metadata_failure=on_metadata_failure, | ||
) | ||
else: | ||
define_visits_config = DefineVisitsTask.ConfigClass() | ||
define_visits_config.groupExposures = "one-to-one" | ||
visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler) | ||
|
@@ -220,6 +234,13 @@ def main(): | |
|
||
logger.info("Ingesting %s", resources) | ||
refs = None | ||
if is_lfa: | ||
resources_photodiode = [ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Internet says you could write this as:
Even the fully explicit:
might be clearer. |
||
resource | ||
for resource in resources | ||
if re.search(r"MTCamera/photodiode.*_photodiode.ecsv", resource) | ||
] | ||
resources = [resource for resource in resources if resource not in resources_photodiode] | ||
try: | ||
refs = ingester.run(resources) | ||
except Exception: | ||
|
@@ -233,6 +254,22 @@ def main(): | |
info = Info.from_path(resource.geturl()) | ||
r.lrem(worker_queue, 0, info.path) | ||
|
||
if is_lfa and resources_photodiode: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is the same structure as the previous section, it might be nice to abstract that out into a function, in case we have more ingesters later. |
||
try: | ||
refs = lsstcam_photodiode_ingester.run(resources_photodiode) | ||
except Exception: | ||
logger.exception( | ||
"Error while ingesting %s, retrying one by one", resources_photodiode | ||
) | ||
refs = [] | ||
for resource in resources_photodiode: | ||
try: | ||
refs.extend(lsstcam_photodiode_ingester.run([resource])) | ||
except Exception: | ||
logger.exception("Error while ingesting %s", resource) | ||
info = Info.from_path(resource.geturl()) | ||
r.lrem(worker_queue, 0, info.path) | ||
|
||
# Define visits if we ingested anything | ||
if not is_lfa and refs: | ||
ids = [ref.dataId for ref in refs] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)