Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add request count with eviction for not retriable requests to fix memory problem #92

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
90df66b
attempt to clean response and session
maxi297 Nov 22, 2024
d630a9b
Log response if failure
maxi297 Nov 22, 2024
2a0219e
Remove HttpClient._request_attempt_count that was used for backoff st…
maxi297 Nov 25, 2024
af77521
source-klaviyo: make expiration dictionary
aldogonzalez8 Nov 26, 2024
ce100c0
source-klaviyo: remove comments
aldogonzalez8 Nov 26, 2024
3aa4999
source-klaviyo: add more testing
aldogonzalez8 Nov 27, 2024
38b4985
source-klaviyo: add more testing
aldogonzalez8 Nov 27, 2024
a284303
source-klaviyo: add more testing
aldogonzalez8 Nov 27, 2024
e56c83c
source-klaviyo: add test dependencies
aldogonzalez8 Nov 27, 2024
5a4a623
source-klaviyo: update time for request if is retried
aldogonzalez8 Nov 27, 2024
0c7bc2d
source-klaviyo: evict request for path when retry on is not needed
aldogonzalez8 Nov 27, 2024
6e19df5
airbyte-cdk: use simple dictionary to store and evist request when no…
aldogonzalez8 Nov 27, 2024
6162198
airbyte-cdk: fix for ruff check results
aldogonzalez8 Nov 27, 2024
0d40f92
airbyte-cdk: fix mypy check results
aldogonzalez8 Nov 27, 2024
466adde
airbyte-cdk: add comments for evict method
aldogonzalez8 Nov 27, 2024
34ffadf
airbyte-cdk: remove close logic
aldogonzalez8 Nov 27, 2024
765c5ea
airbyte-cdk: fix problem with mypy
aldogonzalez8 Nov 28, 2024
b241766
Merge branch 'main' into aldogonzalez8/airbyte-cdk/add_request_count_…
aldogonzalez8 Nov 28, 2024
a4fcd16
airbyte-cdk: merge from main
aldogonzalez8 Nov 29, 2024
ffe1093
airbyte-cdk: fix ruff yelling
aldogonzalez8 Nov 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __str__(self) -> str:
class HttpClient:
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10
_ACTIONS_TO_RETRY_ON = {ResponseAction.RETRY, ResponseAction.RATE_LIMITED}

def __init__(
self,
Expand Down Expand Up @@ -359,6 +360,17 @@ def _get_response_body(self, response: requests.Response) -> Optional[JsonType]:
except Exception:
return "The Content of the Response couldn't be decoded."

def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
"""
Addresses high memory consumption when enabling concurrency in https://github.com/airbytehq/oncall/issues/6821.

The `_request_attempt_count` attribute keeps growing as multiple requests are made using the same `http_client`.
To mitigate this issue, we evict keys for completed requests once we confirm that no further retries are needed.
This helps manage memory usage more efficiently while maintaining the necessary logic for retry attempts.
"""
if prepared_request in self._request_attempt_count:
del self._request_attempt_count[prepared_request]

aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
def _handle_error_resolution(
self,
response: Optional[requests.Response],
Expand All @@ -367,6 +379,9 @@ def _handle_error_resolution(
error_resolution: ErrorResolution,
exit_on_rate_limit: Optional[bool] = False,
) -> None:
if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON:
self._evict_key(request)

# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
Expand Down
Loading
Loading