Skip to content

Commit

Permalink
added new integration setting - calculate entities diff , to control …
Browse files Browse the repository at this point in the history
…if we want to calcualte diff or just upsert all
  • Loading branch information
yaelibarg committed Jan 29, 2025
1 parent bf18ea9 commit e3168fe
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
1 change: 1 addition & 0 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class IntegrationSettings(BaseOceanModel, extra=Extra.allow):
identifier: str
type: str
config: Any = Field(default_factory=dict)
calculate_entities_diff: bool = True

@root_validator(pre=True)
def root_validator(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand Down
44 changes: 25 additions & 19 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,26 +220,32 @@ async def _register_resource_raw(
)
modified_objects = []

try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
if ocean.config.integration.calculate_entities_diff:
try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
else:
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)


return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
Expand Down
54 changes: 46 additions & 8 deletions port_ocean/tests/core/handlers/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ async def test_register_raw(
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin,
mock_resource_config: ResourceConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
# Mock the integration settings with calculate_entities_diff
with patch.object(ocean.config.integration, "calculate_entities_diff", True):
kind = "service"
user_agent_type = UserAgentType.exporter
raw_entity = [
Expand Down Expand Up @@ -689,8 +689,8 @@ async def test_register_resource_raw_no_changes_upsert_not_called_entitiy_is_ret
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
# Mock the integration settings with calculate_entities_diff
with patch.object(ocean.config.integration, "calculate_entities_diff", True):
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
Expand Down Expand Up @@ -720,8 +720,8 @@ async def test_register_resource_raw_with_changes_upsert_called_and_entities_are
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
# Mock the integration settings with calculate_entities_diff
with patch.object(ocean.config.integration, "calculate_entities_diff", True):
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore
Expand All @@ -748,8 +748,8 @@ async def test_register_resource_raw_with_changes_upsert_called_and_entities_are
async def test_register_resource_raw_with_errors(
mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
# Mock the integration settings with calculate_entities_diff
with patch.object(ocean.config.integration, "calculate_entities_diff", True):
failed_entity = Entity(identifier="1", blueprint="service")
error = Exception("Test error")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
Expand All @@ -774,3 +774,41 @@ async def test_register_resource_raw_with_errors(
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()


@pytest.mark.asyncio
async def test_register_resource_raw_skip_calculate_entities_diff_upsert_called_and_no_entitites_diff_calculation(
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
mock_context: PortOceanContext,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Mock ocean.app.is_saas()
with patch.object(ocean.config.integration, "calculate_entities_diff", False):
# Mock dependencies
entity = Entity(identifier="1", blueprint="service")
calculation_result = CalculationResult(
entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]),
errors=[],
misconfigurations=[],
misonfigured_entity_keys=[],
)
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[calculation_result]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock() # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)

# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_not_called()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()

0 comments on commit e3168fe

Please sign in to comment.