Skip to content

Commit

Permalink
[Core] Introduced a new entity diff resolver to reduce port system lo…
Browse files Browse the repository at this point in the history
…ad by comparing entities and upserting changed entities only (#1318)

# Description

What - reduce the amount of upserts we send to port api

Why - many of the upserts does not contain an actual change, reduce load
from port api

How - check if the entity from the third party has a change from the
entity in port, and only if there is an actual change, upsert the entity

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
  • Loading branch information
yaelibarg authored Jan 19, 2025
1 parent 9b801b0 commit 673c23a
Show file tree
Hide file tree
Showing 8 changed files with 1,111 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.18.0 (2025-01-15)

### Improvements

- Introduced a new entity diff resolver to reduce port system load by comparing entities and upserting changed entities only

## 0.17.8 (2025-01-15)

Expand Down
27 changes: 21 additions & 6 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,30 @@ async def upsert_entity(
if result_entity.is_using_search_identifier:
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
# upserted entity result for later calculations
return self._reduce_entity(result_entity)

@staticmethod
def _reduce_entity(entity: Entity) -> Entity:
"""
Reduces an entity to only keep identifier, blueprint and processed relations.
This helps save memory by removing unnecessary data.
Args:
entity: The entity to reduce
Returns:
Entity: A new entity with only the essential data
"""
reduced_entity = Entity(
identifier=result_entity.identifier, blueprint=result_entity.blueprint
identifier=entity.identifier, blueprint=entity.blueprint
)

# Turning dict typed relations (raw search relations) is required
# for us to be able to successfully calculate the participation related entities
# and ignore the ones that don't as they weren't upserted
reduced_entity.relations = {
key: None if isinstance(relation, dict) else relation
for key, relation in result_entity.relations.items()
for key, relation in entity.relations.items()
}

return reduced_entity
Expand Down Expand Up @@ -202,7 +214,10 @@ async def batch_delete_entities(
)

async def search_entities(
self, user_agent_type: UserAgentType, query: dict[Any, Any] | None = None
self,
user_agent_type: UserAgentType,
query: dict[Any, Any] | None = None,
parameters_to_include: list[str] | None = None,
) -> list[Entity]:
default_query = {
"combinator": "and",
Expand Down Expand Up @@ -232,7 +247,7 @@ async def search_entities(
headers=await self.auth.headers(user_agent_type),
params={
"exclude_calculated_properties": "true",
"include": ["blueprint", "identifier"],
"include": parameters_to_include or ["blueprint", "identifier"],
},
extensions={"retryable": True},
)
Expand Down
156 changes: 132 additions & 24 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RAW_ITEM,
CalculationResult,
)
from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.core.utils.utils import resolve_entities_diff, zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5
Expand Down Expand Up @@ -130,24 +130,128 @@ async def _calculate_raw(
)
)

def _construct_search_query_for_entities(self, entities: list[Entity]) -> dict:
"""Create a query to search for entities by their identifiers.
Args:
entities (list[Entity]): List of entities to search for.
Returns:
dict: Query structure for searching entities by identifier.
"""
return {
"combinator": "and",
"rules": [
{
"combinator": "or",
"rules": [
{
"property": "$identifier",
"operator": "=",
"value": entity.identifier,
}
for entity in entities
]
}
]
}

async def _map_entities_compared_with_port(
self,
entities: list[Entity],
resource: ResourceConfig,
user_agent_type: UserAgentType,
) -> list[Entity]:
if not entities:
return []

if entities[0].is_using_search_identifier or entities[0].is_using_search_relation:
return entities

BATCH_SIZE = 50
entities_at_port_with_properties = []

# Process entities in batches
for start_index in range(0, len(entities), BATCH_SIZE):
entities_batch = entities[start_index:start_index + BATCH_SIZE]
batch_results = await self._fetch_entities_batch_from_port(
entities_batch,
resource,
user_agent_type
)
entities_at_port_with_properties.extend(batch_results)

logger.info("Got entities from port with properties and relations", port_entities=len(entities_at_port_with_properties))

if len(entities_at_port_with_properties) > 0:
return resolve_entities_diff(entities, entities_at_port_with_properties)
return entities

async def _fetch_entities_batch_from_port(
self,
entities_batch: list[Entity],
resource: ResourceConfig,
user_agent_type: UserAgentType,
) -> list[Entity]:
query = self._construct_search_query_for_entities(entities_batch)
return await ocean.port_client.search_entities(
user_agent_type,
parameters_to_include=["blueprint", "identifier"] + (
["title"] if resource.port.entity.mappings.title != None else []
) + (
["team"] if resource.port.entity.mappings.team != None else []
) + [
f"properties.{prop}" for prop in resource.port.entity.mappings.properties
] + [
f"relations.{relation}" for relation in resource.port.entity.mappings.relations
],
query=query
)

async def _register_resource_raw(
self,
resource: ResourceConfig,
results: list[dict[Any, Any]],
user_agent_type: UserAgentType,
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
send_raw_data_examples_amount: int = 0
) -> CalculationResult:
objects_diff = await self._calculate_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
modified_objects = []

if ocean.app.is_saas():
try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)

if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))

modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
else:
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
errors=objects_diff[0].errors,
misonfigured_entity_keys=objects_diff[0].misonfigured_entity_keys,
misonfigured_entity_keys=objects_diff[0].misonfigured_entity_keys
)

async def _unregister_resource_raw(
Expand Down Expand Up @@ -186,14 +290,17 @@ async def _register_in_batches(
send_raw_data_examples_amount = (
SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
)
all_entities, register_errors,_ = await self._register_resource_raw(
resource_config,
raw_results,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
)
errors.extend(register_errors)
passed_entities = list(all_entities.passed)

passed_entities = []
if raw_results:
calculation_result = await self._register_resource_raw(
resource_config,
raw_results,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount
)
errors.extend(calculation_result.errors)
passed_entities = list(calculation_result.entity_selector_diff.passed)

for generator in async_generators:
try:
Expand All @@ -203,14 +310,14 @@ async def _register_in_batches(
0, send_raw_data_examples_amount - len(passed_entities)
)

entities, register_errors,_ = await self._register_resource_raw(
calculation_result = await self._register_resource_raw(
resource_config,
items,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
send_raw_data_examples_amount=send_raw_data_examples_amount
)
errors.extend(register_errors)
passed_entities.extend(entities.passed)
errors.extend(calculation_result.errors)
passed_entities.extend(calculation_result.entity_selector_diff.passed)
except* OceanAbortException as error:
errors.append(error)

Expand Down Expand Up @@ -446,9 +553,6 @@ async def sync_raw_all(

try:
did_fetched_current_state = True
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
except httpx.HTTPError as e:
logger.warning(
"Failed to fetch the current state of entities at Port. "
Expand All @@ -461,6 +565,7 @@ async def sync_raw_all(

creation_results: list[tuple[list[Entity], list[Exception]]] = []


try:
for resource in app_config.resources:
# create resource context per resource kind, so resync method could have access to the resource
Expand All @@ -471,7 +576,6 @@ async def sync_raw_all(
)

event.on_abort(lambda: task.cancel())

creation_results.append(await task)

await self.sort_and_upsert_failed_entities(user_agent_type)
Expand All @@ -487,7 +591,7 @@ async def sync_raw_all(
return

logger.info("Starting resync diff calculation")
flat_created_entities, errors = zip_and_sum(creation_results) or [
generated_entities, errors = zip_and_sum(creation_results) or [
[],
[],
]
Expand All @@ -504,10 +608,14 @@ async def sync_raw_all(
logger.error(message, exc_info=error_group)
else:
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
f"Running resync diff calculation, number of entities created during sync: {len(generated_entities)}"
)
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
{"before": entities_at_port, "after": generated_entities},
user_agent_type,
)

logger.info("Resync finished successfully")
4 changes: 4 additions & 0 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class Entity(BaseModel):
def is_using_search_identifier(self) -> bool:
return isinstance(self.identifier, dict)

@property
def is_using_search_relation(self) -> bool:
return any(isinstance(relation, dict) for relation in self.relations.values())


class BlueprintRelation(BaseModel):
many: bool
Expand Down
Loading

0 comments on commit 673c23a

Please sign in to comment.