diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index f9f53498..d69be23c 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -24,7 +24,7 @@ from warnings import warn -from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any +from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable from typing_extensions import Self from google.protobuf.message import Message as GrpcMessage @@ -39,12 +39,14 @@ AioRpcError, ) +from dapr.aio.clients.grpc.subscription import Subscription from dapr.clients.exceptions import DaprInternalError, DaprGrpcError from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions from dapr.clients.grpc._state import StateOptions, StateItem from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy +from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf.helpers import GrpcEndpoint from dapr.conf import settings from dapr.proto import api_v1, api_service_v1, common_v1 @@ -74,27 +76,14 @@ BindingRequest, TransactionalStateOperation, ) -from dapr.clients.grpc._response import ( - BindingResponse, - DaprResponse, - GetSecretResponse, - GetBulkSecretResponse, - GetMetadataResponse, - InvokeMethodResponse, - UnlockResponseStatus, - StateResponse, - BulkStatesResponse, - BulkStateItem, - ConfigurationResponse, - QueryResponse, - QueryResponseItem, - RegisteredComponents, - ConfigurationWatcher, - TryLockResponse, - UnlockResponse, - GetWorkflowResponse, - StartWorkflowResponse, -) +from dapr.clients.grpc._response import (BindingResponse, DaprResponse, GetSecretResponse, + GetBulkSecretResponse, GetMetadataResponse, + InvokeMethodResponse, UnlockResponseStatus, StateResponse, + BulkStatesResponse, BulkStateItem, ConfigurationResponse, + QueryResponse, QueryResponseItem, RegisteredComponents, + ConfigurationWatcher, TryLockResponse, UnlockResponse, + GetWorkflowResponse, StartWorkflowResponse, + TopicEventResponse, ) class DaprGrpcClientAsync: @@ -482,6 +471,63 @@ async def publish_event( return DaprResponse(await call.initial_metadata()) + async def subscribe(self, pubsub_name: str, topic: str, metadata: Optional[dict] = None, + dead_letter_topic: Optional[str] = None, ) -> Subscription: + """ + Subscribe to a topic with a bidirectional stream + + Args: + pubsub_name (str): The name of the pubsub component. + topic (str): The name of the topic. + metadata (Optional[dict]): Additional metadata for the subscription. + dead_letter_topic (Optional[str]): Name of the dead-letter topic. + + Returns: + Subscription: The Subscription object managing the stream. + """ + subscription = Subscription(self._stub, pubsub_name, topic, metadata, + dead_letter_topic) + await subscription.start() + return subscription + + async def subscribe_with_handler(self, pubsub_name: str, topic: str, + handler_fn: Callable[..., TopicEventResponse], metadata: Optional[dict] = None, + dead_letter_topic: Optional[str] = None, ) -> Callable[[], Awaitable[None]]: + """ + Subscribe to a topic with a bidirectional stream and a message handler function + + Args: + pubsub_name (str): The name of the pubsub component. + topic (str): The name of the topic. + handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received. + metadata (Optional[dict]): Additional metadata for the subscription. + dead_letter_topic (Optional[str]): Name of the dead-letter topic. + + Returns: + Callable[[], Awaitable[None]]: An async function to close the subscription. + """ + subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic) + + async def stream_messages(sub: Subscription): + while True: + try: + message = await sub.next_message() + if message: + response = await handler_fn(message) + if response: + await subscription._respond(message, response.status) + else: + continue + except StreamInactiveError: + break + + async def close_subscription(): + await subscription.close() + + asyncio.create_task(stream_messages(subscription)) + + return close_subscription + async def get_state( self, store_name: str, diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py new file mode 100644 index 00000000..b4be6250 --- /dev/null +++ b/dapr/aio/clients/grpc/subscription.py @@ -0,0 +1,109 @@ +import asyncio +from grpc import StatusCode +from grpc.aio import AioRpcError + +from dapr.clients.grpc._response import TopicEventResponse +from dapr.clients.health import DaprHealth +from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage +from dapr.proto import api_v1, appcallback_v1 + +class Subscription: + + def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None): + self._stub = stub + self._pubsub_name = pubsub_name + self._topic = topic + self._metadata = metadata or {} + self._dead_letter_topic = dead_letter_topic or '' + self._stream = None + self._send_queue = asyncio.Queue() + self._stream_active = asyncio.Event() + + async def start(self): + async def outgoing_request_iterator(): + try: + initial_request = api_v1.SubscribeTopicEventsRequestAlpha1( + initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1( + pubsub_name=self._pubsub_name, + topic=self._topic, + metadata=self._metadata, + dead_letter_topic=self._dead_letter_topic, + ) + ) + yield initial_request + + while self._stream_active.is_set(): + try: + response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0) + yield response + except asyncio.TimeoutError: + continue + except Exception as e: + raise Exception(f'Error while writing to stream: {e}') + + self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator()) + self._stream_active.set() + await self._stream.read() # discard the initial message + + async def reconnect_stream(self): + await self.close() + DaprHealth.wait_until_ready() + print('Attempting to reconnect...') + await self.start() + + async def next_message(self): + if not self._stream_active.is_set(): + raise StreamInactiveError('Stream is not active') + + try: + if self._stream is not None: + message = await self._stream.read() + if message is None: + return None + return SubscriptionMessage(message.event_message) + except AioRpcError as e: + if e.code() == StatusCode.UNAVAILABLE: + print(f'gRPC error while reading from stream: {e.details()}, ' + f'Status Code: {e.code()}. ' + f'Attempting to reconnect...') + await self.reconnect_stream() + elif e.code() != StatusCode.CANCELLED: + raise Exception(f'gRPC error while reading from subscription stream: {e.details()} ' + f'Status Code: {e.code()}') + except Exception as e: + raise Exception(f'Error while fetching message: {e}') + + return None + + async def _respond(self, message, status): + try: + status = appcallback_v1.TopicEventResponse(status=status.value) + response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1( + id=message.id(), status=status + ) + msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response) + if not self._stream_active.is_set(): + raise StreamInactiveError('Stream is not active') + await self._send_queue.put(msg) + except Exception as e: + print(f"Can't send message on inactive stream: {e}") + + async def respond_success(self, message): + await self._respond(message, TopicEventResponse('success').status) + + async def respond_retry(self, message): + await self._respond(message, TopicEventResponse('retry').status) + + async def respond_drop(self, message): + await self._respond(message, TopicEventResponse('drop').status) + + async def close(self): + if self._stream: + try: + self._stream.cancel() + self._stream_active.clear() + except AioRpcError as e: + if e.code() != StatusCode.CANCELLED: + raise Exception(f'Error while closing stream: {e}') + except Exception as e: + raise Exception(f'Error while closing stream: {e}') \ No newline at end of file diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index e6469c4f..94793907 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -523,7 +523,7 @@ def subscribe_with_handler( Args: pubsub_name (str): The name of the pubsub component. topic (str): The name of the topic. - handler_fn (Callable[..., TopicEventResponseStatus]): The function to call when a message is received. + handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received. metadata (Optional[MetadataTuple]): Additional metadata for the subscription. dead_letter_topic (Optional[str]): Name of the dead-letter topic. timeout (Optional[int]): The time in seconds to wait for a message before returning None @@ -540,7 +540,7 @@ def stream_messages(sub): # Process the message response = handler_fn(message) if response: - subscription._respond(message, response) + subscription.respond(message, response.status) else: # No message received continue diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index 053194ad..8b99b34f 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -1,22 +1,16 @@ -import json - -from google.protobuf.json_format import MessageToDict from grpc import RpcError, StatusCode, Call # type: ignore from dapr.clients.grpc._response import TopicEventResponse from dapr.clients.health import DaprHealth +from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage from dapr.proto import api_v1, appcallback_v1 import queue import threading -from typing import Optional, Union +from typing import Optional -from dapr.proto.runtime.v1.appcallback_pb2 import TopicEventRequest class Subscription: - SUCCESS = TopicEventResponse('success').status - RETRY = TopicEventResponse('retry').status - DROP = TopicEventResponse('drop').status def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None): self._stub = stub @@ -102,7 +96,7 @@ def next_message(self): return None - def _respond(self, message, status): + def respond(self, message, status): try: status = appcallback_v1.TopicEventResponse(status=status.value) response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1( @@ -116,13 +110,13 @@ def _respond(self, message, status): print(f"Can't send message on inactive stream: {e}") def respond_success(self, message): - self._respond(message, self.SUCCESS) + self.respond(message, TopicEventResponse('success').status) def respond_retry(self, message): - self._respond(message, self.RETRY) + self.respond(message, TopicEventResponse('retry').status) def respond_drop(self, message): - self._respond(message, self.DROP) + self.respond(message, TopicEventResponse('drop').status) def _set_stream_active(self): with self._stream_lock: @@ -146,87 +140,3 @@ def close(self): raise Exception(f'Error while closing stream: {e}') except Exception as e: raise Exception(f'Error while closing stream: {e}') - - -class SubscriptionMessage: - def __init__(self, msg: TopicEventRequest): - self._id: str = msg.id - self._source: str = msg.source - self._type: str = msg.type - self._spec_version: str = msg.spec_version - self._data_content_type: str = msg.data_content_type - self._topic: str = msg.topic - self._pubsub_name: str = msg.pubsub_name - self._raw_data: bytes = msg.data - self._data: Optional[Union[dict, str]] = None - - try: - self._extensions = MessageToDict(msg.extensions) - except Exception as e: - self._extensions = {} - print(f'Error parsing extensions: {e}') - - # Parse the content based on its media type - if self._raw_data and len(self._raw_data) > 0: - self._parse_data_content() - - def id(self): - return self._id - - def source(self): - return self._source - - def type(self): - return self._type - - def spec_version(self): - return self._spec_version - - def data_content_type(self): - return self._data_content_type - - def topic(self): - return self._topic - - def pubsub_name(self): - return self._pubsub_name - - def raw_data(self): - return self._raw_data - - def extensions(self): - return self._extensions - - def data(self): - return self._data - - def _parse_data_content(self): - try: - if self._data_content_type == 'application/json': - try: - self._data = json.loads(self._raw_data) - except json.JSONDecodeError: - print(f'Error parsing json message data from topic {self._topic}') - pass # If JSON parsing fails, keep `data` as None - elif self._data_content_type == 'text/plain': - # Assume UTF-8 encoding - try: - self._data = self._raw_data.decode('utf-8') - except UnicodeDecodeError: - print(f'Error decoding message data from topic {self._topic} as UTF-8') - elif self._data_content_type.startswith( - 'application/' - ) and self._data_content_type.endswith('+json'): - # Handle custom JSON-based media types (e.g., application/vnd.api+json) - try: - self._data = json.loads(self._raw_data) - except json.JSONDecodeError: - print(f'Error parsing json message data from topic {self._topic}') - pass # If JSON parsing fails, keep `data` as None - except Exception as e: - # Log or handle any unexpected exceptions - print(f'Error parsing media type: {e}') - - -class StreamInactiveError(Exception): - pass diff --git a/dapr/common/pubsub/subscription.py b/dapr/common/pubsub/subscription.py new file mode 100644 index 00000000..ac8db973 --- /dev/null +++ b/dapr/common/pubsub/subscription.py @@ -0,0 +1,92 @@ +import json +from google.protobuf.json_format import MessageToDict +from dapr.proto.runtime.v1.appcallback_pb2 import TopicEventRequest +from typing import Optional, Union + +class SubscriptionMessage: + def __init__(self, msg: TopicEventRequest): + self._id: str = msg.id + self._source: str = msg.source + self._type: str = msg.type + self._spec_version: str = msg.spec_version + self._data_content_type: str = msg.data_content_type + self._topic: str = msg.topic + self._pubsub_name: str = msg.pubsub_name + self._raw_data: bytes = msg.data + self._data: Optional[Union[dict, str]] = None + + try: + self._extensions = MessageToDict(msg.extensions) + except Exception as e: + self._extensions = {} + print(f'Error parsing extensions: {e}') + + # Parse the content based on its media type + if self._raw_data and len(self._raw_data) > 0: + self._parse_data_content() + + def id(self): + return self._id + + def source(self): + return self._source + + def type(self): + return self._type + + def spec_version(self): + return self._spec_version + + def data_content_type(self): + return self._data_content_type + + def topic(self): + return self._topic + + def pubsub_name(self): + return self._pubsub_name + + def raw_data(self): + return self._raw_data + + def extensions(self): + return self._extensions + + def data(self): + return self._data + + def _parse_data_content(self): + try: + if self._data_content_type == 'application/json': + try: + self._data = json.loads(self._raw_data) + except json.JSONDecodeError: + print(f'Error parsing json message data from topic {self._topic}') + pass # If JSON parsing fails, keep `data` as None + elif self._data_content_type == 'text/plain': + # Assume UTF-8 encoding + try: + self._data = self._raw_data.decode('utf-8') + except UnicodeDecodeError: + print(f'Error decoding message data from topic {self._topic} as UTF-8') + elif self._data_content_type.startswith( + 'application/' + ) and self._data_content_type.endswith('+json'): + # Handle custom JSON-based media types (e.g., application/vnd.api+json) + try: + self._data = json.loads(self._raw_data) + except json.JSONDecodeError: + print(f'Error parsing json message data from topic {self._topic}') + pass # If JSON parsing fails, keep `data` as None + except Exception as e: + # Log or handle any unexpected exceptions + print(f'Error parsing media type: {e}') + + +class StreamInactiveError(Exception): + pass + +class PubSubEventStatus: + SUCCESS = 'success' + RETRY = 'retry' + DROP = 'drop' \ No newline at end of file diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index 4f51f945..b4e92a9b 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -216,7 +216,7 @@ with DaprClient() as d: - For a full list of state store query options visit [How-To: Query state]({{< ref howto-state-query-api.md >}}). - Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/state_store_query) for code samples and instructions to try out state store querying. -### Publish & subscribe to messages +### Publish & subscribe #### Publish messages @@ -269,14 +269,11 @@ subscription and stop receiving messages. The `subscribe_with_handler` method accepts a callback function that is executed for each message received from the stream. It runs in a separate thread, so it doesn't block the main thread. The callback should return a -`TopicEventResponseStatus`, indicating whether the message was processed successfully, should be -retried, or should be discarded. You can return these statuses using the `Subscription.SUCCESS`, -`Subscription.RETRY`, and `Subscription.DROP` class properties. The method will automatically manage -message acknowledgments based on the returned status. When done, the subscription will automatically -close, and you don't need to manually stop it. - -The call to `subscribe_with_handler` method returns a close function, which should be called to -terminate the subscription when you're done. +`TopicEventResponse` (ex. `TopicEventResponse('success')`), indicating whether the message was +processed successfully, should be retried, or should be discarded. The method will automatically +manage message acknowledgements based on the returned status. The call to `subscribe_with_handler` +method returns a close function, which should be called to terminate the subscription when you're +done. Here's an example of using the `subscribe` method: @@ -343,7 +340,7 @@ And here's an example of using the `subscribe_with_handler` method: import time from dapr.clients import DaprClient -from dapr.clients.grpc.subscription import Subscription +from dapr.clients.grpc._response import TopicEventResponse counter = 0 @@ -353,7 +350,7 @@ def process_message(message): global counter counter += 1 print(f'Processing message: {message.data()} from {message.topic()}...') - return Subscription.SUCCESS + return TopicEventResponse('success') def main(): @@ -376,6 +373,9 @@ if __name__ == '__main__': main() ``` +- For more information about pub/sub, visit [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}). +- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/main/examples/pubsub-simple) for code samples and instructions to try out streaming pub/sub. + ### Interact with output bindings ```python @@ -386,7 +386,7 @@ with DaprClient() as d: ``` - For a full guide on output bindings visit [How-To: Use bindings]({{< ref howto-bindings.md >}}). -- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/invoke-binding) for code samples and instructions to try out output bindings. +- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/main/examples/invoke-binding) for code samples and instructions to try out output bindings. ### Retrieve secrets diff --git a/examples/pubsub-streaming/README.md b/examples/pubsub-streaming/README.md index 4849e791..4bad7f3c 100644 --- a/examples/pubsub-streaming/README.md +++ b/examples/pubsub-streaming/README.md @@ -116,6 +116,103 @@ dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --e +## Run async example where users control reading messages off the stream + +Run the following command in a terminal/command prompt: + + + +```bash +# 1. Start Subscriber +dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber.py +``` + + + +In another terminal/command prompt run: + + + +```bash +# 2. Start Publisher +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py +``` + + + +## Run async example with a handler function + +Run the following command in a terminal/command prompt: + + + +```bash +# 1. Start Subscriber +dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber-handler.py +``` + + + +In another terminal/command prompt run: + + + +```bash +# 2. Start Publisher +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py +``` + + + + ## Cleanup diff --git a/examples/pubsub-streaming/async-subscriber-handler.py b/examples/pubsub-streaming/async-subscriber-handler.py new file mode 100644 index 00000000..75aca19e --- /dev/null +++ b/examples/pubsub-streaming/async-subscriber-handler.py @@ -0,0 +1,43 @@ +import asyncio +from dapr.aio.clients import DaprClient +from dapr.clients.grpc._response import TopicEventResponse + +counter = 0 + + +async def process_message(message) -> TopicEventResponse: + """ + Asynchronously processes the message and returns a TopicEventResponse. + """ + + print(f'Processing message: {message.data()} from {message.topic()}...') + global counter + counter += 1 + return TopicEventResponse('success') + + +async def main(): + """ + Main function to subscribe to a pubsub topic and handle messages asynchronously. + """ + async with DaprClient() as client: + # Subscribe to the pubsub topic with the message handler + close_fn = await client.subscribe_with_handler( + pubsub_name='pubsub', + topic='TOPIC_A', + handler_fn=process_message, + dead_letter_topic='TOPIC_A_DEAD', + ) + + # Wait until 5 messages are processed + global counter + while counter < 5: + print("Counter: ", counter) + await asyncio.sleep(1) + + print('Closing subscription...') + await close_fn() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/pubsub-streaming/async-subscriber.py b/examples/pubsub-streaming/async-subscriber.py new file mode 100644 index 00000000..396b3cc2 --- /dev/null +++ b/examples/pubsub-streaming/async-subscriber.py @@ -0,0 +1,54 @@ +import asyncio + +from dapr.aio.clients import DaprClient +from dapr.clients.grpc.subscription import StreamInactiveError + +counter = 0 + + +def process_message(message): + global counter + counter += 1 + # Process the message here + print(f'Processing message: {message.data()} from {message.topic()}...') + return 'success' + + +async def main(): + async with DaprClient() as client: + global counter + subscription = await client.subscribe( + pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' + ) + + try: + while counter < 5: + try: + message = await subscription.next_message() + + except StreamInactiveError: + print('Stream is inactive. Retrying...') + await asyncio.sleep(1) + continue + if message is None: + print('No message received within timeout period.') + continue + + # Process the message + response_status = process_message(message) + + if response_status == 'success': + await subscription.respond_success(message) + elif response_status == 'retry': + await subscription.respond_retry(message) + elif response_status == 'drop': + await subscription.respond_drop(message) + + finally: + print('Closing subscription...') + await subscription.close() + + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/pubsub-streaming/subscriber-handler.py b/examples/pubsub-streaming/subscriber-handler.py index 896c00ac..aab840a4 100644 --- a/examples/pubsub-streaming/subscriber-handler.py +++ b/examples/pubsub-streaming/subscriber-handler.py @@ -1,7 +1,7 @@ import time from dapr.clients import DaprClient -from dapr.clients.grpc.subscription import Subscription +from dapr.clients.grpc._response import TopicEventResponse counter = 0 @@ -11,7 +11,7 @@ def process_message(message): global counter counter += 1 print(f'Processing message: {message.data()} from {message.topic()}...') - return Subscription.SUCCESS + return TopicEventResponse('success') def main():