Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][GCP] Improved quota handling implementation and performance #1362

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6fb3e55
Improved quota handling performance
oiadebayo Jan 30, 2025
76c3c24
Updated version
oiadebayo Jan 30, 2025
a55cade
Update CHANGELOG.md
oiadebayo Jan 30, 2025
86b79e8
Lint fix
oiadebayo Jan 30, 2025
c799d94
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 3, 2025
14d4e7e
Updated event processing
oiadebayo Feb 3, 2025
4e1fc9a
Update resource_searches.py
oiadebayo Feb 3, 2025
1e927e6
Override AsyncLimiter to fix persistence issue
oiadebayo Feb 3, 2025
6f9dd72
Update base.py
oiadebayo Feb 3, 2025
a719808
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 3, 2025
8b80580
Update main.py
oiadebayo Feb 4, 2025
2c923ef
Added threshold to background task queue
oiadebayo Feb 5, 2025
1317643
cleanup
oiadebayo Feb 5, 2025
5100a24
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 5, 2025
490649a
version bump
oiadebayo Feb 5, 2025
2366490
Update integrations/gcp/gcp_core/helpers/ratelimiter/base.py
oiadebayo Feb 5, 2025
ce27525
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 5, 2025
da28647
Attended to review comments
oiadebayo Feb 5, 2025
4d91e26
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 6, 2025
97246ac
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 6, 2025
c7a0c41
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
mk-armah Feb 7, 2025
f53cc12
Attended to comment
oiadebayo Feb 7, 2025
289f319
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 10, 2025
6402f82
version bump
oiadebayo Feb 10, 2025
74053a9
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 11, 2025
5648815
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
mk-armah Feb 11, 2025
371c173
Merge branch 'main' into PORT-12220-improving-performance-for-quota-h…
oiadebayo Feb 11, 2025
9748196
Attended to review comment
oiadebayo Feb 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.103 (2025-02-10)


### Bug Fixes

- Improved the performance by setting a threshold for the number of background queue
- Added checks that returns server busy when queue exceeds limit
- Updated the rate limiter in `GCPResourceRateLimiter` to actually use the effective limit value rather than the quota value.



## 0.1.102 (2025-02-09)


Expand Down
117 changes: 111 additions & 6 deletions integrations/gcp/gcp_core/helpers/ratelimiter/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, TYPE_CHECKING, final

from typing import Any, Optional, TYPE_CHECKING, final, Type, cast
from aiolimiter import AsyncLimiter
from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest
from google.api_core.exceptions import GoogleAPICallError
Expand All @@ -12,7 +11,10 @@
import asyncio


_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 60.0
# Increasing _DEFAULT_RATE_LIMIT_TIME_PERIOD to 61.0 instead of 60.0 prevents hitting 429 errors in some cases.
# The extra second compensates for potential timing inconsistencies in request handling
# or slight variations in rate limit enforcement by the API.
_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 61.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already utilize 80%, Is there a need to increase the time ?, and why 61 ?

Copy link
Member Author

@oiadebayo oiadebayo Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just based totally off observation; I do not have any documentation to back me up but when I run with the 60s time limit, I hit the rate limit. With the extra second, I do not hit a 429 at all, even with larger batches of requests. I tried removing it, and I hit the limit again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here explaining this

_DEFAULT_RATE_LIMIT_QUOTA: int = int(
ocean.integration_config["search_all_resources_per_minute_quota"]
)
Expand All @@ -29,6 +31,73 @@ class ContainerType(Enum):
ORGANIZATION = "organizations"


class PersistentAsyncLimiter(AsyncLimiter):
"""
Persistent AsyncLimiter that remains valid across event loops.
Ensures rate limiting holds across multiple API requests, even when a new event loop is created.

The AsyncLimiter documentation states that it is not designed to be reused across different event loops.
This class extends AsyncLimiter to ensure that the rate limiter remains attached to a global event loop.
Documentation: https://aiolimiter.readthedocs.io/en/latest/#:~:text=Note,this%20will%20work.
"""

_global_event_loop: Optional[asyncio.AbstractEventLoop] = None
_limiter_instance: Optional["PersistentAsyncLimiter"] = None

def __init__(self, max_rate: float, time_period: float = 60) -> None:
"""
Initializes a persistent AsyncLimiter with a specified rate and time period.

:param max_rate: Maximum number of requests per time period.
:param time_period: Time period in seconds.
"""
super().__init__(max_rate, time_period)
self._attach_to_global_loop()

def _attach_to_global_loop(self) -> None:
"""Ensures the limiter remains attached to a global event loop."""
current_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
if self._global_event_loop is None:
self._global_event_loop = current_loop
elif self._global_event_loop != current_loop:
logger.warning(
"PersistentAsyncLimiter is being reused across different event loops. "
"It has been re-attached to prevent state loss."
)
self._global_event_loop = current_loop

@classmethod
def get_limiter(
cls, max_rate: float, time_period: float = 60
) -> "PersistentAsyncLimiter":
"""
Returns a persistent limiter instance for the given max_rate and time_period.
Ensures that rate limiting remains consistent across all API requests.

:param max_rate: Maximum number of requests per time period.
:param time_period: Time period in seconds.
:return: An instance of PersistentAsyncLimiter.
"""
if cls._limiter_instance is None:
logger.info(
f"Creating new global persistent limiter for {max_rate} requests per {time_period} sec"
)
cls._limiter_instance = cls(max_rate, time_period)
return cls._limiter_instance

async def __aenter__(self) -> None:
await self.acquire()
return None

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[Any],
) -> None:
return None


class GCPResourceQuota(ABC):
"""
GCPResourceQuota is an abstract base class designed to fetch and manage quota information for Google Cloud Platform (GCP) resources.
Expand Down Expand Up @@ -132,18 +201,54 @@ async def register(self, container_id: str, *arg: Optional[Any]) -> AsyncLimiter
f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for rate limiting."
)

limiter = AsyncLimiter(max_rate=quota, time_period=self.time_period)
limiter = AsyncLimiter(
max_rate=effective_quota_limit, time_period=self.time_period
)
return limiter

async def register_persistent_limiter(
self, container_id: str, *arg: Optional[Any]
) -> "PersistentAsyncLimiter":
quota = await self._get_quota(container_id, *arg)
effective_quota_limit: int = int(max(round(quota * _PERCENTAGE_OF_QUOTA, 1), 1))
logger.info(
f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for persistent rate limiting."
)
limiter = PersistentAsyncLimiter.get_limiter(
max_rate=effective_quota_limit, time_period=self.time_period
)
return limiter

@final
def quota_name(self, container_id: str) -> str:
return f"{self.container_type.value}/{container_id}/locations/global/services/{self.service}/quotaInfos/{self.quota_id}"

async def limiter(self, container_id: str) -> AsyncLimiter:
"fetches the rate limiter for the given container"
async def _get_limiter(
self, container_id: str, persistent: bool = False
) -> AsyncLimiter:
"""
Fetches the rate limiter for the given container.

:param container_id: The container ID for which to fetch the limiter.
:param persistent: Whether to return a persistent rate limiter.
:return: An instance of either AsyncLimiter or PersistentAsyncLimiter.
"""
name = self.quota_name(container_id)
if persistent:
return await self.register_persistent_limiter(container_id, name)
return await self.register(container_id, name)

async def limiter(self, container_id: str) -> AsyncLimiter:
return await self._get_limiter(container_id)

async def persistent_rate_limiter(
self, container_id: str
) -> PersistentAsyncLimiter:
return cast(
PersistentAsyncLimiter,
await self._get_limiter(container_id, persistent=True),
)


class ResourceBoundedSemaphore(GCPResourceRateLimiter):
"""
Expand Down
7 changes: 5 additions & 2 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
)
from aiolimiter import AsyncLimiter
from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT
from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS
from gcp_core.helpers.ratelimiter.base import (
MAXIMUM_CONCURRENT_REQUESTS,
PersistentAsyncLimiter,
)
from asyncio import BoundedSemaphore
from gcp_core.overrides import ProtoConfig

Expand Down Expand Up @@ -313,7 +316,7 @@ async def feed_event_to_resource(
asset_name: str,
project_id: str,
asset_data: dict[str, Any],
project_rate_limiter: AsyncLimiter,
project_rate_limiter: PersistentAsyncLimiter,
config: ProtoConfig,
) -> RAW_ITEM:
resource = None
Expand Down
11 changes: 5 additions & 6 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from gcp_core.errors import ResourceNotFoundError
from loguru import logger
import proto # type: ignore
from gcp_core.helpers.ratelimiter.base import PersistentAsyncLimiter
from port_ocean.context.event import event
from port_ocean.core.handlers.port_app_config.models import ResourceConfig

from gcp_core.overrides import GCPCloudResourceConfig, ProtoConfig
from port_ocean.context.ocean import ocean
import json
from pathlib import Path
from aiolimiter import AsyncLimiter
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
Expand All @@ -30,7 +32,6 @@
)

if typing.TYPE_CHECKING:
from aiolimiter import AsyncLimiter
from asyncio import BoundedSemaphore


Expand Down Expand Up @@ -189,10 +190,8 @@ async def get_quotas_for_project(
try:
match kind:
case AssetTypesWithSpecialHandling.PROJECT:
project_rate_limiter = (
await project_get_requests_per_minute_per_project.limiter(
project_id
)
project_rate_limiter = await project_get_requests_per_minute_per_project.persistent_rate_limiter(
project_id
)
project_semaphore = (
await project_get_requests_per_minute_per_project.semaphore(
Expand Down Expand Up @@ -238,6 +237,6 @@ async def get_quotas_for_project(

async def resolve_request_controllers(
kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
) -> Tuple[(AsyncLimiter | PersistentAsyncLimiter), "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()
return await get_quotas_for_project(service_account_project_id, kind)
73 changes: 51 additions & 22 deletions integrations/gcp/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import http
import os
import tempfile
import typing

from aiolimiter import AsyncLimiter
from fastapi import Request, Response, BackgroundTasks
from fastapi import Request, Response
from loguru import logger

from gcp_core.helpers.ratelimiter.base import PersistentAsyncLimiter
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE

Expand Down Expand Up @@ -38,7 +38,8 @@
resolve_request_controllers,
)

PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter
PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter
BACKGROUND_TASK_THRESHOLD: float


async def _resolve_resync_method_for_resource(
Expand Down Expand Up @@ -77,15 +78,6 @@ async def _resolve_resync_method_for_resource(
)


@ocean.on_start()
async def setup_real_time_request_controllers() -> None:
global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
if not ocean.event_listener_type == "ONCE":
PROJECT_V3_GET_REQUESTS_RATE_LIMITER, _ = await resolve_request_controllers(
AssetTypesWithSpecialHandling.PROJECT
)


@ocean.on_start()
async def setup_application_default_credentials() -> None:
if not ocean.integration_config["encoded_adc_configuration"]:
Expand All @@ -103,6 +95,20 @@ async def setup_application_default_credentials() -> None:
logger.info("Created Application Default Credentials configuration")


@ocean.on_start()
async def setup_real_time_request_controllers() -> None:
global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
global BACKGROUND_TASK_THRESHOLD
if not ocean.event_listener_type == "ONCE":
PROJECT_V3_GET_REQUESTS_RATE_LIMITER, _ = typing.cast(
tuple[PersistentAsyncLimiter, asyncio.BoundedSemaphore],
await resolve_request_controllers(AssetTypesWithSpecialHandling.PROJECT),
)
BACKGROUND_TASK_THRESHOLD = float(
PROJECT_V3_GET_REQUESTS_RATE_LIMITER.max_rate * 100
)


@ocean.on_resync(kind=AssetTypesWithSpecialHandling.FOLDER)
async def resync_folders(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for batch in search_all_folders():
Expand Down Expand Up @@ -222,7 +228,7 @@ async def process_realtime_event(

@ocean.router.post("/events")
async def feed_events_callback(
request: Request, background_tasks: BackgroundTasks
request: Request,
) -> Response:
"""
This is the real-time events handler. The subscription which is connected to the Feeds Topic will send events here once
Expand All @@ -234,9 +240,35 @@ async def feed_events_callback(
The request has a message, which contains a 64based data of the asset.
The message schema: https://cloud.google.com/pubsub/docs/push?_gl=1*thv8i4*_ga*NDQwMTA2MzM5LjE3MTEyNzQ2MDY.*_ga_WH2QY8WWF5*MTcxMzA3NzU3Ni40My4xLjE3MTMwNzgxMjUuMC4wLjA.&_ga=2.161162040.-440106339.1711274606&_gac=1.184150868.1711468720.CjwKCAjw5ImwBhBtEiwAFHDZx1mm-z19UdKpEARcG2-F_TXXbXw7j7_gVPKiQ9Z5KcpsvXF1fFb_MBoCUFkQAvD_BwE#receive_push
The Asset schema: https://cloud.google.com/asset-inventory/docs/monitoring-asset-changes#creating_feeds

The handler will reject the request if the background processing threshold is reached, to avoid overloading the system.
The subscription has a retry policy, so the event will be retried later if it's rejected.
Documentation: https://cloud.google.com/pubsub/docs/handling-failures#subscription_retry_policy
"""
request_json = await request.json()
try:
request_json = await request.json()
except Exception as e:
logger.warning(
f"Client raised exception {str(e)} before the request could be processed."
)
return Response(
status_code=http.HTTPStatus.BAD_REQUEST, content="Client disconnected."
)
try:
if (
len(
[
task
for task in asyncio.all_tasks()
if "process_realtime_event" in str(task)
]
)
> BACKGROUND_TASK_THRESHOLD
):
logger.debug(
"Background processing threshold reached. Closing incoming real-time event"
)
return Response(status_code=http.HTTPStatus.TOO_MANY_REQUESTS)
asset_data = await parse_asset_data(request_json["message"]["data"])
asset_type = asset_data["asset"]["assetType"]
asset_name = asset_data["asset"]["name"]
Expand All @@ -261,13 +293,10 @@ async def feed_events_callback(
getattr(selector, "preserve_api_response_case_style", False)
)
)
background_tasks.add_task(
process_realtime_event,
asset_type,
asset_name,
asset_project,
asset_data,
config,
asyncio.create_task(
process_realtime_event(
asset_type, asset_name, asset_project, asset_data, config
)
)
logger.info(
f"Added background task to process real-time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}"
Expand Down
2 changes: 1 addition & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcp"
version = "0.1.102"
version = "0.1.103"
description = "A GCP ocean integration"
authors = ["Matan Geva <matang@getport.io>"]

Expand Down