From 87ccfd396836b6ff1d12b224206b25cc07b759bf Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Sat, 9 Mar 2024 14:32:57 +0100 Subject: [PATCH] handle http 429 in event delivery --- skipper/skipper/dataseries/models/event.py | 26 ++++++++++++++++--- .../dataseries/models/metamodel/consumer.py | 1 + 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/skipper/skipper/dataseries/models/event.py b/skipper/skipper/dataseries/models/event.py index 34417de..7d60dce 100644 --- a/skipper/skipper/dataseries/models/event.py +++ b/skipper/skipper/dataseries/models/event.py @@ -153,6 +153,7 @@ def data_point_event( 'Content-Location', 'Content-Range', 'Content-Type', + 'Retry-After' } MAX_RESPONSE_LENGTH = 1024 TRUSTED_HOSTS_REGEX = [ @@ -295,16 +296,26 @@ def try_send_events( failed = True event.retries = event.retries + 1 event.retries_in_cycle = event.retries_in_cycle + 1 - if event.retries_in_cycle >= consumer.retry_backoff_every: + + new_health = ConsumerHealthState.UNHEALTHY.value + + if _resp is not None and _resp.status_code == 429: + # too many requests + new_health = ConsumerHealthState.RATE_LIMIT.value + # if we dont get the retry value from the response + # we just use our own backoff + event.handle_at = retry_at_from_response(_resp) or timezone.now() + consumer.retry_backoff_delay + elif event.retries_in_cycle >= consumer.retry_backoff_every: event.handle_at = timezone.now() + consumer.retry_backoff_delay + # noinspection PyChainedComparisons if consumer.retry_max > 0 and event.retries > consumer.retry_max: - health = ConsumerHealthState.UNHEALTHY.value + health = new_health if log_errors: logger.exception(f'failed to send message to consumer at {consumer.target}, setting to FAILED...') event.state = ConsumerEventState.FAILED.value else: - health = ConsumerHealthState.UNHEALTHY.value + health = new_health if log_errors: logger.exception(f'failed to send message to consumer at {consumer.target}, setting to RETRY...') event.state = ConsumerEventState.RETRY.value @@ -318,6 +329,15 @@ def try_send_events( return more_events, cnt +def retry_at_from_response(_response: requests.Response) -> Optional[datetime.datetime]: + try: + if 'Retry-After' in _response.headers: + # we only support values in seconds + return timezone.now() + datetime.timedelta(seconds=int(_response.headers['Retry-After'])) + except: + pass + return None + def delete_old_events(consumer: Consumer, log_errors: bool = True) -> None: # delete all events that were successful or failed that originated more than 30 days ago diff --git a/skipper/skipper/dataseries/models/metamodel/consumer.py b/skipper/skipper/dataseries/models/metamodel/consumer.py index bee6063..b1d7bfd 100644 --- a/skipper/skipper/dataseries/models/metamodel/consumer.py +++ b/skipper/skipper/dataseries/models/metamodel/consumer.py @@ -22,6 +22,7 @@ class ConsumerHealthState(Enum): UNKNOWN = 'UNKNOWN' UNHEALTHY = 'UNHEALTHY' HEALTHY = 'HEALTHY' + RATE_LIMIT = 'RATE_LIMIT' @classmethod def choices(cls) -> Tuple[Tuple[str, str], ...]: