From 9e60ce1ac0c0c39ec65b9030da3eeaf0ae1b1d53 Mon Sep 17 00:00:00 2001 From: Yaeli Gimelshtein Date: Wed, 29 Jan 2025 09:13:38 -0800 Subject: [PATCH] [Core] Entity diff calculation only on resync (#1360) # Description What - made the entity diff calculation only on resync Why - most of the events that are not resync dont change alot of entities, so its not worth searching the entity and checking diff on it How - added a check `if event.event_type == EventType.RESYNC:` ## Type of change Please leave one option from the following and delete the rest: - [x] New feature (non-breaking change which adds functionality)

All tests should be run against the port production environment(using a testing org).

### Core testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync finishes successfully - [x] Resync able to create entities - [x] Resync able to update entities - [x] Resync able to detect and delete entities - [x] Scheduled resync able to abort existing resync and start a new one - [x] Tested with at least 2 integrations from scratch - [x] Tested with Kafka and Polling event listeners - [x] Tested deletion of entities that don't pass the selector --- CHANGELOG.md | 6 + .../core/integrations/mixins/sync_raw.py | 44 +-- .../core/handlers/mixins/test_sync_raw.py | 273 ++++++++++-------- pyproject.toml | 2 +- 4 files changed, 181 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1e2fa3303..5d50ec9fe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.18.6 (2025-01-29) + +### Improvements + +- Entity diff calculation only on resync + ## 0.18.5 (2025-01-28) ### Bug Fixes diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index c91754f869..87e26b8491 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -220,26 +220,32 @@ async def _register_resource_raw( ) modified_objects = [] - try: - changed_entities = await self._map_entities_compared_with_port( - objects_diff[0].entity_selector_diff.passed, - resource, - user_agent_type - ) - if changed_entities: - logger.info("Upserting changed entities", changed_entities=len(changed_entities), - total_entities=len(objects_diff[0].entity_selector_diff.passed)) - await self.entities_state_applier.upsert( - changed_entities, user_agent_type - ) - else: - logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed)) - modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed] - except Exception as e: - logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}") - modified_objects = await self.entities_state_applier.upsert( - objects_diff[0].entity_selector_diff.passed, user_agent_type + if event.event_type == EventType.RESYNC: + try: + changed_entities = await self._map_entities_compared_with_port( + objects_diff[0].entity_selector_diff.passed, + resource, + user_agent_type ) + if changed_entities: + logger.info("Upserting changed entities", changed_entities=len(changed_entities), + total_entities=len(objects_diff[0].entity_selector_diff.passed)) + await self.entities_state_applier.upsert( + changed_entities, user_agent_type + ) + else: + logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed)) + modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed] + except Exception as e: + logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}") + modified_objects = await self.entities_state_applier.upsert( + objects_diff[0].entity_selector_diff.passed, user_agent_type + ) + else: + modified_objects = await self.entities_state_applier.upsert( + objects_diff[0].entity_selector_diff.passed, user_agent_type + ) + return CalculationResult( objects_diff[0].entity_selector_diff._replace(passed=modified_objects), diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index b0143a7fb4..917c9ff155 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -435,66 +435,63 @@ async def test_register_raw( mock_sync_raw_mixin_with_jq_processor: SyncRawMixin, mock_resource_config: ResourceConfig, ) -> None: - # Mock the integration settings with skip_check_diff - with patch.object(ocean.config.integration, "skip_check_diff", False): - kind = "service" - user_agent_type = UserAgentType.exporter - raw_entity = [ - {"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"}, - ] - expected_result = [ - { - "identifier": "entity_1", - "blueprint": "service", - "name": "entity_1", - "properties": {}, - }, - ] - async with event_context( - EventType.HTTP_REQUEST, trigger_type="machine" - ) as event: - # Use patch to mock the method instead of direct assignment + kind = "service" + user_agent_type = UserAgentType.exporter + raw_entity = [ + {"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"}, + ] + expected_result = [ + { + "identifier": "entity_1", + "blueprint": "service", + "name": "entity_1", + "properties": {"url": "https://example.com"}, + }, + ] + + async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event: + # Use patch to mock the method instead of direct assignment + with patch.object( + mock_sync_raw_mixin_with_jq_processor.port_app_config_handler, + "get_port_app_config", + return_value=PortAppConfig( + enable_merge_entity=True, + delete_dependent_entities=True, + create_missing_related_entities=False, + resources=[mock_resource_config], + ), + ): + # Ensure the event.port_app_config is set correctly + event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config( + use_cache=False + ) + + def upsert_side_effect( + entities: list[Entity], user_agent_type: UserAgentType + ) -> list[Entity]: + # Simulate returning the passed entities + return entities + + # Patch the upsert method with the side effect with patch.object( - mock_sync_raw_mixin_with_jq_processor.port_app_config_handler, - "get_port_app_config", - return_value=PortAppConfig( - enable_merge_entity=True, - delete_dependent_entities=True, - create_missing_related_entities=False, - resources=[mock_resource_config], - ), + mock_sync_raw_mixin_with_jq_processor.entities_state_applier, + "upsert", + side_effect=upsert_side_effect, ): - # Ensure the event.port_app_config is set correctly - event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config( - use_cache=False - ) - - def upsert_side_effect( - entities: list[Entity], user_agent_type: UserAgentType - ) -> list[Entity]: - # Simulate returning the passed entities - return entities - - # Patch the upsert method with the side effect - with patch.object( - mock_sync_raw_mixin_with_jq_processor.entities_state_applier, - "upsert", - side_effect=upsert_side_effect, - ): - # Call the register_raw method - registered_entities = ( - await mock_sync_raw_mixin_with_jq_processor.register_raw( - kind, raw_entity, user_agent_type - ) + # Call the register_raw method + registered_entities = ( + await mock_sync_raw_mixin_with_jq_processor.register_raw( + kind, raw_entity, user_agent_type ) + ) - # Assert that the registered entities match the expected results - assert len(registered_entities) == len(expected_result) - for entity, result in zip(registered_entities, expected_result): - assert entity.identifier == result["identifier"] - assert entity.blueprint == result["blueprint"] - assert entity.properties == result["properties"] + # Assert that the registered entities match the expected results + assert len(registered_entities) == len(expected_result) + for entity, result in zip(registered_entities, expected_result): + assert entity.identifier == result["identifier"] + assert entity.blueprint == result["blueprint"] + assert entity.properties == result["properties"] @pytest.mark.asyncio @@ -689,30 +686,26 @@ async def test_register_resource_raw_no_changes_upsert_not_called_entitiy_is_ret mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig, ) -> None: - # Mock the integration settings with skip_check_diff - with patch.object(ocean.config.integration, "skip_check_diff", False): - entity = Entity(identifier="1", blueprint="service") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[ - 0 - ], # Use the first resource from the config - [{"some": "data"}], - UserAgentType.exporter, - ) + entity = Entity(identifier="1", blueprint="service") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], # Use the first resource from the config + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 1 - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() + # Assertions + assert len(result.entity_selector_diff.passed) == 1 + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() @pytest.mark.asyncio @@ -720,57 +713,89 @@ async def test_register_resource_raw_with_changes_upsert_called_and_entities_are mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig, ) -> None: - # Mock the integration settings with skip_check_diff - with patch.object(ocean.config.integration, "skip_check_diff", False): - entity = Entity(identifier="1", blueprint="service") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], - [{"some": "data"}], - UserAgentType.exporter, - ) + entity = Entity(identifier="1", blueprint="service") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 1 - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() + # Assertions + assert len(result.entity_selector_diff.passed) == 1 + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() @pytest.mark.asyncio async def test_register_resource_raw_with_errors( mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig ) -> None: - # Mock the integration settings with skip_check_diff - with patch.object(ocean.config.integration, "skip_check_diff", False): - failed_entity = Entity(identifier="1", blueprint="service") - error = Exception("Test error") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], - [{"some": "data"}], - UserAgentType.exporter, - ) + failed_entity = Entity(identifier="1", blueprint="service") + error = Exception("Test error") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], + [{"some": "data"}], + UserAgentType.exporter, + ) + + # Assertions + assert len(result.entity_selector_diff.passed) == 0 + assert len(result.entity_selector_diff.failed) == 1 + assert len(result.errors) == 1 + assert result.errors[0] == error + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() + + +@pytest.mark.asyncio +async def test_register_resource_raw_skip_event_type_http_request_upsert_called_and_no_entitites_diff_calculation( + mock_sync_raw_mixin: SyncRawMixin, + mock_port_app_config: PortAppConfig, + mock_context: PortOceanContext, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Mock dependencies + entity = Entity(identifier="1", blueprint="service") + calculation_result = CalculationResult( + entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), + errors=[], + misconfigurations=[], + misonfigured_entity_keys=[], + ) + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[calculation_result]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock() # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore + + async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 0 - assert len(result.entity_selector_diff.failed) == 1 - assert len(result.errors) == 1 - assert result.errors[0] == error - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() + # Assertions + assert len(result.entity_selector_diff.passed) == 1 + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_not_called() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() diff --git a/pyproject.toml b/pyproject.toml index b895f6699a..fc23e6971d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.18.5" +version = "0.18.6" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"