Skip to content

Commit eb44a8d

Browse files
authored
Improve typing in prefect.server.events (PrefectHQ#16692)
1 parent d069600 commit eb44a8d

File tree

8 files changed

+160
-86
lines changed

8 files changed

+160
-86
lines changed

src/prefect/_internal/retries.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def retry_async_fn(
2929
retry_on_exceptions: tuple[type[Exception], ...] = (Exception,),
3030
operation_name: Optional[str] = None,
3131
) -> Callable[
32-
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, Optional[R]]]
32+
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
3333
]:
3434
"""A decorator for retrying an async function.
3535
@@ -48,9 +48,9 @@ def retry_async_fn(
4848

4949
def decorator(
5050
func: Callable[P, Coroutine[Any, Any, R]],
51-
) -> Callable[P, Coroutine[Any, Any, Optional[R]]]:
51+
) -> Callable[P, Coroutine[Any, Any, R]]:
5252
@wraps(func)
53-
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
53+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
5454
name = operation_name or func.__name__
5555
for attempt in range(max_attempts):
5656
try:
@@ -67,6 +67,9 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
6767
f"Retrying in {delay:.2f} seconds..."
6868
)
6969
await asyncio.sleep(delay)
70+
# Technically unreachable, but this raise helps pyright know that this function
71+
# won't return None.
72+
raise Exception(f"Function {name!r} failed after {max_attempts} attempts")
7073

7174
return wrapper
7275

src/prefect/server/events/schemas/automations.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ class EventTrigger(ResourceTrigger):
317317
@model_validator(mode="before")
318318
@classmethod
319319
def enforce_minimum_within_for_proactive_triggers(
320-
cls, data: Dict[str, Any]
320+
cls, data: Dict[str, Any] | Any
321321
) -> Dict[str, Any]:
322322
if not isinstance(data, dict):
323323
return data
@@ -342,7 +342,7 @@ def enforce_minimum_within_for_proactive_triggers(
342342

343343
return data
344344

345-
def covers(self, event: ReceivedEvent):
345+
def covers(self, event: ReceivedEvent) -> bool:
346346
if not self.covers_resources(event.resource, event.related):
347347
return False
348348

@@ -356,10 +356,10 @@ def immediate(self) -> bool:
356356
"""Does this reactive trigger fire immediately for all events?"""
357357
return self.posture == Posture.Reactive and self.within == timedelta(0)
358358

359-
_event_pattern: Optional[re.Pattern] = PrivateAttr(None)
359+
_event_pattern: Optional[re.Pattern[str]] = PrivateAttr(None)
360360

361361
@property
362-
def event_pattern(self) -> re.Pattern:
362+
def event_pattern(self) -> re.Pattern[str]:
363363
"""A regular expression which may be evaluated against any event string to
364364
determine if this trigger would be interested in the event"""
365365
if self._event_pattern:
@@ -625,13 +625,15 @@ class Firing(PrefectBaseModel):
625625

626626
id: UUID = Field(default_factory=uuid4)
627627

628-
trigger: ServerTriggerTypes = Field(..., description="The trigger that is firing")
628+
trigger: Union[ServerTriggerTypes, CompositeTrigger] = Field(
629+
default=..., description="The trigger that is firing"
630+
)
629631
trigger_states: Set[TriggerState] = Field(
630-
...,
632+
default=...,
631633
description="The state changes represented by this Firing",
632634
)
633635
triggered: DateTime = Field(
634-
...,
636+
default=...,
635637
description=(
636638
"The time at which this trigger fired, which may differ from the "
637639
"occurred time of the associated event (as events processing may always "
@@ -654,16 +656,16 @@ class Firing(PrefectBaseModel):
654656
),
655657
)
656658
triggering_event: Optional[ReceivedEvent] = Field(
657-
None,
659+
default=None,
658660
description=(
659661
"The most recent event associated with this Firing. This may be the "
660662
"event that caused the trigger to fire (for Reactive triggers), or the "
661663
"last event to match the trigger (for Proactive triggers), or the state "
662664
"change event (for a Metric trigger)."
663665
),
664666
)
665-
triggering_value: Any = Field(
666-
None,
667+
triggering_value: Optional[Any] = Field(
668+
default=None,
667669
description=(
668670
"A value associated with this firing of a trigger. Maybe used to "
669671
"convey additional information at the point of firing, like the value of "

src/prefect/server/events/services/actions.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
1+
from __future__ import annotations
2+
13
import asyncio
2-
from typing import Optional
4+
from typing import TYPE_CHECKING, NoReturn
35

46
from prefect.logging import get_logger
57
from prefect.server.events import actions
6-
from prefect.server.utilities.messaging import create_consumer
8+
from prefect.server.utilities.messaging import Consumer, create_consumer
9+
10+
if TYPE_CHECKING:
11+
import logging
712

8-
logger = get_logger(__name__)
13+
logger: "logging.Logger" = get_logger(__name__)
914

1015

1116
class Actions:
1217
"""Runs actions triggered by Automatinos"""
1318

1419
name: str = "Actions"
1520

16-
consumer_task: Optional[asyncio.Task] = None
21+
consumer_task: asyncio.Task[None] | None = None
1722

18-
async def start(self):
23+
async def start(self) -> NoReturn:
1924
assert self.consumer_task is None, "Actions already started"
20-
self.consumer = create_consumer("actions")
25+
self.consumer: Consumer = create_consumer("actions")
2126

2227
async with actions.consumer() as handler:
2328
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
@@ -28,7 +33,7 @@ async def start(self):
2833
except asyncio.CancelledError:
2934
pass
3035

31-
async def stop(self):
36+
async def stop(self) -> None:
3237
assert self.consumer_task is not None, "Actions not started"
3338
self.consumer_task.cancel()
3439
try:

src/prefect/server/events/services/event_logger.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
1+
from __future__ import annotations
2+
13
import asyncio
2-
from typing import Optional
4+
from typing import TYPE_CHECKING, NoReturn
35

46
import pendulum
57
import rich
68

79
from prefect.logging import get_logger
810
from prefect.server.events.schemas.events import ReceivedEvent
9-
from prefect.server.utilities.messaging import Message, create_consumer
11+
from prefect.server.utilities.messaging import Consumer, Message, create_consumer
12+
13+
if TYPE_CHECKING:
14+
import logging
1015

11-
logger = get_logger(__name__)
16+
logger: "logging.Logger" = get_logger(__name__)
1217

1318

1419
class EventLogger:
1520
"""A debugging service that logs events to the console as they arrive."""
1621

1722
name: str = "EventLogger"
1823

19-
consumer_task: Optional[asyncio.Task] = None
24+
consumer_task: asyncio.Task[None] | None = None
2025

21-
async def start(self):
26+
async def start(self) -> NoReturn:
2227
assert self.consumer_task is None, "Logger already started"
23-
self.consumer = create_consumer("events")
28+
self.consumer: Consumer = create_consumer("events")
2429

2530
console = rich.console.Console()
2631

@@ -46,7 +51,7 @@ async def handler(message: Message):
4651
except asyncio.CancelledError:
4752
pass
4853

49-
async def stop(self):
54+
async def stop(self) -> None:
5055
assert self.consumer_task is not None, "Logger not started"
5156
self.consumer_task.cancel()
5257
try:

src/prefect/server/events/services/event_persister.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
storage as fast as it can. Never gets tired.
44
"""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
from contextlib import asynccontextmanager
810
from datetime import timedelta
9-
from typing import AsyncGenerator, List, Optional
11+
from typing import TYPE_CHECKING, AsyncGenerator, List, NoReturn
1012

1113
import pendulum
1214
import sqlalchemy as sa
@@ -15,25 +17,33 @@
1517
from prefect.server.database import provide_database_interface
1618
from prefect.server.events.schemas.events import ReceivedEvent
1719
from prefect.server.events.storage.database import write_events
18-
from prefect.server.utilities.messaging import Message, MessageHandler, create_consumer
20+
from prefect.server.utilities.messaging import (
21+
Consumer,
22+
Message,
23+
MessageHandler,
24+
create_consumer,
25+
)
1926
from prefect.settings import (
2027
PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE,
2128
PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL,
2229
PREFECT_EVENTS_RETENTION_PERIOD,
2330
)
2431

25-
logger = get_logger(__name__)
32+
if TYPE_CHECKING:
33+
import logging
34+
35+
logger: "logging.Logger" = get_logger(__name__)
2636

2737

2838
class EventPersister:
2939
"""A service that persists events to the database as they arrive."""
3040

3141
name: str = "EventLogger"
3242

33-
consumer_task: Optional[asyncio.Task] = None
43+
consumer_task: asyncio.Task[None] | None = None
3444

3545
def __init__(self):
36-
self._started_event: Optional[asyncio.Event] = None
46+
self._started_event: asyncio.Event | None = None
3747

3848
@property
3949
def started_event(self) -> asyncio.Event:
@@ -45,9 +55,9 @@ def started_event(self) -> asyncio.Event:
4555
def started_event(self, value: asyncio.Event) -> None:
4656
self._started_event = value
4757

48-
async def start(self):
58+
async def start(self) -> NoReturn:
4959
assert self.consumer_task is None, "Event persister already started"
50-
self.consumer = create_consumer("events")
60+
self.consumer: Consumer = create_consumer("events")
5161

5262
async with create_handler(
5363
batch_size=PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE.value(),
@@ -64,7 +74,7 @@ async def start(self):
6474
except asyncio.CancelledError:
6575
pass
6676

67-
async def stop(self):
77+
async def stop(self) -> None:
6878
assert self.consumer_task is not None, "Event persister not started"
6979
self.consumer_task.cancel()
7080
try:
Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
1+
from __future__ import annotations
2+
13
import asyncio
2-
from typing import Optional
4+
from typing import TYPE_CHECKING, Any, NoReturn, Optional
35

46
from prefect.logging import get_logger
57
from prefect.server.events import triggers
68
from prefect.server.services.loop_service import LoopService
7-
from prefect.server.utilities.messaging import create_consumer
9+
from prefect.server.utilities.messaging import Consumer, create_consumer
810
from prefect.settings import PREFECT_EVENTS_PROACTIVE_GRANULARITY
911

10-
logger = get_logger(__name__)
12+
if TYPE_CHECKING:
13+
import logging
14+
15+
logger: "logging.Logger" = get_logger(__name__)
1116

1217

1318
class ReactiveTriggers:
1419
"""Runs the reactive triggers consumer"""
1520

1621
name: str = "ReactiveTriggers"
1722

18-
consumer_task: Optional[asyncio.Task] = None
23+
consumer_task: asyncio.Task[None] | None = None
1924

20-
async def start(self):
25+
async def start(self) -> NoReturn:
2126
assert self.consumer_task is None, "Reactive triggers already started"
22-
self.consumer = create_consumer("events")
27+
self.consumer: Consumer = create_consumer("events")
2328

2429
async with triggers.consumer() as handler:
2530
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
@@ -30,7 +35,7 @@ async def start(self):
3035
except asyncio.CancelledError:
3136
pass
3237

33-
async def stop(self):
38+
async def stop(self) -> None:
3439
assert self.consumer_task is not None, "Reactive triggers not started"
3540
self.consumer_task.cancel()
3641
try:
@@ -43,7 +48,7 @@ async def stop(self):
4348

4449

4550
class ProactiveTriggers(LoopService):
46-
def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
51+
def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
4752
super().__init__(
4853
loop_seconds=(
4954
loop_seconds
@@ -52,5 +57,5 @@ def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
5257
**kwargs,
5358
)
5459

55-
async def run_once(self):
60+
async def run_once(self) -> None:
5661
await triggers.evaluate_proactive_triggers()

0 commit comments

Comments
 (0)