-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dynamodb retry config for throttling and other errors. Add exponential backoff and jitter for unprocessed keys. Fix edge case where we succesfully process keys on our last attempt but still fail #1023
base: master
Are you sure you want to change the base?
Conversation
KaspariK
commented
Jan 15, 2025
•
edited
Loading
edited
- Use Boto3 retries (see Retries - Boto3)
- Backoff on getting unprocessed keys
…ntial backoff and jitter for unprocessed keys. Fix edge case where we succesfully process keys on our last attempt but still fail
b361235
to
3e74d75
Compare
log.warning( | ||
f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." | ||
) | ||
time.sleep(delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What to do about this lil guy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!8ball we should use a restore thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, we should probably try to figure out a non-blocking way to do this or have this run in a separate thread - if we get to the worst case of 5 attempts and this is running on the reactor thread, we'll essentially block all of tron from doing anything for 20s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although, actually - this is probably fine since we do all sorts of blocking stuff in restore and aren't expecting tron to be usable/do anything until we've restored everything
...so maybe this is fine?
@@ -294,7 +296,8 @@ def test_delete_item_with_json_partitions(self, store, small_object, large_objec | |||
vals = store.restore([key]) | |||
assert key not in vals | |||
|
|||
def test_retry_saving(self, store, small_object, large_object): | |||
@mock.patch("time.sleep", return_value=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my personal preference is usually to use the context manager way of mocking since that gives a little more control over where a mock is active, but not a blocker :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, not what you commented on at all, but upon closer inspection this test isn't really testing much. I'll rewrite this
log.warning( | ||
f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." | ||
) | ||
time.sleep(delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, we should probably try to figure out a non-blocking way to do this or have this run in a separate thread - if we get to the worst case of 5 attempts and this is running on the reactor thread, we'll essentially block all of tron from doing anything for 20s
log.warning( | ||
f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." | ||
) | ||
time.sleep(delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although, actually - this is probably fine since we do all sorts of blocking stuff in restore and aren't expecting tron to be usable/do anything until we've restored everything
...so maybe this is fine?
# We also need to verify that sleep was called with expected delays | ||
expected_delays = [] | ||
base_delay_seconds = 0.5 | ||
max_delay_seconds = 10 | ||
for attempt in range(1, MAX_UNPROCESSED_KEYS_RETRIES + 1): | ||
expected_delay = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) | ||
expected_delays.append(expected_delay) | ||
actual_delays = [call.args[0] for call in mock_sleep.call_args_list] | ||
assert_equal(actual_delays, expected_delays) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd maybe extract the exponential backoff logic in tron/serialize/runstate/dynamodb_state_store.py
to a function so that we can write a more targeted test for that and simplify this to checking if we called that function the right amount of times
(mostly 'cause I generally try to avoid for loops/calculations inside tests :p)
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{failed_keys}\n from dynamodb\n{resp.result()}" | ||
) | ||
raise error | ||
result = resp.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should also print the response when we get into the exception block to also have an idea on why we got unprocessed keys and why we exceeded the attempts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so maybe we add it here
except Exception as e:
log.exception("Encountered issues retrieving data from DynamoDB")
raise e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hesitant to dump the response because it can get pretty large. After a lot of reading I've landed on logging ResponseMetadata on ClientError. This should capture what we care about
See https://fluffy.yelpcorp.com/i/qWG1tRPrFt40M6pPr3lLkXnCSbJJBFhd.html
… for UnprocessedKeys retry
…tions and requeue of failed items. Remove backoff from test_retry_reading
…lculate_backoff_delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally looks good to me - i left some minor suggestions, but i don't think any of them are blocking atm
assert_equal(mock_failed_read.call_count, 11) | ||
) as mock_batch_get_item, mock.patch("time.sleep") as mock_sleep, pytest.raises(Exception) as exec_info: | ||
store.restore(keys) | ||
assert "failed to retrieve items with keys" in str(exec_info.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could probably remove this assert if we raised a less generic exception and rely on the pytest.raises(SomeMoreTargetedException)
proving that the right exception was raised
@@ -83,12 +102,19 @@ def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: | |||
cand_keys_chunks.append(keys[i : min(len(keys), i + 100)]) | |||
return cand_keys_chunks | |||
|
|||
def _calculate_backoff_delay(self, attempt: int) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this technically doesn't need to be in the class since we're not accessing anything in it (i.e., we never use self
)
def _calculate_backoff_delay(self, attempt: int) -> int: | ||
base_delay_seconds = 1 | ||
max_delay_seconds = 10 | ||
delay: int = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be a little defensive and set attempt
to 1 if attempt is <= 0?
(to protect against someone passing in a 0th (i.e., first) attempt value if they pass in a pre-increment attempt value)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess the attempt=0 case would really just lead to a .5s jitter...which is fine?
error = Exception( | ||
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." | ||
) | ||
log.error(error) | ||
raise error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd maybe do something like
error = Exception( | |
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." | |
) | |
log.error(error) | |
raise error | |
msg = f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." | |
log.error(msg) | |
raise KeyError(msg) |
as it looks a little funky to log an Exception object, and a KeyError would give more info :)
(or we could have a custom exception defined for this)
name="tron.dynamodb.setitem", | ||
delta=time.time() - start, | ||
) | ||
log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know this is old code being moved around, so you can leave this as-is but
log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") | |
log.exception(f"Failed to save partition for key: {key}") |
would include the full traceback for us automatically :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although: it looks like there's a behavior change here?
might be worth adding a comment here (or in the docstring) that this function will not retry on its own and that it's the callers responsibility to do so)
raise | ||
if cand_keys_list: | ||
attempts += 1 | ||
delay = self._calculate_backoff_delay(attempts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, I think it's probably fine to rely on the built-in backoff from boto - there shouldn't be anything else touching these dynamo tables other than tron, so we don't really need any jitter to avoid a thundering herd scenario :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two levels of backoff, basically. There is the built-in retry config that catches something like throttling, and then there is our own backoff based on unprocessedkeys. This seems to be the suggested approach based on the warning in: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html
It's not 100% clear to me that the retry config will handle unprocessedkeys