Skip to content

Commit a3883e0

Browse files
sheldyggLancetnik
andauthored
Add api for nats message scheduling (#2640)
* Add api for nats scheduled messages * fix NatsPublisherConfig * add test * fix tests for 3.10 * move Schedule to dataclass * move test to func and add marks * add docs * fix docs * add nats server conf * just try * update healthcheck * rm useless config and fix space --------- Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
1 parent cce5bf3 commit a3883e0

File tree

11 files changed

+213
-11
lines changed

11 files changed

+213
-11
lines changed

.github/workflows/pr_tests.yaml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,18 @@ jobs:
363363
- test-basic
364364
- test-nats-smoke
365365
runs-on: ubuntu-latest
366-
services:
367-
nats:
368-
image: diementros/nats:js
369-
ports:
370-
- 4222:4222
371366
steps:
367+
- name: Start NATS with JetStream
368+
run: |
369+
docker run -d \
370+
--name nats \
371+
-p 4222:4222 \
372+
nats:2.12.1 -js
373+
374+
# Wait for NATS to be ready
375+
timeout 30 bash -c 'until docker logs nats 2>&1 | grep -q "Server is ready"; do sleep 0.5; done'
376+
echo "NATS is ready!"
377+
docker logs nats
372378
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
373379
with:
374380
persist-credentials: false
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
---
2+
# 0.5 - API
3+
# 2 - Release
4+
# 3 - Contributing
5+
# 5 - Template Page
6+
# 10 - Default
7+
search:
8+
boost: 10
9+
---
10+
11+
# Scheduling messages
12+
13+
JetStream supports scheduling messages to be delivered at a specific time in the future. This is useful for delayed task execution, reminder systems, or any scenario where you need to defer message processing.
14+
15+
## Enabling Message Scheduling
16+
17+
To use message scheduling, you need to create a JetStream with the `allow_msg_schedules=True` parameter:
18+
19+
```python
20+
from faststream import FastStream
21+
from faststream.nats import NatsBroker, JStream, NatsMessage, Schedule
22+
23+
broker = NatsBroker()
24+
25+
@broker.subscriber(
26+
"test_stream.*",
27+
stream=JStream("test_stream", allow_msg_schedules=True)
28+
)
29+
async def handle_scheduled_message(msg: NatsMessage) -> None:
30+
# Process the scheduled message when it arrives
31+
print(f"Received scheduled message: {msg}")
32+
```
33+
34+
## Publishing Scheduled Messages
35+
36+
To schedule a message for future delivery, use the `Schedule` object when publishing:
37+
38+
```python
39+
from datetime import UTC, datetime, timedelta
40+
from uuid import uuid4
41+
42+
async def publish_scheduled_message() -> None:
43+
# Connect to the broker
44+
await broker.connect()
45+
46+
# Calculate the delivery time (e.g., 3 seconds from now)
47+
current_time = datetime.now(tz=UTC)
48+
schedule_time = current_time + timedelta(seconds=3)
49+
50+
# Define the target subject for the scheduled message
51+
schedule_target = f"test_stream.{uuid4()}"
52+
53+
# Publish the message with a schedule
54+
await broker.publish(
55+
message={"type": "do_something"},
56+
subject="test_stream.subject",
57+
schedule=Schedule(schedule_time, schedule_target),
58+
stream="test_stream",
59+
timeout=10,
60+
)
61+
```
62+
63+
## Complete Example
64+
65+
Here's a full working example that demonstrates scheduled message publishing:
66+
67+
```python
68+
from datetime import UTC, datetime, timedelta
69+
from uuid import uuid4
70+
71+
from faststream import FastStream
72+
from faststream.nats import JStream, NatsBroker, NatsMessage, Schedule
73+
74+
broker = NatsBroker()
75+
76+
77+
@broker.subscriber(
78+
"test_stream.*",
79+
stream=JStream("test_stream", allow_msg_schedules=True)
80+
)
81+
async def handle_scheduled_message(msg: NatsMessage) -> None:
82+
print(f"Message received at {datetime.now(tz=UTC)}")
83+
print(msg)
84+
85+
86+
async def on_startup() -> None:
87+
current_time = datetime.now(tz=UTC)
88+
schedule_time = current_time + timedelta(seconds=3)
89+
await broker.connect()
90+
91+
schedule_target = f"test_stream.{uuid4()}"
92+
await broker.publish(
93+
message={"type": "do_something"},
94+
subject="test_stream.subject",
95+
schedule=Schedule(schedule_time, schedule_target),
96+
stream="test_stream",
97+
timeout=10,
98+
)
99+
print(f"Message scheduled for delivery at {schedule_time}")
100+
101+
102+
app = FastStream(broker)
103+
app.on_startup(on_startup)
104+
105+
if __name__ == "__main__":
106+
import asyncio
107+
asyncio.run(app.run())
108+
```
109+
110+
## Key Points
111+
112+
- **Stream Configuration**: The JetStream must be created with `allow_msg_schedules=True` to enable scheduling
113+
- **Schedule Object**: Takes two parameters:
114+
- `schedule_time`: A `datetime` object (preferably with UTC timezone) indicating when the message should be delivered
115+
- `schedule_target`: The subject where the scheduled message will be published, should be unique for every message.
116+
- **Subject Pattern**: The subscriber should use a wildcard pattern (e.g., `"test_stream.*"`) to match the scheduled target subjects
117+
- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues

faststream/nats/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .annotations import NatsMessage
2020
from .broker import NatsBroker, NatsPublisher, NatsRoute, NatsRouter
2121
from .response import NatsPublishCommand, NatsResponse
22-
from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub
22+
from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub, Schedule
2323
from .testing import TestNatsBroker
2424

2525
except ImportError as e:
@@ -53,6 +53,7 @@
5353
"RePublish",
5454
"ReplayPolicy",
5555
"RetentionPolicy",
56+
"Schedule",
5657
"StorageType",
5758
"StreamConfig",
5859
"StreamSource",

faststream/nats/broker/broker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
from faststream.nats.configs.broker import JsInitOptions
7171
from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer
7272
from faststream.nats.message import NatsMessage
73-
from faststream.nats.schemas import PubAck
73+
from faststream.nats.schemas import PubAck, Schedule
7474
from faststream.security import BaseSecurity
7575
from faststream.specification.schema.extra import Tag, TagDict
7676

@@ -523,6 +523,7 @@ async def publish(
523523
correlation_id: str | None = None,
524524
stream: None = None,
525525
timeout: float | None = None,
526+
schedule: Optional["Schedule"] = None,
526527
) -> None: ...
527528

528529
@overload
@@ -535,6 +536,7 @@ async def publish(
535536
correlation_id: str | None = None,
536537
stream: str | None = None,
537538
timeout: float | None = None,
539+
schedule: Optional["Schedule"] = None,
538540
) -> "PubAck": ...
539541

540542
@override
@@ -547,6 +549,7 @@ async def publish(
547549
correlation_id: str | None = None,
548550
stream: str | None = None,
549551
timeout: float | None = None,
552+
schedule: Optional["Schedule"] = None,
550553
) -> Optional["PubAck"]:
551554
"""Publish message directly.
552555
@@ -574,6 +577,8 @@ async def publish(
574577
Can be omitted without any effect if you doesn't want PubAck frame.
575578
timeout:
576579
Timeout to send message to NATS.
580+
schedule:
581+
Schedule to publish message at a specific time.
577582
578583
Returns:
579584
`None` if you publishes a regular message.
@@ -588,6 +593,7 @@ async def publish(
588593
stream=stream,
589594
timeout=timeout or 0.5,
590595
_publish_type=PublishType.PUBLISH,
596+
schedule=schedule,
591597
)
592598

593599
result: PubAck | None

faststream/nats/response.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import TYPE_CHECKING, Union
1+
from typing import TYPE_CHECKING, Optional, Union
22

33
from typing_extensions import override
44

@@ -7,6 +7,7 @@
77

88
if TYPE_CHECKING:
99
from faststream._internal.basic_types import SendableMessage
10+
from faststream.nats.schemas.schedule import Schedule
1011

1112

1213
class NatsResponse(Response):
@@ -17,13 +18,15 @@ def __init__(
1718
headers: dict[str, str] | None = None,
1819
correlation_id: str | None = None,
1920
stream: str | None = None,
21+
schedule: Optional["Schedule"] = None,
2022
) -> None:
2123
super().__init__(
2224
body=body,
2325
headers=headers,
2426
correlation_id=correlation_id,
2527
)
2628
self.stream = stream
29+
self.schedule = schedule
2730

2831
@override
2932
def as_publish_command(self) -> "NatsPublishCommand":
@@ -35,6 +38,7 @@ def as_publish_command(self) -> "NatsPublishCommand":
3538
# Nats specific
3639
subject="",
3740
stream=self.stream,
41+
schedule=self.schedule,
3842
)
3943

4044

@@ -49,6 +53,7 @@ def __init__(
4953
reply_to: str = "",
5054
stream: str | None = None,
5155
timeout: float = 0.5,
56+
schedule: Optional["Schedule"] = None,
5257
_publish_type: PublishType,
5358
) -> None:
5459
super().__init__(
@@ -62,6 +67,7 @@ def __init__(
6267

6368
self.stream = stream
6469
self.timeout = timeout
70+
self.schedule = schedule
6571

6672
def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
6773
headers = {}
@@ -72,6 +78,10 @@ def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
7278
if js and self.reply_to:
7379
headers["reply_to"] = self.reply_to
7480

81+
if self.schedule:
82+
headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}"
83+
headers["Nats-Schedule-Target"] = self.schedule.target
84+
7585
return headers | self.headers
7686

7787
@classmethod

faststream/nats/schemas/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
from faststream.nats.schemas.kv_watch import KvWatch
55
from faststream.nats.schemas.obj_watch import ObjWatch
66
from faststream.nats.schemas.pull_sub import PullSub
7+
from faststream.nats.schemas.schedule import Schedule
78

89
__all__ = (
910
"JStream",
1011
"KvWatch",
1112
"ObjWatch",
1213
"PubAck",
1314
"PullSub",
15+
"Schedule",
1416
"SubjectsCollection",
1517
)

faststream/nats/schemas/js_stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(
5656
republish: Optional["RePublish"] = None,
5757
allow_direct: bool | None = None,
5858
mirror_direct: bool | None = None,
59+
allow_msg_schedules: bool | None = None,
5960
declare: bool = True,
6061
) -> None:
6162
"""Initialized JSrream.
@@ -128,6 +129,8 @@ def __init__(
128129
Should direct requests be allowed. Note: you can get stale data.
129130
mirror_direct:
130131
Should direct mirror requests be allowed
132+
allow_msg_schedules:
133+
Should allow message schedules.
131134
declare:
132135
Whether to create stream automatically or just connect to it.
133136
"""
@@ -162,6 +165,7 @@ def __init__(
162165
republish=republish,
163166
allow_direct=allow_direct,
164167
mirror_direct=mirror_direct,
168+
allow_msg_schedules=allow_msg_schedules,
165169
subjects=[], # use subjects from builder in declaration
166170
)
167171

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
4+
5+
@dataclass
6+
class Schedule:
7+
"""A class to represent a message schedule."""
8+
9+
time: datetime
10+
target: str

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ confluent = [
7272
"confluent-kafka>=2.6,!=2.8.1,<3; python_version >= '3.13'",
7373
]
7474

75-
nats = ["nats-py>=2.7.0,<=3.0.0"]
75+
nats = ["nats-py>=2.12.0,<=3.0.0"]
7676

7777
redis = ["redis>=5.0.0,<7.0.0"]
7878

tests/brokers/nats/test_publish.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import asyncio
2+
from datetime import datetime, timedelta, timezone
23
from unittest.mock import MagicMock
4+
from uuid import uuid4
35

46
import pytest
57

68
from faststream import Context
7-
from faststream.nats import NatsResponse
9+
from faststream.nats import JStream, NatsBroker, NatsMessage, NatsResponse, Schedule
810
from tests.brokers.base.publish import BrokerPublishTestcase
911

1012
from .basic import NatsTestcaseConfig
@@ -75,3 +77,47 @@ async def handle():
7577
)
7678

7779
assert await response.decode() == "Hi!", response
80+
81+
82+
@pytest.mark.asyncio()
83+
@pytest.mark.nats()
84+
@pytest.mark.connected()
85+
async def test_publish_with_schedule(
86+
queue: str,
87+
mock: MagicMock,
88+
) -> None:
89+
event = asyncio.Event()
90+
91+
pub_broker = NatsBroker()
92+
await pub_broker.connect()
93+
94+
assert pub_broker._connection is not None
95+
await pub_broker._connection.jetstream().add_stream(
96+
name=queue,
97+
subjects=[f"{queue}.>"],
98+
)
99+
100+
schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=0.1)
101+
schedule_target = f"{queue}.{uuid4()}"
102+
103+
@pub_broker.subscriber(
104+
schedule_target, stream=JStream(queue, allow_msg_schedules=True)
105+
)
106+
async def handle(body: dict, msg: NatsMessage) -> None:
107+
mock(body)
108+
event.set()
109+
110+
await pub_broker.start()
111+
112+
await pub_broker.publish(
113+
{"type": "do_something"},
114+
f"{queue}.subject",
115+
schedule=Schedule(schedule_time, schedule_target),
116+
stream=queue,
117+
timeout=10,
118+
)
119+
120+
await asyncio.wait_for(event.wait(), timeout=5)
121+
122+
assert event.is_set()
123+
mock.assert_called_once_with({"type": "do_something"})

0 commit comments

Comments
 (0)