Skip to content

Commit

Permalink
Merge pull request #105 from neuroforgede/wip/handle-429#main
Browse files Browse the repository at this point in the history
handle http 429 in event delivery
  • Loading branch information
s4ke committed Mar 9, 2024
2 parents deb659e + 87ccfd3 commit b871c24
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
26 changes: 23 additions & 3 deletions skipper/skipper/dataseries/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def data_point_event(
'Content-Location',
'Content-Range',
'Content-Type',
'Retry-After'
}
MAX_RESPONSE_LENGTH = 1024
TRUSTED_HOSTS_REGEX = [
Expand Down Expand Up @@ -295,16 +296,26 @@ def try_send_events(
failed = True
event.retries = event.retries + 1
event.retries_in_cycle = event.retries_in_cycle + 1
if event.retries_in_cycle >= consumer.retry_backoff_every:

new_health = ConsumerHealthState.UNHEALTHY.value

if _resp is not None and _resp.status_code == 429:
# too many requests
new_health = ConsumerHealthState.RATE_LIMIT.value
# if we dont get the retry value from the response
# we just use our own backoff
event.handle_at = retry_at_from_response(_resp) or timezone.now() + consumer.retry_backoff_delay
elif event.retries_in_cycle >= consumer.retry_backoff_every:
event.handle_at = timezone.now() + consumer.retry_backoff_delay

# noinspection PyChainedComparisons
if consumer.retry_max > 0 and event.retries > consumer.retry_max:
health = ConsumerHealthState.UNHEALTHY.value
health = new_health
if log_errors:
logger.exception(f'failed to send message to consumer at {consumer.target}, setting to FAILED...')
event.state = ConsumerEventState.FAILED.value
else:
health = ConsumerHealthState.UNHEALTHY.value
health = new_health
if log_errors:
logger.exception(f'failed to send message to consumer at {consumer.target}, setting to RETRY...')
event.state = ConsumerEventState.RETRY.value
Expand All @@ -318,6 +329,15 @@ def try_send_events(

return more_events, cnt

def retry_at_from_response(_response: requests.Response) -> Optional[datetime.datetime]:
try:
if 'Retry-After' in _response.headers:
# we only support values in seconds
return timezone.now() + datetime.timedelta(seconds=int(_response.headers['Retry-After']))
except:
pass
return None


def delete_old_events(consumer: Consumer, log_errors: bool = True) -> None:
# delete all events that were successful or failed that originated more than 30 days ago
Expand Down
1 change: 1 addition & 0 deletions skipper/skipper/dataseries/models/metamodel/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ConsumerHealthState(Enum):
UNKNOWN = 'UNKNOWN'
UNHEALTHY = 'UNHEALTHY'
HEALTHY = 'HEALTHY'
RATE_LIMIT = 'RATE_LIMIT'

@classmethod
def choices(cls) -> Tuple[Tuple[str, str], ...]:
Expand Down

0 comments on commit b871c24

Please sign in to comment.