Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,18 @@ jobs:
- test-basic
- test-nats-smoke
runs-on: ubuntu-latest
services:
nats:
image: diementros/nats:js
ports:
- 4222:4222
steps:
- name: Start NATS with JetStream
run: |
docker run -d \
--name nats \
-p 4222:4222 \
nats:2.12.1 -js

# Wait for NATS to be ready
timeout 30 bash -c 'until docker logs nats 2>&1 | grep -q "Server is ready"; do sleep 0.5; done'
echo "NATS is ready!"
docker logs nats
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
persist-credentials: false
Expand Down
117 changes: 117 additions & 0 deletions docs/docs/en/nats/jetstream/scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Scheduling messages

JetStream supports scheduling messages to be delivered at a specific time in the future. This is useful for delayed task execution, reminder systems, or any scenario where you need to defer message processing.

## Enabling Message Scheduling

To use message scheduling, you need to create a JetStream with the `allow_msg_schedules=True` parameter:

```python
from faststream import FastStream
from faststream.nats import NatsBroker, JStream, NatsMessage, Schedule

broker = NatsBroker()

@broker.subscriber(
"test_stream.*",
stream=JStream("test_stream", allow_msg_schedules=True)
)
async def handle_scheduled_message(msg: NatsMessage) -> None:
# Process the scheduled message when it arrives
print(f"Received scheduled message: {msg}")
```

## Publishing Scheduled Messages

To schedule a message for future delivery, use the `Schedule` object when publishing:

```python
from datetime import UTC, datetime, timedelta
from uuid import uuid4

async def publish_scheduled_message() -> None:
# Connect to the broker
await broker.connect()

# Calculate the delivery time (e.g., 3 seconds from now)
current_time = datetime.now(tz=UTC)
schedule_time = current_time + timedelta(seconds=3)

# Define the target subject for the scheduled message
schedule_target = f"test_stream.{uuid4()}"

# Publish the message with a schedule
await broker.publish(
message={"type": "do_something"},
subject="test_stream.subject",
schedule=Schedule(schedule_time, schedule_target),
stream="test_stream",
timeout=10,
)
```

## Complete Example

Here's a full working example that demonstrates scheduled message publishing:

```python
from datetime import UTC, datetime, timedelta
from uuid import uuid4

from faststream import FastStream
from faststream.nats import JStream, NatsBroker, NatsMessage, Schedule

broker = NatsBroker()


@broker.subscriber(
"test_stream.*",
stream=JStream("test_stream", allow_msg_schedules=True)
)
async def handle_scheduled_message(msg: NatsMessage) -> None:
print(f"Message received at {datetime.now(tz=UTC)}")
print(msg)


async def on_startup() -> None:
current_time = datetime.now(tz=UTC)
schedule_time = current_time + timedelta(seconds=3)
await broker.connect()

schedule_target = f"test_stream.{uuid4()}"
await broker.publish(
message={"type": "do_something"},
subject="test_stream.subject",
schedule=Schedule(schedule_time, schedule_target),
stream="test_stream",
timeout=10,
)
print(f"Message scheduled for delivery at {schedule_time}")


app = FastStream(broker)
app.on_startup(on_startup)

if __name__ == "__main__":
import asyncio
asyncio.run(app.run())
```

## Key Points

- **Stream Configuration**: The JetStream must be created with `allow_msg_schedules=True` to enable scheduling
- **Schedule Object**: Takes two parameters:
- `schedule_time`: A `datetime` object (preferably with UTC timezone) indicating when the message should be delivered
- `schedule_target`: The subject where the scheduled message will be published, should be unique for every message.
- **Subject Pattern**: The subscriber should use a wildcard pattern (e.g., `"test_stream.*"`) to match the scheduled target subjects
- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues
3 changes: 2 additions & 1 deletion faststream/nats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .annotations import NatsMessage
from .broker import NatsBroker, NatsPublisher, NatsRoute, NatsRouter
from .response import NatsPublishCommand, NatsResponse
from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub
from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub, Schedule
from .testing import TestNatsBroker

except ImportError as e:
Expand Down Expand Up @@ -53,6 +53,7 @@
"RePublish",
"ReplayPolicy",
"RetentionPolicy",
"Schedule",
"StorageType",
"StreamConfig",
"StreamSource",
Expand Down
8 changes: 7 additions & 1 deletion faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
from faststream.nats.configs.broker import JsInitOptions
from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer
from faststream.nats.message import NatsMessage
from faststream.nats.schemas import PubAck
from faststream.nats.schemas import PubAck, Schedule
from faststream.security import BaseSecurity
from faststream.specification.schema.extra import Tag, TagDict

Expand Down Expand Up @@ -523,6 +523,7 @@ async def publish(
correlation_id: str | None = None,
stream: None = None,
timeout: float | None = None,
schedule: Optional["Schedule"] = None,
) -> None: ...

@overload
Expand All @@ -535,6 +536,7 @@ async def publish(
correlation_id: str | None = None,
stream: str | None = None,
timeout: float | None = None,
schedule: Optional["Schedule"] = None,
) -> "PubAck": ...

@override
Expand All @@ -547,6 +549,7 @@ async def publish(
correlation_id: str | None = None,
stream: str | None = None,
timeout: float | None = None,
schedule: Optional["Schedule"] = None,
) -> Optional["PubAck"]:
"""Publish message directly.

Expand Down Expand Up @@ -574,6 +577,8 @@ async def publish(
Can be omitted without any effect if you doesn't want PubAck frame.
timeout:
Timeout to send message to NATS.
schedule:
Schedule to publish message at a specific time.

Returns:
`None` if you publishes a regular message.
Expand All @@ -588,6 +593,7 @@ async def publish(
stream=stream,
timeout=timeout or 0.5,
_publish_type=PublishType.PUBLISH,
schedule=schedule,
)

result: PubAck | None
Expand Down
12 changes: 11 additions & 1 deletion faststream/nats/response.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Optional, Union

from typing_extensions import override

Expand All @@ -7,6 +7,7 @@

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage
from faststream.nats.schemas.schedule import Schedule


class NatsResponse(Response):
Expand All @@ -17,13 +18,15 @@ def __init__(
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
stream: str | None = None,
schedule: Optional["Schedule"] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)
self.stream = stream
self.schedule = schedule

@override
def as_publish_command(self) -> "NatsPublishCommand":
Expand All @@ -35,6 +38,7 @@ def as_publish_command(self) -> "NatsPublishCommand":
# Nats specific
subject="",
stream=self.stream,
schedule=self.schedule,
)


Expand All @@ -49,6 +53,7 @@ def __init__(
reply_to: str = "",
stream: str | None = None,
timeout: float = 0.5,
schedule: Optional["Schedule"] = None,
_publish_type: PublishType,
) -> None:
super().__init__(
Expand All @@ -62,6 +67,7 @@ def __init__(

self.stream = stream
self.timeout = timeout
self.schedule = schedule

def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
headers = {}
Expand All @@ -72,6 +78,10 @@ def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
if js and self.reply_to:
headers["reply_to"] = self.reply_to

if self.schedule:
headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}"
headers["Nats-Schedule-Target"] = self.schedule.target

return headers | self.headers

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions faststream/nats/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from faststream.nats.schemas.kv_watch import KvWatch
from faststream.nats.schemas.obj_watch import ObjWatch
from faststream.nats.schemas.pull_sub import PullSub
from faststream.nats.schemas.schedule import Schedule

__all__ = (
"JStream",
"KvWatch",
"ObjWatch",
"PubAck",
"PullSub",
"Schedule",
"SubjectsCollection",
)
4 changes: 4 additions & 0 deletions faststream/nats/schemas/js_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
republish: Optional["RePublish"] = None,
allow_direct: bool | None = None,
mirror_direct: bool | None = None,
allow_msg_schedules: bool | None = None,
declare: bool = True,
) -> None:
"""Initialized JSrream.
Expand Down Expand Up @@ -128,6 +129,8 @@ def __init__(
Should direct requests be allowed. Note: you can get stale data.
mirror_direct:
Should direct mirror requests be allowed
allow_msg_schedules:
Should allow message schedules.
declare:
Whether to create stream automatically or just connect to it.
"""
Expand Down Expand Up @@ -162,6 +165,7 @@ def __init__(
republish=republish,
allow_direct=allow_direct,
mirror_direct=mirror_direct,
allow_msg_schedules=allow_msg_schedules,
subjects=[], # use subjects from builder in declaration
)

Expand Down
10 changes: 10 additions & 0 deletions faststream/nats/schemas/schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass
from datetime import datetime


@dataclass
class Schedule:
"""A class to represent a message schedule."""

time: datetime
target: str
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ confluent = [
"confluent-kafka>=2.6,!=2.8.1,<3; python_version >= '3.13'",
]

nats = ["nats-py>=2.7.0,<=3.0.0"]
nats = ["nats-py>=2.12.0,<=3.0.0"]

redis = ["redis>=5.0.0,<7.0.0"]

Expand Down
48 changes: 47 additions & 1 deletion tests/brokers/nats/test_publish.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock
from uuid import uuid4

import pytest

from faststream import Context
from faststream.nats import NatsResponse
from faststream.nats import JStream, NatsBroker, NatsMessage, NatsResponse, Schedule
from tests.brokers.base.publish import BrokerPublishTestcase

from .basic import NatsTestcaseConfig
Expand Down Expand Up @@ -75,3 +77,47 @@ async def handle():
)

assert await response.decode() == "Hi!", response


@pytest.mark.asyncio()
@pytest.mark.nats()
@pytest.mark.connected()
async def test_publish_with_schedule(
queue: str,
mock: MagicMock,
) -> None:
event = asyncio.Event()

pub_broker = NatsBroker()
await pub_broker.connect()

assert pub_broker._connection is not None
await pub_broker._connection.jetstream().add_stream(
name=queue,
subjects=[f"{queue}.>"],
)

schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=0.1)
schedule_target = f"{queue}.{uuid4()}"

@pub_broker.subscriber(
schedule_target, stream=JStream(queue, allow_msg_schedules=True)
)
async def handle(body: dict, msg: NatsMessage) -> None:
mock(body)
event.set()

await pub_broker.start()

await pub_broker.publish(
{"type": "do_something"},
f"{queue}.subject",
schedule=Schedule(schedule_time, schedule_target),
stream=queue,
timeout=10,
)

await asyncio.wait_for(event.wait(), timeout=5)

assert event.is_set()
mock.assert_called_once_with({"type": "do_something"})
Loading
Loading