Skip to content

Makes streaming Subscription iterable #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,14 @@
async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
else:
continue
async for message in subscription:
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)

Check warning on line 541 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L537-L541

Added lines #L537 - L541 were not covered by tests
else:
continue

Check warning on line 543 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L543

Added line #L543 was not covered by tests

except StreamInactiveError:
break

Expand Down
7 changes: 7 additions & 0 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

def __aiter__(self):
"""Make the subscription async iterable."""
return self

Check warning on line 120 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L120

Added line #L120 was not covered by tests

async def __anext__(self):
return await self.next_message()

Check warning on line 123 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L123

Added line #L123 was not covered by tests
22 changes: 13 additions & 9 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamCancelledError
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
Expand Down Expand Up @@ -535,17 +536,20 @@ def subscribe_with_handler(
def stream_messages(sub):
while True:
try:
message = sub.next_message()
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue
for message in sub:
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue

except StreamInactiveError:
break
except StreamCancelledError:
break

def close_subscription():
subscription.close()
Expand Down
6 changes: 6 additions & 0 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ def close(self):
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

def __iter__(self):
return self

def __next__(self):
return self.next_message()
47 changes: 26 additions & 21 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None:
You can create a streaming subscription to a PubSub topic using either the `subscribe`
or `subscribe_handler` methods.

The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
stream by
The `subscribe` method returns an iterable `Subscription` object, which allows you to pull messages from the
stream by using a `for` loop (ex. `for message in subscription`) or by
calling the `next_message` method. This will block on the main thread while waiting for messages.
When done, you should call the close method to terminate the
subscription and stop receiving messages.
Expand All @@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method:
import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError

counter = 0

Expand All @@ -303,30 +303,35 @@ def main():
)

try:
while counter < 5:
try:
message = subscription.next_message()
for message in subscription:
if message is None:
print('No message received. The stream might have been cancelled.')
continue

except StreamInactiveError as e:
try:
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

if counter >= 5:
break
except StreamInactiveError:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
except StreamCancelledError:
print('Stream was cancelled')
break
except Exception as e:
print(f'Error occurred during message processing: {e}')

finally:
print("Closing subscription...")
print('Closing subscription...')
subscription.close()


Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub-streaming-async/subscriber-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def process_message(message) -> TopicEventResponse:
Asynchronously processes the message and returns a TopicEventResponse.
"""

print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
global counter
counter += 1
return TopicEventResponse('success')
Expand Down
42 changes: 23 additions & 19 deletions examples/pubsub-streaming-async/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return 'success'


Expand All @@ -31,32 +31,36 @@ async def main():
)

try:
while counter < 5:
async for message in subscription:
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue

try:
message = await subscription.next_message()
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue
# Process the message
response_status = process_message(message)

# Respond based on the processing result
if response_status == 'success':
await subscription.respond_success(message)
elif response_status == 'retry':
await subscription.respond_retry(message)
elif response_status == 'drop':
await subscription.respond_drop(message)

if counter >= 5:
break

except StreamInactiveError:
print('Stream is inactive. Retrying...')
await asyncio.sleep(1)
continue
except StreamCancelledError as e:
except StreamCancelledError:
print('Stream was cancelled')
break
# Process the message
response_status = process_message(message)

if response_status == 'success':
await subscription.respond_success(message)
elif response_status == 'retry':
await subscription.respond_retry(message)
elif response_status == 'drop':
await subscription.respond_drop(message)

finally:
print('Closing subscription...')
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub-streaming/subscriber-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return TopicEventResponse('success')


Expand Down
44 changes: 20 additions & 24 deletions examples/pubsub-streaming/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
return 'success'


Expand All @@ -36,36 +36,32 @@ def main():
return

try:
while counter < 5:
for message in subscription:
if message is None:
print('No message received. The stream might have been cancelled.')
continue

try:
message = subscription.next_message()
if message is None:
print(
'No message received within timeout period. '
'The stream might have been cancelled.'
)
continue

except StreamInactiveError as e:
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

if counter >= 5:
break
except StreamInactiveError:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
except StreamCancelledError as e:
except StreamCancelledError:
print('Stream was cancelled')
break
except Exception as e:
print(f'Error occurred: {e}')
pass

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
print(f'Error occurred during message processing: {e}')

finally:
print('Closing subscription...')
Expand Down