diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 521a9cb99f..716716fdcb 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.104 (2025-02-14) + + +### Bug Fixes + +- Improved the performance by setting a threshold for the number of background queue +- Added checks that returns server busy when queue exceeds limit +- Updated the rate limiter in `GCPResourceRateLimiter` to actually use the effective limit value rather than the quota value. + + ## 0.1.103 (2025-02-13) diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/base.py b/integrations/gcp/gcp_core/helpers/ratelimiter/base.py index e1d5c049e7..a88f2d1c0b 100644 --- a/integrations/gcp/gcp_core/helpers/ratelimiter/base.py +++ b/integrations/gcp/gcp_core/helpers/ratelimiter/base.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Optional, TYPE_CHECKING, final - +from typing import Any, Optional, TYPE_CHECKING, final, Type, cast from aiolimiter import AsyncLimiter from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest from google.api_core.exceptions import GoogleAPICallError @@ -12,7 +11,10 @@ import asyncio -_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 60.0 +# Increasing _DEFAULT_RATE_LIMIT_TIME_PERIOD to 61.0 instead of 60.0 prevents hitting 429 errors in some cases. +# The extra second compensates for potential timing inconsistencies in request handling +# or slight variations in rate limit enforcement by the API. +_DEFAULT_RATE_LIMIT_TIME_PERIOD: float = 61.0 _DEFAULT_RATE_LIMIT_QUOTA: int = int( ocean.integration_config["search_all_resources_per_minute_quota"] ) @@ -29,6 +31,73 @@ class ContainerType(Enum): ORGANIZATION = "organizations" +class PersistentAsyncLimiter(AsyncLimiter): + """ + Persistent AsyncLimiter that remains valid across event loops. + Ensures rate limiting holds across multiple API requests, even when a new event loop is created. + + The AsyncLimiter documentation states that it is not designed to be reused across different event loops. + This class extends AsyncLimiter to ensure that the rate limiter remains attached to a global event loop. + Documentation: https://aiolimiter.readthedocs.io/en/latest/#:~:text=Note,this%20will%20work. + """ + + _global_event_loop: Optional[asyncio.AbstractEventLoop] = None + _limiter_instance: Optional["PersistentAsyncLimiter"] = None + + def __init__(self, max_rate: float, time_period: float = 60) -> None: + """ + Initializes a persistent AsyncLimiter with a specified rate and time period. + + :param max_rate: Maximum number of requests per time period. + :param time_period: Time period in seconds. + """ + super().__init__(max_rate, time_period) + self._attach_to_global_loop() + + def _attach_to_global_loop(self) -> None: + """Ensures the limiter remains attached to a global event loop.""" + current_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() + if self._global_event_loop is None: + self._global_event_loop = current_loop + elif self._global_event_loop != current_loop: + logger.warning( + "PersistentAsyncLimiter is being reused across different event loops. " + "It has been re-attached to prevent state loss." + ) + self._global_event_loop = current_loop + + @classmethod + def get_limiter( + cls, max_rate: float, time_period: float = 60 + ) -> "PersistentAsyncLimiter": + """ + Returns a persistent limiter instance for the given max_rate and time_period. + Ensures that rate limiting remains consistent across all API requests. + + :param max_rate: Maximum number of requests per time period. + :param time_period: Time period in seconds. + :return: An instance of PersistentAsyncLimiter. + """ + if cls._limiter_instance is None: + logger.info( + f"Creating new global persistent limiter for {max_rate} requests per {time_period} sec" + ) + cls._limiter_instance = cls(max_rate, time_period) + return cls._limiter_instance + + async def __aenter__(self) -> None: + await self.acquire() + return None + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[Any], + ) -> None: + return None + + class GCPResourceQuota(ABC): """ GCPResourceQuota is an abstract base class designed to fetch and manage quota information for Google Cloud Platform (GCP) resources. @@ -132,18 +201,54 @@ async def register(self, container_id: str, *arg: Optional[Any]) -> AsyncLimiter f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for rate limiting." ) - limiter = AsyncLimiter(max_rate=quota, time_period=self.time_period) + limiter = AsyncLimiter( + max_rate=effective_quota_limit, time_period=self.time_period + ) + return limiter + + async def register_persistent_limiter( + self, container_id: str, *arg: Optional[Any] + ) -> "PersistentAsyncLimiter": + quota = await self._get_quota(container_id, *arg) + effective_quota_limit: int = int(max(round(quota * _PERCENTAGE_OF_QUOTA, 1), 1)) + logger.info( + f"The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {effective_quota_limit} for persistent rate limiting." + ) + limiter = PersistentAsyncLimiter.get_limiter( + max_rate=effective_quota_limit, time_period=self.time_period + ) return limiter @final def quota_name(self, container_id: str) -> str: return f"{self.container_type.value}/{container_id}/locations/global/services/{self.service}/quotaInfos/{self.quota_id}" - async def limiter(self, container_id: str) -> AsyncLimiter: - "fetches the rate limiter for the given container" + async def _get_limiter( + self, container_id: str, persistent: bool = False + ) -> AsyncLimiter: + """ + Fetches the rate limiter for the given container. + + :param container_id: The container ID for which to fetch the limiter. + :param persistent: Whether to return a persistent rate limiter. + :return: An instance of either AsyncLimiter or PersistentAsyncLimiter. + """ name = self.quota_name(container_id) + if persistent: + return await self.register_persistent_limiter(container_id, name) return await self.register(container_id, name) + async def limiter(self, container_id: str) -> AsyncLimiter: + return await self._get_limiter(container_id) + + async def persistent_rate_limiter( + self, container_id: str + ) -> PersistentAsyncLimiter: + return cast( + PersistentAsyncLimiter, + await self._get_limiter(container_id, persistent=True), + ) + class ResourceBoundedSemaphore(GCPResourceRateLimiter): """ @@ -184,3 +289,14 @@ async def _maximum_concurrent_requests(self, container_id: str) -> int: f"Consider increasing the {self.service}/{self.quota_id} quota for {container_id} to process more {self.container_type.value} concurrently." ) return limit + + async def semaphore_for_real_time_event( + self, container_id: str + ) -> asyncio.BoundedSemaphore: + name = self.quota_name(container_id) + quota = await self.register(container_id, name) + semaphore = asyncio.BoundedSemaphore(int(quota.max_rate)) + logger.info( + f"The integration will process {quota.max_rate} {self.container_type.value} at a time based on {container_id}'s quota capacity" + ) + return semaphore diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 2eabb60549..49e77a7f90 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -26,7 +26,10 @@ ) from aiolimiter import AsyncLimiter from gcp_core.search.paginated_query import paginated_query, DEFAULT_REQUEST_TIMEOUT -from gcp_core.helpers.ratelimiter.base import MAXIMUM_CONCURRENT_REQUESTS +from gcp_core.helpers.ratelimiter.base import ( + MAXIMUM_CONCURRENT_REQUESTS, + PersistentAsyncLimiter, +) from asyncio import BoundedSemaphore from gcp_core.overrides import ProtoConfig @@ -217,19 +220,21 @@ async def search_all_organizations() -> ASYNC_GENERATOR_RESYNC_TYPE: async def get_single_project( project_name: str, rate_limiter: AsyncLimiter, + semaphore: BoundedSemaphore, config: ProtoConfig, ) -> RAW_ITEM: async with ProjectsAsyncClient() as projects_client: - async with rate_limiter: - logger.debug( - f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." - ) - return parse_protobuf_message( - await projects_client.get_project( - name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT - ), - config, - ) + async with semaphore: + async with rate_limiter: + logger.debug( + f"Executing get_single_project. Current rate limit: {rate_limiter.max_rate} requests per {rate_limiter.time_period} seconds." + ) + return parse_protobuf_message( + await projects_client.get_project( + name=project_name, timeout=DEFAULT_REQUEST_TIMEOUT + ), + config, + ) async def get_single_folder(folder_name: str, config: ProtoConfig) -> RAW_ITEM: @@ -313,14 +318,15 @@ async def feed_event_to_resource( asset_name: str, project_id: str, asset_data: dict[str, Any], - project_rate_limiter: AsyncLimiter, + project_rate_limiter: PersistentAsyncLimiter, + project_semaphore: BoundedSemaphore, config: ProtoConfig, ) -> RAW_ITEM: resource = None if asset_data.get("deleted") is True: resource = asset_data["priorAsset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, project_rate_limiter, config + project_id, project_rate_limiter, project_semaphore, config ) else: match asset_type: @@ -328,13 +334,13 @@ async def feed_event_to_resource( topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_topic(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, project_rate_limiter, config + project_id, project_rate_limiter, project_semaphore, config ) case AssetTypesWithSpecialHandling.SUBSCRIPTION: topic_name = asset_name.replace("//pubsub.googleapis.com/", "") resource = await get_single_subscription(topic_name, config) resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, project_rate_limiter, config + project_id, project_rate_limiter, project_semaphore, config ) case AssetTypesWithSpecialHandling.FOLDER: folder_id = asset_name.replace( @@ -348,11 +354,11 @@ async def feed_event_to_resource( resource = await get_single_organization(organization_id, config) case AssetTypesWithSpecialHandling.PROJECT: resource = await get_single_project( - project_id, project_rate_limiter, config + project_id, project_rate_limiter, project_semaphore, config ) case _: resource = asset_data["asset"]["resource"]["data"] resource[EXTRA_PROJECT_FIELD] = await get_single_project( - project_id, project_rate_limiter, config + project_id, project_rate_limiter, project_semaphore, config ) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index f42fd9b17d..b652c8cea4 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -7,6 +7,7 @@ from gcp_core.errors import ResourceNotFoundError from loguru import logger import proto # type: ignore +from gcp_core.helpers.ratelimiter.base import PersistentAsyncLimiter from port_ocean.context.event import event from port_ocean.core.handlers.port_app_config.models import ResourceConfig @@ -14,6 +15,7 @@ from port_ocean.context.ocean import ocean import json from pathlib import Path +from aiolimiter import AsyncLimiter from gcp_core.helpers.ratelimiter.overrides import ( SearchAllResourcesQpmPerProject, PubSubAdministratorPerMinutePerProject, @@ -30,7 +32,6 @@ ) if typing.TYPE_CHECKING: - from aiolimiter import AsyncLimiter from asyncio import BoundedSemaphore @@ -189,15 +190,11 @@ async def get_quotas_for_project( try: match kind: case AssetTypesWithSpecialHandling.PROJECT: - project_rate_limiter = ( - await project_get_requests_per_minute_per_project.limiter( - project_id - ) + project_rate_limiter = await project_get_requests_per_minute_per_project.persistent_rate_limiter( + project_id ) - project_semaphore = ( - await project_get_requests_per_minute_per_project.semaphore( - project_id - ) + project_semaphore = await project_get_requests_per_minute_per_project.semaphore_for_real_time_event( + project_id ) return project_rate_limiter, project_semaphore case ( @@ -238,6 +235,6 @@ async def get_quotas_for_project( async def resolve_request_controllers( kind: str, -) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: +) -> Tuple[(AsyncLimiter | PersistentAsyncLimiter), "BoundedSemaphore"]: service_account_project_id = get_service_account_project_id() return await get_quotas_for_project(service_account_project_id, kind) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 0eeb8ed849..351e629b1d 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -1,12 +1,14 @@ +import asyncio import http import os import tempfile import typing +from asyncio import BoundedSemaphore -from aiolimiter import AsyncLimiter -from fastapi import Request, Response, BackgroundTasks +from fastapi import Request, Response from loguru import logger +from gcp_core.helpers.ratelimiter.base import PersistentAsyncLimiter from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE @@ -38,7 +40,9 @@ resolve_request_controllers, ) -PROJECT_V3_GET_REQUESTS_RATE_LIMITER: AsyncLimiter +PROJECT_V3_GET_REQUESTS_RATE_LIMITER: PersistentAsyncLimiter +PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE: BoundedSemaphore +BACKGROUND_TASK_THRESHOLD: float async def _resolve_resync_method_for_resource( @@ -77,15 +81,6 @@ async def _resolve_resync_method_for_resource( ) -@ocean.on_start() -async def setup_real_time_request_controllers() -> None: - global PROJECT_V3_GET_REQUESTS_RATE_LIMITER - if not ocean.event_listener_type == "ONCE": - PROJECT_V3_GET_REQUESTS_RATE_LIMITER, _ = await resolve_request_controllers( - AssetTypesWithSpecialHandling.PROJECT - ) - - @ocean.on_start() async def setup_application_default_credentials() -> None: if not ocean.integration_config["encoded_adc_configuration"]: @@ -103,6 +98,24 @@ async def setup_application_default_credentials() -> None: logger.info("Created Application Default Credentials configuration") +@ocean.on_start() +async def setup_real_time_request_controllers() -> None: + global PROJECT_V3_GET_REQUESTS_RATE_LIMITER + global PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE + global BACKGROUND_TASK_THRESHOLD + if not ocean.event_listener_type == "ONCE": + ( + PROJECT_V3_GET_REQUESTS_RATE_LIMITER, + PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE, + ) = typing.cast( + tuple[PersistentAsyncLimiter, asyncio.BoundedSemaphore], + await resolve_request_controllers(AssetTypesWithSpecialHandling.PROJECT), + ) + BACKGROUND_TASK_THRESHOLD = float( + PROJECT_V3_GET_REQUESTS_RATE_LIMITER.max_rate * 10 + ) + + @ocean.on_resync(kind=AssetTypesWithSpecialHandling.FOLDER) async def resync_folders(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async for batch in search_all_folders(): @@ -200,6 +213,7 @@ async def process_realtime_event( asset_project, asset_data, PROJECT_V3_GET_REQUESTS_RATE_LIMITER, + PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE, config, ) if asset_data.get("deleted") is True: @@ -222,7 +236,7 @@ async def process_realtime_event( @ocean.router.post("/events") async def feed_events_callback( - request: Request, background_tasks: BackgroundTasks + request: Request, ) -> Response: """ This is the real-time events handler. The subscription which is connected to the Feeds Topic will send events here once @@ -234,9 +248,35 @@ async def feed_events_callback( The request has a message, which contains a 64based data of the asset. The message schema: https://cloud.google.com/pubsub/docs/push?_gl=1*thv8i4*_ga*NDQwMTA2MzM5LjE3MTEyNzQ2MDY.*_ga_WH2QY8WWF5*MTcxMzA3NzU3Ni40My4xLjE3MTMwNzgxMjUuMC4wLjA.&_ga=2.161162040.-440106339.1711274606&_gac=1.184150868.1711468720.CjwKCAjw5ImwBhBtEiwAFHDZx1mm-z19UdKpEARcG2-F_TXXbXw7j7_gVPKiQ9Z5KcpsvXF1fFb_MBoCUFkQAvD_BwE#receive_push The Asset schema: https://cloud.google.com/asset-inventory/docs/monitoring-asset-changes#creating_feeds + + The handler will reject the request if the background processing threshold is reached, to avoid overloading the system. + The subscription has a retry policy, so the event will be retried later if it's rejected. + Documentation: https://cloud.google.com/pubsub/docs/handling-failures#subscription_retry_policy """ - request_json = await request.json() try: + request_json = await request.json() + except Exception as e: + logger.warning( + f"Client raised exception {str(e)} before the request could be processed." + ) + return Response( + status_code=http.HTTPStatus.BAD_REQUEST, content="Client disconnected." + ) + try: + if ( + len( + [ + task + for task in asyncio.all_tasks() + if "process_realtime_event" in str(task) + ] + ) + > BACKGROUND_TASK_THRESHOLD + ): + logger.debug( + "Background processing threshold reached. Closing incoming real-time event" + ) + return Response(status_code=http.HTTPStatus.TOO_MANY_REQUESTS) asset_data = await parse_asset_data(request_json["message"]["data"]) asset_type = asset_data["asset"]["assetType"] asset_name = asset_data["asset"]["name"] @@ -261,13 +301,10 @@ async def feed_events_callback( getattr(selector, "preserve_api_response_case_style", False) ) ) - background_tasks.add_task( - process_realtime_event, - asset_type, - asset_name, - asset_project, - asset_data, - config, + asyncio.create_task( + process_realtime_event( + asset_type, asset_name, asset_project, asset_data, config + ) ) logger.info( f"Added background task to process real-time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}" diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index 1b51e6e8a5..3996b01a3a 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.103" +version = "0.1.104" description = "A GCP ocean integration" authors = ["Matan Geva "] diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py index e815be5870..9c30e28591 100644 --- a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -187,6 +187,7 @@ async def test_feed_to_resource( project_id=mock_asset_project_name, asset_data=mock_asset_data, config=config, + project_semaphore=AsyncMock(), ) # Assert