From 0ada5ea1165bd147c251bbec4b724906f99c8557 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Thu, 10 Oct 2024 11:02:09 +0100 Subject: [PATCH] Linter Signed-off-by: Elena Kolevska --- dapr/aio/clients/grpc/client.py | 53 +++++++++++++------ dapr/aio/clients/grpc/subscription.py | 12 +++-- dapr/clients/grpc/subscription.py | 2 - dapr/common/pubsub/subscription.py | 4 +- .../async-subscriber-handler.py | 2 +- examples/pubsub-streaming/async-subscriber.py | 1 - tests/clients/test_dapr_grpc_client.py | 15 ++++-- tests/clients/test_dapr_grpc_client_async.py | 12 ++--- 8 files changed, 65 insertions(+), 36 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 626ca697..2b40101c 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -76,14 +76,28 @@ BindingRequest, TransactionalStateOperation, ) -from dapr.clients.grpc._response import (BindingResponse, DaprResponse, GetSecretResponse, - GetBulkSecretResponse, GetMetadataResponse, - InvokeMethodResponse, UnlockResponseStatus, StateResponse, - BulkStatesResponse, BulkStateItem, ConfigurationResponse, - QueryResponse, QueryResponseItem, RegisteredComponents, - ConfigurationWatcher, TryLockResponse, UnlockResponse, - GetWorkflowResponse, StartWorkflowResponse, - TopicEventResponse, ) +from dapr.clients.grpc._response import ( + BindingResponse, + DaprResponse, + GetSecretResponse, + GetBulkSecretResponse, + GetMetadataResponse, + InvokeMethodResponse, + UnlockResponseStatus, + StateResponse, + BulkStatesResponse, + BulkStateItem, + ConfigurationResponse, + QueryResponse, + QueryResponseItem, + RegisteredComponents, + ConfigurationWatcher, + TryLockResponse, + UnlockResponse, + GetWorkflowResponse, + StartWorkflowResponse, + TopicEventResponse, +) class DaprGrpcClientAsync: @@ -471,8 +485,13 @@ async def publish_event( return DaprResponse(await call.initial_metadata()) - async def subscribe(self, pubsub_name: str, topic: str, metadata: Optional[dict] = None, - dead_letter_topic: Optional[str] = None, ) -> Subscription: + async def subscribe( + self, + pubsub_name: str, + topic: str, + metadata: Optional[dict] = None, + dead_letter_topic: Optional[str] = None, + ) -> Subscription: """ Subscribe to a topic with a bidirectional stream @@ -485,14 +504,18 @@ async def subscribe(self, pubsub_name: str, topic: str, metadata: Optional[dict] Returns: Subscription: The Subscription object managing the stream. """ - subscription = Subscription(self._stub, pubsub_name, topic, metadata, - dead_letter_topic) + subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic) await subscription.start() return subscription - async def subscribe_with_handler(self, pubsub_name: str, topic: str, - handler_fn: Callable[..., TopicEventResponse], metadata: Optional[dict] = None, - dead_letter_topic: Optional[str] = None, ) -> Callable[[], Awaitable[None]]: + async def subscribe_with_handler( + self, + pubsub_name: str, + topic: str, + handler_fn: Callable[..., TopicEventResponse], + metadata: Optional[dict] = None, + dead_letter_topic: Optional[str] = None, + ) -> Callable[[], Awaitable[None]]: """ Subscribe to a topic with a bidirectional stream and a message handler function diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index 7dc10f9a..84542bb4 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -7,8 +7,8 @@ from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage from dapr.proto import api_v1, appcallback_v1 -class Subscription: +class Subscription: def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None): self._stub = stub self._pubsub_name = pubsub_name @@ -63,9 +63,11 @@ async def next_message(self): return SubscriptionMessage(message.event_message) except AioRpcError as e: if e.code() == StatusCode.UNAVAILABLE: - print(f'gRPC error while reading from stream: {e.details()}, ' - f'Status Code: {e.code()}. ' - f'Attempting to reconnect...') + print( + f'gRPC error while reading from stream: {e.details()}, ' + f'Status Code: {e.code()}. ' + f'Attempting to reconnect...' + ) await self.reconnect_stream() elif e.code() != StatusCode.CANCELLED: raise Exception(f'gRPC error while reading from subscription stream: {e} ') @@ -105,4 +107,4 @@ async def close(self): if e.code() != StatusCode.CANCELLED: raise Exception(f'Error while closing stream: {e}') except Exception as e: - raise Exception(f'Error while closing stream: {e}') \ No newline at end of file + raise Exception(f'Error while closing stream: {e}') diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index 8b99b34f..3374a121 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -9,9 +9,7 @@ from typing import Optional - class Subscription: - def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None): self._stub = stub self._pubsub_name = pubsub_name diff --git a/dapr/common/pubsub/subscription.py b/dapr/common/pubsub/subscription.py index ac8db973..0f96ab6b 100644 --- a/dapr/common/pubsub/subscription.py +++ b/dapr/common/pubsub/subscription.py @@ -3,6 +3,7 @@ from dapr.proto.runtime.v1.appcallback_pb2 import TopicEventRequest from typing import Optional, Union + class SubscriptionMessage: def __init__(self, msg: TopicEventRequest): self._id: str = msg.id @@ -86,7 +87,8 @@ def _parse_data_content(self): class StreamInactiveError(Exception): pass + class PubSubEventStatus: SUCCESS = 'success' RETRY = 'retry' - DROP = 'drop' \ No newline at end of file + DROP = 'drop' diff --git a/examples/pubsub-streaming/async-subscriber-handler.py b/examples/pubsub-streaming/async-subscriber-handler.py index 75aca19e..e5f68953 100644 --- a/examples/pubsub-streaming/async-subscriber-handler.py +++ b/examples/pubsub-streaming/async-subscriber-handler.py @@ -32,7 +32,7 @@ async def main(): # Wait until 5 messages are processed global counter while counter < 5: - print("Counter: ", counter) + print('Counter: ', counter) await asyncio.sleep(1) print('Closing subscription...') diff --git a/examples/pubsub-streaming/async-subscriber.py b/examples/pubsub-streaming/async-subscriber.py index 396b3cc2..0f7da59b 100644 --- a/examples/pubsub-streaming/async-subscriber.py +++ b/examples/pubsub-streaming/async-subscriber.py @@ -49,6 +49,5 @@ async def main(): await subscription.close() - if __name__ == '__main__': asyncio.run(main()) diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index 6c46d5ec..d3eab236 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -28,7 +28,7 @@ from dapr.clients.exceptions import DaprGrpcError from dapr.clients.grpc.client import DaprGrpcClient from dapr.clients import DaprClient -from dapr.clients.grpc.subscription import StreamInactiveError, Subscription +from dapr.clients.grpc.subscription import StreamInactiveError from dapr.proto import common_v1 from .fake_dapr_server import FakeDaprSidecar from dapr.conf import settings @@ -36,9 +36,14 @@ from dapr.clients.grpc._request import TransactionalStateOperation from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions -from dapr.clients.grpc._response import (ConfigurationItem, ConfigurationResponse, - ConfigurationWatcher, UnlockResponseStatus, - WorkflowRuntimeStatus, TopicEventResponse, ) +from dapr.clients.grpc._response import ( + ConfigurationItem, + ConfigurationResponse, + ConfigurationWatcher, + UnlockResponseStatus, + WorkflowRuntimeStatus, + TopicEventResponse, +) class DaprGrpcClientTests(unittest.TestCase): @@ -372,7 +377,7 @@ def handler(message): counter += 1 - return TopicEventResponse("success") + return TopicEventResponse('success') close_fn = dapr.subscribe_with_handler( pubsub_name='pubsub', topic='example', handler_fn=handler diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index e8f8af3c..42bbd830 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -12,13 +12,11 @@ See the License for the specific language governing permissions and limitations under the License. """ -import asyncio import json import socket import tempfile import unittest import uuid -import time from unittest.mock import patch from google.rpc import status_pb2, code_pb2 @@ -34,9 +32,12 @@ from dapr.clients.grpc._request import TransactionalStateOperation from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions -from dapr.clients.grpc._response import (ConfigurationItem, ConfigurationWatcher, - ConfigurationResponse, UnlockResponseStatus, - TopicEventResponse, ) +from dapr.clients.grpc._response import ( + ConfigurationItem, + ConfigurationWatcher, + ConfigurationResponse, + UnlockResponseStatus, +) class DaprGrpcClientAsyncTests(unittest.IsolatedAsyncioTestCase): @@ -382,7 +383,6 @@ async def test_subscribe_topic_early_close(self): # await asyncio.sleep(0.1) # sleep to prevent a busy loop # await close_fn() - @patch.object(settings, 'DAPR_API_TOKEN', 'test-token') async def test_dapr_api_token_insertion(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')