From 2caf0720fe84986df3e2163d1fc2928a8b67e1b8 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 24 Sep 2024 01:05:32 +0100 Subject: [PATCH] more small tweaks Signed-off-by: Elena Kolevska --- dapr/clients/exceptions.py | 4 -- dapr/clients/grpc/subscription.py | 12 ++++- .../en/python-sdk-docs/python-client.md | 48 +++++++++++-------- examples/pubsub-streaming/subscriber.py | 15 +++++- tests/clients/test_dapr_grpc_client.py | 7 ++- 5 files changed, 59 insertions(+), 27 deletions(-) diff --git a/dapr/clients/exceptions.py b/dapr/clients/exceptions.py index c872b65a..91bc04a8 100644 --- a/dapr/clients/exceptions.py +++ b/dapr/clients/exceptions.py @@ -132,7 +132,3 @@ def serialize_status_detail(status_detail): if not status_detail: return None return MessageToDict(status_detail, preserving_proto_field_name=True) - - -class StreamInactiveError(Exception): - pass diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index 2c8c18e3..b2a88b9d 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -2,7 +2,6 @@ from grpc import RpcError, StatusCode, Call # type: ignore -from dapr.clients.exceptions import StreamInactiveError from dapr.clients.grpc._response import TopicEventResponse from dapr.clients.health import DaprHealth from dapr.proto import api_v1, appcallback_v1 @@ -95,6 +94,13 @@ def reconnect_stream(self): self.start() def next_message(self, timeout=None): + """ + Get the next message from the receive queue. + @param timeout: The time in seconds to wait for a message before returning None. + If None, wait indefinitely. + @return: The next message from the queue, + or None if no message is received within the timeout. + """ msg = self.read_message_from_queue(self._receive_queue, timeout=timeout) if msg is None: @@ -241,3 +247,7 @@ def _parse_data_content(self): except Exception as e: # Log or handle any unexpected exceptions print(f'Error parsing media type: {e}') + + +class StreamInactiveError(Exception): + pass diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index 6d646ff0..900546ed 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -263,27 +263,37 @@ When done using the subscription, you should call the `close` method to stop the ```python with DaprClient() as client: - subscription = client.subscribe( - pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' - ) - - try: - for i in range(5): + subscription = client.subscribe( + pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' + ) + + try: + i = 0 + while i < 5: + try: message = subscription.next_message(1) - if message is None: - print('No message received within timeout period.') - continue - - # Process the message - # ... - - # Return the status based on the processing result + except StreamInactiveError as e: + print('Stream is inactive. Retrying...') + time.sleep(5) + continue + if message is None: + print('No message received within timeout period.') + continue + + # Process the message + response_status = process_message(message) + + if response_status == 'success': subscription.respond_success(message) - # or subscription.respond_retry(message) - # or subscription.respond_drop(message) - - finally: - subscription.close() + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) + + i += 1 + + finally: + subscription.close() ``` ### Interact with output bindings diff --git a/examples/pubsub-streaming/subscriber.py b/examples/pubsub-streaming/subscriber.py index 8b396281..c8cc8205 100644 --- a/examples/pubsub-streaming/subscriber.py +++ b/examples/pubsub-streaming/subscriber.py @@ -1,4 +1,7 @@ +import time + from dapr.clients import DaprClient +from dapr.clients.grpc.subscription import StreamInactiveError def process_message(message): @@ -14,8 +17,14 @@ def main(): ) try: - for i in range(5): - message = subscription.next_message() + i = 0 + while i < 5: + try: + message = subscription.next_message(1) + except StreamInactiveError as e: + print('Stream is inactive. Retrying...') + time.sleep(5) + continue if message is None: print('No message received within timeout period.') continue @@ -30,6 +39,8 @@ def main(): elif response_status == 'drop': subscription.respond_drop(message) + i += 1 + finally: subscription.close() diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index ac411745..019ea84f 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -24,9 +24,10 @@ from google.rpc import status_pb2, code_pb2 -from dapr.clients.exceptions import DaprGrpcError, StreamInactiveError +from dapr.clients.exceptions import DaprGrpcError from dapr.clients.grpc.client import DaprGrpcClient from dapr.clients import DaprClient +from dapr.clients.grpc.subscription import StreamInactiveError from dapr.proto import common_v1 from .fake_dapr_server import FakeDaprSidecar from dapr.conf import settings @@ -295,6 +296,10 @@ def test_subscribe_topic(self): self.assertEqual('application/json', message2.data_content_type()) self.assertEqual({'a': 1}, message2.data()) + # Third call with timeout + message3 = subscription.next_message(1) + self.assertIsNone(message3) + def test_subscribe_topic_early_close(self): dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}') subscription = dapr.subscribe(pubsub_name='pubsub', topic='example')