diff --git a/dapr/clients/http/client.py b/dapr/clients/http/client.py index 627fddae..e6240683 100644 --- a/dapr/clients/http/client.py +++ b/dapr/clients/http/client.py @@ -24,7 +24,6 @@ DAPR_USER_AGENT, CONTENT_TYPE_HEADER, ) -from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy if TYPE_CHECKING: @@ -105,33 +104,43 @@ async def retry_call(self, session, req): # If max_retries is 0, we don't retry if self.retry_policy.max_attempts == 0: return await session.request( - method=req["method"], - url=req["url"], - data=req["data"], - headers=req["headers"], - ssl=req["sslcontext"], - params=req["params"], + method=req['method'], + url=req['url'], + data=req['data'], + headers=req['headers'], + ssl=req['sslcontext'], + params=req['params'], ) attempt = 0 while self.retry_policy.max_attempts == -1 or attempt < self.retry_policy.max_attempts: # type: ignore - print(f'Trying RPC call, attempt {attempt + 1}') - r = await session.request(method=req["method"], url=req["url"], data=req["data"], - headers=req["headers"], ssl=req["sslcontext"], params=req["params"], ) + print(f'Request attempt {attempt + 1}') + r = await session.request( + method=req['method'], + url=req['url'], + data=req['data'], + headers=req['headers'], + ssl=req['sslcontext'], + params=req['params'], + ) - if r.status not in self.retry_policy.retryable_http_status_codes: - return r + if r.status not in self.retry_policy.retryable_http_status_codes: + return r - if self.retry_policy.max_attempts != -1 and attempt == self.retry_policy.max_attempts - 1: # type: ignore - return r + if ( + self.retry_policy.max_attempts != -1 + and attempt == self.retry_policy.max_attempts - 1 # type: ignore + ): # type: ignore + return r - sleep_time = min(self.retry_policy.max_backoff, - self.retry_policy.initial_backoff * (self.retry_policy.backoff_multiplier ** attempt), ) + sleep_time = min( + self.retry_policy.max_backoff, + self.retry_policy.initial_backoff * (self.retry_policy.backoff_multiplier**attempt), + ) - print(f'Sleeping for {sleep_time} seconds before retrying RPC call') - await asyncio.sleep(sleep_time) - attempt += 1 - raise Exception(f'Request failed after {attempt} retries') + print(f'Sleeping for {sleep_time} seconds before retrying call') + await asyncio.sleep(sleep_time) + attempt += 1 async def convert_to_error(self, response: aiohttp.ClientResponse) -> DaprInternalError: error_info = None diff --git a/dapr/clients/retry.py b/dapr/clients/retry.py index 022eaa76..93ddd8f0 100644 --- a/dapr/clients/retry.py +++ b/dapr/clients/retry.py @@ -45,7 +45,7 @@ def __init__( StatusCode.DEADLINE_EXCEEDED, ], ): - if max_attempts < -1: + if max_attempts < -1: # type: ignore raise ValueError('max_attempts must be greater than or equal to -1') self.max_attempts = max_attempts @@ -62,11 +62,11 @@ def __init__( self.backoff_multiplier = backoff_multiplier if len(retryable_http_status_codes) == 0: - raise ValueError('retryable_http_status_codes can\'t be empty') + raise ValueError("retryable_http_status_codes can't be empty") self.retryable_http_status_codes = retryable_http_status_codes if len(retryable_grpc_status_codes) == 0: - raise ValueError('retryable_http_status_codes can\'t be empty') + raise ValueError("retryable_http_status_codes can't be empty") self.retryable_grpc_status_codes = retryable_grpc_status_codes diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index 3030f64a..0455e642 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -75,7 +75,7 @@ set it in the environment and the client will use it automatically. You can read more about Dapr API token authentication [here](https://docs.dapr.io/operations/security/api-token/). ##### Health timeout -On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outboud`). +On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outbound`). The client will wait for the sidecar to be up and running before proceeding. The default timeout is 60 seconds, but it can be overridden by setting the `DAPR_HEALTH_TIMEOUT` diff --git a/tests/clients/test_retries.py b/tests/clients/test_retries_grpc.py similarity index 72% rename from tests/clients/test_retries.py rename to tests/clients/test_retries_grpc.py index dd017529..23e44201 100644 --- a/tests/clients/test_retries.py +++ b/tests/clients/test_retries_grpc.py @@ -21,53 +21,6 @@ from dapr.clients.retry import RetryPolicy, run_rpc_with_retry -class RetryPolicyTests(unittest.TestCase): - def test_init_success_default(self): - policy = RetryPolicy() - - self.assertEqual(0, policy.max_attempts) - self.assertEqual(1, policy.initial_backoff) - self.assertEqual(20, policy.max_backoff) - self.assertEqual(1.5, policy.backoff_multiplier) - self.assertEqual([408, 429, 500, 502, 503, 504], policy.retryable_http_status_codes) - self.assertEqual([StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED], policy.retryable_grpc_status_codes) - - def test_init_success(self): - policy = RetryPolicy( - max_attempts=3, - initial_backoff=2, - max_backoff=10, - backoff_multiplier=2, - retryable_grpc_status_codes=[StatusCode.UNAVAILABLE], - retryable_http_status_codes=[408, 429] - ) - self.assertEqual(3, policy.max_attempts) - self.assertEqual(2, policy.initial_backoff) - self.assertEqual(10, policy.max_backoff) - self.assertEqual(2, policy.backoff_multiplier) - self.assertEqual([StatusCode.UNAVAILABLE], policy.retryable_grpc_status_codes) - self.assertEqual([408, 429], policy.retryable_http_status_codes) - - def test_init_with_errors(self): - with self.assertRaises(ValueError): - RetryPolicy(max_attempts=-2) - - with self.assertRaises(ValueError): - RetryPolicy(initial_backoff=0) - - with self.assertRaises(ValueError): - RetryPolicy(max_backoff=0) - - with self.assertRaises(ValueError): - RetryPolicy(backoff_multiplier=0) - - with self.assertRaises(ValueError): - RetryPolicy(retryable_http_status_codes=[]) - - with self.assertRaises(ValueError): - RetryPolicy(retryable_grpc_status_codes=[]) - - class RetriesTest(unittest.TestCase): def test_run_rpc_with_retry_success(self): mock_func = Mock(return_value='success') @@ -111,8 +64,9 @@ def test_run_rpc_with_retry_fail_with_another_status_code(self): mock_func = MagicMock(side_effect=mock_error) with self.assertRaises(RpcError): - policy = RetryPolicy(max_attempts=3, - retryable_grpc_status_codes=[StatusCode.UNAVAILABLE]) + policy = RetryPolicy( + max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE] + ) run_rpc_with_retry(policy, mock_func) mock_func.assert_called_once() diff --git a/tests/clients/test_retries_async.py b/tests/clients/test_retries_grpc_async.py similarity index 93% rename from tests/clients/test_retries_async.py rename to tests/clients/test_retries_grpc_async.py index 44dd8cc2..26d638a5 100644 --- a/tests/clients/test_retries_async.py +++ b/tests/clients/test_retries_grpc_async.py @@ -21,7 +21,7 @@ from dapr.clients.retry import RetryPolicy, async_run_rpc_with_retry -class AsyncRetriesTests(unittest.IsolatedAsyncioTestCase): +class RetryPolicyGrpcAsyncTests(unittest.IsolatedAsyncioTestCase): async def test_run_rpc_with_retry_success(self): mock_func = AsyncMock(return_value='success') @@ -40,7 +40,7 @@ async def test_run_rpc_with_retry_no_retry(self): await async_run_rpc_with_retry(RetryPolicy(max_attempts=0), mock_func) mock_func.assert_awaited_once() - @patch('asyncio.sleep', return_value=None) # To speed up tests + @patch('asyncio.sleep', return_value=None) async def test_run_rpc_with_retry_fail(self, mock_sleep): mock_error = RpcError() mock_error.code = MagicMock(return_value=StatusCode.UNAVAILABLE) @@ -64,12 +64,14 @@ async def test_run_rpc_with_retry_fail_with_another_status_code(self): mock_func = AsyncMock(side_effect=mock_error) with self.assertRaises(RpcError): - policy = RetryPolicy(max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE]) + policy = RetryPolicy( + max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE] + ) await async_run_rpc_with_retry(policy, mock_func) mock_func.assert_awaited_once() - @patch('asyncio.sleep', return_value=None) # To speed up tests + @patch('asyncio.sleep', return_value=None) async def test_run_rpc_with_retry_fail_with_max_backoff(self, mock_sleep): mock_error = RpcError() mock_error.code = MagicMock(return_value=StatusCode.UNAVAILABLE) @@ -91,7 +93,7 @@ async def test_run_rpc_with_retry_fail_with_max_backoff(self, mock_sleep): ] mock_sleep.assert_has_calls(expected_sleep_calls, any_order=False) - @patch('asyncio.sleep', return_value=None) # To speed up tests + @patch('asyncio.sleep', return_value=None) async def test_run_rpc_with_infinite_retries(self, mock_sleep): # Testing a function that's supposed to run forever is tricky, so we'll simulate it # Instead of a fixed side effect, we'll create a function that's supposed to diff --git a/tests/clients/test_retries_http.py b/tests/clients/test_retries_http.py new file mode 100644 index 00000000..a8710448 --- /dev/null +++ b/tests/clients/test_retries_http.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest +from unittest import mock +from unittest.mock import patch, MagicMock, AsyncMock + +from dapr.clients.http.client import DaprHttpClient +from dapr.clients.retry import RetryPolicy +from dapr.serializers import DefaultJSONSerializer + + +class RetryPolicyHttpTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + # Setup your test environment and mocks here + self.session = MagicMock() + self.session.request = AsyncMock() + + self.serializer = (DefaultJSONSerializer(),) + self.client = DaprHttpClient(message_serializer=self.serializer) + + # Example request + self.req = { + 'method': 'GET', + 'url': 'http://example.com', + 'data': None, + 'headers': None, + 'sslcontext': None, + 'params': None, + } + + async def test_run_with_success(self): + # Mock the request to succeed on the first try + self.session.request.return_value.status = 200 + + response = await self.client.retry_call(self.session, self.req) + + self.session.request.assert_called_once() + self.assertEqual(200, response.status) + + async def test_success_run_with_no_retry(self): + self.session.request.return_value.status = 200 + + client = DaprHttpClient( + message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=0) + ) + response = await client.retry_call(self.session, self.req) + + self.session.request.assert_called_once() + self.assertEqual(200, response.status) + + async def test_fail_run_with_no_retry(self): + self.session.request.return_value.status = 408 + + client = DaprHttpClient( + message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=0) + ) + response = await client.retry_call(self.session, self.req) + + self.session.request.assert_called_once() + self.assertEqual(408, response.status) + + @patch('asyncio.sleep', return_value=None) + async def test_retry_eventually_succeeds(self, _): + # Mock the request to fail twice then succeed + self.session.request.side_effect = [ + MagicMock(status=500), # First attempt fails + MagicMock(status=502), # Second attempt fails + MagicMock(status=200), # Third attempt succeeds + ] + client = DaprHttpClient( + message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=3) + ) + + response = await client.retry_call(self.session, self.req) + + self.assertEqual(3, self.session.request.call_count) + self.assertEqual(200, response.status) + + @patch('asyncio.sleep', return_value=None) + async def test_retry_eventually_fails(self, _): + self.session.request.return_value.status = 408 + + client = DaprHttpClient( + message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=3) + ) + + response = await client.retry_call(self.session, self.req) + + self.assertEqual(3, self.session.request.call_count) + self.assertEqual(408, response.status) + + @patch('asyncio.sleep', return_value=None) + async def test_retry_fails_with_a_different_code(self, _): + # Mock the request to fail twice then succeed + self.session.request.return_value.status = 501 + + client = DaprHttpClient( + message_serializer=self.serializer, + retry_policy=RetryPolicy(max_attempts=3, retryable_http_status_codes=[500]), + ) + + response = await client.retry_call(self.session, self.req) + + self.session.request.assert_called_once() + self.assertEqual(response.status, 501) + + @patch('asyncio.sleep', return_value=None) + async def test_retries_exhausted(self, _): + # Mock the request to fail three times + self.session.request.return_value = MagicMock(status=500) + + client = DaprHttpClient( + message_serializer=self.serializer, + retry_policy=RetryPolicy(max_attempts=3, retryable_http_status_codes=[500]), + ) + + response = await client.retry_call(self.session, self.req) + + self.assertEqual(3, self.session.request.call_count) + self.assertEqual(500, response.status) + + @patch('asyncio.sleep', return_value=None) + async def test_max_backoff(self, mock_sleep): + self.session.request.return_value.status = 500 + + policy = RetryPolicy(max_attempts=4, initial_backoff=2, backoff_multiplier=2, max_backoff=3) + client = DaprHttpClient(message_serializer=self.serializer, retry_policy=policy) + + response = await client.retry_call(self.session, self.req) + + expected_sleep_calls = [ + mock.call(2.0), # First sleep call + mock.call(3.0), # Second sleep call + mock.call(3.0), # Third sleep call + ] + self.assertEqual(4, self.session.request.call_count) + mock_sleep.assert_has_calls(expected_sleep_calls, any_order=False) + self.assertEqual(500, response.status) + + @patch('asyncio.sleep', return_value=None) + async def test_infinite_retries(self, _): + retry_count = 0 + max_test_retries = 6 # Simulates "indefinite" retries for test purposes + + # Function to simulate request behavior + async def mock_request(*args, **kwargs): + nonlocal retry_count + retry_count += 1 + if retry_count < max_test_retries: + return MagicMock(status=500) # Simulate failure + else: + return MagicMock(status=200) # Simulate success to stop retrying + + self.session.request = mock_request + + policy = RetryPolicy(max_attempts=-1, retryable_http_status_codes=[500]) + client = DaprHttpClient(message_serializer=self.serializer, retry_policy=policy) + + response = await client.retry_call(self.session, self.req) + + # Assert that the retry logic was executed the expected number of times + self.assertEqual(response.status, 200) + self.assertEqual(retry_count, max_test_retries) diff --git a/tests/clients/test_retries_policy.py b/tests/clients/test_retries_policy.py new file mode 100644 index 00000000..166fb17d --- /dev/null +++ b/tests/clients/test_retries_policy.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest + +from grpc import StatusCode + +from dapr.clients.retry import RetryPolicy + + +class RetryPolicyGrpcTests(unittest.TestCase): + def test_init_success_default(self): + policy = RetryPolicy() + + self.assertEqual(0, policy.max_attempts) + self.assertEqual(1, policy.initial_backoff) + self.assertEqual(20, policy.max_backoff) + self.assertEqual(1.5, policy.backoff_multiplier) + self.assertEqual([408, 429, 500, 502, 503, 504], policy.retryable_http_status_codes) + self.assertEqual( + [StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED], + policy.retryable_grpc_status_codes, + ) + + def test_init_success(self): + policy = RetryPolicy( + max_attempts=3, + initial_backoff=2, + max_backoff=10, + backoff_multiplier=2, + retryable_grpc_status_codes=[StatusCode.UNAVAILABLE], + retryable_http_status_codes=[408, 429], + ) + self.assertEqual(3, policy.max_attempts) + self.assertEqual(2, policy.initial_backoff) + self.assertEqual(10, policy.max_backoff) + self.assertEqual(2, policy.backoff_multiplier) + self.assertEqual([StatusCode.UNAVAILABLE], policy.retryable_grpc_status_codes) + self.assertEqual([408, 429], policy.retryable_http_status_codes) + + def test_init_with_errors(self): + with self.assertRaises(ValueError): + RetryPolicy(max_attempts=-2) + + with self.assertRaises(ValueError): + RetryPolicy(initial_backoff=0) + + with self.assertRaises(ValueError): + RetryPolicy(max_backoff=0) + + with self.assertRaises(ValueError): + RetryPolicy(backoff_multiplier=0) + + with self.assertRaises(ValueError): + RetryPolicy(retryable_http_status_codes=[]) + + with self.assertRaises(ValueError): + RetryPolicy(retryable_grpc_status_codes=[])