Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidirectional streaming for pubsub #735

Merged
merged 34 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0b425ce
works
elena-kolevska Sep 21, 2024
b7abfb4
works
elena-kolevska Sep 22, 2024
19f8c9b
Sync bidi streaming and tests
elena-kolevska Sep 23, 2024
aaf4599
example fix
elena-kolevska Sep 23, 2024
2e817b4
examples fix
elena-kolevska Sep 23, 2024
e34bd95
Adds support for api token
elena-kolevska Sep 23, 2024
d6ceec2
clean up
elena-kolevska Sep 23, 2024
3cadf32
Adds docs
elena-kolevska Sep 23, 2024
f232478
more small tweaks
elena-kolevska Sep 24, 2024
cc318b6
cleanups and tests
elena-kolevska Sep 28, 2024
e75436c
Removes receive queue
elena-kolevska Sep 28, 2024
bcf116b
Adds `subscribe_with_handler`
elena-kolevska Sep 30, 2024
f85c0ab
Fixes linter
elena-kolevska Sep 30, 2024
49dde9d
Fixes linter
elena-kolevska Sep 30, 2024
920bd24
Adds async
elena-kolevska Oct 9, 2024
fb862e4
Adds tests for async streaming subscription
elena-kolevska Oct 9, 2024
7ac13de
Linter
elena-kolevska Oct 10, 2024
c5e5f4f
Split sync and async examples
elena-kolevska Oct 10, 2024
2d89fef
linter
elena-kolevska Oct 10, 2024
a7f2169
Adds interceptors to the async client for bidirectional streaming
elena-kolevska Oct 11, 2024
be4cc40
Removes unneeded class
elena-kolevska Oct 11, 2024
46f2923
Removes async client
elena-kolevska Oct 21, 2024
df84b63
Fixes missing docker-compose in examples (#736)
elena-kolevska Oct 21, 2024
75c3729
Removes async examples test
elena-kolevska Oct 21, 2024
8d4ccd2
Small cleanup
elena-kolevska Oct 21, 2024
533c339
Split up topic names between tests
elena-kolevska Oct 21, 2024
c70b927
lint
elena-kolevska Oct 21, 2024
f7f5162
Revert "Removes async client"
elena-kolevska Oct 21, 2024
da09a54
Split up topic names between tests
elena-kolevska Oct 21, 2024
8c9ce85
updates fake server to wait for confirmation message before sending n…
elena-kolevska Oct 21, 2024
ffde935
Updates protos
elena-kolevska Oct 21, 2024
c237e7b
Adds stream cancelled error
elena-kolevska Oct 21, 2024
86cc67c
linter
elena-kolevska Oct 21, 2024
07fe8fe
Merge branch 'main' into bidistreaming
elena-kolevska Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from warnings import warn

from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
from typing_extensions import Self

from google.protobuf.message import Message as GrpcMessage
Expand All @@ -39,12 +39,14 @@
AioRpcError,
)

from dapr.aio.clients.grpc.subscription import Subscription
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamInactiveError
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf import settings
from dapr.proto import api_v1, api_service_v1, common_v1
Expand Down Expand Up @@ -94,6 +96,7 @@
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -482,6 +485,72 @@

return DaprResponse(await call.initial_metadata())

async def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.

Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
await subscription.start()
return subscription

async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.

Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

Check warning on line 532 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L532

Added line #L532 was not covered by tests

async def stream_messages(sub: Subscription):
while True:
try:
message = await sub.next_message()
if message:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)

Check warning on line 541 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L534-L541

Added lines #L534 - L541 were not covered by tests
else:
continue
except StreamInactiveError:
break

Check warning on line 545 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L543-L545

Added lines #L543 - L545 were not covered by tests

async def close_subscription():
await subscription.close()

Check warning on line 548 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L547-L548

Added lines #L547 - L548 were not covered by tests

asyncio.create_task(stream_messages(subscription))

Check warning on line 550 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L550

Added line #L550 was not covered by tests

return close_subscription

Check warning on line 552 in dapr/aio/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/client.py#L552

Added line #L552 was not covered by tests

async def get_state(
self,
store_name: str,
Expand Down
22 changes: 19 additions & 3 deletions dapr/aio/clients/grpc/interceptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import namedtuple
from typing import List, Tuple

from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
from grpc.aio import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, ClientCallDetails # type: ignore

from dapr.conf import settings

Expand Down Expand Up @@ -51,7 +51,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
return continuation(client_call_details, request)


class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor, StreamStreamClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
additional headers to all calls as needed.

Expand Down Expand Up @@ -115,8 +115,24 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
return response

async def intercept_stream_stream(self, continuation, client_call_details, request):
"""This method intercepts a stream-stream gRPC call. This is the implementation of the
abstract method defined in StreamStreamClientInterceptor defined in grpc. This is invoked
automatically by grpc based on the order in which interceptors are added to the channel.

Args:
continuation: a callable to be invoked to continue with the RPC or next interceptor
client_call_details: a ClientCallDetails object describing the outgoing RPC
request: the request value for the RPC

# Pre-process or intercept call
Returns:
A response object after invoking the continuation callable
"""
new_call_details = await self._intercept_call(client_call_details)
# Call continuation
response = await continuation(new_call_details, request)
Expand Down
116 changes: 116 additions & 0 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import asyncio
from grpc import StatusCode
from grpc.aio import AioRpcError

from dapr.clients.grpc._response import TopicEventResponse
from dapr.clients.health import DaprHealth
from dapr.common.pubsub.subscription import (
StreamInactiveError,
SubscriptionMessage,
StreamCancelledError,
)
from dapr.proto import api_v1, appcallback_v1


class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
self._stub = stub
self._pubsub_name = pubsub_name
self._topic = topic
self._metadata = metadata or {}
self._dead_letter_topic = dead_letter_topic or ''
self._stream = None
self._send_queue = asyncio.Queue()
self._stream_active = asyncio.Event()

async def start(self):
async def outgoing_request_iterator():
try:
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
pubsub_name=self._pubsub_name,
topic=self._topic,
metadata=self._metadata,
dead_letter_topic=self._dead_letter_topic,
)
)
yield initial_request

while self._stream_active.is_set():
try:
response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0)
yield response
except asyncio.TimeoutError:
continue

Check warning on line 44 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L44

Added line #L44 was not covered by tests
except Exception as e:
raise Exception(f'Error while writing to stream: {e}')

Check warning on line 46 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L46

Added line #L46 was not covered by tests

self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
self._stream_active.set()
await self._stream.read() # discard the initial message

async def reconnect_stream(self):
await self.close()
DaprHealth.wait_until_ready()
print('Attempting to reconnect...')
await self.start()

async def next_message(self):
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')

try:
if self._stream is not None:
message = await self._stream.read()
if message is None:
return None

Check warning on line 66 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L66

Added line #L66 was not covered by tests
return SubscriptionMessage(message.event_message)
except AioRpcError as e:
if e.code() == StatusCode.UNAVAILABLE:
print(
f'gRPC error while reading from stream: {e.details()}, '
f'Status Code: {e.code()}. '
f'Attempting to reconnect...'
)
await self.reconnect_stream()
elif e.code() == StatusCode.CANCELLED:
raise StreamCancelledError('Stream has been cancelled')

Check warning on line 77 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L76-L77

Added lines #L76 - L77 were not covered by tests
else:
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
except Exception as e:
raise Exception(f'Error while fetching message: {e}')

Check warning on line 81 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L79-L81

Added lines #L79 - L81 were not covered by tests

return None

async def respond(self, message, status):
try:
status = appcallback_v1.TopicEventResponse(status=status.value)
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
id=message.id(), status=status
)
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
if not self._stream_active.is_set():
raise StreamInactiveError('Stream is not active')

Check warning on line 93 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L93

Added line #L93 was not covered by tests
await self._send_queue.put(msg)
except Exception as e:
print(f"Can't send message: {e}")

Check warning on line 96 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L95-L96

Added lines #L95 - L96 were not covered by tests

async def respond_success(self, message):
await self.respond(message, TopicEventResponse('success').status)

async def respond_retry(self, message):
await self.respond(message, TopicEventResponse('retry').status)

Check warning on line 102 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L102

Added line #L102 was not covered by tests

async def respond_drop(self, message):
await self.respond(message, TopicEventResponse('drop').status)

Check warning on line 105 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L105

Added line #L105 was not covered by tests

async def close(self):
if self._stream:
try:
self._stream.cancel()
self._stream_active.clear()
except AioRpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')

Check warning on line 116 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L112-L116

Added lines #L112 - L116 were not covered by tests
76 changes: 75 additions & 1 deletion dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import threading
import time
import socket
import json
Expand Down Expand Up @@ -41,6 +41,7 @@
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc.subscription import Subscription, StreamInactiveError
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy
Expand Down Expand Up @@ -85,6 +86,7 @@
StartWorkflowResponse,
EncryptResponse,
DecryptResponse,
TopicEventResponse,
)


Expand Down Expand Up @@ -481,6 +483,78 @@

return DaprResponse(call.initial_metadata())

def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.

Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
subscription.start()
return subscription

def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[MetadataTuple] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable:
"""
Subscribe to a topic with a bidirectional stream and a message handler function

Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
timeout (Optional[int]): The time in seconds to wait for a message before returning None
If not set, the `next_message` method will block indefinitely
until a message is received.
"""
subscription = self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)

def stream_messages(sub):
while True:
try:
message = sub.next_message()
if message:
# Process the message
response = handler_fn(message)
if response:
subscription.respond(message, response.status)
else:
# No message received
continue
except StreamInactiveError:
break

Check warning on line 548 in dapr/clients/grpc/client.py

View check run for this annotation

Codecov / codecov/patch

dapr/clients/grpc/client.py#L548

Added line #L548 was not covered by tests

def close_subscription():
subscription.close()

streaming_thread = threading.Thread(target=stream_messages, args=(subscription,))
streaming_thread.start()

return close_subscription

def get_state(
self,
store_name: str,
Expand Down
Loading