Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Entity diff calculation only on resync #1360

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.18.6 (2025-01-29)

### Improvements

- Entity diff calculation only on resync

## 0.18.5 (2025-01-28)

### Bug Fixes
Expand Down
44 changes: 25 additions & 19 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,26 +220,32 @@ async def _register_resource_raw(
)
modified_objects = []

try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
if event.event_type == EventType.RESYNC:
try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
else:
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)


return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
Expand Down
273 changes: 149 additions & 124 deletions port_ocean/tests/core/handlers/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,66 +435,63 @@ async def test_register_raw(
mock_sync_raw_mixin_with_jq_processor: SyncRawMixin,
mock_resource_config: ResourceConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
kind = "service"
user_agent_type = UserAgentType.exporter
raw_entity = [
{"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"},
]
expected_result = [
{
"identifier": "entity_1",
"blueprint": "service",
"name": "entity_1",
"properties": {},
},
]

async with event_context(
EventType.HTTP_REQUEST, trigger_type="machine"
) as event:
# Use patch to mock the method instead of direct assignment
kind = "service"
user_agent_type = UserAgentType.exporter
raw_entity = [
{"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"},
]
expected_result = [
{
"identifier": "entity_1",
"blueprint": "service",
"name": "entity_1",
"properties": {"url": "https://example.com"},
},
]

async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event:
# Use patch to mock the method instead of direct assignment
with patch.object(
mock_sync_raw_mixin_with_jq_processor.port_app_config_handler,
"get_port_app_config",
return_value=PortAppConfig(
enable_merge_entity=True,
delete_dependent_entities=True,
create_missing_related_entities=False,
resources=[mock_resource_config],
),
):
# Ensure the event.port_app_config is set correctly
event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config(
use_cache=False
)

def upsert_side_effect(
entities: list[Entity], user_agent_type: UserAgentType
) -> list[Entity]:
# Simulate returning the passed entities
return entities

# Patch the upsert method with the side effect
with patch.object(
mock_sync_raw_mixin_with_jq_processor.port_app_config_handler,
"get_port_app_config",
return_value=PortAppConfig(
enable_merge_entity=True,
delete_dependent_entities=True,
create_missing_related_entities=False,
resources=[mock_resource_config],
),
mock_sync_raw_mixin_with_jq_processor.entities_state_applier,
"upsert",
side_effect=upsert_side_effect,
):
# Ensure the event.port_app_config is set correctly
event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config(
use_cache=False
)

def upsert_side_effect(
entities: list[Entity], user_agent_type: UserAgentType
) -> list[Entity]:
# Simulate returning the passed entities
return entities

# Patch the upsert method with the side effect
with patch.object(
mock_sync_raw_mixin_with_jq_processor.entities_state_applier,
"upsert",
side_effect=upsert_side_effect,
):
# Call the register_raw method
registered_entities = (
await mock_sync_raw_mixin_with_jq_processor.register_raw(
kind, raw_entity, user_agent_type
)
# Call the register_raw method
registered_entities = (
await mock_sync_raw_mixin_with_jq_processor.register_raw(
kind, raw_entity, user_agent_type
)
)

# Assert that the registered entities match the expected results
assert len(registered_entities) == len(expected_result)
for entity, result in zip(registered_entities, expected_result):
assert entity.identifier == result["identifier"]
assert entity.blueprint == result["blueprint"]
assert entity.properties == result["properties"]
# Assert that the registered entities match the expected results
assert len(registered_entities) == len(expected_result)
for entity, result in zip(registered_entities, expected_result):
assert entity.identifier == result["identifier"]
assert entity.blueprint == result["blueprint"]
assert entity.properties == result["properties"]


@pytest.mark.asyncio
Expand Down Expand Up @@ -689,88 +686,116 @@ async def test_register_resource_raw_no_changes_upsert_not_called_entitiy_is_ret
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[
0
], # Use the first resource from the config
[{"some": "data"}],
UserAgentType.exporter,
)
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0], # Use the first resource from the config
[{"some": "data"}],
UserAgentType.exporter,
)

# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()


@pytest.mark.asyncio
async def test_register_resource_raw_with_changes_upsert_called_and_entities_are_mapped(
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)
entity = Entity(identifier="1", blueprint="service")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)

# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()


@pytest.mark.asyncio
async def test_register_resource_raw_with_errors(
mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig
) -> None:
# Mock the integration settings with skip_check_diff
with patch.object(ocean.config.integration, "skip_check_diff", False):
failed_entity = Entity(identifier="1", blueprint="service")
error = Exception("Test error")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)
failed_entity = Entity(identifier="1", blueprint="service")
error = Exception("Test error")
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore

async with event_context(EventType.RESYNC, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)

# Assertions
assert len(result.entity_selector_diff.passed) == 0
assert len(result.entity_selector_diff.failed) == 1
assert len(result.errors) == 1
assert result.errors[0] == error
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()


@pytest.mark.asyncio
async def test_register_resource_raw_skip_event_type_http_request_upsert_called_and_no_entitites_diff_calculation(
mock_sync_raw_mixin: SyncRawMixin,
mock_port_app_config: PortAppConfig,
mock_context: PortOceanContext,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Mock dependencies
entity = Entity(identifier="1", blueprint="service")
calculation_result = CalculationResult(
entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]),
errors=[],
misconfigurations=[],
misonfigured_entity_keys=[],
)
mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[calculation_result]) # type: ignore
mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock() # type: ignore
mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore

async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event:
event.port_app_config = mock_port_app_config

# Test execution
result = await mock_sync_raw_mixin._register_resource_raw(
mock_port_app_config.resources[0],
[{"some": "data"}],
UserAgentType.exporter,
)

# Assertions
assert len(result.entity_selector_diff.passed) == 0
assert len(result.entity_selector_diff.failed) == 1
assert len(result.errors) == 1
assert result.errors[0] == error
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called()
# Assertions
assert len(result.entity_selector_diff.passed) == 1
mock_sync_raw_mixin._calculate_raw.assert_called_once()
mock_sync_raw_mixin._map_entities_compared_with_port.assert_not_called()
mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.18.5"
version = "0.18.6"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down