Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Improve performance apply events #1141

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions src/gobupload/apply/event_applicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
Applies events to the respective entity in the current model

"""
from __future__ import annotations

import datetime
from collections import defaultdict
from typing import Union

from gobcore.exceptions import GOBException
from sqlalchemy import RowMapping

from gobcore.events import GOB, database_to_gobevent, ImportEvent
from gobcore.exceptions import GOBException
from gobcore.model import FIELD
from gobupload.storage.handler import GOBStorageHandler
from gobupload.update.update_statistics import UpdateStatistics

Expand Down Expand Up @@ -42,7 +48,7 @@ def _add_update(self, gob_event):
self.updates[gob_event.tid].append(gob_event)
self.updates_total += 1

def _update_entity(self, entity):
def _update_entity(self, entity, tid: str):
"""
Update an entity

Expand All @@ -52,23 +58,23 @@ def _update_entity(self, entity):

:param entity:
"""
for gob_event in self.updates.pop(entity._tid):
# validate event and entity, raises if not valid
self._validate_update_event(gob_event, entity)
for gob_event in self.updates[tid]:
self._validate_update_event(gob_event, entity._date_deleted)

# apply the event on the entity
gob_event.apply_to(entity)

# register the last event that has updated this entity
entity._last_event = gob_event.id

# update stats
self.stats.add_applied(gob_event.action, 1)

def _validate_update_event(self, gob_event: ImportEvent, entity) -> None:
return entity

def _validate_update_event(
self, gob_event: ImportEvent, date_deleted: datetime.datetime | None
) -> None:
if isinstance(gob_event, GOB.ADD):
# only apply ADD on deleted entity
if entity._date_deleted is not None:
if date_deleted is not None:
return

# a ADD event is trying to be applied to a non-deleted (current) entity
Expand All @@ -79,7 +85,7 @@ def _validate_update_event(self, gob_event: ImportEvent, entity) -> None:

else:
# only apply GOB.MODIFY, GOB.DELETE on non-deleted entity
if entity._date_deleted is None:
if date_deleted is None:
return

# a non-ADD event is trying to be applied on a deleted entity
Expand All @@ -91,10 +97,20 @@ def _validate_update_event(self, gob_event: ImportEvent, entity) -> None:
def _flush_updates(self):
"""Generate database updates from events in buffer and clear buffer."""
if self.updates:
entities = self.storage.get_entities(self.updates.keys(), with_deleted=True)

for entity in entities:
self._update_entity(entity)
Entity = self.storage.DbEntity

rows = self.storage.get_entity_rows_by_tid(
self.updates.keys(), with_deleted=True, columns=[FIELD.GOBID, FIELD.TID, FIELD.DATE_DELETED]
).mappings()

updates = (
self._update_entity(
Entity(_gobid=row[FIELD.GOBID], _date_deleted=row[FIELD.DATE_DELETED]),
row[FIELD.TID],
)
for row in rows
)
self.storage.apply_updates(updates)

self.updates.clear()
self.updates_total = 0
Expand Down
19 changes: 9 additions & 10 deletions src/gobupload/apply/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import traceback
from pathlib import Path

from sqlalchemy.engine import Row
Expand Down Expand Up @@ -31,21 +32,20 @@ def apply_events(storage: GOBStorageHandler, last_events: set[str], start_after:
:param stats: update statitics for this action
:return:
"""
CHUNK_SIZE = 10_000

with (
ProgressTicker("Apply events", 10_000) as progress,
ProgressTicker("Apply events", CHUNK_SIZE) as progress,
EventApplicator(storage, last_events, stats) as event_applicator,
):
for chunk in storage.get_events_starting_after(start_after):
with storage.get_session():
while chunk := storage.get_events_starting_after(start_after, CHUNK_SIZE):
with storage.get_session() as session:
for event in chunk:
progress.tick()
event_applicator.load(event)

try:
event_applicator.flush()
except Exception as err:
logger.error(f"Exception during applying events: {repr(err)}")
break # skips 'else' and executes 'finally' => session rollback + closed
event_applicator.flush()
start_after = event.eventid


def _apply_confirms(storage: GOBStorageHandler, confirms: Path, timestamp: str, stats: UpdateStatistics):
Expand Down Expand Up @@ -141,8 +141,7 @@ def apply(msg):
apply_confirm_events(storage, stats, msg)
else:
logger.info(f"Start application of unhandled {model} events")
with storage.get_session():
last_events = set(storage.get_current_ids(exclude_deleted=False))
last_events = set(storage.get_current_ids(exclude_deleted=False))

apply_events(storage, last_events, entity_max_eventid, stats)
apply_confirm_events(storage, stats, msg)
Expand Down
1 change: 1 addition & 0 deletions src/gobupload/compare/entity_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ def collect(self, entity):

def close(self):
self._write_entities()
self.storage.session.flush()
self.storage.analyze_temporary_table()
5 changes: 3 additions & 2 deletions src/gobupload/compare/event_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class EventCollector:

MAX_BULK = 100_000 # Max number of events of same type in one bulk event
MAX_BULK = 10_000 # Max number of events of same type in one bulk event
BULK_TYPES = ["CONFIRM"] # Only CONFIRM events are grouped in bulk events

def __init__(self, contents_writer, confirms_writer, version):
Expand Down Expand Up @@ -65,7 +65,8 @@ def _end_of_bulk(self):
event = GOB.BULKCONFIRM.create_event([
{
'_tid': event["data"]["_tid"],
'_last_event': event["data"]["_last_event"]
'_last_event': event["data"]["_last_event"],
"_gobid": event["data"]["_gobid"]
} for event in self._bulk_events
], self.version)
self._add_event(event)
Expand Down
48 changes: 24 additions & 24 deletions src/gobupload/compare/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Todo: Event, action and mutation are used for the same subject. Use one name to improve maintainability.
"""
from typing import Iterator, Callable, Any
from typing import Iterator, Callable, Any, Sequence
from sqlalchemy.engine import Row

from gobcore.enum import ImportMode
Expand Down Expand Up @@ -70,37 +70,37 @@ def compare(msg):
stats = CompareStatistics()
filename, confirms = None, None # initialise here, storage.get_session doesn't re-raise exception

with storage.get_session(invalidate=True):
# Check any dependencies
if not meets_dependencies(storage, msg):
return {
"header": msg["header"],
"summary": logger.get_summary(),
"contents": None
}
# Check any dependencies
if not meets_dependencies(storage, msg):
return {
"header": msg["header"],
"summary": logger.get_summary(),
"contents": None
}

enricher = Enricher(storage, msg)
populator = Populator(entity_model, msg)
enricher = Enricher(storage, msg)
populator = Populator(entity_model, msg)

if storage.has_any_entity():
# Collect entities in a temporary table
if storage.has_any_entity():
# Collect entities in a temporary table
with storage.get_session(invalidate=True) as session:
with EntityCollector(storage) as collector:
_collect_entities(msg["contents"], collector.collect, enricher, populator, stats)

diff = storage.compare_temporary_data(mode)
filename, confirms = _process_compare_results(storage, entity_model, diff, stats)

else:
# If there are no records in the database all data are ADD events
logger.info("Initial load of new collection detected")
else:
# If there are no records in the database all data are ADD events
logger.info("Initial load of new collection detected")

with (
ContentsWriter() as writer,
EventCollector(contents_writer=writer, confirms_writer=None, version=version) as collector
):
_collect_entities(msg["contents"], collector.collect_initial_add, enricher, populator, stats)
with (
ContentsWriter() as writer,
EventCollector(contents_writer=writer, confirms_writer=None, version=version) as collector
):
_collect_entities(msg["contents"], collector.collect_initial_add, enricher, populator, stats)

filename = writer.filename
filename = writer.filename

# Build result message
results = stats.results()
Expand Down Expand Up @@ -167,7 +167,7 @@ def _process_compare_result_row(
elif event_type == "CONFIRM":
return GOB.CONFIRM.create_event(
_tid=tid,
data={FIELD.LAST_EVENT: last_event},
data={FIELD.LAST_EVENT: last_event, FIELD.GOBID: getattr(row, "_entity_gobid")},
version=event_version
)

Expand All @@ -193,7 +193,7 @@ def _process_compare_result_row(


def _process_compare_results(
storage: GOBStorageHandler, model: dict, results: Iterator[list[Row]], stats: CompareStatistics
storage: GOBStorageHandler, model: dict, results: Iterator[Sequence[Row]], stats: CompareStatistics
) -> tuple[str, str]:
"""Process the results of the in database compare.

Expand Down
Loading