From c49fab8e8a0dd50b62cd5d6bc6fa3742e8b0b16b Mon Sep 17 00:00:00 2001 From: Diederik van der Boor Date: Mon, 8 Jul 2024 17:17:32 +0200 Subject: [PATCH] Remove BenK EventsProcessor code, this is no longer used. Confirmed by @peteradrichem This code was no longer working since the benk-dataset was removed in: https://github.com/Amsterdam/amsterdam-schema/pull/1032 This also removes the `schema import events` CLI command. --- src/schematools/cli.py | 29 +- src/schematools/events/__init__.py | 5 - src/schematools/events/full.py | 741 -------------- tests/conftest.py | 5 - tests/files/datasets/benk.json | 44 - tests/test_events_full.py | 1502 ---------------------------- 6 files changed, 2 insertions(+), 2324 deletions(-) delete mode 100644 src/schematools/events/__init__.py delete mode 100644 src/schematools/events/full.py delete mode 100644 tests/files/datasets/benk.json delete mode 100644 tests/test_events_full.py diff --git a/src/schematools/cli.py b/src/schematools/cli.py index ebe3f0a8..e866597e 100644 --- a/src/schematools/cli.py +++ b/src/schematools/cli.py @@ -1,4 +1,5 @@ """Cli tools.""" + from __future__ import annotations import io @@ -31,7 +32,6 @@ ckan, validation, ) -from schematools.events.full import EventsProcessor from schematools.exceptions import ( DatasetNotFound, IncompatibleMetaschema, @@ -865,32 +865,6 @@ def import_geojson( importer.load_file(geojson_path, batch_size=batch_size, truncate=truncate_table) -@import_.command("events") -@option_db_url -@option_schema_url -@argument_dataset_id -@click.option("--additional-schemas", "-a", multiple=True) -@click.argument("events_path") -@click.option("-t", "--truncate-table", default=False, is_flag=True) -def import_events( - db_url: str, - schema_url: str, - dataset_id: str, - additional_schemas: str, - events_path: str, - truncate_table: bool, -) -> None: - """Import an events file into a table.""" - engine = _get_engine(db_url) - dataset_schemas = [_get_dataset_schema(dataset_id, schema_url)] - for schema in additional_schemas: - dataset_schemas.append(_get_dataset_schema(schema, schema_url)) - # Create connection, do not start a transaction. - with engine.connect() as connection: - importer = EventsProcessor(dataset_schemas, connection, truncate=truncate_table) - importer.load_events_from_file(events_path) - - def _get_dataset_schema( dataset_id: str, schema_url: str, prefetch_related: bool = False ) -> DatasetSchema: @@ -960,6 +934,7 @@ def create_tables(db_url: str, schema_url: str, dataset_id: str) -> None: is_versioned_dataset=importer.is_versioned_dataset, ) + @create.command("sql") @click.option("--versioned/--no-versioned", default=True) @option_db_url diff --git a/src/schematools/events/__init__.py b/src/schematools/events/__init__.py deleted file mode 100644 index 62b2b1f1..00000000 --- a/src/schematools/events/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Events package.""" - -from sqlalchemy import MetaData - -metadata = MetaData() diff --git a/src/schematools/events/full.py b/src/schematools/events/full.py deleted file mode 100644 index 26ee350c..00000000 --- a/src/schematools/events/full.py +++ /dev/null @@ -1,741 +0,0 @@ -"""Module implementing an event processor, that processes full events.""" - -from __future__ import annotations - -import logging -import re -from collections import defaultdict -from dataclasses import dataclass - -import orjson -from sqlalchemy import Table, inspect -from sqlalchemy.engine import Connection - -from schematools.events import metadata -from schematools.exceptions import DatasetTableNotFound -from schematools.factories import tables_factory -from schematools.importer.base import BaseImporter -from schematools.loaders import get_schema_loader -from schematools.naming import to_snake_case -from schematools.types import DatasetFieldSchema, DatasetSchema, DatasetTableSchema - -# Enable the sqlalchemy logger by uncommenting the following 2 lines to debug SQL related issues -# logging.basicConfig() -# logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) - -logger = logging.getLogger(__name__) - -# Configuration information to map the event-type -# to the following fields: -# - db_operation_name (INSERT/UPDATE) -# - needs_select: generated SQL needs to select a record - -EVENT_TYPE_MAPPINGS = { - "ADD": ("insert", False), - "MODIFY": ("update", True), - "DELETE": ("delete", True), -} - -FULL_LOAD_TABLE_POSTFIX = "_full_load" - - -@dataclass -class UpdateParentTableConfiguration: - parent_schema_table: DatasetTableSchema - parent_table: Table - relation_name: str - - @property - def parent_id_field(self): - return ( - self.parent_table.c.id - if self.parent_schema_table.has_composite_key - else getattr(self.parent_table.c, self.parent_schema_table.identifier[0]) - ) - - def parent_id_value(self, prepared_row: dict) -> str: - return ".".join( - [ - str(prepared_row[to_snake_case(f"{self.parent_schema_table.shortname}_{fn}")]) - for fn in self.parent_schema_table.identifier - ] - ) - - @property - def parent_fields(self) -> list[str]: - pattern = re.compile(f"{self.relation_name}_[a-zA-Z]+") - return [f.name for f in self.parent_table.columns if re.match(pattern, f.name) is not None] - - def parent_field_values(self, data: dict) -> dict: - return {k: data[k] for k in self.parent_fields} - - -@dataclass -class RunConfiguration: - check_existence_on_add: bool = False - process_events: bool = True - execute_after_process: bool = True - - """Whether or not to update the table. This should usually be the case, but this variable can - be set to False when the table does not exist. This can happen for certain relation events - for which no relation table is defined in the database. In this case we still want to update - the relation columns in the parent table. - - """ - update_table: bool = True - - table: Table = None - table_name: str = None - schema_table: DatasetTableSchema = None - - update_parent_table_configuration: UpdateParentTableConfiguration = None - - nested_table_fields: list[DatasetFieldSchema] = None - - -class LastEventIds: - BENK_DATASET = "benk" - LASTEVENTIDS_TABLE = "lasteventids" - - def __init__(self, benk_dataset=None): - if benk_dataset is None: - loader = get_schema_loader() - benk_dataset = loader.get_dataset(self.BENK_DATASET) - self.lasteventids_table = benk_dataset.get_table_by_id(self.LASTEVENTIDS_TABLE) - self.lasteventid_column = self.lasteventids_table.get_field_by_id("lastEventId") - - self.cache = defaultdict(int) - - def is_event_processed(self, conn, table_name: str, event_id: int) -> bool: - last_eventid = self.get_last_eventid(conn, table_name) - - return False if last_eventid is None else last_eventid >= event_id - - def update_eventid(self, conn, table_name: str, event_id: int | None): - value = event_id if event_id is not None else "NULL" - conn.execute( - f"INSERT INTO {self.lasteventids_table.db_name} " # noqa: S608 # nosec: B608 - f"VALUES ('{table_name}', {value}) " - f'ON CONFLICT ("table") DO UPDATE SET {self.lasteventid_column.db_name} = {value}' - ) - self.cache[table_name] = event_id - - def get_last_eventid(self, conn, table_name: str) -> int | None: - if table_name in self.cache: - return self.cache[table_name] - - res = conn.execute( - f"SELECT {self.lasteventid_column.db_name} " # noqa: S608 - f"FROM {self.lasteventids_table.db_name} " # noqa: S608 - f"WHERE \"table\" = '{table_name}'" # noqa: S608 - ).fetchone() - last_eventid = None if res is None else res[0] - self.cache[table_name] = last_eventid - return last_eventid - - def copy_lasteventid(self, conn, from_table_name: str, to_table_name: str): - eventid = self.get_last_eventid(conn, from_table_name) - self.update_eventid(conn, to_table_name, eventid) - - def clear_cache(self): - self.cache = defaultdict(int) - - -class EventsProcessor: - """The core event processing class. - - It needs to be initialised once - with configuration (datasets) and a db connection. - Once initialised, the process_event() method is able to - process incoming events. - The database actions are done using SQLAlchemy Core. So, - a helper function `tables_factory()` is used to created the - SA Tables that are needed during the processing of the events. - """ - - def __init__( - self, - datasets: list[DatasetSchema], - connection: Connection, - local_metadata=None, - truncate=False, - benk_dataset=None, - ) -> None: - """Construct the event processor. - - Args: - datasets: list of DatasetSchema instances. Usually dataset tables - have relations to tables. - If these target tables are in different datasets, - these dataset also need to be provided. - srid: coordinate system - local_metadata: SQLAlchemy metadata object, only needs to be provided - in unit tests. - truncate: indicates if the relational tables need to be truncated - """ - self.lasteventids = LastEventIds(benk_dataset=benk_dataset) - benk_dataset = self.lasteventids.lasteventids_table.dataset - self.datasets = {benk_dataset.id: benk_dataset} | {ds.id: ds for ds in datasets} - self.conn = connection - _metadata = local_metadata or metadata # mainly for testing - _metadata.bind = connection.engine - inspector = inspect(connection.engine) - self.tables = {} - self.full_load_tables = defaultdict(dict) - for dataset_id, dataset in self.datasets.items(): - base_tables_ids = {dataset_table.id for dataset_table in dataset.tables} - self.tables[dataset_id] = tfac = { - # As quick workaround to map table identifiers with the existing event streams, - # the code of this file works with snake cased identifiers. - # The remaining code (e.g. BaseImporter) already works directly with table.id. - to_snake_case(table_id): table - for table_id, table in tables_factory(dataset, metadata=_metadata).items() - } - self.geo_fields = defaultdict(lambda: defaultdict(list)) - for table_id, table in tfac.items(): - if not inspector.has_table(table.name): - table.create() - elif truncate: - with self.conn.begin(): - self.conn.execute(table.delete()) - # self.has_composite_key = dataset_table.has_composite_key - # skip the generated nm tables - if table_id not in base_tables_ids: - continue - for field in dataset.get_table_by_id(table_id).fields: - if field.is_geo: - self.geo_fields[dataset_id][table_id].append(field) - - def _flatten_event_data(self, event_data: dict) -> dict: - result = {} - for k, v in event_data.items(): - if isinstance(v, dict): - flattened = self._flatten_event_data(v) - for k2, v2 in flattened.items(): - result[f"{k}_{k2}"] = v2 - else: - result[k] = v - return result - - def _get_full_load_tables( - self, dataset_id: str, table_id: str - ) -> tuple[Table, DatasetTableSchema]: - try: - # Cached - table, schema_table = self.full_load_tables[dataset_id][table_id] - except KeyError: - # Initialise fresh tables for full load. - dataset_schema: DatasetSchema = self.datasets[dataset_id] - tables = dataset_schema.get_tables(include_nested=True, include_through=True) - - importer = BaseImporter(dataset_schema, self.conn.engine, logger) - for schema_table in tables: - this_table_id = schema_table.id - - if to_snake_case(this_table_id) in self.full_load_tables[dataset_id]: - continue - - db_table_name = schema_table.db_name_variant(postfix=FULL_LOAD_TABLE_POSTFIX) - - importer.generate_db_objects( - this_table_id, - db_schema_name=to_snake_case(dataset_id), - db_table_name=db_table_name, - is_versioned_dataset=importer.is_versioned_dataset, - ind_extra_index=False, - ind_create_pk_lookup=False, - ) - - table = importer.tables[this_table_id] - self.full_load_tables[dataset_id][to_snake_case(this_table_id)] = ( - table, - schema_table, - ) - try: - table, schema_table = self.full_load_tables[dataset_id][table_id] - except KeyError: - # Happens when there is no relation table to update for a given relation, for ex. - raise DatasetTableNotFound() - - return table, schema_table - - def _before_process(self, run_configuration: RunConfiguration, event_meta: dict) -> None: - if not run_configuration.update_table: - return - if event_meta.get("full_load_sequence", False): - - if event_meta.get("first_of_sequence", False): - self.conn.execute(f"TRUNCATE {run_configuration.table.fullname}") - self.lasteventids.update_eventid(self.conn, run_configuration.table_name, None) - - def _after_process(self, run_configuration: RunConfiguration, event_meta: dict): - if not run_configuration.update_table: - return - - if event_meta.get("full_load_sequence", False) and event_meta.get( - "last_of_sequence", False - ): - dataset_id = event_meta["dataset_id"] - table_id = event_meta["table_id"] - - nested_tables = [ - to_snake_case(f.nested_table.id) for f in run_configuration.nested_table_fields - ] - table_ids_to_replace = [ - table_id, - ] + nested_tables - - logger.info("End of full load sequence. Replacing active table.") - with self.conn.begin(): - - full_load_tables = [] - for t_id in table_ids_to_replace: - table_to_replace = self.tables[dataset_id][to_snake_case(t_id)] - full_load_table, full_load_schema_table = self._get_full_load_tables( - dataset_id, to_snake_case(t_id) - ) - full_load_tables.append((full_load_table, full_load_schema_table)) - - fields = [field.db_name for field in full_load_schema_table.get_db_fields()] - if t_id in nested_tables: - fields.remove("id") # Let PG generate the id field for nested tables. - fieldnames = ", ".join(fields) - - self.conn.execute(f"TRUNCATE {table_to_replace.fullname}") - self.conn.execute( - f"INSERT INTO {table_to_replace.fullname} ({fieldnames}) " # noqa: S608 - f"SELECT {fieldnames} FROM {full_load_table.fullname}" # noqa: S608 - ) - if run_configuration.update_parent_table_configuration: - self._update_parent_table_bulk(run_configuration) - - for full_load_table, full_load_schema_table in full_load_tables: - self.conn.execute(f"DROP TABLE {full_load_table.fullname} CASCADE") - self.full_load_tables[dataset_id].pop(to_snake_case(full_load_schema_table.id)) - - # Copy full_load lasteventid to active table and set full_load lasteventid to None - self.lasteventids.copy_lasteventid( - self.conn, run_configuration.table_name, table_to_replace.name - ) - self.lasteventids.update_eventid(self.conn, run_configuration.table_name, None) - - def _prepare_row( - self, - run_configuration: RunConfiguration, - event_meta: dict, - event_data: dict, - schema_table: DatasetTableSchema, - ) -> dict: - dataset_id = event_meta["dataset_id"] - table_id = event_meta["table_id"] - - row = self._flatten_event_data(event_data) - - # Set null values for missing fields after flattening (e.g. when a 1-1 relation is empty) - # Only applies to events for which we have a table - if schema_table: - row |= { - f.db_name: None - for f in schema_table.get_fields(include_subfields=True) - if f.db_name not in row and f.db_name not in ("id", "schema") - } - - for geo_field in self.geo_fields[dataset_id][table_id]: - row_key = to_snake_case(geo_field.name) - geo_value = row.get(row_key) - if geo_value is not None and not geo_value.startswith("SRID"): - row[row_key] = f"SRID={geo_field.srid};{geo_value}" - - if run_configuration.update_table: - identifier = schema_table.identifier - id_value = ".".join(str(row[to_snake_case(fn)]) for fn in identifier) - row["id"] = id_value - return row - - def _update_parent_table( - self, - configuration: UpdateParentTableConfiguration, - event_meta: dict, - prepared_row: dict, - ): - # Have 1:n relation. We need to update the relation columns in the parent table as - # well. Skips this for n:m relations (schematable.parent_table_field.relation only - # returns 1:n relations) - stmt = configuration.parent_table.update().where( - configuration.parent_id_field == configuration.parent_id_value(prepared_row) - ) - - update_row = ( - configuration.parent_field_values(prepared_row) - if event_meta["event_type"] != "DELETE" - else {k: None for k in configuration.parent_fields} - ) - - self.conn.execute(stmt, update_row) - - def _update_parent_table_bulk(self, run_configuration: RunConfiguration): - update_parent_table_config = run_configuration.update_parent_table_configuration - parent_table_ref_id = f"{run_configuration.schema_table.shortname.split('_')[0]}_id" - - if len(update_parent_table_config.parent_fields) > 1: - # Adds parentheses - set_fields = ( - f'({", ".join(update_parent_table_config.parent_fields)}) = ' - f'(s.{", s.".join(update_parent_table_config.parent_fields)})' - ) - else: - # No parentheses, because only one field and Postgres will fail on this - set_fields = ( - f"{update_parent_table_config.parent_fields[0]} = " - f"s.{update_parent_table_config.parent_fields[0]}" - ) - - query = f""" - UPDATE {update_parent_table_config.parent_table.fullname} p - SET {set_fields} - FROM {run_configuration.table.fullname} s - WHERE p.{update_parent_table_config.parent_id_field.name} = s.{parent_table_ref_id} - ; - """ # noqa: S608 - - self.conn.execute(query) - - def _process_row( - self, run_configuration: RunConfiguration, event_meta: dict, event_data: dict - ) -> None: - """Process one row of data. - - Args: - run_configuration: Configuration for the current run - event_meta: Metadata about the event - event_data: Data containing the fields of the event - """ - table = run_configuration.table - schema_table = run_configuration.schema_table - - if self.lasteventids.is_event_processed( - self.conn, run_configuration.table_name, event_meta["event_id"] - ): - logger.warning("Event with id %s already processed. Skipping.", event_meta["event_id"]) - return - - row = self._prepare_row(run_configuration, event_meta, event_data, schema_table) - id_value = row["id"] - - event_type = event_meta["event_type"] - - db_operation = None - if run_configuration.update_table: - if ( - run_configuration.check_existence_on_add - and event_type == "ADD" - and self._row_exists_in_database(run_configuration, id_value) - ): - logger.info("Row with id %s already exists in database. Skipping.", row["id"]) - return - - db_operation_name, needs_select = EVENT_TYPE_MAPPINGS[event_type] - db_operation = getattr(table, db_operation_name)() - - if needs_select: - id_field = ( - table.c.id - if schema_table.has_composite_key - else getattr(table.c, schema_table.identifier[0]) - ) - db_operation = db_operation.where(id_field == id_value) - with self.conn.begin(): - if run_configuration.update_table: - self.conn.execute(db_operation, row) - self._update_nested_tables(run_configuration, row, event_type, id_value) - self.lasteventids.update_eventid( - self.conn, run_configuration.table_name, event_meta["event_id"] - ) - - if run_configuration.update_parent_table_configuration: - self._update_parent_table( - run_configuration.update_parent_table_configuration, - event_meta, - row, - ) - - def _update_nested_tables( - self, run_configuration: RunConfiguration, prepared_row: dict, event_type: str, id_value - ): - is_delete = event_type == "DELETE" - - for field in run_configuration.nested_table_fields: - schema_table: DatasetTableSchema = field.nested_table - table = self.tables[run_configuration.schema_table.dataset.id][ - to_snake_case(schema_table.id) - ] - - self.conn.execute(table.delete().where(table.c.parent_id == id_value)) - - if is_delete: - continue - - if value := prepared_row.get(to_snake_case(field.id), []): - if rows := self._prepare_nested_rows(field, value, id_value): - self.conn.execute(table.insert(), rows) - - def _prepare_nested_rows(self, field: DatasetFieldSchema, value: list, parent_id_value: str): - return [ - { - "parent_id": parent_id_value, - } - | {subfield.db_name: v[subfield.db_name] for subfield in field.subfields} - for v in value - ] - - def _update_nested_tables_bulk(self, run_configuration: RunConfiguration, rows: list[dict]): - - for field in run_configuration.nested_table_fields: - schema_table: DatasetTableSchema = field.nested_table - table, _ = self._get_full_load_tables( - run_configuration.schema_table.dataset.id, to_snake_case(schema_table.id) - ) - - nested_rows = [] - for row in rows: - nested_rows += self._prepare_nested_rows( - field, row.get(to_snake_case(field.id)) or [], row["id"] - ) - if nested_rows: - self.conn.execute(table.insert(), nested_rows) - - def _row_exists_in_database(self, run_configuration: RunConfiguration, id_value: str): - table = run_configuration.table - schema_table = run_configuration.schema_table - id_field = ( - table.c.id - if schema_table.has_composite_key - else getattr(table.c, schema_table.identifier[0]) - ) - - with self.conn.begin(): - res = self.conn.execute(table.select().where(id_field == id_value)) - try: - next(res) - except StopIteration: - return False - return True - - def _table_empty(self, table: Table): - with self.conn.begin(): - res = self.conn.execute(table.select()) - try: - next(res) - except StopIteration: - return True - return False - - def _get_run_configuration( - self, first_event_meta: dict, last_event_meta: dict, recovery_mode: bool - ) -> RunConfiguration: - run_configuration = RunConfiguration() - dataset_id = first_event_meta["dataset_id"] - table_id = first_event_meta["table_id"] - - try: - if first_event_meta.get("full_load_sequence", False): - table, schema_table = self._get_full_load_tables( - dataset_id, to_snake_case(table_id) - ) - else: - schema_table = self.datasets[dataset_id].get_table_by_id(table_id) - table = self.tables[dataset_id][to_snake_case(table_id)] - run_configuration.table = table - run_configuration.schema_table = schema_table - run_configuration.table_name = table.name - run_configuration.nested_table_fields = [ - f for f in schema_table.fields if f.is_nested_table - ] - - if schema_table.has_parent_table and schema_table.parent_table_field.relation: - run_configuration.update_parent_table_configuration = ( - UpdateParentTableConfiguration( - parent_schema_table=schema_table.parent_table, - parent_table=self.tables[dataset_id][schema_table.parent_table.id], - relation_name=to_snake_case(schema_table.parent_table_field.shortname), - ) - ) - except DatasetTableNotFound as exc: - # Check if relation table, if so continue - parent_table_id, *field_id = table_id.split("_") - field_id = "_".join(field_id) - - parent_table = self.datasets[dataset_id].get_table_by_id(parent_table_id) - field = parent_table.get_field_by_id(field_id) - if field.get("relation"): - logger.info( - "Relation %s.%s has no table. Will only update parent table.", - dataset_id, - table_id, - ) - run_configuration.update_table = False - run_configuration.update_parent_table_configuration = ( - UpdateParentTableConfiguration( - parent_schema_table=parent_table, - parent_table=self.tables[dataset_id][parent_table_id], - relation_name=to_snake_case(field.shortname), - ) - ) - run_configuration.table_name = to_snake_case(table_id) - - else: - raise exc - - if recovery_mode: - self._recover(run_configuration, first_event_meta, last_event_meta) - return run_configuration - - def _recover( - self, run_configuration: RunConfiguration, first_event_meta: dict, last_event_meta: dict - ): - # If a message is redelivered, we need to enter recovery mode. Redelivery means something - # has gone wrong - # somewhere. We need to get back to a consistent state. - # The actions to take in recovery mode depend on the type of message: - # 1. full_load_sequence = False: This was a regular update event. Can be of any type - # (ADD/MODIFY/DELETE). MODIFY - # and DELETE are idempotent, but ADD is not. We need to check if the data is already - # in the database before - # trying to add it again. - # 2. full_load_sequence = True with first_of_sequence = True: This should not be a problem. - # The first_of_sequence - # causes the table to be truncated, so we can just continue as normal. No need to check - # for existence. - # 3. full_load_sequence = True with first_of_sequence = False and last_of_sequence = False: - # We should check - # for existence before adding event data to the table. Because first_of_sequence and - # last_of_sequence are - # both False, there are no other possible side effects to consider. - # 4. full_load_sequence = True with first_of_sequence = False and last_of_sequence = True: - # If the target table - # is empty, we know that after_process was executed and that this message was handled - # correctly the first - # time it got delivered (because first_of_sequence = False, there should already have - # been data in the - # table). In that case we can ignore everything in this message; skip processing the - # events and skip the - # after_process step (4a). If the target table is not empty, we know that after_process - # was not executed. We - # should process the events and check for existence of the first event. After that we - # should execute - # after_process (4b). - if first_event_meta.get("full_load_sequence", False): - if first_event_meta.get("first_of_sequence", False): - # Case 2. - pass - elif not last_event_meta.get("last_of_sequence", False): - # Case 3. - run_configuration.check_existence_on_add = True - else: - # Case 4. - if self._table_empty(run_configuration.table): - # Case 4a. - run_configuration.execute_after_process = False - run_configuration.process_events = False - else: - # Case 4b. - run_configuration.check_existence_on_add = True - else: - # Case 1. - run_configuration.check_existence_on_add = True - - def process_event(self, event_meta: dict, event_data: dict, recovery_mode: bool = False): - self.process_events([(event_meta, event_data)], recovery_mode) - - def process_events(self, events: list[tuple[dict, dict]], recovery_mode: bool = False): - if len(events) == 0: - return - - first_event_meta, last_event_meta = events[0][0], events[-1][0] - run_configuration = self._get_run_configuration( - first_event_meta, last_event_meta, recovery_mode - ) - - self._before_process(run_configuration, first_event_meta) - - if run_configuration.process_events: - if first_event_meta.get("full_load_sequence", False): - # full_load_sequence only contains add events. Take more efficient shortcut. - self._process_bulk_adds(run_configuration, events) - else: - for event_meta, event_data in events: - self._process_row(run_configuration, event_meta, event_data) - - if run_configuration.execute_after_process: - self._after_process(run_configuration, last_event_meta) - - def _process_bulk_adds( - self, run_configuration: RunConfiguration, events: list[tuple[dict, dict]] - ): - first = True - rows = [] - last_eventid = None - - for event_meta, event_data in events: - if event_meta["event_type"] != "ADD": - raise Exception("This method should only be called when processing ADD events.") - - row = self._prepare_row( - run_configuration, event_meta, event_data, run_configuration.schema_table - ) - if ( - run_configuration.check_existence_on_add - and first - and run_configuration.update_table - ): - if self._row_exists_in_database(run_configuration, row["id"]): - logger.info("Skip bulk adds, as the first row already exists in the database.") - return - first = False - - # Note: This has some overlap with the recovery mode - if self.lasteventids.is_event_processed( - self.conn, run_configuration.table_name, event_meta["event_id"] - ): - logger.warning( - "Skip event with id %s, as the event has already been processed.", - event_meta["event_id"], - ) - continue - - rows.append(row) - last_eventid = event_meta["event_id"] - - if len(rows) == 0: - return - - with self.conn.begin(): - if run_configuration.update_table: - self.conn.execute(run_configuration.table.insert(), rows) - self._update_nested_tables_bulk(run_configuration, rows) - self.lasteventids.update_eventid(self.conn, run_configuration.table_name, last_eventid) - - def load_events_from_file(self, events_path: str): - """Load events from a file, primarily used for testing.""" - with open(events_path, "rb") as ef: - for line in ef: - if line := line.strip(): - event_id, event_meta_str, data_str = line.split(b"|", maxsplit=2) - event_meta = orjson.loads(event_meta_str) - event_data = orjson.loads(data_str) - self.process_event( - event_meta, - event_data, - ) - - def load_events_from_file_using_bulk(self, events_path: str): - """Load events from a file, primarily used for testing.""" - with open(events_path, "rb") as ef: - events = [] - for line in ef: - if line := line.strip(): - event_id, event_meta_str, data_str = line.split(b"|", maxsplit=2) - event_meta = orjson.loads(event_meta_str) - event_data = orjson.loads(data_str) - events.append((event_meta, event_data)) - self.process_events(events) diff --git a/tests/conftest.py b/tests/conftest.py index f3cbb6ae..ddeb72ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -326,11 +326,6 @@ def nap_schema(schema_loader) -> DatasetSchema: return schema_loader.get_dataset_from_file("nap.json") -@pytest.fixture -def benk_schema(schema_loader) -> DatasetSchema: - return schema_loader.get_dataset_from_file("benk.json") - - @pytest.fixture def bouwblokken_schema(schema_loader, gebieden_schema) -> DatasetSchema: # gebieden_schema is listed as dependency to resolve relations diff --git a/tests/files/datasets/benk.json b/tests/files/datasets/benk.json deleted file mode 100644 index 2a3aede9..00000000 --- a/tests/files/datasets/benk.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "type": "dataset", - "id": "benk", - "title": "benk", - "status": "niet_beschikbaar", - "version": "1.0.0", - "owner": "Datateam Basis- en Kernregistraties", - "creator": "Datateam Basis- en Kernregistraties", - "publisher": "Datateam Basis- en Kernregistraties", - "authorizationGrantor": "Datateam Basis- en Kernregistraties", - "auth": "OPENBAAR", - "tables": [ - { - "id": "lasteventids", - "version": "1.0.0", - "type": "table", - "schema": { - "$id": "https://github.com/Amsterdam/schemas/benk/lasteventids.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "additionalProperties": false, - "identifier": "table", - "required": [ - "table", - "schema" - ], - "display": "table", - "properties": { - "schema": { - "$ref": "https://schemas.data.amsterdam.nl/schema@v1.1.1#/definitions/schema" - }, - "table": { - "type": "string", - "description": "Table to which this last event ID applies." - }, - "lastEventId": { - "type": "integer", - "description": "Last event ID that was processed." - } - } - } - } - ] -} diff --git a/tests/test_events_full.py b/tests/test_events_full.py deleted file mode 100644 index cb1d140a..00000000 --- a/tests/test_events_full.py +++ /dev/null @@ -1,1502 +0,0 @@ -"""Event tests.""" - -import json -from datetime import date, datetime - -import orjson -import pytest -from more_itertools import peekable - -from schematools.events.full import EventsProcessor - -# pytestmark = pytest.mark.skip("all tests disabled") - - -def _load_events_from_file_as_full_load_sequence(importer, events_path): - with open(events_path, "rb") as ef: - is_first = True - p = peekable(ef) - - for line in p: - event_id, event_meta_str, data_str = line.split(b"|", maxsplit=2) - event_meta = orjson.loads(event_meta_str) - event_meta |= { - "full_load_sequence": True, - "first_of_sequence": is_first, - "last_of_sequence": p.peek(None) is None, - } - event_data = orjson.loads(data_str) - importer.process_event( - event_meta, - event_data, - ) - is_first = False - - -def test_event_process_insert( - here, db_schema, tconn, local_metadata, gebieden_schema, benk_schema -): - """Prove that event gets inserted.""" - events_path = here / "files" / "data" / "bouwblokken.gobevents" - importer = EventsProcessor( - [gebieden_schema], - tconn, - local_metadata=local_metadata, - truncate=True, - benk_dataset=benk_schema, - ) - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_bouwblokken")] - assert len(records) == 2 - assert records[0]["code"] == "AA01" - assert records[1]["code"] == "AA02" - assert records[0]["eind_geldigheid"] is None - assert records[0]["begin_geldigheid"] == date(2006, 6, 12) - - -def test_event_process_insert_object( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Prove that event gets inserted correctly with object split in columns.""" - events_path = here / "files" / "data" / "peilmerken.gobevents" - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - truncate=True, - benk_dataset=benk_schema, - ) - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert records[0]["status_code"] == 3 - assert records[0]["status_omschrijving"] == "Vervallen" - - -def test_event_process_update(here, tconn, local_metadata, gebieden_schema, benk_schema): - """Prove that event gets updated.""" - events_path = here / "files" / "data" / "bouwblokken_update.gobevents" - importer = EventsProcessor( - [gebieden_schema], - tconn, - local_metadata=local_metadata, - truncate=True, - benk_dataset=benk_schema, - ) - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_bouwblokken")] - assert len(records) == 1 - assert records[0]["code"] == "AA01" - assert records[0]["begin_geldigheid"] == date(2020, 2, 5) - assert records[0]["registratiedatum"] == datetime(2020, 2, 5, 15, 6, 43) - - -def test_event_process_delete(here, tconn, local_metadata, gebieden_schema, benk_schema): - """Prove that event gets deleted.""" - events_path = here / "files" / "data" / "bouwblokken_delete.gobevents" - importer = EventsProcessor( - [gebieden_schema], - tconn, - local_metadata=local_metadata, - truncate=True, - benk_dataset=benk_schema, - ) - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_bouwblokken")] - assert len(records) == 1 - assert records[0]["code"] == "AA01" - - -def test_event_process_nm_relation_delete( - here, tconn, local_metadata, gebieden_schema, salogger, benk_schema -): - """Prove that NM relations of an event get deleted.""" - events_path = ( - here / "files" / "data" / "gebieden_ggwgebieden_bestaat_uit_buurten_delete.gobevents" - ) - importer = EventsProcessor( - [gebieden_schema], - tconn, - local_metadata=local_metadata, - truncate=True, - benk_dataset=benk_schema, - ) - importer.load_events_from_file(events_path) - records = [ - dict(r) for r in tconn.execute("SELECT * FROM gebieden_ggwgebieden_bestaat_uit_buurten") - ] - assert len(records) == 0 - - -def test_event_process_relation_update_parent_table( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - events_path = here / "files" / "data" / "peilmerken.gobevents" - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - - # Imported objects without relations - assert len(records) == 1 - assert records[0]["ligt_in_bouwblok_id"] is None - assert records[0]["ligt_in_bouwblok_identificatie"] is None - assert records[0]["ligt_in_bouwblok_volgnummer"] is None - - events_path = here / "files" / "data" / "peilmerken_ligt_in_bouwblok.gobevents" - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - rel_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken_ligt_in_bouwblok")] - parent_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - - # Should have inserted the rel and updated relation columns in parent (object) table - assert len(rel_records) == 1 - assert len(parent_records) == 1 - - assert parent_records[0]["ligt_in_bouwblok_id"] == "03630012095746.1" - assert parent_records[0]["ligt_in_bouwblok_identificatie"] == "03630012095746" - assert parent_records[0]["ligt_in_bouwblok_volgnummer"] == 1 - - events_path = here / "files" / "data" / "peilmerken_ligt_in_bouwblok.delete.gobevents" - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - rel_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken_ligt_in_bouwblok")] - parent_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - - # Rel table row should be deleted, fields in parent table should be set to None again - assert len(rel_records) == 0 - assert len(parent_records) == 1 - assert records[0]["ligt_in_bouwblok_id"] is None - assert records[0]["ligt_in_bouwblok_identificatie"] is None - assert records[0]["ligt_in_bouwblok_volgnummer"] is None - - -def test_event_process_relation_skip_update_parent_table_nm_relations( - here, db_schema, tconn, local_metadata, gebieden_schema, benk_schema -): - events_path = here / "files" / "data" / "gebieden_ggwgebieden_bestaat_uit_buurten.gobevents" - importer = EventsProcessor( - [gebieden_schema], tconn, local_metadata=local_metadata, benk_dataset=benk_schema - ) - - # First import row in parent table - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_ggwgebieden")] - assert len(records) == 1 - - # Then import row in relation table and check that parent table is not updated and no error is - # raised - events_path = ( - here / "files" / "data" / "gebieden_ggwgebieden_bestaat_uit_buurten_reltable_add.gobevents" - ) - importer.load_events_from_file(events_path) - records = [ - dict(r) for r in tconn.execute("SELECT * FROM gebieden_ggwgebieden_bestaat_uit_buurten") - ] - assert len(records) == 1 - assert records[0]["ggwgebieden_id"] == "03630950000000.1" - assert records[0]["bestaat_uit_buurten_id"] == "03630023754008ADD.1" - - # Update row in relation table and check that parent table is not updated and no error is - # raised - events_path = ( - here - / "files" - / "data" - / "gebieden_ggwgebieden_bestaat_uit_buurten_reltable_modify.gobevents" - ) - importer.load_events_from_file(events_path) - records = [ - dict(r) for r in tconn.execute("SELECT * FROM gebieden_ggwgebieden_bestaat_uit_buurten") - ] - assert len(records) == 1 - assert records[0]["ggwgebieden_id"] == "03630950000000.1" - assert records[0]["bestaat_uit_buurten_id"] == "03630023754008MOD.1" - - -def test_event_process_relation_update_parent_table_shortname( - here, db_schema, tconn, local_metadata, gebieden_schema, benk_schema -): - """Tests updating of the parent table with a relation attribute with a shortname.""" - - # Import bouwblokken in object table - events_path = here / "files" / "data" / "bouwblokken.gobevents" - importer = EventsProcessor( - [gebieden_schema], tconn, local_metadata=local_metadata, benk_dataset=benk_schema - ) - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_bouwblokken")] - assert len(records) == 2 - - # Import relation table row event for relation with shortname - events_path = here / "files" / "data" / "bouwblokken_ligt_in_buurt_met_te_lange_naam.gobevents" - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM gebieden_bouwblokken")] - assert len(records) == 2 - - record = [r for r in records if r["identificatie"] == "03630012096976"][0] - assert record["lgt_in_brt_id"] == "03630023754008ADD.1" - assert record["lgt_in_brt_identificatie"] == "03630023754008ADD" - assert record["lgt_in_brt_volgnummer"] == 1 - - -def test_events_process_relation_without_table_update_parent_table( - here, db_schema, tconn, local_metadata, brk_schema_without_bag_relations, benk_schema -): - # First, verify that the relation table indeed does not exist - res = next( - tconn.execute( - "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' " - "AND tablename = 'brk_tenaamstellingen_van_kadastraalsubject');" - ) - ) - assert res[0] is False - - importer = EventsProcessor( - [brk_schema_without_bag_relations], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Import relation vanKadastraalsubject. Has no table, but we want to update the parent table - events_path = here / "files" / "data" / "tenaamstellingen.gobevents" - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_tenaamstellingen")] - assert len(records) == 2 - assert [(r["identificatie"], r["van_kadastraalsubject_id"]) for r in records] == [ - ("NL.IMKAD.Tenaamstelling.ajdkfl4j4", "NL.IMKAD.Persoon.1124ji44kd"), - ("NL.IMKAD.Tenaamstelling.adkfkadfkjld", "NL.IMKAD.Persoon.2f4802kkdd"), - ] - - # Test that parent table is updated - events_path = here / "files" / "data" / "tenaamstellingen_van_kadastraalsubject.gobevents" - importer.load_events_from_file(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_tenaamstellingen")] - assert len(records) == 2 - assert [(r["identificatie"], r["van_kadastraalsubject_id"]) for r in records] == [ - ("NL.IMKAD.Tenaamstelling.ajdkfl4j4", "NL.IMKAD.Persoon.20042004eeeeefjd"), - ("NL.IMKAD.Tenaamstelling.adkfkadfkjld", None), - ] - - -def test_events_process_relation_without_table_update_parent_table_full_load( - here, db_schema, tconn, local_metadata, brk_schema_without_bag_relations, benk_schema -): - # First, verify that the relation table indeed does not exist - res = next( - tconn.execute( - "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' " - "AND tablename = 'brk_tenaamstellingen_van_kadastraalsubject');" - ) - ) - assert res[0] is False - - importer = EventsProcessor( - [brk_schema_without_bag_relations], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Import relation vanKadastraalsubject. Has no table, but we want to update the parent table - events_path = here / "files" / "data" / "tenaamstellingen.gobevents" - _load_events_from_file_as_full_load_sequence(importer, events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_tenaamstellingen")] - assert len(records) == 2 - assert [(r["identificatie"], r["van_kadastraalsubject_id"]) for r in records] == [ - ("NL.IMKAD.Tenaamstelling.ajdkfl4j4", "NL.IMKAD.Persoon.1124ji44kd"), - ("NL.IMKAD.Tenaamstelling.adkfkadfkjld", "NL.IMKAD.Persoon.2f4802kkdd"), - ] - - # Test that parent table is updated - events = [ - ( - { - "event_type": "ADD", - "event_id": 1, - "dataset_id": "brk", - "table_id": "tenaamstellingen_vanKadastraalsubject", - "full_load_sequence": True, - "first_of_sequence": True, - "last_of_sequence": False, - }, - { - "id": 1, - "tenaamstellingen_id": "NL.IMKAD.Tenaamstelling.ajdkfl4j4.4", - "tenaamstellingen_identificatie": "NL.IMKAD.Tenaamstelling.ajdkfl4j4", - "tenaamstellingen_volgnummer": 4, - "van_kadastraalsubject_id": "NL.IMKAD.Persoon.20042004eeeeefjd", - "van_kadastraalsubject_identificatie": "NL.IMKAD.Persoon.20042004eeeeefjd", - }, - ), - ( - { - "event_type": "ADD", - "event_id": 2, - "dataset_id": "brk", - "table_id": "tenaamstellingen_vanKadastraalsubject", - "full_load_sequence": True, - "first_of_sequence": False, - "last_of_sequence": True, - }, - { - "id": 3, - "tenaamstellingen_id": "NL.IMKAD.Tenaamstelling.ajdkeeeefad.4", - "tenaamstellingen_identificatie": "NL.IMKAD.Tenaamstelling.ajdkeeeefad", - "tenaamstellingen_volgnummer": 4, - "van_kadastraalsubject_id": "NL.IMKAD.Persoon.20042004eeeeefjd", - "van_kadastraalsubject_identificatie": "NL.IMKAD.Persoon.20042004eeeeefjd", - }, - ), - ] - - importer.process_events(events, recovery_mode=False) - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_tenaamstellingen")] - assert len(records) == 2 - - # Full load does not update parent table, so result should be the same as above. This - # testcase is included though to make sure we don't get an error on the missing relation table. - assert [(r["identificatie"], r["van_kadastraalsubject_id"]) for r in records] == [ - ("NL.IMKAD.Tenaamstelling.ajdkfl4j4", "NL.IMKAD.Persoon.1124ji44kd"), - ("NL.IMKAD.Tenaamstelling.adkfkadfkjld", "NL.IMKAD.Persoon.2f4802kkdd"), - ] - - -def test_event_process_full_load_sequence( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Test consists of three parts: - - - First, load usual peilmerken events from peilmerken.gobevents. - - Then, load events from peilmerken_full_load_sequence_start.gobevents. This should - create the tmp table and not change the active table. - - Lastly, load events from peilmerken_full_load_sequence_end.gobevents. This should - end the full load sequence and - replace the active table with the tmp table. Also, the tmp table should be removed. - - The peilmerken identificaties in the *.gobevents files used are unique, so we can - check that the objects are in the expected tables. - """ - - def load_events(events_file): - events_path = here / "files" / "data" / events_file - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - - # 1. - load_events("peilmerken.gobevents") - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert len(records) == 1 - assert records[0]["identificatie"] == "70780001" - - # 2. - load_events("peilmerken_full_load_sequence_start.gobevents") - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert len(records) == 1 - assert records[0]["identificatie"] == "70780001" - - records = [dict(r) for r in tconn.execute("SELECT * FROM nap.nap_peilmerken_full_load")] - assert len(records) == 3 - assert records[0]["identificatie"] == "70780002" - assert records[1]["identificatie"] == "70780003" - assert records[2]["identificatie"] == "70780004" - - # 3. - load_events("peilmerken_full_load_sequence_end.gobevents") - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert len(records) == 4 - assert records[0]["identificatie"] == "70780002" - assert records[1]["identificatie"] == "70780003" - assert records[2]["identificatie"] == "70780004" - assert records[3]["identificatie"] == "70780005" - - res = next( - tconn.execute( - "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' " - "AND tablename = 'nap_peilmerken_full_load');" - ) - ) - assert res[0] is False - - -def test_event_process_geometry_attrs( - here, db_schema, tconn, local_metadata, brk_schema_without_bag_relations, benk_schema -): - events_path = here / "files" / "data" / "kadastraleobjecten_geometry.gobevents" - - importer = EventsProcessor( - [brk_schema_without_bag_relations], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_kadastraleobjecten")] - - assert len(records) == 1 - record = records[0] - assert record["bijpijling_geometrie"] is not None - assert record["geometrie"] is not None - - -def _create_peilmerken_event(id: str, jaar: int, type: str = "ADD", **kwargs) -> tuple[dict, dict]: - """Creates a peilmerken event. Use 'jaar' to make sure the event is unique.""" - - return ( - { - "event_type": type, - "dataset_id": "nap", - "table_id": "peilmerken", - **kwargs, - }, - { - "identificatie": id, - "hoogte_tov_nap": -2.6954, - "jaar": jaar, - "merk": { - "code": "7", - "omschrijving": "Bijzondere merktekens bijvoorbeeld zeskantige bout, " - "stalen pen etc.", - }, - "omschrijving": "Gemaal aan de Ringvaart gelegen aan de Schipholweg. Vervallen?", - "windrichting": "Z", - "x_coordinaat_muurvlak": 84, - "y_coordinaat_muurvlak": 37, - "rws_nummer": "25D0039", - "geometrie": "01010000204071000000000000C05FFC400000000050601D41", - "status": {"code": 3, "omschrijving": "Vervallen"}, - "vervaldatum": "2018-04-23", - "ligt_in_bouwblok_id": None, - "ligt_in_bouwblok_identificatie": None, - "ligt_in_bouwblok_volgnummer": None, - "publiceerbaar": False, - }, - ) - - -def _assert_have_peilmerken( - id_years: list[tuple[str, int]], tconn, check_full_load_table: bool = False -): - tablename = "nap_peilmerken" if not check_full_load_table else "nap.nap_peilmerken_full_load" - records = [dict(r) for r in tconn.execute(f"SELECT * FROM {tablename}")] # noqa: S608 - assert len(records) == len(id_years) - - for record, (id, year) in zip(records, id_years): - assert record["identificatie"] == id - assert record["jaar"] == year - - -def _import_assert_result_expect_exception( - importer, events, expected_result, recovery_mode=False, check_full_load_table=False -): - with pytest.raises(Exception): - importer.process_events(events, recovery_mode) - _assert_have_peilmerken(expected_result, importer.conn, check_full_load_table) - - -def _import_assert_result( - importer, events, expected_result, recovery_mode=False, check_full_load_table=False -): - importer.process_events(events, recovery_mode) - _assert_have_peilmerken(expected_result, importer.conn, check_full_load_table) - - -def test_event_process_recovery_regular( - db_schema, engine, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Tests adding of regular events (that are not part of a full load sequence) with - and without recovery mode enabled. - - Initialise database with some rows. - - Try to add rows: - - 1. One that already exists, with recovery_mode = False. Should raise an error. - - 2. One that doesn't exist, with recovery_mode = False. Should be added. - - 3. One that already exists, with recovery_mode = True. Should be ignored. - - 4. One that doesn't exist, with recovery_mode = True. Should be added. - """ - with engine.connect() as conn: - importer = EventsProcessor( - [nap_schema, gebieden_schema], - conn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Init - _import_assert_result( - importer, - [ - _create_peilmerken_event("1", 2018, event_id=1), - _create_peilmerken_event("2", 2019, event_id=2), - ], - [("1", 2018), ("2", 2019)], - ) - - # 1. One that already exists, with recovery_mode = False. Should raise an error. - _import_assert_result_expect_exception( - importer, [_create_peilmerken_event("1", 2017, event_id=3)], [("1", 2018), ("2", 2019)] - ) - - # 2. One that doesn't exist, with recovery_mode = False. Should be added. - _import_assert_result( - importer, - [_create_peilmerken_event("3", 2020, event_id=4)], - [("1", 2018), ("2", 2019), ("3", 2020)], - ) - - # 3. One that already exists, with recovery_mode = True. Should be ignored. - _import_assert_result( - importer, - [_create_peilmerken_event("3", 2019, event_id=5)], - [("1", 2018), ("2", 2019), ("3", 2020)], - recovery_mode=True, - ) - - # 4. One that doesn't exist, with recovery_mode = True. Should be added. - _import_assert_result( - importer, - [_create_peilmerken_event("4", 2021, event_id=6)], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021)], - recovery_mode=True, - ) - conn.close() - - -def test_event_process_recovery_full_load_first( - db_schema, engine, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Tests adding of events that are part of a full load sequence and where the first - event is the first in the sequence. - - Initialise full load with some rows. - - Try to add multiple rows, where the first one: - - Already exists, with recovery_mode = False. New rows should be added. - - Doesn't exist, with recovery_mode = False. New rows should be added. - - Already exists, with recovery_mode = True. New rows should be added. - - Doesn't exist, with recovery_mode = True. New rows should be added. - - Note: When the first event to handle in this case is the first in the sequence, the - _full_load table is always truncated. This means that in every case we replace the - table. - - """ - with engine.connect() as conn: - importer = EventsProcessor( - [nap_schema, gebieden_schema], - conn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Init - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "1", 2018, full_load_sequence=True, first_of_sequence=True, event_id=1 - ), - _create_peilmerken_event("2", 2019, full_load_sequence=True, event_id=2), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - - # 1. Already exists, with recovery_mode = False. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "1", 2020, full_load_sequence=True, first_of_sequence=True, event_id=3 - ), - _create_peilmerken_event("2", 2021, full_load_sequence=True, event_id=4), - ], - [("1", 2020), ("2", 2021)], - check_full_load_table=True, - ) - - # 2. Doesn't exist, with recovery_mode = False. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "3", 2020, full_load_sequence=True, first_of_sequence=True, event_id=5 - ), - _create_peilmerken_event("4", 2021, full_load_sequence=True, event_id=6), - ], - [("3", 2020), ("4", 2021)], - check_full_load_table=True, - ) - - # 3. Already exists, with recovery_mode = True. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "3", 2022, full_load_sequence=True, first_of_sequence=True, event_id=7 - ), - _create_peilmerken_event("4", 2023, full_load_sequence=True, event_id=8), - ], - [("3", 2022), ("4", 2023)], - check_full_load_table=True, - recovery_mode=True, - ) - - # 4. Doesn't exist, with recovery_mode = True. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "5", 2020, full_load_sequence=True, first_of_sequence=True, event_id=9 - ), - _create_peilmerken_event("6", 2021, full_load_sequence=True, event_id=10), - ], - [("5", 2020), ("6", 2021)], - check_full_load_table=True, - recovery_mode=True, - ) - - -def test_event_process_recovery_full_load_no_first_no_last( - db_schema, engine, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Tests adding of events that are part of a full load sequence and where events to - be added are neither the first nor the last in the sequence. - - Initialise full load with some rows. - - Try to add multiple rows, where the first one: - - Already exists, with recovery_mode = False. Should raise an error. - - Doesn't exist, with recovery_mode = False. New rows should be added. - - Already exists, with recovery_mode = True. Rows should be ignored. - - Doesn't exist, with recovery_mode = True. New rows should be added. - - """ - with engine.connect() as conn: - importer = EventsProcessor( - [nap_schema, gebieden_schema], - conn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Init - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "1", 2018, full_load_sequence=True, first_of_sequence=True, event_id=1 - ), - _create_peilmerken_event("2", 2019, full_load_sequence=True, event_id=2), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - - # 1. Already exists, with recovery_mode = False. Should raise an error. - _import_assert_result_expect_exception( - importer, - [ - _create_peilmerken_event("1", 2022, full_load_sequence=True, event_id=3), - _create_peilmerken_event("2", 2023, full_load_sequence=True, event_id=4), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - - # 2. Doesn't exist, with recovery_mode = False. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event("3", 2020, full_load_sequence=True, event_id=5), - _create_peilmerken_event("4", 2021, full_load_sequence=True, event_id=6), - ], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021)], - check_full_load_table=True, - ) - - # 3. Already exists, with recovery_mode = True. Rows should be ignored. - _import_assert_result( - importer, - [ - _create_peilmerken_event("1", 2022, full_load_sequence=True, event_id=7), - _create_peilmerken_event("2", 2023, full_load_sequence=True, event_id=8), - ], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021)], - check_full_load_table=True, - recovery_mode=True, - ) - - # 4. Doesn't exist, with recovery_mode = True. New rows should be added. - _import_assert_result( - importer, - [ - _create_peilmerken_event("5", 2022, full_load_sequence=True, event_id=9), - _create_peilmerken_event("6", 2023, full_load_sequence=True, event_id=10), - ], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021), ("5", 2022), ("6", 2023)], - check_full_load_table=True, - recovery_mode=True, - ) - - -def test_event_process_recovery_full_load_last_table_empty( - db_schema, engine, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Tests adding of events that are part of a full load sequence and where the last - event is the last in the sequence and the _full_load table is empty. - - No full load initialisation; table should be empty - - Try to add multiple rows, where the first one: - - Doesn't exist, with recovery_mode = False. New rows should be added and object table - replaced. - - Doesn't exist, with recovery_mode = True. Everything in this message should be ignored - - """ - with engine.connect() as conn: - importer = EventsProcessor( - [nap_schema, gebieden_schema], - conn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - def _init_empty_full_load_table(importer): - # Init and truncate right after to get inconsistent state - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "1", 2018, full_load_sequence=True, first_of_sequence=True, event_id=1 - ), - _create_peilmerken_event("2", 2019, full_load_sequence=True, event_id=2), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - importer.conn.execute("TRUNCATE TABLE nap.nap_peilmerken_full_load") - - # 1. Doesn't exist, with recovery_mode = False. New rows should be added and object - # table replaced. - _init_empty_full_load_table(importer) - _import_assert_result( - importer, - [ - _create_peilmerken_event("1", 2018, full_load_sequence=True, event_id=3), - _create_peilmerken_event( - "2", 2019, full_load_sequence=True, last_of_sequence=True, event_id=4 - ), - ], - [ - ("1", 2018), - ("2", 2019), - ], - ) - - # 2. Doesn't exist, with recovery_mode = True. Everything in this message should - # be ignored - _init_empty_full_load_table(importer) - _import_assert_result( - importer, - [ - _create_peilmerken_event("3", 2018, full_load_sequence=True, event_id=5), - _create_peilmerken_event( - "4", 2019, full_load_sequence=True, last_of_sequence=True, event_id=6 - ), - ], - [ - ("1", 2018), - ("2", 2019), - ], - recovery_mode=True, - ) - - -def test_event_process_recovery_full_load_last_table_not_empty( - db_schema, engine, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """Tests adding of events that are part of a full load sequence and where the last event is - the last in the sequence and the _full_load table is not empty. - - Initialise full load with some rows. - - Try to add multiple rows, where the first one: - - Already exists, with recovery_mode = False. Should raise an error. - - Doesn't exist, with recovery_mode = False. New rows should be added and object table - replaced. - - Already exists, with recovery_mode = True. Rows should be ignored. Object table replaced. - - Doesn't exist, with recovery_mode = True. Rows should be added. Object table replaced. - - """ - with engine.connect() as conn: - importer = EventsProcessor( - [nap_schema, gebieden_schema], - conn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - def _init_full_load_table(importer): - for table in ["nap_peilmerken_full_load", "benk_lasteventids"]: - try: - conn.execute(f"TRUNCATE TABLE {table}") # noqa: S608 - except Exception: # noqa: S110 - pass - importer.lasteventids.clear_cache() - - _import_assert_result( - importer, - [ - _create_peilmerken_event( - "1", 2018, full_load_sequence=True, first_of_sequence=True, event_id=1 - ), - _create_peilmerken_event("2", 2019, full_load_sequence=True, event_id=2), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - - # 1. Already exists, with recovery_mode = False. Should raise an error. - _init_full_load_table(importer) - _import_assert_result_expect_exception( - importer, - [ - _create_peilmerken_event("1", 2022, full_load_sequence=True, event_id=3), - _create_peilmerken_event( - "2", 2023, full_load_sequence=True, last_of_sequence=True, event_id=4 - ), - ], - [("1", 2018), ("2", 2019)], - check_full_load_table=True, - ) - - # 2. Doesn't exist, with recovery_mode = False. New rows should be added and - # object table replaced. - _init_full_load_table(importer) - _import_assert_result( - importer, - [ - _create_peilmerken_event("3", 2020, full_load_sequence=True, event_id=5), - _create_peilmerken_event( - "4", 2021, full_load_sequence=True, last_of_sequence=True, event_id=6 - ), - ], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021)], - ) - - # 3. Already exists, with recovery_mode = True. Rows should be ignored. Object - # table replaced. - _init_full_load_table(importer) - _import_assert_result( - importer, - [ - _create_peilmerken_event("1", 2023, full_load_sequence=True, event_id=7), - _create_peilmerken_event( - "2", 2024, full_load_sequence=True, last_of_sequence=True, event_id=8 - ), - ], - [("1", 2018), ("2", 2019)], - recovery_mode=True, - ) - - # 4. Doesn't exist, with recovery_mode = True. Rows should be added. Object table - # replaced. - _init_full_load_table(importer) - _import_assert_result( - importer, - [ - _create_peilmerken_event("3", 2020, full_load_sequence=True, event_id=9), - _create_peilmerken_event( - "4", 2021, full_load_sequence=True, last_of_sequence=True, event_id=10 - ), - ], - [("1", 2018), ("2", 2019), ("3", 2020), ("4", 2021)], - recovery_mode=True, - ) - - -def test_event_process_last_event_id( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - def get_last_event_id(tablename: str = "nap_peilmerken"): - res = tconn.execute( - f"SELECT last_event_id FROM benk_lasteventids " # noqa: S608 # nosec: B608 - f"WHERE \"table\"='{tablename}'" - ).fetchone() - - return res[0] if res is not None else None - - importer = EventsProcessor( - [nap_schema, gebieden_schema, benk_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # 1. Assert start state - assert get_last_event_id() is None - - events = [ - _create_peilmerken_event("1", 2018, event_id=203), - _create_peilmerken_event("2", 2019, event_id=210), - ] - importer.process_events(events) - - # 2. Add rows and assert they exist - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert [2018, 2019] == [r["jaar"] for r in records] - assert get_last_event_id() == 210 - - events = [ - _create_peilmerken_event("2", 2020, type="MODIFY", event_id=211), - ] - importer.process_events(events) - - # 3. Assert event with newer ID is applied - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert [2018, 2020] == [r["jaar"] for r in records] - assert get_last_event_id() == 211 - - events = [ - _create_peilmerken_event("1", 2021, type="MODIFY", event_id=204), - _create_peilmerken_event("2", 2021, type="MODIFY", event_id=211), - ] - importer.process_events(events) - - # 4. Assert event with older IDs are ignored - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert [2018, 2020] == [r["jaar"] for r in records] - assert get_last_event_id() == 211 - - -def test_event_process_last_event_id_full_load_sequence( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - def get_last_event_id(tablename: str = "nap_peilmerken"): - res = tconn.execute( - f"SELECT last_event_id FROM benk_lasteventids " # noqa: S608 - f"WHERE \"table\"='{tablename}'" # noqa: S608 - ).fetchone() - - return res[0] if res is not None else None - - importer = EventsProcessor( - [nap_schema, gebieden_schema, benk_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # 1. Assert start state - assert get_last_event_id("nap_peilmerken") is None - assert get_last_event_id("nap_peilmerken_full_load") is None - - events = [ - _create_peilmerken_event( - "1", 2018, event_id=203, full_load_sequence=True, first_of_sequence=True - ), - _create_peilmerken_event("2", 2019, event_id=210, full_load_seuqence=True), - ] - importer.process_events(events) - - # 2. Add rows and assert they exist - records = [dict(r) for r in tconn.execute("SELECT * FROM nap.nap_peilmerken_full_load")] - assert [2018, 2019] == [r["jaar"] for r in records] - assert get_last_event_id("nap_peilmerken") is None - assert get_last_event_id("nap_peilmerken_full_load") == 210 - - events = [ - _create_peilmerken_event("3", 2020, type="ADD", event_id=212, full_load_sequence=True), - ] - importer.process_events(events) - - # 3. Assert event with newer ID is applied - records = [dict(r) for r in tconn.execute("SELECT * FROM nap.nap_peilmerken_full_load")] - assert [2018, 2019, 2020] == [r["jaar"] for r in records] - assert get_last_event_id("nap_peilmerken") is None - assert get_last_event_id("nap_peilmerken_full_load") == 212 - - events = [ - _create_peilmerken_event("4", 2021, type="ADD", event_id=204, full_load_sequence=True), - _create_peilmerken_event("5", 2021, type="ADD", event_id=211, full_load_sequence=True), - ] - importer.process_events(events) - - # 4. Assert event with older IDs are ignored - records = [dict(r) for r in tconn.execute("SELECT * FROM nap.nap_peilmerken_full_load")] - assert [2018, 2019, 2020] == [r["jaar"] for r in records] - assert get_last_event_id("nap_peilmerken") is None - assert get_last_event_id("nap_peilmerken_full_load") == 212 - - # 5. End full load. Table should be replaced and last_event_id copied to main table and reset - events = [ - _create_peilmerken_event("4", 2021, type="ADD", event_id=213, full_load_sequence=True), - _create_peilmerken_event( - "5", 2022, type="ADD", event_id=217, full_load_sequence=True, last_of_sequence=True - ), - ] - importer.process_events(events) - - # 4. Assert event with older IDs are ignored - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - assert [2018, 2019, 2020, 2021, 2022] == [r["jaar"] for r in records] - assert get_last_event_id("nap_peilmerken") == 217 - assert get_last_event_id("nap_peilmerken_full_load") is None - - -def test_events_process_full_load_sequence_snake_cased_schema( - here, db_schema, tconn, local_metadata, brk2_simple_schema, benk_schema -): - """Tests whether the correct (snake_cased) schema for brk2 is used for the full load.""" - - event_meta = { - "event_type": "ADD", - "event_id": 1, - "dataset_id": "brk2", - "table_id": "gemeentes", - "full_load_sequence": True, - "first_of_sequence": True, - } - event_data = {"identificatie": "0363", "naam": "Amsterdam"} - - importer = EventsProcessor( - [brk2_simple_schema], tconn, local_metadata=local_metadata, benk_dataset=benk_schema - ) - importer.process_event(event_meta, event_data) - - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_2.brk_2_gemeentes_full_load")] - assert len(records) == 1 - assert records[0]["identificatie"] == "0363" - assert records[0]["naam"] == "Amsterdam" - - event_meta = { - "event_type": "ADD", - "event_id": 2, - "dataset_id": "brk2", - "table_id": "gemeentes", - "full_load_sequence": True, - "last_of_sequence": True, - } - event_data = { - "identificatie": "0457", - "naam": "Weesp", - } - importer.process_event(event_meta, event_data) - - records = [dict(r) for r in tconn.execute("SELECT * FROM brk_2_gemeentes")] - assert len(records) == 2 - assert records[0]["identificatie"] == "0363" - assert records[0]["naam"] == "Amsterdam" - assert records[1]["identificatie"] == "0457" - assert records[1]["naam"] == "Weesp" - - -def test_events_process_full_load_relation_update_parent_table( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - events_path = here / "files" / "data" / "peilmerken.gobevents" - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - importer.load_events_from_file_using_bulk(events_path) - records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - - # Imported objects without relations - assert len(records) == 1 - assert records[0]["ligt_in_bouwblok_id"] is None - assert records[0]["ligt_in_bouwblok_identificatie"] is None - assert records[0]["ligt_in_bouwblok_volgnummer"] is None - - event_meta = { - "event_type": "ADD", - "event_id": 1, - "dataset_id": "nap", - "table_id": "peilmerken_ligtInBouwblok", - "full_load_sequence": True, - "first_of_sequence": True, - "last_of_sequence": True, - } - event_data = { - "id": 1, - "peilmerken_id": "70780001", - "peilmerken_identificatie": "70780001", - "ligt_in_bouwblok_id": "03630012095746.1", - "ligt_in_bouwblok_identificatie": "03630012095746", - "ligt_in_bouwblok_volgnummer": 1, - } - - importer.process_event(event_meta, event_data) - - rel_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken_ligt_in_bouwblok")] - parent_records = [dict(r) for r in tconn.execute("SELECT * FROM nap_peilmerken")] - - # Should have updated relation columns in parent (object) table - assert len(rel_records) == 1 - assert len(parent_records) == 1 - - assert parent_records[0]["ligt_in_bouwblok_id"] == "03630012095746.1" - assert parent_records[0]["ligt_in_bouwblok_identificatie"] == "03630012095746" - assert parent_records[0]["ligt_in_bouwblok_volgnummer"] == 1 - - -def load_json_results_file(location): - class _JSONDecoder(json.JSONDecoder): - """Custom JSON decoder that converts date(time) strings to date(time) objects.""" - - def __init__(self, *args, **kwargs): - json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) - - def object_hook(self, obj): - ret = {} - for key, value in obj.items(): - if key in ("begin_geldigheid", "eind_geldigheid") and value is not None: - if len(value) == 10: - ret[key] = date.fromisoformat(value) - else: - ret[key] = datetime.fromisoformat(value) - else: - ret[key] = value - return ret - - with open(location) as f: - return json.load(f, cls=_JSONDecoder) - - -def delete_id(d: dict): - return {k: v for k, v in d.items() if k != "id"} - - -def delete_ids(lst: list[dict]): - return [delete_id(d) for d in lst] - - -def assert_results(tconn, expected_results: dict, testname: str): - for table_name, expected_result in expected_results.items(): - records = [dict(r) for r in tconn.execute(f"SELECT * FROM {table_name}")] # noqa: S608 - records = delete_ids(records) - - assert len(records) == len( - expected_result - ), f"Number of records in {table_name} does not match for test {testname}" - for res in expected_result: - assert res in records, f"Record {res} not found in {table_name} for test {testname}" - for rec in records: - assert ( - rec in expected_result - ), f"Unexpected record {rec} found in {table_name} for test {testname}" - - -def test_events_nested_table( - here, db_schema, tconn, local_metadata, bag_verblijfsobjecten_schema, benk_schema -): - expected_results = load_json_results_file( - here / "files" / "data" / "expect" / "events_nested_table.json" - ) - - importer = EventsProcessor( - [bag_verblijfsobjecten_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Load initial data with nested objects - events_path = here / "files" / "data" / "verblijfsobjecten.gobevents" - importer.load_events_from_file(events_path) - assert_results(tconn, expected_results["initial_add"], "Load initial data") - - # Modify nested objects - events_path = here / "files" / "data" / "verblijfsobjecten.modify_nested.gobevents" - importer.load_events_from_file(events_path) - assert_results(tconn, expected_results["modify_nested"], "Modify nested objects") - - # Empty nested objects - events_path = here / "files" / "data" / "verblijfsobjecten.empty_nested.gobevents" - importer.load_events_from_file(events_path) - assert_results(tconn, expected_results["empty_nested"], "Remove nested objects") - - # Modify nested objects again - events_path = here / "files" / "data" / "verblijfsobjecten.modify_nested_2.gobevents" - importer.load_events_from_file(events_path) - assert_results(tconn, expected_results["modify_nested"], "Modify nested objects again") - - # Delete full object - events_path = here / "files" / "data" / "verblijfsobjecten.delete.gobevents" - importer.load_events_from_file(events_path) - assert_results(tconn, expected_results["delete"], "Delete everything") - - # Now test full load. Full load only works with ADD events, so we reuse the ADD event from - # above and add the full load metadata. - events_path = here / "files" / "data" / "verblijfsobjecten.gobevents" - _load_events_from_file_as_full_load_sequence(importer, events_path) - - assert_results(tconn, expected_results["initial_add"], "Load initial data using full load") - - -def test_full_load_shortnames( - here, db_schema, tconn, local_metadata, hr_simple_schema, benk_schema -): - importer = EventsProcessor( - [hr_simple_schema], tconn, local_metadata=local_metadata, benk_dataset=benk_schema - ) - - # First import an object with nested objects - events = [ - ( - { - "event_type": "ADD", - "event_id": 1, - "dataset_id": "hr", - "table_id": "maatschappelijkeactiviteiten", - "full_load_sequence": True, - "first_of_sequence": True, - "last_of_sequence": True, - }, - { - "kvknummer": 42, - "email_adressen": [ - { - "email_adres": "address1@example.com", - }, - { - "email_adres": "address2@example.com", - }, - ], - }, - ) - ] - importer.process_events(events) - - # Not testing contents here, but merely the fact that the right tables are used without errors - records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac")] - assert len(records) == 1 - assert records[0]["heeft_hoofdvestiging_id"] is None - - nested_records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac_email_adressen")] - assert len(nested_records) == 2 - assert nested_records[0]["parent_id"] == "42" - assert nested_records[0]["email_adres"] == "address1@example.com" - - # Now test adding a relation object that references a parent table with short name - events = [ - ( - { - "dataset_id": "hr", - "table_id": "maatschappelijkeactiviteiten_heeftHoofdvestiging", - "event_type": "ADD", - "event_id": 1658565091, - "tid": "42.AMSBI.24902480", - "generated_timestamp": "2023-10-05T09:59:05.314873", - "full_load_sequence": True, - "first_of_sequence": True, - "last_of_sequence": True, - }, - { - "mac_kvknummer": "42", - "mac_id": "42", - "heeft_hoofdvestiging_vestigingsnummer": "24902480", - "heeft_hoofdvestiging_id": "24902480", - "begin_geldigheid": None, - "eind_geldigheid": None, - "id": 457172, - }, - ) - ] - - importer.process_events(events) - rel_records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac_heeft_hoofdvestiging")] - assert len(rel_records) == 1 - assert rel_records[0]["id"] == 457172 - assert rel_records[0]["mac_id"] == "42" - assert rel_records[0]["heeft_hoofdvestiging_id"] == "24902480" - - records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac")] - assert len(records) == 1 - assert records[0]["heeft_hoofdvestiging_id"] == "24902480" - - -def test_full_load_shortnames_update( - here, db_schema, tconn, local_metadata, hr_simple_schema, benk_schema -): - importer = EventsProcessor( - [hr_simple_schema], tconn, local_metadata=local_metadata, benk_dataset=benk_schema - ) - - # First import an object with nested objects - events = [ - ( - { - "event_type": "ADD", - "event_id": 1, - "dataset_id": "hr", - "table_id": "maatschappelijkeactiviteiten", - }, - { - "kvknummer": 42, - "email_adressen": [ - { - "email_adres": "address1@example.com", - }, - { - "email_adres": "address2@example.com", - }, - ], - }, - ) - ] - importer.process_events(events) - - # Not testing contents here, but merely the fact that the right tables are used without errors - records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac")] - assert len(records) == 1 - assert records[0]["heeft_hoofdvestiging_id"] is None - - nested_records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac_email_adressen")] - assert len(nested_records) == 2 - assert nested_records[0]["parent_id"] == "42" - assert nested_records[0]["email_adres"] == "address1@example.com" - - # Now test adding a relation object that references a parent table with short name - events = [ - ( - { - "dataset_id": "hr", - "table_id": "maatschappelijkeactiviteiten_heeftHoofdvestiging", - "event_type": "ADD", - "event_id": 1658565091, - "tid": "42.AMSBI.24902480", - "generated_timestamp": "2023-10-05T09:59:05.314873", - }, - { - "mac_kvknummer": "42", - "mac_id": "42", - "heeft_hoofdvestiging_vestigingsnummer": "24902480", - "heeft_hoofdvestiging_id": "24902480", - "begin_geldigheid": None, - "eind_geldigheid": None, - "id": 457172, - }, - ) - ] - - importer.process_events(events) - rel_records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac_heeft_hoofdvestiging")] - assert len(rel_records) == 1 - assert rel_records[0]["id"] == 457172 - assert rel_records[0]["mac_id"] == "42" - assert rel_records[0]["heeft_hoofdvestiging_id"] == "24902480" - - records = [dict(r) for r in tconn.execute("SELECT * FROM hr_mac")] - assert len(records) == 1 - assert records[0]["heeft_hoofdvestiging_id"] == "24902480" - - -def test_reset_lasteventid_after_incomplete_full_load( - here, db_schema, tconn, local_metadata, nap_schema, gebieden_schema, benk_schema -): - """This testcase tests whether the lasteventid is reset after an incomplete full load sequence. - This should not happen during normal usage, but can happen when the full load stream is - manually removed from the queue. - """ - importer = EventsProcessor( - [nap_schema, gebieden_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - events = [ - _create_peilmerken_event( - "1", 2018, event_id=3, full_load_sequence=True, first_of_sequence=True - ), - _create_peilmerken_event( - "2", 2018, event_id=4, full_load_sequence=True, first_of_sequence=False - ), - ] - - importer.process_events(events) - lasteventrecord = tconn.execute( - "SELECT * FROM benk_lasteventids WHERE \"table\" = 'nap_peilmerken_full_load'" - ).fetchone() - assert lasteventrecord["last_event_id"] == 4 - - events = [ - _create_peilmerken_event( - "1", 2018, event_id=1, full_load_sequence=True, first_of_sequence=True - ), - ] - importer.process_events(events) - lasteventrecord = tconn.execute( - "SELECT * FROM benk_lasteventids WHERE \"table\" = 'nap_peilmerken_full_load'" - ).fetchone() - assert lasteventrecord["last_event_id"] == 1 - - -def test_avoid_duplicate_key_after_full_load( - here, db_schema, tconn, local_metadata, bag_verblijfsobjecten_schema, benk_schema -): - """Make sure we don't get duplicate key errors after a full load sequence with a serial id - field in the table.""" - - def create_event(gebruiksdoel_cnt: int, event_id: int, identificatie: str, **extra_headers): - gebruiksdoelen = [ - {"code": i, "omschrijving": f"doel {i}"} for i in range(1, gebruiksdoel_cnt + 1) - ] - return ( - { - "event_type": "ADD", - "event_id": event_id, - "dataset_id": "bag", - "table_id": "verblijfsobjecten", - **extra_headers, - }, - { - "identificatie": identificatie, - "volgnummer": 1, - "gebruiksdoel": gebruiksdoelen, - "toegang": None, - "ligt_in_buurt": {}, - "begin_geldigheid": "2018-10-22T00:00:00.000000", - "eind_geldigheid": None, - }, - ) - - importer = EventsProcessor( - [bag_verblijfsobjecten_schema], - tconn, - local_metadata=local_metadata, - benk_dataset=benk_schema, - ) - - # Add objects with in total 4 nested objects - full_load_events = [ - create_event(2, 1, "VB1", full_load_sequence=True, first_of_sequence=True), - create_event(2, 2, "VB2", full_load_sequence=True, last_of_sequence=True), - ] - importer.process_events(full_load_events) - - update_event = [create_event(1, 3, "VB3")] - importer.process_events(update_event)