Skip to content

Commit

Permalink
tests for http retries
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska committed Mar 2, 2024
1 parent ad13b66 commit 31efcc8
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 78 deletions.
49 changes: 29 additions & 20 deletions dapr/clients/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
DAPR_USER_AGENT,
CONTENT_TYPE_HEADER,
)
from dapr.clients.health import DaprHealth
from dapr.clients.retry import RetryPolicy

if TYPE_CHECKING:
Expand Down Expand Up @@ -105,33 +104,43 @@ async def retry_call(self, session, req):
# If max_retries is 0, we don't retry
if self.retry_policy.max_attempts == 0:
return await session.request(
method=req["method"],
url=req["url"],
data=req["data"],
headers=req["headers"],
ssl=req["sslcontext"],
params=req["params"],
method=req['method'],
url=req['url'],
data=req['data'],
headers=req['headers'],
ssl=req['sslcontext'],
params=req['params'],
)

attempt = 0
while self.retry_policy.max_attempts == -1 or attempt < self.retry_policy.max_attempts: # type: ignore
print(f'Trying RPC call, attempt {attempt + 1}')
r = await session.request(method=req["method"], url=req["url"], data=req["data"],
headers=req["headers"], ssl=req["sslcontext"], params=req["params"], )
print(f'Request attempt {attempt + 1}')
r = await session.request(
method=req['method'],
url=req['url'],
data=req['data'],
headers=req['headers'],
ssl=req['sslcontext'],
params=req['params'],
)

if r.status not in self.retry_policy.retryable_http_status_codes:
return r
if r.status not in self.retry_policy.retryable_http_status_codes:
return r

if self.retry_policy.max_attempts != -1 and attempt == self.retry_policy.max_attempts - 1: # type: ignore
return r
if (
self.retry_policy.max_attempts != -1
and attempt == self.retry_policy.max_attempts - 1 # type: ignore
): # type: ignore
return r

sleep_time = min(self.retry_policy.max_backoff,
self.retry_policy.initial_backoff * (self.retry_policy.backoff_multiplier ** attempt), )
sleep_time = min(
self.retry_policy.max_backoff,
self.retry_policy.initial_backoff * (self.retry_policy.backoff_multiplier**attempt),
)

print(f'Sleeping for {sleep_time} seconds before retrying RPC call')
await asyncio.sleep(sleep_time)
attempt += 1
raise Exception(f'Request failed after {attempt} retries')
print(f'Sleeping for {sleep_time} seconds before retrying call')
await asyncio.sleep(sleep_time)
attempt += 1

async def convert_to_error(self, response: aiohttp.ClientResponse) -> DaprInternalError:
error_info = None
Expand Down
6 changes: 3 additions & 3 deletions dapr/clients/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
StatusCode.DEADLINE_EXCEEDED,
],
):
if max_attempts < -1:
if max_attempts < -1: # type: ignore
raise ValueError('max_attempts must be greater than or equal to -1')
self.max_attempts = max_attempts

Expand All @@ -62,11 +62,11 @@ def __init__(
self.backoff_multiplier = backoff_multiplier

if len(retryable_http_status_codes) == 0:
raise ValueError('retryable_http_status_codes can\'t be empty')
raise ValueError("retryable_http_status_codes can't be empty")
self.retryable_http_status_codes = retryable_http_status_codes

if len(retryable_grpc_status_codes) == 0:
raise ValueError('retryable_http_status_codes can\'t be empty')
raise ValueError("retryable_http_status_codes can't be empty")
self.retryable_grpc_status_codes = retryable_grpc_status_codes


Expand Down
2 changes: 1 addition & 1 deletion daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ set it in the environment and the client will use it automatically.
You can read more about Dapr API token authentication [here](https://docs.dapr.io/operations/security/api-token/).

##### Health timeout
On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outboud`).
On client initialisation, a health check is performed against the Dapr sidecar (`/healthz/outbound`).
The client will wait for the sidecar to be up and running before proceeding.

The default timeout is 60 seconds, but it can be overridden by setting the `DAPR_HEALTH_TIMEOUT`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,6 @@
from dapr.clients.retry import RetryPolicy, run_rpc_with_retry


class RetryPolicyTests(unittest.TestCase):
def test_init_success_default(self):
policy = RetryPolicy()

self.assertEqual(0, policy.max_attempts)
self.assertEqual(1, policy.initial_backoff)
self.assertEqual(20, policy.max_backoff)
self.assertEqual(1.5, policy.backoff_multiplier)
self.assertEqual([408, 429, 500, 502, 503, 504], policy.retryable_http_status_codes)
self.assertEqual([StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED], policy.retryable_grpc_status_codes)

def test_init_success(self):
policy = RetryPolicy(
max_attempts=3,
initial_backoff=2,
max_backoff=10,
backoff_multiplier=2,
retryable_grpc_status_codes=[StatusCode.UNAVAILABLE],
retryable_http_status_codes=[408, 429]
)
self.assertEqual(3, policy.max_attempts)
self.assertEqual(2, policy.initial_backoff)
self.assertEqual(10, policy.max_backoff)
self.assertEqual(2, policy.backoff_multiplier)
self.assertEqual([StatusCode.UNAVAILABLE], policy.retryable_grpc_status_codes)
self.assertEqual([408, 429], policy.retryable_http_status_codes)

def test_init_with_errors(self):
with self.assertRaises(ValueError):
RetryPolicy(max_attempts=-2)

with self.assertRaises(ValueError):
RetryPolicy(initial_backoff=0)

with self.assertRaises(ValueError):
RetryPolicy(max_backoff=0)

with self.assertRaises(ValueError):
RetryPolicy(backoff_multiplier=0)

with self.assertRaises(ValueError):
RetryPolicy(retryable_http_status_codes=[])

with self.assertRaises(ValueError):
RetryPolicy(retryable_grpc_status_codes=[])


class RetriesTest(unittest.TestCase):
def test_run_rpc_with_retry_success(self):
mock_func = Mock(return_value='success')
Expand Down Expand Up @@ -111,8 +64,9 @@ def test_run_rpc_with_retry_fail_with_another_status_code(self):
mock_func = MagicMock(side_effect=mock_error)

with self.assertRaises(RpcError):
policy = RetryPolicy(max_attempts=3,
retryable_grpc_status_codes=[StatusCode.UNAVAILABLE])
policy = RetryPolicy(
max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE]
)
run_rpc_with_retry(policy, mock_func)

mock_func.assert_called_once()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dapr.clients.retry import RetryPolicy, async_run_rpc_with_retry


class AsyncRetriesTests(unittest.IsolatedAsyncioTestCase):
class RetryPolicyGrpcAsyncTests(unittest.IsolatedAsyncioTestCase):
async def test_run_rpc_with_retry_success(self):
mock_func = AsyncMock(return_value='success')

Expand All @@ -40,7 +40,7 @@ async def test_run_rpc_with_retry_no_retry(self):
await async_run_rpc_with_retry(RetryPolicy(max_attempts=0), mock_func)
mock_func.assert_awaited_once()

@patch('asyncio.sleep', return_value=None) # To speed up tests
@patch('asyncio.sleep', return_value=None)
async def test_run_rpc_with_retry_fail(self, mock_sleep):
mock_error = RpcError()
mock_error.code = MagicMock(return_value=StatusCode.UNAVAILABLE)
Expand All @@ -64,12 +64,14 @@ async def test_run_rpc_with_retry_fail_with_another_status_code(self):
mock_func = AsyncMock(side_effect=mock_error)

with self.assertRaises(RpcError):
policy = RetryPolicy(max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE])
policy = RetryPolicy(
max_attempts=3, retryable_grpc_status_codes=[StatusCode.UNAVAILABLE]
)
await async_run_rpc_with_retry(policy, mock_func)

mock_func.assert_awaited_once()

@patch('asyncio.sleep', return_value=None) # To speed up tests
@patch('asyncio.sleep', return_value=None)
async def test_run_rpc_with_retry_fail_with_max_backoff(self, mock_sleep):
mock_error = RpcError()
mock_error.code = MagicMock(return_value=StatusCode.UNAVAILABLE)
Expand All @@ -91,7 +93,7 @@ async def test_run_rpc_with_retry_fail_with_max_backoff(self, mock_sleep):
]
mock_sleep.assert_has_calls(expected_sleep_calls, any_order=False)

@patch('asyncio.sleep', return_value=None) # To speed up tests
@patch('asyncio.sleep', return_value=None)
async def test_run_rpc_with_infinite_retries(self, mock_sleep):
# Testing a function that's supposed to run forever is tricky, so we'll simulate it
# Instead of a fixed side effect, we'll create a function that's supposed to
Expand Down
175 changes: 175 additions & 0 deletions tests/clients/test_retries_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-

"""
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
from unittest import mock
from unittest.mock import patch, MagicMock, AsyncMock

from dapr.clients.http.client import DaprHttpClient
from dapr.clients.retry import RetryPolicy
from dapr.serializers import DefaultJSONSerializer


class RetryPolicyHttpTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
# Setup your test environment and mocks here
self.session = MagicMock()
self.session.request = AsyncMock()

self.serializer = (DefaultJSONSerializer(),)
self.client = DaprHttpClient(message_serializer=self.serializer)

# Example request
self.req = {
'method': 'GET',
'url': 'http://example.com',
'data': None,
'headers': None,
'sslcontext': None,
'params': None,
}

async def test_run_with_success(self):
# Mock the request to succeed on the first try
self.session.request.return_value.status = 200

response = await self.client.retry_call(self.session, self.req)

self.session.request.assert_called_once()
self.assertEqual(200, response.status)

async def test_success_run_with_no_retry(self):
self.session.request.return_value.status = 200

client = DaprHttpClient(
message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=0)
)
response = await client.retry_call(self.session, self.req)

self.session.request.assert_called_once()
self.assertEqual(200, response.status)

async def test_fail_run_with_no_retry(self):
self.session.request.return_value.status = 408

client = DaprHttpClient(
message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=0)
)
response = await client.retry_call(self.session, self.req)

self.session.request.assert_called_once()
self.assertEqual(408, response.status)

@patch('asyncio.sleep', return_value=None)
async def test_retry_eventually_succeeds(self, _):
# Mock the request to fail twice then succeed
self.session.request.side_effect = [
MagicMock(status=500), # First attempt fails
MagicMock(status=502), # Second attempt fails
MagicMock(status=200), # Third attempt succeeds
]
client = DaprHttpClient(
message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=3)
)

response = await client.retry_call(self.session, self.req)

self.assertEqual(3, self.session.request.call_count)
self.assertEqual(200, response.status)

@patch('asyncio.sleep', return_value=None)
async def test_retry_eventually_fails(self, _):
self.session.request.return_value.status = 408

client = DaprHttpClient(
message_serializer=self.serializer, retry_policy=RetryPolicy(max_attempts=3)
)

response = await client.retry_call(self.session, self.req)

self.assertEqual(3, self.session.request.call_count)
self.assertEqual(408, response.status)

@patch('asyncio.sleep', return_value=None)
async def test_retry_fails_with_a_different_code(self, _):
# Mock the request to fail twice then succeed
self.session.request.return_value.status = 501

client = DaprHttpClient(
message_serializer=self.serializer,
retry_policy=RetryPolicy(max_attempts=3, retryable_http_status_codes=[500]),
)

response = await client.retry_call(self.session, self.req)

self.session.request.assert_called_once()
self.assertEqual(response.status, 501)

@patch('asyncio.sleep', return_value=None)
async def test_retries_exhausted(self, _):
# Mock the request to fail three times
self.session.request.return_value = MagicMock(status=500)

client = DaprHttpClient(
message_serializer=self.serializer,
retry_policy=RetryPolicy(max_attempts=3, retryable_http_status_codes=[500]),
)

response = await client.retry_call(self.session, self.req)

self.assertEqual(3, self.session.request.call_count)
self.assertEqual(500, response.status)

@patch('asyncio.sleep', return_value=None)
async def test_max_backoff(self, mock_sleep):
self.session.request.return_value.status = 500

policy = RetryPolicy(max_attempts=4, initial_backoff=2, backoff_multiplier=2, max_backoff=3)
client = DaprHttpClient(message_serializer=self.serializer, retry_policy=policy)

response = await client.retry_call(self.session, self.req)

expected_sleep_calls = [
mock.call(2.0), # First sleep call
mock.call(3.0), # Second sleep call
mock.call(3.0), # Third sleep call
]
self.assertEqual(4, self.session.request.call_count)
mock_sleep.assert_has_calls(expected_sleep_calls, any_order=False)
self.assertEqual(500, response.status)

@patch('asyncio.sleep', return_value=None)
async def test_infinite_retries(self, _):
retry_count = 0
max_test_retries = 6 # Simulates "indefinite" retries for test purposes

# Function to simulate request behavior
async def mock_request(*args, **kwargs):
nonlocal retry_count
retry_count += 1
if retry_count < max_test_retries:
return MagicMock(status=500) # Simulate failure
else:
return MagicMock(status=200) # Simulate success to stop retrying

self.session.request = mock_request

policy = RetryPolicy(max_attempts=-1, retryable_http_status_codes=[500])
client = DaprHttpClient(message_serializer=self.serializer, retry_policy=policy)

response = await client.retry_call(self.session, self.req)

# Assert that the retry logic was executed the expected number of times
self.assertEqual(response.status, 200)
self.assertEqual(retry_count, max_test_retries)
Loading

0 comments on commit 31efcc8

Please sign in to comment.