-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Integration][GCP] Improved quota handling implementation and performance #1362
base: main
Are you sure you want to change the base?
[Integration][GCP] Improved quota handling implementation and performance #1362
Conversation
@@ -12,7 +11,7 @@ | |||
import asyncio | |||
|
|||
|
|||
_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 60.0 | |||
_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 61.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already utilize 80%, Is there a need to increase the time ?, and why 61 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just based totally off observation; I do not have any documentation to back me up but when I run with the 60s time limit, I hit the rate limit. With the extra second, I do not hit a 429 at all, even with larger batches of requests. I tried removing it, and I hit the limit again.
|
||
async def persistent_rate_limiter( | ||
self, container_id: str | ||
) -> PersistentAsyncLimiter | AsyncLimiter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) -> PersistentAsyncLimiter | AsyncLimiter: | |
) -> PersistentAsyncLimiter: |
"fetches the rate limiter for the given container" | ||
async def _get_limiter( | ||
self, container_id: str, persistent: bool = False | ||
) -> AsyncLimiter | PersistentAsyncLimiter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) -> AsyncLimiter | PersistentAsyncLimiter: | |
) -> AsyncLimiter: |
return await self._get_limiter(container_id, persistent=True) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cast to PersistentAsyncLimiter
integrations/gcp/main.py
Outdated
@@ -38,7 +40,8 @@ | |||
resolve_request_controllers, | |||
) | |||
|
|||
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter | |||
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter | AsyncLimiter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets be explicit, PROJECT_V3_GET_REQUESTS_RATE_LIMITER should never be AsyncLimiter
integrations/gcp/main.py
Outdated
@@ -38,7 +40,8 @@ | |||
resolve_request_controllers, | |||
) | |||
|
|||
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter | |||
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter | AsyncLimiter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter | AsyncLimiter | |
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left comments
integrations/gcp/main.py
Outdated
logger.debug( | ||
"Background processing threshold reached. Closing incoming real-time event" | ||
) | ||
return Response(status_code=http.HTTPStatus.SERVICE_UNAVAILABLE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt we return a 4XX status code and not 5XX? I fear the this might cause the feed flow to stop if we send 5XXs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oiadebayo 429 would do ? can you test that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I experimented with timeout (408) but I will check the 429 too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left another comment
Co-authored-by: Matan <51418643+matan84@users.noreply.github.com>
""" | ||
|
||
_global_event_loop: Optional[asyncio.AbstractEventLoop] = None | ||
_limiter_instances: dict[tuple[float, float], "PersistentAsyncLimiter"] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to maintain instances based on max rate and time period ?
key: tuple[float, float] = (max_rate, time_period) | ||
if key not in cls._limiter_instances: | ||
logger.info( | ||
f"Creating new persistent limiter for {max_rate} requests per {time_period} sec" | ||
) | ||
cls._limiter_instances[key] = cls(max_rate, time_period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we maintaining multiple instances of the limiter in a single class instantiation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, maintaining multiple instances isn't necessary here. I'll refactor the code to use a single global PersistentAsyncLimiter instance instead.
Description
What - This PR attempts to deal with the memory spike observed in version 0.1.98
Why - Queuing tasks in the background while rate limiter suspends the processor leads to a spike in memory usage
How - Improved the performance by setting a threshold for the number of background tasks in the queue so new requests timeout when the queue is bigger than the threshold. The retry mechanism in pub/sub will retry the request.
Type of change
Please leave one option from the following and delete the rest:
All tests should be run against the port production environment(using a testing org).
Core testing checklist
Integration testing checklist
examples
folder in the integration directory.Preflight checklist
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.