Skip to content

Commit

Permalink
Linter
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska committed Oct 10, 2024
1 parent cac1726 commit 0ada5ea
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 36 deletions.
53 changes: 38 additions & 15 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,28 @@
BindingRequest,
TransactionalStateOperation,
)
from dapr.clients.grpc._response import (BindingResponse, DaprResponse, GetSecretResponse,
GetBulkSecretResponse, GetMetadataResponse,
InvokeMethodResponse, UnlockResponseStatus, StateResponse,
BulkStatesResponse, BulkStateItem, ConfigurationResponse,
QueryResponse, QueryResponseItem, RegisteredComponents,
ConfigurationWatcher, TryLockResponse, UnlockResponse,
GetWorkflowResponse, StartWorkflowResponse,
TopicEventResponse, )
from dapr.clients.grpc._response import (
BindingResponse,
DaprResponse,
GetSecretResponse,
GetBulkSecretResponse,
GetMetadataResponse,
InvokeMethodResponse,
UnlockResponseStatus,
StateResponse,
BulkStatesResponse,
BulkStateItem,
ConfigurationResponse,
QueryResponse,
QueryResponseItem,
RegisteredComponents,
ConfigurationWatcher,
TryLockResponse,
UnlockResponse,
GetWorkflowResponse,
StartWorkflowResponse,
TopicEventResponse,
)


class DaprGrpcClientAsync:
Expand Down Expand Up @@ -471,8 +485,13 @@ async def publish_event(

return DaprResponse(await call.initial_metadata())

async def subscribe(self, pubsub_name: str, topic: str, metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None, ) -> Subscription:
async def subscribe(
self,
pubsub_name: str,
topic: str,
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Subscription:
"""
Subscribe to a topic with a bidirectional stream
Expand All @@ -485,14 +504,18 @@ async def subscribe(self, pubsub_name: str, topic: str, metadata: Optional[dict]
Returns:
Subscription: The Subscription object managing the stream.
"""
subscription = Subscription(self._stub, pubsub_name, topic, metadata,
dead_letter_topic)
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
await subscription.start()
return subscription

async def subscribe_with_handler(self, pubsub_name: str, topic: str,
handler_fn: Callable[..., TopicEventResponse], metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None, ) -> Callable[[], Awaitable[None]]:
async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., TopicEventResponse],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Expand Down
12 changes: 7 additions & 5 deletions dapr/aio/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage
from dapr.proto import api_v1, appcallback_v1

class Subscription:

class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
self._stub = stub
self._pubsub_name = pubsub_name
Expand Down Expand Up @@ -63,9 +63,11 @@ async def next_message(self):
return SubscriptionMessage(message.event_message)
except AioRpcError as e:
if e.code() == StatusCode.UNAVAILABLE:
print(f'gRPC error while reading from stream: {e.details()}, '
f'Status Code: {e.code()}. '
f'Attempting to reconnect...')
print(
f'gRPC error while reading from stream: {e.details()}, '
f'Status Code: {e.code()}. '
f'Attempting to reconnect...'
)
await self.reconnect_stream()
elif e.code() != StatusCode.CANCELLED:
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
Expand Down Expand Up @@ -105,4 +107,4 @@ async def close(self):
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')
raise Exception(f'Error while closing stream: {e}')

Check warning on line 110 in dapr/aio/clients/grpc/subscription.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/subscription.py#L106-L110

Added lines #L106 - L110 were not covered by tests
2 changes: 0 additions & 2 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from typing import Optional



class Subscription:

def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
self._stub = stub
self._pubsub_name = pubsub_name
Expand Down
4 changes: 3 additions & 1 deletion dapr/common/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dapr.proto.runtime.v1.appcallback_pb2 import TopicEventRequest
from typing import Optional, Union


class SubscriptionMessage:
def __init__(self, msg: TopicEventRequest):
self._id: str = msg.id
Expand Down Expand Up @@ -86,7 +87,8 @@ def _parse_data_content(self):
class StreamInactiveError(Exception):
pass


class PubSubEventStatus:
SUCCESS = 'success'
RETRY = 'retry'
DROP = 'drop'
DROP = 'drop'
2 changes: 1 addition & 1 deletion examples/pubsub-streaming/async-subscriber-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def main():
# Wait until 5 messages are processed
global counter
while counter < 5:
print("Counter: ", counter)
print('Counter: ', counter)
await asyncio.sleep(1)

print('Closing subscription...')
Expand Down
1 change: 0 additions & 1 deletion examples/pubsub-streaming/async-subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,5 @@ async def main():
await subscription.close()



if __name__ == '__main__':
asyncio.run(main())
15 changes: 10 additions & 5 deletions tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@
from dapr.clients.exceptions import DaprGrpcError
from dapr.clients.grpc.client import DaprGrpcClient
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError, Subscription
from dapr.clients.grpc.subscription import StreamInactiveError
from dapr.proto import common_v1
from .fake_dapr_server import FakeDaprSidecar
from dapr.conf import settings
from dapr.clients.grpc._helpers import to_bytes
from dapr.clients.grpc._request import TransactionalStateOperation
from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._response import (ConfigurationItem, ConfigurationResponse,
ConfigurationWatcher, UnlockResponseStatus,
WorkflowRuntimeStatus, TopicEventResponse, )
from dapr.clients.grpc._response import (
ConfigurationItem,
ConfigurationResponse,
ConfigurationWatcher,
UnlockResponseStatus,
WorkflowRuntimeStatus,
TopicEventResponse,
)


class DaprGrpcClientTests(unittest.TestCase):
Expand Down Expand Up @@ -372,7 +377,7 @@ def handler(message):

counter += 1

return TopicEventResponse("success")
return TopicEventResponse('success')

close_fn = dapr.subscribe_with_handler(
pubsub_name='pubsub', topic='example', handler_fn=handler
Expand Down
12 changes: 6 additions & 6 deletions tests/clients/test_dapr_grpc_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import json
import socket
import tempfile
import unittest
import uuid
import time
from unittest.mock import patch

from google.rpc import status_pb2, code_pb2
Expand All @@ -34,9 +32,12 @@
from dapr.clients.grpc._request import TransactionalStateOperation
from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._response import (ConfigurationItem, ConfigurationWatcher,
ConfigurationResponse, UnlockResponseStatus,
TopicEventResponse, )
from dapr.clients.grpc._response import (
ConfigurationItem,
ConfigurationWatcher,
ConfigurationResponse,
UnlockResponseStatus,
)


class DaprGrpcClientAsyncTests(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -382,7 +383,6 @@ async def test_subscribe_topic_early_close(self):
# await asyncio.sleep(0.1) # sleep to prevent a busy loop
# await close_fn()


@patch.object(settings, 'DAPR_API_TOKEN', 'test-token')
async def test_dapr_api_token_insertion(self):
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
Expand Down

0 comments on commit 0ada5ea

Please sign in to comment.