From 92c4dae70af5f278971c565f93bf33851ab9b213 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 2 Nov 2025 20:34:25 +0100 Subject: [PATCH 01/12] Add api for nats scheduled messages --- faststream/nats/__init__.py | 3 ++- faststream/nats/broker/broker.py | 8 +++++++- faststream/nats/publisher/config.py | 3 ++- faststream/nats/response.py | 12 +++++++++++- faststream/nats/schemas/__init__.py | 2 ++ faststream/nats/schemas/js_stream.py | 4 ++++ faststream/nats/schemas/schedule.py | 14 ++++++++++++++ pyproject.toml | 2 +- uv.lock | 6 +++--- 9 files changed, 46 insertions(+), 8 deletions(-) create mode 100644 faststream/nats/schemas/schedule.py diff --git a/faststream/nats/__init__.py b/faststream/nats/__init__.py index 5df7e7be88..6bd5e775ac 100644 --- a/faststream/nats/__init__.py +++ b/faststream/nats/__init__.py @@ -19,7 +19,7 @@ from .annotations import NatsMessage from .broker import NatsBroker, NatsPublisher, NatsRoute, NatsRouter from .response import NatsPublishCommand, NatsResponse - from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub + from .schemas import JStream, KvWatch, ObjWatch, PubAck, PullSub, Schedule from .testing import TestNatsBroker except ImportError as e: @@ -53,6 +53,7 @@ "RePublish", "ReplayPolicy", "RetentionPolicy", + "Schedule", "StorageType", "StreamConfig", "StreamSource", diff --git a/faststream/nats/broker/broker.py b/faststream/nats/broker/broker.py index cc962e4053..64055b1143 100644 --- a/faststream/nats/broker/broker.py +++ b/faststream/nats/broker/broker.py @@ -70,7 +70,7 @@ from faststream.nats.configs.broker import JsInitOptions from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer from faststream.nats.message import NatsMessage - from faststream.nats.schemas import PubAck + from faststream.nats.schemas import PubAck, Schedule from faststream.security import BaseSecurity from faststream.specification.schema.extra import Tag, TagDict @@ -523,6 +523,7 @@ async def publish( correlation_id: str | None = None, stream: None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> None: ... @overload @@ -535,6 +536,7 @@ async def publish( correlation_id: str | None = None, stream: str | None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> "PubAck": ... @override @@ -547,6 +549,7 @@ async def publish( correlation_id: str | None = None, stream: str | None = None, timeout: float | None = None, + schedule: Optional["Schedule"] = None, ) -> Optional["PubAck"]: """Publish message directly. @@ -574,6 +577,8 @@ async def publish( Can be omitted without any effect if you doesn't want PubAck frame. timeout: Timeout to send message to NATS. + schedule: + Schedule to publish message at a specific time. Returns: `None` if you publishes a regular message. @@ -588,6 +593,7 @@ async def publish( stream=stream, timeout=timeout or 0.5, _publish_type=PublishType.PUBLISH, + schedule=schedule, ) result: PubAck | None diff --git a/faststream/nats/publisher/config.py b/faststream/nats/publisher/config.py index 6fdc4f8468..0d32f84e51 100644 --- a/faststream/nats/publisher/config.py +++ b/faststream/nats/publisher/config.py @@ -8,7 +8,7 @@ from faststream.nats.configs import NatsBrokerConfig if TYPE_CHECKING: - from faststream.nats.schemas import JStream + from faststream.nats.schemas import JStream, Schedule @dataclass(kw_only=True) @@ -25,3 +25,4 @@ class NatsPublisherConfig(PublisherUsecaseConfig): headers: dict[str, str] | None stream: Optional["JStream"] timeout: float | None + schedule: Optional["Schedule"] diff --git a/faststream/nats/response.py b/faststream/nats/response.py index 86e58599f2..676003f974 100644 --- a/faststream/nats/response.py +++ b/faststream/nats/response.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Optional, Union from typing_extensions import override @@ -7,6 +7,7 @@ if TYPE_CHECKING: from faststream._internal.basic_types import SendableMessage + from faststream.nats.schemas.schedule import Schedule class NatsResponse(Response): @@ -17,6 +18,7 @@ def __init__( headers: dict[str, str] | None = None, correlation_id: str | None = None, stream: str | None = None, + schedule: Optional["Schedule"] = None, ) -> None: super().__init__( body=body, @@ -24,6 +26,7 @@ def __init__( correlation_id=correlation_id, ) self.stream = stream + self.schedule = schedule @override def as_publish_command(self) -> "NatsPublishCommand": @@ -35,6 +38,7 @@ def as_publish_command(self) -> "NatsPublishCommand": # Nats specific subject="", stream=self.stream, + schedule=self.schedule, ) @@ -49,6 +53,7 @@ def __init__( reply_to: str = "", stream: str | None = None, timeout: float = 0.5, + schedule: Optional["Schedule"] = None, _publish_type: PublishType, ) -> None: super().__init__( @@ -62,6 +67,7 @@ def __init__( self.stream = stream self.timeout = timeout + self.schedule = schedule def headers_to_publish(self, *, js: bool = False) -> dict[str, str]: headers = {} @@ -72,6 +78,10 @@ def headers_to_publish(self, *, js: bool = False) -> dict[str, str]: if js and self.reply_to: headers["reply_to"] = self.reply_to + if self.schedule: + headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}" + headers["Nats-Schedule-Target"] = self.schedule.target + return headers | self.headers @classmethod diff --git a/faststream/nats/schemas/__init__.py b/faststream/nats/schemas/__init__.py index 583fc644e1..155e6bc253 100644 --- a/faststream/nats/schemas/__init__.py +++ b/faststream/nats/schemas/__init__.py @@ -4,6 +4,7 @@ from faststream.nats.schemas.kv_watch import KvWatch from faststream.nats.schemas.obj_watch import ObjWatch from faststream.nats.schemas.pull_sub import PullSub +from faststream.nats.schemas.schedule import Schedule __all__ = ( "JStream", @@ -11,5 +12,6 @@ "ObjWatch", "PubAck", "PullSub", + "Schedule", "SubjectsCollection", ) diff --git a/faststream/nats/schemas/js_stream.py b/faststream/nats/schemas/js_stream.py index 0dd83f86b9..dcd6e86495 100644 --- a/faststream/nats/schemas/js_stream.py +++ b/faststream/nats/schemas/js_stream.py @@ -56,6 +56,7 @@ def __init__( republish: Optional["RePublish"] = None, allow_direct: bool | None = None, mirror_direct: bool | None = None, + allow_msg_schedules: bool | None = None, declare: bool = True, ) -> None: """Initialized JSrream. @@ -128,6 +129,8 @@ def __init__( Should direct requests be allowed. Note: you can get stale data. mirror_direct: Should direct mirror requests be allowed + allow_msg_schedules: + Should allow message schedules. declare: Whether to create stream automatically or just connect to it. """ @@ -162,6 +165,7 @@ def __init__( republish=republish, allow_direct=allow_direct, mirror_direct=mirror_direct, + allow_msg_schedules=allow_msg_schedules, subjects=[], # use subjects from builder in declaration ) diff --git a/faststream/nats/schemas/schedule.py b/faststream/nats/schemas/schedule.py new file mode 100644 index 0000000000..79ebdca962 --- /dev/null +++ b/faststream/nats/schemas/schedule.py @@ -0,0 +1,14 @@ +from datetime import datetime + +from faststream._internal.proto import NameRequired + + +class Schedule(NameRequired): + """A class to represent a message schedule.""" + + __slots__ = ("target", "time") + + def __init__(self, time: datetime | str, target: str, /) -> None: + super().__init__(target) + self.time = datetime.fromisoformat(time) if isinstance(time, str) else time + self.target = target diff --git a/pyproject.toml b/pyproject.toml index 9161b8cb8b..8ec9134ff2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,7 +72,7 @@ confluent = [ "confluent-kafka>=2.6,!=2.8.1,<3; python_version >= '3.13'", ] -nats = ["nats-py>=2.7.0,<=3.0.0"] +nats = ["nats-py>=2.12.0,<=3.0.0"] redis = ["redis>=5.0.0,<7.0.0"] diff --git a/uv.lock b/uv.lock index 02a49e79ee..70ffd29973 100644 --- a/uv.lock +++ b/uv.lock @@ -891,7 +891,7 @@ requires-dist = [ { name = "confluent-kafka", marker = "python_full_version >= '3.13' and extra == 'confluent'", specifier = ">=2.6,!=2.8.1,<3" }, { name = "confluent-kafka", marker = "python_full_version < '3.13' and extra == 'confluent'", specifier = ">=2,!=2.8.1,<3" }, { name = "fast-depends", extras = ["pydantic"], specifier = ">=3.0.0" }, - { name = "nats-py", marker = "extra == 'nats'", specifier = ">=2.7.0,<=3.0.0" }, + { name = "nats-py", marker = "extra == 'nats'", specifier = ">=2.12.0,<=3.0.0" }, { name = "opentelemetry-sdk", marker = "extra == 'otel'", specifier = ">=1.24.0,<2.0.0" }, { name = "prometheus-client", marker = "extra == 'prometheus'", specifier = ">=0.20.0,<0.30.0" }, { name = "redis", marker = "extra == 'redis'", specifier = ">=5.0.0,<7.0.0" }, @@ -1808,9 +1808,9 @@ wheels = [ [[package]] name = "nats-py" -version = "2.11.0" +version = "2.12.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/65/be/757c8af63596453daaa42cc21be51aa42fc6b23cc9d4347784f99c8357b5/nats_py-2.11.0.tar.gz", hash = "sha256:fb1097db8b520bb4c8f5ad51340ca54d9fa54dbfc4ecc81c3625ef80994b6100", size = 114186, upload-time = "2025-07-22T08:41:08.589Z" } +sdist = { url = "https://files.pythonhosted.org/packages/71/c5/2564d917503fe8d68fe630c74bf6b678fbc15c01b58f2565894761010f57/nats_py-2.12.0.tar.gz", hash = "sha256:2981ca4b63b8266c855573fa7871b1be741f1889fd429ee657e5ffc0971a38a1", size = 119821, upload-time = "2025-10-31T05:27:31.247Z" } [[package]] name = "nodeenv" From 33a363455fcf18b9559b4cea48a49c62d8f76805 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 2 Nov 2025 21:58:23 +0100 Subject: [PATCH 02/12] fix NatsPublisherConfig --- faststream/nats/publisher/config.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/faststream/nats/publisher/config.py b/faststream/nats/publisher/config.py index 0d32f84e51..6fdc4f8468 100644 --- a/faststream/nats/publisher/config.py +++ b/faststream/nats/publisher/config.py @@ -8,7 +8,7 @@ from faststream.nats.configs import NatsBrokerConfig if TYPE_CHECKING: - from faststream.nats.schemas import JStream, Schedule + from faststream.nats.schemas import JStream @dataclass(kw_only=True) @@ -25,4 +25,3 @@ class NatsPublisherConfig(PublisherUsecaseConfig): headers: dict[str, str] | None stream: Optional["JStream"] timeout: float | None - schedule: Optional["Schedule"] From ee79fbd2c06884ba821435b351a4acff361a589c Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 2 Nov 2025 21:58:32 +0100 Subject: [PATCH 03/12] add test --- tests/brokers/nats/test_publish.py | 46 +++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/tests/brokers/nats/test_publish.py b/tests/brokers/nats/test_publish.py index 1285659fbb..f9d55d85e7 100644 --- a/tests/brokers/nats/test_publish.py +++ b/tests/brokers/nats/test_publish.py @@ -1,10 +1,12 @@ import asyncio +from datetime import UTC, datetime, timedelta from unittest.mock import MagicMock +from uuid import uuid4 import pytest from faststream import Context -from faststream.nats import NatsResponse +from faststream.nats import JStream, NatsMessage, NatsResponse, Schedule from tests.brokers.base.publish import BrokerPublishTestcase from .basic import NatsTestcaseConfig @@ -75,3 +77,45 @@ async def handle(): ) assert await response.decode() == "Hi!", response + + @pytest.mark.asyncio() + async def test_publish_with_schedule( + self, + queue: str, + mock: MagicMock, + ) -> None: + event = asyncio.Event() + + pub_broker = self.get_broker(apply_types=True) + await pub_broker.connect() + + assert pub_broker._connection is not None + await pub_broker._connection.jetstream().add_stream( + name=queue, + subjects=[f"{queue}.>"], + ) + + schedule_time = datetime.now(tz=UTC) + timedelta(seconds=2) + schedule_target = f"{queue}.{uuid4()}" + + @pub_broker.subscriber( + schedule_target, stream=JStream(queue, allow_msg_schedules=True) + ) + async def handle(body: dict, msg: NatsMessage) -> None: + mock(body) + event.set() + + await pub_broker.start() + + await pub_broker.publish( + {"type": "do_something"}, + f"{queue}.subject", + schedule=Schedule(schedule_time, schedule_target), + stream=queue, + timeout=10, + ) + + await asyncio.wait_for(event.wait(), timeout=5) + + assert event.is_set() + mock.assert_called_once_with({"type": "do_something"}) From 0be8202d12c32fdc69f9053106d42fb2307cb0b9 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sun, 2 Nov 2025 22:04:11 +0100 Subject: [PATCH 04/12] fix tests for 3.10 --- tests/brokers/nats/test_publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/brokers/nats/test_publish.py b/tests/brokers/nats/test_publish.py index f9d55d85e7..988717752f 100644 --- a/tests/brokers/nats/test_publish.py +++ b/tests/brokers/nats/test_publish.py @@ -1,5 +1,5 @@ import asyncio -from datetime import UTC, datetime, timedelta +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock from uuid import uuid4 @@ -95,7 +95,7 @@ async def test_publish_with_schedule( subjects=[f"{queue}.>"], ) - schedule_time = datetime.now(tz=UTC) + timedelta(seconds=2) + schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=2) schedule_target = f"{queue}.{uuid4()}" @pub_broker.subscriber( From 86e6a3320d901ba542f6b04e2a2d867943179bcc Mon Sep 17 00:00:00 2001 From: sheldy Date: Fri, 7 Nov 2025 01:55:05 +0100 Subject: [PATCH 05/12] move Schedule to dataclass --- faststream/nats/schemas/schedule.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/faststream/nats/schemas/schedule.py b/faststream/nats/schemas/schedule.py index 79ebdca962..5f6fc02029 100644 --- a/faststream/nats/schemas/schedule.py +++ b/faststream/nats/schemas/schedule.py @@ -1,14 +1,10 @@ +from dataclasses import dataclass from datetime import datetime -from faststream._internal.proto import NameRequired - -class Schedule(NameRequired): +@dataclass +class Schedule: """A class to represent a message schedule.""" - __slots__ = ("target", "time") - - def __init__(self, time: datetime | str, target: str, /) -> None: - super().__init__(target) - self.time = datetime.fromisoformat(time) if isinstance(time, str) else time - self.target = target + time: datetime + target: str From 68edb7c9cd2307df5118f9a4b7b2e6618e7e4ac8 Mon Sep 17 00:00:00 2001 From: sheldy Date: Fri, 7 Nov 2025 01:55:20 +0100 Subject: [PATCH 06/12] move test to func and add marks --- tests/brokers/nats/test_publish.py | 84 +++++++++++++++--------------- 1 file changed, 43 insertions(+), 41 deletions(-) diff --git a/tests/brokers/nats/test_publish.py b/tests/brokers/nats/test_publish.py index 988717752f..471886a504 100644 --- a/tests/brokers/nats/test_publish.py +++ b/tests/brokers/nats/test_publish.py @@ -6,7 +6,7 @@ import pytest from faststream import Context -from faststream.nats import JStream, NatsMessage, NatsResponse, Schedule +from faststream.nats import JStream, NatsBroker, NatsMessage, NatsResponse, Schedule from tests.brokers.base.publish import BrokerPublishTestcase from .basic import NatsTestcaseConfig @@ -78,44 +78,46 @@ async def handle(): assert await response.decode() == "Hi!", response - @pytest.mark.asyncio() - async def test_publish_with_schedule( - self, - queue: str, - mock: MagicMock, - ) -> None: - event = asyncio.Event() - - pub_broker = self.get_broker(apply_types=True) - await pub_broker.connect() - - assert pub_broker._connection is not None - await pub_broker._connection.jetstream().add_stream( - name=queue, - subjects=[f"{queue}.>"], - ) - - schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=2) - schedule_target = f"{queue}.{uuid4()}" - @pub_broker.subscriber( - schedule_target, stream=JStream(queue, allow_msg_schedules=True) - ) - async def handle(body: dict, msg: NatsMessage) -> None: - mock(body) - event.set() - - await pub_broker.start() - - await pub_broker.publish( - {"type": "do_something"}, - f"{queue}.subject", - schedule=Schedule(schedule_time, schedule_target), - stream=queue, - timeout=10, - ) - - await asyncio.wait_for(event.wait(), timeout=5) - - assert event.is_set() - mock.assert_called_once_with({"type": "do_something"}) +@pytest.mark.asyncio() +@pytest.mark.nats() +@pytest.mark.connected() +async def test_publish_with_schedule( + queue: str, + mock: MagicMock, +) -> None: + event = asyncio.Event() + + pub_broker = NatsBroker() + await pub_broker.connect() + + assert pub_broker._connection is not None + await pub_broker._connection.jetstream().add_stream( + name=queue, + subjects=[f"{queue}.>"], + ) + + schedule_time = datetime.now(tz=timezone.utc) + timedelta(seconds=0.1) + schedule_target = f"{queue}.{uuid4()}" + + @pub_broker.subscriber( + schedule_target, stream=JStream(queue, allow_msg_schedules=True) + ) + async def handle(body: dict, msg: NatsMessage) -> None: + mock(body) + event.set() + + await pub_broker.start() + + await pub_broker.publish( + {"type": "do_something"}, + f"{queue}.subject", + schedule=Schedule(schedule_time, schedule_target), + stream=queue, + timeout=10, + ) + + await asyncio.wait_for(event.wait(), timeout=5) + + assert event.is_set() + mock.assert_called_once_with({"type": "do_something"}) From 1296bebe69ac79c026915f176ce4ce14835fef64 Mon Sep 17 00:00:00 2001 From: sheldy Date: Fri, 7 Nov 2025 02:07:59 +0100 Subject: [PATCH 07/12] add docs --- docs/docs/en/nats/jetstream/scheduling.md | 117 ++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 docs/docs/en/nats/jetstream/scheduling.md diff --git a/docs/docs/en/nats/jetstream/scheduling.md b/docs/docs/en/nats/jetstream/scheduling.md new file mode 100644 index 0000000000..1d36063116 --- /dev/null +++ b/docs/docs/en/nats/jetstream/scheduling.md @@ -0,0 +1,117 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Scheduling messages + +JetStream supports scheduling messages to be delivered at a specific time in the future. This is useful for delayed task execution, reminder systems, or any scenario where you need to defer message processing. + +## Enabling Message Scheduling + +To use message scheduling, you need to create a JetStream with the `allow_msg_schedules=True` parameter: + +```python +from faststream import FastStream +from faststream.nats import NatsBroker, JStream, NatsMessage, Schedule + +broker = NatsBroker() + +@broker.subscriber( + "test_stream.*", + stream=JStream("test_stream", allow_msg_schedules=True) +) +async def handle_scheduled_message(msg: NatsMessage) -> None: + # Process the scheduled message when it arrives + print(f"Received scheduled message: {msg}") +``` + +## Publishing Scheduled Messages + +To schedule a message for future delivery, use the `Schedule` object when publishing: + +```python +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +async def publish_scheduled_message() -> None: + # Connect to the broker + await broker.connect() + + # Calculate the delivery time (e.g., 3 seconds from now) + current_time = datetime.now(tz=UTC) + schedule_time = current_time + timedelta(seconds=3) + + # Define the target subject for the scheduled message + schedule_target = f"test_stream.{uuid4()}" + + # Publish the message with a schedule + await broker.publish( + message={"type": "do_something"}, + subject="test_stream.subject", + schedule=Schedule(schedule_time, schedule_target), + stream="test_stream", + timeout=10, + ) +``` + +## Complete Example + +Here's a full working example that demonstrates scheduled message publishing: + +```python +from datetime import UTC, datetime, timedelta +from uuid import uuid4 + +from faststream import FastStream +from faststream.nats import JStream, NatsBroker, NatsMessage, Schedule + +broker = NatsBroker() + + +@broker.subscriber( + "test_stream.*", + stream=JStream("test_stream", allow_msg_schedules=True) +) +async def handle_scheduled_message(msg: NatsMessage) -> None: + print(f"Message received at {datetime.now(tz=UTC)}") + print(msg) + + +async def on_startup() -> None: + current_time = datetime.now(tz=UTC) + schedule_time = current_time + timedelta(seconds=3) + await broker.connect() + + schedule_target = f"test_stream.{uuid4()}" + await broker.publish( + message={"type": "do_something"}, + subject="test_stream.subject", + schedule=Schedule(schedule_time, schedule_target), + stream="test_stream", + timeout=10, + ) + print(f"Message scheduled for delivery at {schedule_time}") + + +app = FastStream(broker) +app.on_startup(on_startup) + +if __name__ == "__main__": + import asyncio + asyncio.run(app.run()) +``` + +## Key Points + +- **Stream Configuration**: The JetStream must be created with `allow_msg_schedules=True` to enable scheduling +- **Schedule Object**: Takes two parameters: + - `schedule_time`: A `datetime` object (preferably with UTC timezone) indicating when the message should be delivered + - `schedule_target`: The subject where the scheduled message will be published, should be unique for every message. +- **Subject Pattern**: The subscriber should use a wildcard pattern (e.g., `"test_stream.*"`) to match the scheduled target subjects +- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues \ No newline at end of file From d66a63993722bf9c403c8a75e1631fc7113c3e8e Mon Sep 17 00:00:00 2001 From: sheldy Date: Fri, 7 Nov 2025 02:12:26 +0100 Subject: [PATCH 08/12] fix docs --- docs/docs/en/nats/jetstream/scheduling.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/docs/en/nats/jetstream/scheduling.md b/docs/docs/en/nats/jetstream/scheduling.md index 1d36063116..973382f4b9 100644 --- a/docs/docs/en/nats/jetstream/scheduling.md +++ b/docs/docs/en/nats/jetstream/scheduling.md @@ -23,7 +23,7 @@ from faststream.nats import NatsBroker, JStream, NatsMessage, Schedule broker = NatsBroker() @broker.subscriber( - "test_stream.*", + "test_stream.*", stream=JStream("test_stream", allow_msg_schedules=True) ) async def handle_scheduled_message(msg: NatsMessage) -> None: @@ -42,14 +42,14 @@ from uuid import uuid4 async def publish_scheduled_message() -> None: # Connect to the broker await broker.connect() - + # Calculate the delivery time (e.g., 3 seconds from now) current_time = datetime.now(tz=UTC) schedule_time = current_time + timedelta(seconds=3) - + # Define the target subject for the scheduled message schedule_target = f"test_stream.{uuid4()}" - + # Publish the message with a schedule await broker.publish( message={"type": "do_something"}, @@ -75,7 +75,7 @@ broker = NatsBroker() @broker.subscriber( - "test_stream.*", + "test_stream.*", stream=JStream("test_stream", allow_msg_schedules=True) ) async def handle_scheduled_message(msg: NatsMessage) -> None: @@ -114,4 +114,4 @@ if __name__ == "__main__": - `schedule_time`: A `datetime` object (preferably with UTC timezone) indicating when the message should be delivered - `schedule_target`: The subject where the scheduled message will be published, should be unique for every message. - **Subject Pattern**: The subscriber should use a wildcard pattern (e.g., `"test_stream.*"`) to match the scheduled target subjects -- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues \ No newline at end of file +- **Timezone**: Always use timezone-aware datetime objects, preferably UTC, to avoid scheduling issues From 39ef0d60594bdb87a3a66490075af642cb5082c8 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 8 Nov 2025 02:07:01 +0100 Subject: [PATCH 09/12] add nats server conf --- .github/configs/nats-server.conf | 3 +++ .github/workflows/pr_tests.yaml | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 .github/configs/nats-server.conf diff --git a/.github/configs/nats-server.conf b/.github/configs/nats-server.conf new file mode 100644 index 0000000000..2b099dc0be --- /dev/null +++ b/.github/configs/nats-server.conf @@ -0,0 +1,3 @@ +jetstream { + store_dir='nats_data/' +} \ No newline at end of file diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index 3109848083..8a775de716 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -365,9 +365,11 @@ jobs: runs-on: ubuntu-latest services: nats: - image: diementros/nats:js + image: nats:2.12.1 ports: - 4222:4222 + volumes: + - ./.github/configs/nats-server.conf:/etc/nats/nats-server.conf steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: From aebe188481a9df7cf567151f066f9f196a97fb04 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 8 Nov 2025 02:20:24 +0100 Subject: [PATCH 10/12] just try --- .github/workflows/pr_tests.yaml | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index 8a775de716..d641d03868 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -363,14 +363,24 @@ jobs: - test-basic - test-nats-smoke runs-on: ubuntu-latest - services: - nats: - image: nats:2.12.1 - ports: - - 4222:4222 - volumes: - - ./.github/configs/nats-server.conf:/etc/nats/nats-server.conf steps: + - name: Start NATS with JetStream + run: | + docker run -d \ + --name nats \ + -p 4222:4222 \ + --health-cmd "nats-server --signal=ping" \ + --health-interval 10s \ + --health-timeout 5s \ + --health-retries 5 \ + nats:2.12.1 -js + + # Wait for NATS to be ready + until docker exec nats nats-server --signal=ping 2>/dev/null; do + echo "Waiting for NATS to be ready..." + sleep 1 + done + echo "NATS is ready!" - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: persist-credentials: false From 73d514468ecf16c1ffdb7b98b30cea9653b1e958 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 8 Nov 2025 02:25:47 +0100 Subject: [PATCH 11/12] update healthcheck --- .github/workflows/pr_tests.yaml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index d641d03868..55fc29f83d 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -369,18 +369,12 @@ jobs: docker run -d \ --name nats \ -p 4222:4222 \ - --health-cmd "nats-server --signal=ping" \ - --health-interval 10s \ - --health-timeout 5s \ - --health-retries 5 \ nats:2.12.1 -js # Wait for NATS to be ready - until docker exec nats nats-server --signal=ping 2>/dev/null; do - echo "Waiting for NATS to be ready..." - sleep 1 - done + timeout 30 bash -c 'until docker logs nats 2>&1 | grep -q "Server is ready"; do sleep 0.5; done' echo "NATS is ready!" + docker logs nats - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: persist-credentials: false From 7f826869c219a41eacff28c0ec3181187ec0c559 Mon Sep 17 00:00:00 2001 From: sheldy Date: Sat, 8 Nov 2025 02:35:52 +0100 Subject: [PATCH 12/12] rm useless config and fix space --- .github/configs/nats-server.conf | 3 --- .github/workflows/pr_tests.yaml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) delete mode 100644 .github/configs/nats-server.conf diff --git a/.github/configs/nats-server.conf b/.github/configs/nats-server.conf deleted file mode 100644 index 2b099dc0be..0000000000 --- a/.github/configs/nats-server.conf +++ /dev/null @@ -1,3 +0,0 @@ -jetstream { - store_dir='nats_data/' -} \ No newline at end of file diff --git a/.github/workflows/pr_tests.yaml b/.github/workflows/pr_tests.yaml index 55fc29f83d..df043bd671 100644 --- a/.github/workflows/pr_tests.yaml +++ b/.github/workflows/pr_tests.yaml @@ -370,7 +370,7 @@ jobs: --name nats \ -p 4222:4222 \ nats:2.12.1 -js - + # Wait for NATS to be ready timeout 30 bash -c 'until docker logs nats 2>&1 | grep -q "Server is ready"; do sleep 0.5; done' echo "NATS is ready!"