Skip to content

Commit fbd12a7

Browse files
example fix
Signed-off-by: Elena Kolevska <elena@kolevska.com> fixes typing Signed-off-by: Elena Kolevska <elena@kolevska.com> more readable example Signed-off-by: Elena Kolevska <elena@kolevska.com> linter Signed-off-by: Elena Kolevska <elena@kolevska.com>
1 parent c325a2a commit fbd12a7

File tree

3 files changed

+44
-51
lines changed

3 files changed

+44
-51
lines changed

dapr/clients/grpc/subscription.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import json
22

3-
import grpc
3+
from grpc import StreamStreamMultiCallable, RpcError, StatusCode # type: ignore
44

55
from dapr.clients.exceptions import StreamInactiveError
66
from dapr.clients.grpc._response import TopicEventResponse
77
from dapr.proto import api_v1, appcallback_v1
88
import queue
99
import threading
10+
from typing import Optional
1011

1112

1213
def success():
@@ -28,11 +29,11 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
2829
self.topic = topic
2930
self.metadata = metadata or {}
3031
self.dead_letter_topic = dead_letter_topic or ''
31-
self._stream = None
32-
self._response_thread = None
33-
self._send_queue = queue.Queue()
34-
self._receive_queue = queue.Queue()
35-
self._stream_active = False
32+
self._stream: Optional[StreamStreamMultiCallable] = None # Type annotation for gRPC stream
33+
self._response_thread: Optional[threading.Thread] = None # Type for thread
34+
self._send_queue: queue.Queue = queue.Queue() # Type annotation for send queue
35+
self._receive_queue: queue.Queue = queue.Queue() # Type annotation for receive queue
36+
self._stream_active: bool = False
3637
self._stream_lock = threading.Lock() # Protects _stream_active
3738

3839
def start(self):
@@ -55,9 +56,8 @@ def outgoing_request_iterator():
5556
# Start sending back acknowledgement messages from the send queue
5657
while self._is_stream_active():
5758
try:
58-
response = self._send_queue.get()
59-
# The above blocks until a message is available or the stream is closed
60-
# so that's why we need to check again if the stream is still active
59+
response = self._send_queue.get(timeout=1)
60+
# Check again if the stream is still active
6161
if not self._is_stream_active():
6262
break
6363
yield response
@@ -76,17 +76,19 @@ def outgoing_request_iterator():
7676

7777
def _handle_incoming_messages(self):
7878
try:
79-
# The first message dapr sends on the stream is for signalling only, so discard it
80-
next(self._stream)
81-
82-
# Read messages from the stream and put them in the receive queue
83-
for message in self._stream:
84-
if self._is_stream_active():
85-
self._receive_queue.put(message.event_message)
86-
else:
87-
break
88-
except grpc.RpcError as e:
89-
if e.code() != grpc.StatusCode.CANCELLED:
79+
# Check if the stream is not None
80+
if self._stream is not None:
81+
# The first message dapr sends on the stream is for signalling only, so discard it
82+
next(self._stream)
83+
84+
# Read messages from the stream and put them in the receive queue
85+
for message in self._stream:
86+
if self._is_stream_active():
87+
self._receive_queue.put(message.event_message)
88+
else:
89+
break
90+
except RpcError as e:
91+
if e.code() != StatusCode.CANCELLED:
9092
print(f'gRPC error in stream: {e.details()}, Status Code: {e.code()}')
9193
except Exception as e:
9294
raise Exception(f'Error while handling responses: {e}')
@@ -157,8 +159,8 @@ def close(self):
157159
if self._stream:
158160
try:
159161
self._stream.cancel()
160-
except grpc.RpcError as e:
161-
if e.code() != grpc.StatusCode.CANCELLED:
162+
except RpcError as e:
163+
if e.code() != StatusCode.CANCELLED:
162164
raise Exception(f'Error while closing stream: {e}')
163165
except Exception as e:
164166
raise Exception(f'Error while closing stream: {e}')

examples/pubsub-streaming/README.md

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,11 @@ Run the following command in a terminal/command prompt:
2727
<!-- STEP
2828
name: Run subscriber
2929
expected_stdout_lines:
30-
- '== APP == Subscriber received: id=1, message="hello world", content_type="application/json"'
31-
- 'RETRY status returned from app while processing pub/sub event'
32-
- '== APP == Subscriber received: id=2, message="hello world", content_type="application/json"'
33-
- '== APP == Subscriber received: id=3, message="hello world", content_type="application/json"'
34-
- '== APP == Wildcard-Subscriber received: id=4, message="hello world", content_type="application/json"'
35-
- '== APP == Wildcard-Subscriber received: id=5, message="hello world", content_type="application/json"'
36-
- '== APP == Wildcard-Subscriber received: id=6, message="hello world", content_type="application/json"'
37-
- '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"'
38-
- '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD'
39-
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
30+
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A"
31+
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A"
32+
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A"
33+
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A"
34+
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A"
4035
output_match_mode: substring
4136
background: true
4237
match_order: none
@@ -61,6 +56,7 @@ expected_stdout_lines:
6156
- "== APP == {'id': 4, 'message': 'hello world'}"
6257
- "== APP == {'id': 5, 'message': 'hello world'}"
6358
background: true
59+
output_match_mode: substring
6460
sleep: 15
6561
-->
6662

examples/pubsub-streaming/subscriber.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,20 @@ def main():
1717

1818
try:
1919
for i in range(5):
20-
try:
21-
message = subscription.next_message()
22-
if message is None:
23-
print('No message received within timeout period.')
24-
continue
25-
26-
# Process the message
27-
response_status = process_message(message)
28-
29-
if response_status == 'success':
30-
subscription.respond_success(message)
31-
elif response_status == 'retry':
32-
subscription.respond_retry(message)
33-
elif response_status == 'drop':
34-
subscription.respond_drop(message)
35-
36-
except Exception as e:
37-
print(f'Error getting message: {e}')
38-
break
20+
message = subscription.next_message()
21+
if message is None:
22+
print('No message received within timeout period.')
23+
continue
24+
25+
# Process the message
26+
response_status = process_message(message)
27+
28+
if response_status == 'success':
29+
subscription.respond_success(message)
30+
elif response_status == 'retry':
31+
subscription.respond_retry(message)
32+
elif response_status == 'drop':
33+
subscription.respond_drop(message)
3934

4035
finally:
4136
subscription.close()

0 commit comments

Comments
 (0)