-
Notifications
You must be signed in to change notification settings - Fork 104
[Integration][GCP] Improved quota handling implementation and performance #1362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
6fb3e55
76c3c24
a55cade
86b79e8
c799d94
14d4e7e
4e1fc9a
1e927e6
6f9dd72
a719808
8b80580
2c923ef
1317643
5100a24
490649a
2366490
ce27525
da28647
4d91e26
97246ac
c7a0c41
f53cc12
289f319
6402f82
74053a9
5648815
371c173
9748196
ebeb537
0b2a537
dac8ef3
d9db2b4
db6d92b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Any, Optional, TYPE_CHECKING, final | ||
|
||
from typing import Any, Optional, TYPE_CHECKING, final, Type, cast | ||
from aiolimiter import AsyncLimiter | ||
from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest | ||
from google.api_core.exceptions import GoogleAPICallError | ||
|
@@ -12,7 +11,7 @@ | |
import asyncio | ||
|
||
|
||
_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 60.0 | ||
_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 61.0 | ||
_DEFAULT_RATE_LIMIT_QUOTA: int = int( | ||
ocean.integration_config["search_all_resources_per_minute_quota"] | ||
) | ||
|
@@ -29,6 +28,74 @@ class ContainerType(Enum): | |
ORGANIZATION = "organizations" | ||
|
||
|
||
class PersistentAsyncLimiter(AsyncLimiter): | ||
""" | ||
Persistent AsyncLimiter that remains valid across event loops. | ||
Ensures rate limiting holds across multiple API requests, even when a new event loop is created. | ||
|
||
The AsyncLimiter documentation states that it is not designed to be reused across different event loops. | ||
This class extends AsyncLimiter to ensure that the rate limiter remains attached to a global event loop. | ||
Documentation: https://aiolimiter.readthedocs.io/en/latest/#:~:text=Note,this%20will%20work. | ||
""" | ||
|
||
_global_event_loop: Optional[asyncio.AbstractEventLoop] = None | ||
_limiter_instances: dict[tuple[float, float], "PersistentAsyncLimiter"] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to maintain instances based on max rate and time period ? |
||
|
||
def __init__(self, max_rate: float, time_period: float = 60) -> None: | ||
""" | ||
Initializes a persistent AsyncLimiter with a specified rate and time period. | ||
|
||
:param max_rate: Maximum number of requests per time period. | ||
:param time_period: Time period in seconds. | ||
""" | ||
super().__init__(max_rate, time_period) | ||
self._attach_to_global_loop() | ||
|
||
def _attach_to_global_loop(self) -> None: | ||
"""Ensures the limiter remains attached to a global event loop.""" | ||
current_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() | ||
if self._global_event_loop is None: | ||
self._global_event_loop = current_loop | ||
elif self._global_event_loop != current_loop: | ||
logger.warning( | ||
"PersistentAsyncLimiter is being reused across different event loops. " | ||
"It has been re-attached to prevent state loss." | ||
) | ||
self._global_event_loop = current_loop | ||
|
||
@classmethod | ||
def get_limiter( | ||
cls, max_rate: float, time_period: float = 60 | ||
) -> "PersistentAsyncLimiter": | ||
""" | ||
Returns a persistent limiter instance for the given max_rate and time_period. | ||
Ensures that rate limiting remains consistent across all API requests. | ||
|
||
:param max_rate: Maximum number of requests per time period. | ||
:param time_period: Time period in seconds. | ||
:return: An instance of PersistentAsyncLimiter. | ||
""" | ||
key: tuple[float, float] = (max_rate, time_period) | ||
if key not in cls._limiter_instances: | ||
logger.info( | ||
f"Creating new persistent limiter for {max_rate} requests per {time_period} sec" | ||
) | ||
cls._limiter_instances[key] = cls(max_rate, time_period) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we maintaining multiple instances of the limiter in a single class instantiation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, maintaining multiple instances isn't necessary here. I'll refactor the code to use a single global PersistentAsyncLimiter instance instead. |
||
return cls._limiter_instances[key] | ||
|
||
async def __aenter__(self) -> None: | ||
await self.acquire() | ||
return None | ||
|
||
async def __aexit__( | ||
self, | ||
exc_type: Optional[Type[BaseException]], | ||
exc_value: Optional[BaseException], | ||
traceback: Optional[Any], | ||
) -> None: | ||
return None | ||
|
||
|
||
class GCPResourceQuota(ABC): | ||
""" | ||
GCPResourceQuota is an abstract base class designed to fetch and manage quota information for Google Cloud Platform (GCP) resources. | ||
|
@@ -132,18 +199,54 @@ async def register(self, container_id: str, *arg: Optional[Any]) -> AsyncLimiter | |
f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for rate limiting." | ||
) | ||
|
||
limiter = AsyncLimiter(max_rate=quota, time_period=self.time_period) | ||
limiter = AsyncLimiter( | ||
max_rate=effective_quota_limit, time_period=self.time_period | ||
) | ||
return limiter | ||
|
||
async def register_persistent_limiter( | ||
self, container_id: str, *arg: Optional[Any] | ||
) -> "PersistentAsyncLimiter": | ||
quota = await self._get_quota(container_id, *arg) | ||
effective_quota_limit: int = int(max(round(quota * _PERCENTAGE_OF_QUOTA, 1), 1)) | ||
logger.info( | ||
f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for persistent rate limiting." | ||
) | ||
limiter = PersistentAsyncLimiter.get_limiter( | ||
max_rate=effective_quota_limit, time_period=self.time_period | ||
) | ||
return limiter | ||
|
||
@final | ||
def quota_name(self, container_id: str) -> str: | ||
return f"{self.container_type.value}/{container_id}/locations/global/services/{self.service}/quotaInfos/{self.quota_id}" | ||
|
||
async def limiter(self, container_id: str) -> AsyncLimiter: | ||
"fetches the rate limiter for the given container" | ||
async def _get_limiter( | ||
self, container_id: str, persistent: bool = False | ||
) -> AsyncLimiter: | ||
""" | ||
Fetches the rate limiter for the given container. | ||
|
||
:param container_id: The container ID for which to fetch the limiter. | ||
:param persistent: Whether to return a persistent rate limiter. | ||
:return: An instance of either AsyncLimiter or PersistentAsyncLimiter. | ||
""" | ||
name = self.quota_name(container_id) | ||
if persistent: | ||
return await self.register_persistent_limiter(container_id, name) | ||
return await self.register(container_id, name) | ||
|
||
async def limiter(self, container_id: str) -> AsyncLimiter: | ||
return await self._get_limiter(container_id) | ||
|
||
async def persistent_rate_limiter( | ||
self, container_id: str | ||
) -> PersistentAsyncLimiter: | ||
return cast( | ||
PersistentAsyncLimiter, | ||
await self._get_limiter(container_id, persistent=True), | ||
) | ||
|
||
|
||
class ResourceBoundedSemaphore(GCPResourceRateLimiter): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already utilize 80%, Is there a need to increase the time ?, and why 61 ?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just based totally off observation; I do not have any documentation to back me up but when I run with the 60s time limit, I hit the rate limit. With the extra second, I do not hit a 429 at all, even with larger batches of requests. I tried removing it, and I hit the limit again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment here explaining this