Skip to content

Commit

Permalink
Validate provision enabled for integrations
Browse files Browse the repository at this point in the history
* Solve race condition when Port creates the integration config
* Check if integration is enabled for port provisioning if flag is set
  • Loading branch information
erikzaadi committed Feb 3, 2025
1 parent 4861b00 commit cf5debc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
41 changes: 37 additions & 4 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any, Dict, TYPE_CHECKING, Optional, TypedDict
from typing import Any, Dict, List, TYPE_CHECKING, Optional, TypedDict
from urllib.parse import quote_plus

import httpx
Expand All @@ -14,7 +14,7 @@


INTEGRATION_POLLING_INTERVAL_INITIAL_SECONDS = 3
INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR = 1.15
INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR = 1.55
INTEGRATION_POLLING_RETRY_LIMIT = 30
CREATE_RESOURCES_PARAM_NAME = "integration_modes"
CREATE_RESOURCES_PARAM_VALUE = ["create_resources"]
Expand All @@ -38,6 +38,27 @@ def __init__(
self.client = client
self._log_attributes: LogAttributes | None = None

async def is_integration_provision_enabled(
self, integration_type: str, should_raise: bool = True, should_log: bool = True
) -> bool:
enabled_integrations = await self.get_provision_enabled_integrations(
should_raise, should_log
)
return integration_type in enabled_integrations

async def get_provision_enabled_integrations(
self, should_raise: bool = True, should_log: bool = True
) -> List[str]:
logger.info("Fetching provision enabled integrations")
response = await self.client.get(
f"{self.auth.api_url}/integration/provision-enabled",
headers=await self.auth.headers(),
)

handle_status_code(response, should_raise, should_log)

return response.json().get("integrations", [])

async def _get_current_integration(self) -> httpx.Response:
logger.info(f"Fetching integration with id: {self.integration_identifier}")
response = await self.client.get(
Expand All @@ -51,7 +72,19 @@ async def get_current_integration(
) -> dict[str, Any]:
response = await self._get_current_integration()
handle_status_code(response, should_raise, should_log)
return response.json().get("integration", {})
integration = response.json().get("integration", {})
if integration.get("config", {}) != {}:
return integration
is_provision_enabled_for_integration = (
await self.is_integration_provision_enabled(
integration.get("installationAppType", ""),
should_raise,
should_log,
)
)
if is_provision_enabled_for_integration:
return self._poll_integration_until_default_provisioning_is_complete()
return integration

async def get_log_attributes(self) -> LogAttributes:
if self._log_attributes is None:
Expand All @@ -72,7 +105,7 @@ async def _poll_integration_until_default_provisioning_is_complete(
)
response = await self._get_current_integration()
integration_json = response.json()
if integration_json.get("integration", {}).get("config", {}):
if integration_json.get("integration", {}).get("config", {}) != {}:
return integration_json

logger.info(
Expand Down
22 changes: 15 additions & 7 deletions port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def _initialize_required_integration_settings(
create_port_resources_origin_in_port=integration_config.create_port_resources_origin
== CreatePortResourcesOrigin.Port,
)
elif not integration.get("config"):
elif integration.get("config", {}) == {}:
logger.info(
"Encountered that the integration's mapping is empty, Initializing to default mapping"
)
Expand Down Expand Up @@ -213,12 +213,20 @@ async def _initialize_defaults(
config_class, integration_config.resources_path
)

if (
not integration_config.create_port_resources_origin
and integration_config.runtime.is_saas_runtime
):
logger.info("Setting resources origin to be Port")
integration_config.create_port_resources_origin = CreatePortResourcesOrigin.Port
if not integration_config.create_port_resources_origin:
# Need to set default since spec is missing
is_integration_provision_enabled = (
await port_client.is_integration_provision_enabled(
integration_config.integration.type
)
)
if is_integration_provision_enabled:
logger.info(
f"Setting resources origin to be Port (integration {integration_config.integration.type} is supported)"
)
integration_config.create_port_resources_origin = (
CreatePortResourcesOrigin.Port
)

if (
integration_config.create_port_resources_origin
Expand Down

0 comments on commit cf5debc

Please sign in to comment.