diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index 3109848083..df043bd671 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -363,12 +363,18 @@ jobs: - test-basic - test-nats-smoke runs-on: ubuntu-latest - services: - nats: - image: diementros/nats:js - ports: - - 4222:4222 steps: + - name: Start NATS with JetStream + run: | + docker run -d \ + --name nats \ + -p 4222:4222 \ + nats:2.12.1 -js + + # Wait for NATS to be ready + timeout 30 bash -c 'until docker logs nats 2>&1 | grep -q "Server is ready"; do sleep 0.5; done' + echo "NATS is ready!" + docker logs nats - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: persist-credentials: false diff --git a/docs/docs/en/nats/jetstream/scheduling.md b/docs/docs/en/nats/jetstream/scheduling.md new file mode 100644 index 0000000000..973382f4b9 --- /dev/null +++ b/docs/docs/en/nats/jetstream/scheduling.md @@ -0,0 +1,117 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Scheduling messages + +JetStream supports scheduling messages to be delivered at a specific time in the future. This is useful for delayed task execution, reminder systems, or any scenario where you need to defer message processing. + +## Enabling Message Scheduling + +To use message scheduling, you need to create a JetStream with the `allow_msg_schedules=True` parameter: + +```python +from faststream import FastStream +from faststream.nats import NatsBroker, JStream, NatsMessage, Schedule + +broker = NatsBroker() + +@broker.subscriber( + "test_stream.*", + stream=JStream("test_stream", allow_msg_schedules=True) +) +async def handle_scheduled_message(msg: NatsMessage) -> None: + # Process the scheduled message when it arrives + print(f"Received scheduled message: {msg}") +``` + +## Publishing Scheduled Messages + +To schedule a message for future delivery, use the `Schedule` object when publishing: + +```python +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +async def publish_scheduled_message() -> None: + # Connect to the broker + await broker.connect() + + # Calculate the delivery time (e.g., 3 seconds from now) + current_time = datetime.now(tz=UTC) + schedule_time = current_time + timedelta(seconds=3) + + # Define the target subject for the scheduled message + schedule_target = f"test_stream.{uuid4()}" + + # Publish the message with a schedule + await broker.publish( + message={"type": "do_something"}, + subject="test_stream.subject", + schedule=Schedule(schedule_time, schedule_target), + stream="test_stream", + timeout=10, + ) +``` + +## Complete Example + +Here's a full working example that demonstrates scheduled message publishing: + +```python +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +from faststream import FastStream +from faststream.nats import JStream, NatsBroker, NatsMessage, Schedule + +broker = NatsBroker() + + +@broker.subscriber( + "test_stream.*", + stream=JStream("test_stream", allow_msg_schedules=True) +) +async def handle_scheduled_message(msg: NatsMessage) -> None: + print(f"Message received at {datetime.now(tz=UTC)}") + print(msg) + + +async def on_startup() -> None: + current_time = datetime.now(tz=UTC) + schedule_time = current_time + timedelta(seconds=3) + await broker.connect() + + schedule_target = f"test_stream.{uuid4()}" + await broker.publish( + message={"type": "do_something"}, + subject="test_stream.subject", + schedule=Schedule(schedule_time, schedule_target), + stream="test_stream", + timeout=10, + ) + print(f"Message scheduled for delivery at {schedule_time}") + + +app = FastStream(broker) +app.on_startup(on_startup) + +if __name__ == "__main__": + import asyncio + asyncio.run(app.run()) +``` + +## Key Points + +- **Stream Configuration**: The JetStream must be created with `allow_msg_schedules=True` to enable scheduling +- **Schedule Object**: Takes two parameters: + - `schedule_time`: A `datetime` object (preferably with UTC timezone) indicating when the message should be delivered + - `schedule_target`: The subject where the scheduled message will be published, should be unique for every message. +- **Subject Pattern**: The subscriber should use a wildcard pattern (e.g., `"test_stream.*"`) to match the scheduled target subjects +- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues diff --git a/faststream/nats/__init__.py b/faststream/nats/__init__.py index 5df7e7be88..6bd5e775ac 100644 --- a/faststream/nats/__init__.py +++ b/faststream/nats/__init__.py @@ -19,7 +19,7 @@ from .annotations import NatsMessage from .broker import NatsBroker, NatsPublisher, NatsRoute, NatsRouter from .response import NatsPublishCommand, NatsResponse - from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub + from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub, Schedule from .testing import TestNatsBroker except ImportError as e: @@ -53,6 +53,7 @@ "RePublish", "ReplayPolicy", "RetentionPolicy", + "Schedule", "StorageType", "StreamConfig", "StreamSource", diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index cc962e4053..64055b1143 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -70,7 +70,7 @@ from faststream.nats.configs.broker import JsInitOptions from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer from faststream.nats.message import NatsMessage - from faststream.nats.schemas import PubAck + from faststream.nats.schemas import PubAck, Schedule from faststream.security import BaseSecurity from faststream.specification.schema.extra import Tag, TagDict @@ -523,6 +523,7 @@ async def publish( correlation_id: str | None = None, stream: None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> None: ... @overload @@ -535,6 +536,7 @@ async def publish( correlation_id: str | None = None, stream: str | None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> "PubAck": ... @override @@ -547,6 +549,7 @@ async def publish( correlation_id: str | None = None, stream: str | None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> Optional["PubAck"]: """Publish message directly. @@ -574,6 +577,8 @@ async def publish( Can be omitted without any effect if you doesn't want PubAck frame. timeout: Timeout to send message to NATS. + schedule: + Schedule to publish message at a specific time. Returns: `None` if you publishes a regular message. @@ -588,6 +593,7 @@ async def publish( stream=stream, timeout=timeout or 0.5, _publish_type=PublishType.PUBLISH, + schedule=schedule, ) result: PubAck | None diff --git a/faststream/nats/response.py b/faststream/nats/response.py index 86e58599f2..676003f974 100644 --- a/faststream/nats/response.py +++ b/faststream/nats/response.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Optional, Union from typing_extensions import override @@ -7,6 +7,7 @@ if TYPE_CHECKING: from faststream._internal.basic_types import SendableMessage + from faststream.nats.schemas.schedule import Schedule class NatsResponse(Response): @@ -17,6 +18,7 @@ def __init__( headers: dict[str, str] | None = None, correlation_id: str | None = None, stream: str | None = None, + schedule: Optional["Schedule"] = None, ) -> None: super().__init__( body=body, @@ -24,6 +26,7 @@ def __init__( correlation_id=correlation_id, ) self.stream = stream + self.schedule = schedule @override def as_publish_command(self) -> "NatsPublishCommand": @@ -35,6 +38,7 @@ def as_publish_command(self) -> "NatsPublishCommand": # Nats specific subject="", stream=self.stream, + schedule=self.schedule, ) @@ -49,6 +53,7 @@ def __init__( reply_to: str = "", stream: str | None = None, timeout: float = 0.5, + schedule: Optional["Schedule"] = None, _publish_type: PublishType, ) -> None: super().__init__( @@ -62,6 +67,7 @@ def __init__( self.stream = stream self.timeout = timeout + self.schedule = schedule def headers_to_publish(self, *, js: bool = False) -> dict[str, str]: headers = {} @@ -72,6 +78,10 @@ def headers_to_publish(self, *, js: bool = False) -> dict[str, str]: if js and self.reply_to: headers["reply_to"] = self.reply_to + if self.schedule: + headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}" + headers["Nats-Schedule-Target"] = self.schedule.target + return headers | self.headers @classmethod diff --git a/faststream/nats/schemas/__init__.py b/faststream/nats/schemas/__init__.py index 583fc644e1..155e6bc253 100644 --- a/faststream/nats/schemas/__init__.py +++ b/faststream/nats/schemas/__init__.py @@ -4,6 +4,7 @@ from faststream.nats.schemas.kv_watch import KvWatch from faststream.nats.schemas.obj_watch import ObjWatch from faststream.nats.schemas.pull_sub import PullSub +from faststream.nats.schemas.schedule import Schedule __all__ = ( "JStream", @@ -11,5 +12,6 @@ "ObjWatch", "PubAck", "PullSub", + "Schedule", "SubjectsCollection", ) diff --git a/faststream/nats/schemas/js_stream.py b/faststream/nats/schemas/js_stream.py index 0dd83f86b9..dcd6e86495 100644 --- a/faststream/nats/schemas/js_stream.py +++ b/faststream/nats/schemas/js_stream.py @@ -56,6 +56,7 @@ def __init__( republish: Optional["RePublish"] = None, allow_direct: bool | None = None, mirror_direct: bool | None = None, + allow_msg_schedules: bool | None = None, declare: bool = True, ) -> None: """Initialized JSrream. @@ -128,6 +129,8 @@ def __init__( Should direct requests be allowed. Note: you can get stale data. mirror_direct: Should direct mirror requests be allowed + allow_msg_schedules: + Should allow message schedules. declare: Whether to create stream automatically or just connect to it. """ @@ -162,6 +165,7 @@ def __init__( republish=republish, allow_direct=allow_direct, mirror_direct=mirror_direct, + allow_msg_schedules=allow_msg_schedules, subjects=[], # use subjects from builder in declaration ) diff --git a/faststream/nats/schemas/schedule.py b/faststream/nats/schemas/schedule.py new file mode 100644 index 0000000000..5f6fc02029 --- /dev/null +++ b/faststream/nats/schemas/schedule.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class Schedule: + """A class to represent a message schedule.""" + + time: datetime + target: str diff --git a/pyproject.toml b/pyproject.toml index e4a0ee6a90..2f9698d1c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,7 +72,7 @@ confluent = [ "confluent-kafka>=2.6,!=2.8.1,<3; python_version >= '3.13'", ] -nats = ["nats-py>=2.7.0,<=3.0.0"] +nats = ["nats-py>=2.12.0,<=3.0.0"] redis = ["redis>=5.0.0,<7.0.0"] diff --git a/tests/brokers/nats/test_publish.py b/tests/brokers/nats/test_publish.py index 1285659fbb..471886a504 100644 --- a/tests/brokers/nats/test_publish.py +++ b/tests/brokers/nats/test_publish.py @@ -1,10 +1,12 @@ import asyncio +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock +from uuid import uuid4 import pytest from faststream import Context -from faststream.nats import NatsResponse +from faststream.nats import JStream, NatsBroker, NatsMessage, NatsResponse, Schedule from tests.brokers.base.publish import BrokerPublishTestcase from .basic import NatsTestcaseConfig @@ -75,3 +77,47 @@ async def handle(): ) assert await response.decode() == "Hi!", response + + +@pytest.mark.asyncio() +@pytest.mark.nats() +@pytest.mark.connected() +async def test_publish_with_schedule( + queue: str, + mock: MagicMock, +) -> None: + event = asyncio.Event() + + pub_broker = NatsBroker() + await pub_broker.connect() + + assert pub_broker._connection is not None + await pub_broker._connection.jetstream().add_stream( + name=queue, + subjects=[f"{queue}.>"], + ) + + schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=0.1) + schedule_target = f"{queue}.{uuid4()}" + + @pub_broker.subscriber( + schedule_target, stream=JStream(queue, allow_msg_schedules=True) + ) + async def handle(body: dict, msg: NatsMessage) -> None: + mock(body) + event.set() + + await pub_broker.start() + + await pub_broker.publish( + {"type": "do_something"}, + f"{queue}.subject", + schedule=Schedule(schedule_time, schedule_target), + stream=queue, + timeout=10, + ) + + await asyncio.wait_for(event.wait(), timeout=5) + + assert event.is_set() + mock.assert_called_once_with({"type": "do_something"}) diff --git a/uv.lock b/uv.lock index 1f228521da..2636f022d6 100644 --- a/uv.lock +++ b/uv.lock @@ -907,7 +907,7 @@ requires-dist = [ { name = "confluent-kafka", marker = "python_full_version >= '3.13' and extra == 'confluent'", specifier = ">=2.6,!=2.8.1,<3" }, { name = "confluent-kafka", marker = "python_full_version < '3.13' and extra == 'confluent'", specifier = ">=2,!=2.8.1,<3" }, { name = "fast-depends", extras = ["pydantic"], specifier = ">=3.0.0" }, - { name = "nats-py", marker = "extra == 'nats'", specifier = ">=2.7.0,<=3.0.0" }, + { name = "nats-py", marker = "extra == 'nats'", specifier = ">=2.12.0,<=3.0.0" }, { name = "opentelemetry-sdk", marker = "extra == 'otel'", specifier = ">=1.24.0,<2.0.0" }, { name = "prometheus-client", marker = "extra == 'prometheus'", specifier = ">=0.20.0,<0.30.0" }, { name = "redis", marker = "extra == 'redis'", specifier = ">=5.0.0,<7.0.0" },