From afd13ab3071dbc39aaa0b7f4c556886d8f855de3 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 11 Nov 2024 16:34:09 +0000 Subject: [PATCH] Makes streaming Subscription iterable (#751) * Make Streaming Subscription iterable Signed-off-by: Elena Kolevska * Adds iter for async client Signed-off-by: Elena Kolevska * linter and updates docs Signed-off-by: Elena Kolevska --------- Signed-off-by: Elena Kolevska --- dapr/aio/clients/grpc/client.py | 15 +++--- dapr/aio/clients/grpc/subscription.py | 7 +++ dapr/clients/grpc/client.py | 22 +++++---- dapr/clients/grpc/subscription.py | 6 +++ .../en/python-sdk-docs/python-client.md | 47 ++++++++++--------- .../subscriber-handler.py | 2 +- examples/pubsub-streaming-async/subscriber.py | 42 +++++++++-------- .../pubsub-streaming/subscriber-handler.py | 2 +- examples/pubsub-streaming/subscriber.py | 44 ++++++++--------- 9 files changed, 105 insertions(+), 82 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 2b40101c..e4d4e902 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -534,13 +534,14 @@ async def subscribe_with_handler( async def stream_messages(sub: Subscription): while True: try: - message = await sub.next_message() - if message: - response = await handler_fn(message) - if response: - await subscription.respond(message, response.status) - else: - continue + async for message in subscription: + if message: + response = await handler_fn(message) + if response: + await subscription.respond(message, response.status) + else: + continue + except StreamInactiveError: break diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index a526ee86..9aabf8b2 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -114,3 +114,10 @@ async def close(self): raise Exception(f'Error while closing stream: {e}') except Exception as e: raise Exception(f'Error while closing stream: {e}') + + def __aiter__(self): + """Make the subscription async iterable.""" + return self + + async def __anext__(self): + return await self.next_message() diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index 94793907..19f4a3df 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -45,6 +45,7 @@ from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy +from dapr.common.pubsub.subscription import StreamCancelledError from dapr.conf import settings from dapr.proto import api_v1, api_service_v1, common_v1 from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse @@ -535,17 +536,20 @@ def subscribe_with_handler( def stream_messages(sub): while True: try: - message = sub.next_message() - if message: - # Process the message - response = handler_fn(message) - if response: - subscription.respond(message, response.status) - else: - # No message received - continue + for message in sub: + if message: + # Process the message + response = handler_fn(message) + if response: + subscription.respond(message, response.status) + else: + # No message received + continue + except StreamInactiveError: break + except StreamCancelledError: + break def close_subscription(): subscription.close() diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index d67bed9d..6022d618 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -143,3 +143,9 @@ def close(self): raise Exception(f'Error while closing stream: {e}') except Exception as e: raise Exception(f'Error while closing stream: {e}') + + def __iter__(self): + return self + + def __next__(self): + return self.next_message() diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index b4e92a9b..fc6ef496 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None: You can create a streaming subscription to a PubSub topic using either the `subscribe` or `subscribe_handler` methods. -The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the -stream by +The `subscribe` method returns an iterable `Subscription` object, which allows you to pull messages from the +stream by using a `for` loop (ex. `for message in subscription`) or by calling the `next_message` method. This will block on the main thread while waiting for messages. When done, you should call the close method to terminate the subscription and stop receiving messages. @@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method: import time from dapr.clients import DaprClient -from dapr.clients.grpc.subscription import StreamInactiveError +from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError counter = 0 @@ -303,30 +303,35 @@ def main(): ) try: - while counter < 5: - try: - message = subscription.next_message() + for message in subscription: + if message is None: + print('No message received. The stream might have been cancelled.') + continue - except StreamInactiveError as e: + try: + response_status = process_message(message) + + if response_status == 'success': + subscription.respond_success(message) + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) + + if counter >= 5: + break + except StreamInactiveError: print('Stream is inactive. Retrying...') time.sleep(1) continue - if message is None: - print('No message received within timeout period.') - continue - - # Process the message - response_status = process_message(message) - - if response_status == 'success': - subscription.respond_success(message) - elif response_status == 'retry': - subscription.respond_retry(message) - elif response_status == 'drop': - subscription.respond_drop(message) + except StreamCancelledError: + print('Stream was cancelled') + break + except Exception as e: + print(f'Error occurred during message processing: {e}') finally: - print("Closing subscription...") + print('Closing subscription...') subscription.close() diff --git a/examples/pubsub-streaming-async/subscriber-handler.py b/examples/pubsub-streaming-async/subscriber-handler.py index 34129ee7..06a492af 100644 --- a/examples/pubsub-streaming-async/subscriber-handler.py +++ b/examples/pubsub-streaming-async/subscriber-handler.py @@ -18,7 +18,7 @@ async def process_message(message) -> TopicEventResponse: Asynchronously processes the message and returns a TopicEventResponse. """ - print(f'Processing message: {message.data()} from {message.topic()}...') + print(f'Processing message: {message.data()} from {message.topic()}...', flush=True) global counter counter += 1 return TopicEventResponse('success') diff --git a/examples/pubsub-streaming-async/subscriber.py b/examples/pubsub-streaming-async/subscriber.py index 7907bb5f..de51a797 100644 --- a/examples/pubsub-streaming-async/subscriber.py +++ b/examples/pubsub-streaming-async/subscriber.py @@ -19,7 +19,7 @@ def process_message(message): global counter counter += 1 # Process the message here - print(f'Processing message: {message.data()} from {message.topic()}...') + print(f'Processing message: {message.data()} from {message.topic()}...', flush=True) return 'success' @@ -31,32 +31,36 @@ async def main(): ) try: - while counter < 5: + async for message in subscription: + if message is None: + print( + 'No message received within timeout period. ' + 'The stream might have been cancelled.' + ) + continue + try: - message = await subscription.next_message() - if message is None: - print( - 'No message received within timeout period. ' - 'The stream might have been cancelled.' - ) - continue + # Process the message + response_status = process_message(message) + + # Respond based on the processing result + if response_status == 'success': + await subscription.respond_success(message) + elif response_status == 'retry': + await subscription.respond_retry(message) + elif response_status == 'drop': + await subscription.respond_drop(message) + + if counter >= 5: + break except StreamInactiveError: print('Stream is inactive. Retrying...') await asyncio.sleep(1) continue - except StreamCancelledError as e: + except StreamCancelledError: print('Stream was cancelled') break - # Process the message - response_status = process_message(message) - - if response_status == 'success': - await subscription.respond_success(message) - elif response_status == 'retry': - await subscription.respond_retry(message) - elif response_status == 'drop': - await subscription.respond_drop(message) finally: print('Closing subscription...') diff --git a/examples/pubsub-streaming/subscriber-handler.py b/examples/pubsub-streaming/subscriber-handler.py index 3a963fd2..c7aac318 100644 --- a/examples/pubsub-streaming/subscriber-handler.py +++ b/examples/pubsub-streaming/subscriber-handler.py @@ -18,7 +18,7 @@ def process_message(message): # Process the message here global counter counter += 1 - print(f'Processing message: {message.data()} from {message.topic()}...') + print(f'Processing message: {message.data()} from {message.topic()}...', flush=True) return TopicEventResponse('success') diff --git a/examples/pubsub-streaming/subscriber.py b/examples/pubsub-streaming/subscriber.py index 88744c88..50e64c13 100644 --- a/examples/pubsub-streaming/subscriber.py +++ b/examples/pubsub-streaming/subscriber.py @@ -19,7 +19,7 @@ def process_message(message): global counter counter += 1 # Process the message here - print(f'Processing message: {message.data()} from {message.topic()}...') + print(f'Processing message: {message.data()} from {message.topic()}...', flush=True) return 'success' @@ -36,36 +36,32 @@ def main(): return try: - while counter < 5: + for message in subscription: + if message is None: + print('No message received. The stream might have been cancelled.') + continue + try: - message = subscription.next_message() - if message is None: - print( - 'No message received within timeout period. ' - 'The stream might have been cancelled.' - ) - continue - - except StreamInactiveError as e: + response_status = process_message(message) + + if response_status == 'success': + subscription.respond_success(message) + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) + + if counter >= 5: + break + except StreamInactiveError: print('Stream is inactive. Retrying...') time.sleep(1) continue - except StreamCancelledError as e: + except StreamCancelledError: print('Stream was cancelled') break except Exception as e: - print(f'Error occurred: {e}') - pass - - # Process the message - response_status = process_message(message) - - if response_status == 'success': - subscription.respond_success(message) - elif response_status == 'retry': - subscription.respond_retry(message) - elif response_status == 'drop': - subscription.respond_drop(message) + print(f'Error occurred during message processing: {e}') finally: print('Closing subscription...')