Skip to content

Commit

Permalink
more small tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska committed Sep 24, 2024
1 parent ecc8696 commit 2caf072
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 27 deletions.
4 changes: 0 additions & 4 deletions dapr/clients/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,3 @@ def serialize_status_detail(status_detail):
if not status_detail:
return None
return MessageToDict(status_detail, preserving_proto_field_name=True)


class StreamInactiveError(Exception):
pass
12 changes: 11 additions & 1 deletion dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from grpc import RpcError, StatusCode, Call # type: ignore

from dapr.clients.exceptions import StreamInactiveError
from dapr.clients.grpc._response import TopicEventResponse
from dapr.clients.health import DaprHealth
from dapr.proto import api_v1, appcallback_v1
Expand Down Expand Up @@ -95,6 +94,13 @@ def reconnect_stream(self):
self.start()

Check warning on line 94 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L91-L94

Added lines #L91 - L94 were not covered by tests

def next_message(self, timeout=None):
"""
Get the next message from the receive queue.
@param timeout: The time in seconds to wait for a message before returning None.
If None, wait indefinitely.
@return: The next message from the queue,
or None if no message is received within the timeout.
"""
msg = self.read_message_from_queue(self._receive_queue, timeout=timeout)

if msg is None:
Expand Down Expand Up @@ -241,3 +247,7 @@ def _parse_data_content(self):
except Exception as e:

Check warning on line 247 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L243-L247

Added lines #L243 - L247 were not covered by tests
# Log or handle any unexpected exceptions
print(f'Error parsing media type: {e}')

Check warning on line 249 in dapr/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/subscription.py#L249

Added line #L249 was not covered by tests


class StreamInactiveError(Exception):
pass
48 changes: 29 additions & 19 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,27 +263,37 @@ When done using the subscription, you should call the `close` method to stop the

```python
with DaprClient() as client:
subscription = client.subscribe(
pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
)

try:
for i in range(5):
subscription = client.subscribe(
pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
)

try:
i = 0
while i < 5:
try:
message = subscription.next_message(1)
if message is None:
print('No message received within timeout period.')
continue

# Process the message
# ...

# Return the status based on the processing result
except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(5)
continue
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
# or subscription.respond_retry(message)
# or subscription.respond_drop(message)

finally:
subscription.close()
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

i += 1

finally:
subscription.close()
```

### Interact with output bindings
Expand Down
15 changes: 13 additions & 2 deletions examples/pubsub-streaming/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError


def process_message(message):
Expand All @@ -14,8 +17,14 @@ def main():
)

try:
for i in range(5):
message = subscription.next_message()
i = 0
while i < 5:
try:
message = subscription.next_message(1)
except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(5)
continue
if message is None:
print('No message received within timeout period.')
continue
Expand All @@ -30,6 +39,8 @@ def main():
elif response_status == 'drop':
subscription.respond_drop(message)

i += 1

finally:
subscription.close()

Expand Down
7 changes: 6 additions & 1 deletion tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@

from google.rpc import status_pb2, code_pb2

from dapr.clients.exceptions import DaprGrpcError, StreamInactiveError
from dapr.clients.exceptions import DaprGrpcError
from dapr.clients.grpc.client import DaprGrpcClient
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
from dapr.proto import common_v1
from .fake_dapr_server import FakeDaprSidecar
from dapr.conf import settings
Expand Down Expand Up @@ -295,6 +296,10 @@ def test_subscribe_topic(self):
self.assertEqual('application/json', message2.data_content_type())
self.assertEqual({'a': 1}, message2.data())

# Third call with timeout
message3 = subscription.next_message(1)
self.assertIsNone(message3)

def test_subscribe_topic_early_close(self):
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
subscription = dapr.subscribe(pubsub_name='pubsub', topic='example')
Expand Down

0 comments on commit 2caf072

Please sign in to comment.