Skip to content

Commit

Permalink
Merge pull request #329 from lsst-sqre/tickets/45522/fix-metrics-time…
Browse files Browse the repository at this point in the history
…delta

`timedelta` can now be used a member of a union field in an EventPayload
  • Loading branch information
fajpunk authored Nov 19, 2024
2 parents 0cc2c9a + 3d35cee commit dbd2236
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 9 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20241119_132629_danfuchs_fix_metrics_timedelta.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Bug fixes

- `timedelta` can now be a member of a union field in a `safir.metrics.EventPayload`
2 changes: 1 addition & 1 deletion docs/user-guide/metrics/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ We can do this all in an :file:`events.py` file.
class Events(EventMaker):
async def initialize(manager: EventManager) -> None:
async def initialize(self, manager: EventManager) -> None:
self.query = await manager.create_publisher("query", QueryEvent)
Expand Down
2 changes: 1 addition & 1 deletion safir/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"aiokafka>=0.11,<1",
"click<9",
"cryptography<44",
"dataclasses-avroschema>=0.65,<1",
"dataclasses-avroschema>=0.65.3,<1",
"fastapi<1",
"faststream>0.5,<0.6",
"gidgethub<6",
Expand Down
32 changes: 27 additions & 5 deletions safir/src/safir/metrics/_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Models for representing metrics events."""

from typing import Any

from dataclasses_avroschema.pydantic import AvroBaseModel
from pydantic import UUID4, AwareDatetime, Field, create_model

Expand Down Expand Up @@ -84,17 +86,17 @@ def validate_structure(cls) -> None:

# Unions are represented by a list
if isinstance(field_type, list):
if not all(subtype in valids for subtype in field_type):
if not all(
cls._extract_type(subtype) in valids
for subtype in field_type
):
errors.append(
f"{name}\n is a union with a type that is"
f" unsupported by InfluxDB: {field_type}"
)
continue

# Some complex types like enums are represented by a dict, not a
# string.
if isinstance(field_type, dict):
field_type = field["type"]["type"]
field_type = cls._extract_type(field_type)

if field_type not in valids:
errors.append(
Expand All @@ -109,3 +111,23 @@ def validate_structure(cls) -> None:
f" {valids}"
)
raise ValueError("\n".join(errors))

@classmethod
def _extract_type(cls, spec: str | dict[str, dict[str, Any]]) -> str:
"""Extract avro base type from container type.
Raises
------
ValueError
If a base type can not be extracted
"""
# Some complex types like enums are represented by a dict, not a
# string.
avro_type: str | dict = spec
if isinstance(spec, dict):
avro_type = spec["type"]
if not isinstance(avro_type, str):
# Ignore suggestion to make this a type error to keep consistent
# with the ValueError in the calling method.
raise ValueError(f"{spec} is not a supported Avro type") # noqa: TRY004
return avro_type
11 changes: 9 additions & 2 deletions safir/tests/metrics/event_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MyEvent(EventPayload):

foo: str
duration: timedelta
duration_union: timedelta | None


class Events(EventMaker):
Expand Down Expand Up @@ -133,12 +134,14 @@ async def integration_test(
MyEvent(
foo="bar1",
duration=timedelta(seconds=2, milliseconds=123),
duration_union=None,
)
)
published = await event.publish(
MyEvent(
foo="bar2",
duration=timedelta(seconds=2, milliseconds=456),
duration_union=None,
)
)
schema = published.avro_schema_to_python()
Expand Down Expand Up @@ -325,6 +328,10 @@ async def test_disable() -> None:
await manager.initialize()
event = await manager.create_publisher("myevent", MyEvent)

await event.publish(MyEvent(foo="bar1", duration=timedelta(seconds=1)))
await event.publish(MyEvent(foo="bar2", duration=timedelta(seconds=1)))
await event.publish(
MyEvent(foo="bar1", duration=timedelta(seconds=1), duration_union=None)
)
await event.publish(
MyEvent(foo="bar2", duration=timedelta(seconds=1), duration_union=None)
)
await manager.aclose()

0 comments on commit dbd2236

Please sign in to comment.