From f92994e85e526502a620506b964665b9afd385fe Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 13 Aug 2024 15:03:50 +0200 Subject: [PATCH] Fix tracing existing entries when there are deletes (#1046) --- pyiceberg/table/__init__.py | 13 ++-- tests/integration/test_writes/test_writes.py | 82 +++++++++++++++++++- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 98bb88557c..f48e6f2f7b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -62,7 +62,7 @@ Reference, ) from pyiceberg.expressions.visitors import ( - ROWS_CANNOT_MATCH, + ROWS_MIGHT_NOT_MATCH, ROWS_MUST_MATCH, _InclusiveMetricsEvaluator, _StrictMetricsEvaluator, @@ -3360,13 +3360,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> existing_entries = [] for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: + # Based on the metadata, it can be dropped right away deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) self._deleted_data_files.add(entry.data_file) - elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH: - existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) else: - # Based on the metadata, it is unsure to say if the file can be deleted - partial_rewrites_needed = True + # Based on the metadata, we cannot determine if it can be deleted + existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH: + partial_rewrites_needed = True if len(deleted_entries) > 0: total_deleted_entries += deleted_entries @@ -3383,8 +3384,6 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> for existing_entry in existing_entries: writer.add_entry(existing_entry) existing_manifests.append(writer.to_manifest_file()) - # else: - # deleted_manifests.append() else: existing_manifests.append(manifest_file) else: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 0716862806..53bb25e8cc 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -23,6 +23,7 @@ from typing import Any, Dict from urllib.parse import urlparse +import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq @@ -38,13 +39,20 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import In +from pyiceberg.expressions import GreaterThanOrEqual, In, Not from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import TableProperties -from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import IntegerType, LongType, NestedField, StringType +from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.types import ( + DateType, + DoubleType, + IntegerType, + LongType, + NestedField, + StringType, +) from utils import _create_table @@ -1331,3 +1339,71 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None: tbl.overwrite(data, In("id", ["1", "2", "3"])) assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_threshold() -> None: + catalog = load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + schema = Schema( + NestedField(field_id=101, name="id", field_type=LongType(), required=True), + NestedField(field_id=103, name="created_at", field_type=DateType(), required=False), + NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False), + ) + + partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day")) + + try: + catalog.drop_table( + identifier="default.scores", + ) + except NoSuchTableError: + pass + + catalog.create_table( + identifier="default.scores", + schema=schema, + partition_spec=partition_spec, + ) + + # Parameters + num_rows = 100 # Number of rows in the dataframe + id_min, id_max = 1, 10000 + date_start, date_end = date(2024, 1, 1), date(2024, 2, 1) + + # Generate the 'id' column + id_column = np.random.randint(id_min, id_max, num_rows) + + # Generate the 'created_at' column as dates only + date_range = pd.date_range(start=date_start, end=date_end, freq="D") # Daily frequency for dates + created_at_column = np.random.choice(date_range, num_rows) # Convert to string (YYYY-MM-DD format) + + # Generate the 'relevancy_score' column with a peak around 0.1 + relevancy_score_column = np.random.beta(a=2, b=20, size=num_rows) # Adjusting parameters to peak around 0.1 + + # Create the dataframe + df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column}) + + iceberg_table = catalog.load_table("default.scores") + + # Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema + arrow_schema = iceberg_table.schema().as_arrow() + docs_table = pa.Table.from_pandas(df, schema=arrow_schema) + + # Append the data to the Iceberg table + iceberg_table.append(docs_table) + + delete_condition = GreaterThanOrEqual("relevancy_score", 0.1) + lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) + assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before + iceberg_table.delete(delete_condition) + assert len(iceberg_table.scan().to_arrow()) == lower_before