Skip to content

Commit

Permalink
Makes streaming Subscription iterable (#751)
Browse files Browse the repository at this point in the history
* Make Streaming Subscription iterable

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds iter for async client

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* linter and updates docs

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska authored Nov 11, 2024
1 parent 6e90e84 commit afd13ab
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 82 deletions.
15 changes: 8 additions & 7 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,14 @@ async def subscribe_with_handler(
async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
else:
continue
async for message in subscription:
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
else:
continue

except StreamInactiveError:
break

Expand Down
7 changes: 7 additions & 0 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@ async def close(self):
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

def __aiter__(self):
"""Make the subscription async iterable."""
return self

async def __anext__(self):
return await self.next_message()
22 changes: 13 additions & 9 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamCancelledError
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -535,17 +536,20 @@ def subscribe_with_handler(
def stream_messages(sub):
while True:
try:
message = sub.next_message()
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue
for message in sub:
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue

except StreamInactiveError:
break
except StreamCancelledError:
break

def close_subscription():
subscription.close()
Expand Down
6 changes: 6 additions & 0 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ def close(self):
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

def __iter__(self):
return self

def __next__(self):
return self.next_message()
47 changes: 26 additions & 21 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None:
You can create a streaming subscription to a PubSub topic using either the `subscribe`
or `subscribe_handler` methods.

The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
stream by
The `subscribe` method returns an iterable `Subscription` object, which allows you to pull messages from the
stream by using a `for` loop (ex. `for message in subscription`) or by
calling the `next_message` method. This will block on the main thread while waiting for messages.
When done, you should call the close method to terminate the
subscription and stop receiving messages.
Expand All @@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method:
import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError

counter = 0

Expand All @@ -303,30 +303,35 @@ def main():
)

try:
while counter < 5:
try:
message = subscription.next_message()
for message in subscription:
if message is None:
print('No message received. The stream might have been cancelled.')
continue

except StreamInactiveError as e:
try:
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

if counter >= 5:
break
except StreamInactiveError:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
except StreamCancelledError:
print('Stream was cancelled')
break
except Exception as e:
print(f'Error occurred during message processing: {e}')

finally:
print("Closing subscription...")
print('Closing subscription...')
subscription.close()


Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub-streaming-async/subscriber-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def process_message(message) -> TopicEventResponse:
Asynchronously processes the message and returns a TopicEventResponse.
"""

print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
global counter
counter += 1
return TopicEventResponse('success')
Expand Down
42 changes: 23 additions & 19 deletions examples/pubsub-streaming-async/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return 'success'


Expand All @@ -31,32 +31,36 @@ async def main():
)

try:
while counter < 5:
async for message in subscription:
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue

try:
message = await subscription.next_message()
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue
# Process the message
response_status = process_message(message)

# Respond based on the processing result
if response_status == 'success':
await subscription.respond_success(message)
elif response_status == 'retry':
await subscription.respond_retry(message)
elif response_status == 'drop':
await subscription.respond_drop(message)

if counter >= 5:
break

except StreamInactiveError:
print('Stream is inactive. Retrying...')
await asyncio.sleep(1)
continue
except StreamCancelledError as e:
except StreamCancelledError:
print('Stream was cancelled')
break
# Process the message
response_status = process_message(message)

if response_status == 'success':
await subscription.respond_success(message)
elif response_status == 'retry':
await subscription.respond_retry(message)
elif response_status == 'drop':
await subscription.respond_drop(message)

finally:
print('Closing subscription...')
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub-streaming/subscriber-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return TopicEventResponse('success')


Expand Down
44 changes: 20 additions & 24 deletions examples/pubsub-streaming/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return 'success'


Expand All @@ -36,36 +36,32 @@ def main():
return

try:
while counter < 5:
for message in subscription:
if message is None:
print('No message received. The stream might have been cancelled.')
continue

try:
message = subscription.next_message()
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue

except StreamInactiveError as e:
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

if counter >= 5:
break
except StreamInactiveError:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
except StreamCancelledError as e:
except StreamCancelledError:
print('Stream was cancelled')
break
except Exception as e:
print(f'Error occurred: {e}')
pass

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
print(f'Error occurred during message processing: {e}')

finally:
print('Closing subscription...')
Expand Down

0 comments on commit afd13ab

Please sign in to comment.