Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import glob
import itertools
import os.path
import tempfile
import unittest

Expand All @@ -34,12 +32,14 @@
import lsst.utils
import lsst.utils.tests

from lsst.resources import ResourcePath


class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
"""Tests of the self-consistency of our pipeline definitions.
"""
def setUp(self):
self.path = os.path.join(lsst.utils.getPackageDir("ap_pipe"), "pipelines")
self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True)
# Each pipeline file should have a subset that represents it in
# higher-level pipelines.
self.synonyms = {"ApPipe.yaml": "apPipe",
Expand All @@ -55,30 +55,38 @@ def test_graph_build(self):
"""Test that each pipeline definition file can be
used to build a graph.
"""
files = glob.glob(os.path.join(self.path, "**", "*.yaml"))
files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
for file in files:
if "QuickTemplate" in file:
print("GRAPH BUILD FILE: ", file)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this bare print shouldn't be here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Debug left over.

if "QuickTemplate" in file.path:
# Our QuickTemplate definition cannot be tested here because it
# depends on drp_tasks, which we cannot make a dependency here.
continue
if "PromptTemplate" in file:
if "PromptTemplate" in file.path:
# Our PromptTemplate definition cannot be tested here because it
# depends on drp_tasks, which we cannot make a dependency here.
continue
with self.subTest(file):
with self.subTest(file=str(file)):
pipeline = lsst.pipe.base.Pipeline.from_uri(file)
pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
# If this fails, it will produce a useful error message.
pipeline.to_graph()

def test_datasets(self):
files = glob.glob(os.path.join(self.path, "_ingredients", "*.yaml"))
files = [
f for f in ResourcePath.findFileResources(
[self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$"
)
# Validation currently broken for injection pipelines.
# TODO: DM-54077
if "injection/" not in f.path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we do want to validate these pipelines, though the "WithFakes" filter on line 94/101 clearly needs to be updated.

Copy link
Member Author

@timj timj Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They weren't being validated before and I don't know how any of this works to know how to fix the validation. The change triggered here is that the test is finding things in subdirectories whereas before it was only looking in a single directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd call it a bug in #223. I can ticket it, but please mark it as TODO instead making it sound as if SI pipelines are exempt from validation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Will do. Can add the ticket number once you've filed it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

]
for file in files:
if "QuickTemplate" in file:
if "QuickTemplate" in file.path:
# Our QuickTemplate definition cannot be tested here because it
# depends on drp_tasks, which we cannot make a dependency here.
continue
with self.subTest(file):
with self.subTest(file=str(file)):
expected_inputs = {
# ISR
"raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
Expand All @@ -91,7 +99,7 @@ def test_datasets(self):
"skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
"template_coadd", "pretrainedModelPackage", "dia_source_apdb"
}
if "WithFakes" in file:
if "WithFakes" in file.path:
expected_inputs.add("injection_catalog")
tester = PipelineStepTester(
filename=file,
Expand All @@ -115,38 +123,48 @@ def test_whole_subset(self):
"""Test that each pipeline's synonymous subset includes all tasks,
including those imported from other files.
"""
files = glob.glob(os.path.join(self.path, "**", "*.yaml"))
files = [
f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
# Validation currently broken for injection pipelines.
# TODO: DM-54077
if "injection/" not in f.path
]
for file in files:
if "QuickTemplate" in file:
if "QuickTemplate" in file.path:
# Our QuickTemplate definition cannot be tested here because it
# depends on drp_tasks, which we cannot make a dependency here.
continue
elif "ApdbDeduplication" in file:
elif "ApdbDeduplication" in file.path:
# The task to export catalogs from the APDB and re-run
# association is not intended to be part of Prompt Processing
# or batch AP pipeline runs.
continue
elif "PromptTemplate" in file:
elif "PromptTemplate" in file.path:
# Our PromptTemplate definition cannot be tested here because it
# depends on drp_tasks, which we cannot make a dependency here.
continue
with self.subTest(file):
with self.subTest(file=str(file)):
pipeline = lsst.pipe.base.Pipeline.from_uri(file)
subset = self.synonyms[os.path.basename(file)]
self.assertEqual(pipeline.subsets[subset], set(pipeline.task_labels),
msg=f"These tasks are missing from subset '{subset}'.")
subset = self.synonyms.get(file.basename(), "<unknown_synonym>")
self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels),
msg=f"These tasks are missing from subset '{subset}'")

def test_ap_pipe_subsets(self):
"""Test the unique subsets of ApPipe.
"""
files = glob.glob(os.path.join(self.path, "**", "ApPipe*.yaml"))
files = [
f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$")
# Validation currently broken for injection pipelines.
# TODO: DM-54077
if "injection/" not in f.path
]
required_subsets = {"preload", "prompt", "afterburner"}
# getRegionTimeFromVisit is part of no subset besides apPipe. This is a
# very deliberate exception; see RFC-997.
no_subset_wanted = {"getRegionTimeFromVisit"}

for file in files:
with self.subTest(file):
with self.subTest(file=str(file)):
pipeline = lsst.pipe.base.Pipeline.from_uri(file)
# Do all steps exist?
self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
Expand All @@ -171,12 +189,14 @@ def test_inherited_subsets(self):

Note that this does not check inheritance *within* `_ingredients`!
"""
files = [f for f in glob.glob(os.path.join(self.path, "**", "*.yaml"))
if "_ingredients" not in f]
files = [
f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
if "_ingredients" not in f.path
]
for file in files:
with self.subTest(file):
generic = os.path.join(self.path, "_ingredients", os.path.basename(file))
if not os.path.exists(generic):
with self.subTest(file=str(file)):
generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename())
if not generic.exists():
continue
special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
Expand Down