From 67e877303f417ac0a2b20db9752231b59fe1775c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 11 Sep 2024 22:42:09 +0200 Subject: [PATCH] Use the correct spec when rewiting existing manifests (#1157) * Use the correct spec when rewiting existing manifests Fixes #1108 * Rename test --- pyiceberg/table/update/snapshot.py | 2 +- tests/integration/test_writes/test_writes.py | 75 +++++++++++++++----- 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 8b8614db2..2b9354d26 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -545,7 +545,7 @@ def _existing_manifests(self) -> List[ManifestFile]: if any(entry.data_file not in found_deleted_data_files for entry in entries): with write_manifest( format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.spec(), + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index ce5b1474b..fc2746c61 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -18,7 +18,7 @@ import math import os import time -from datetime import date, datetime +from datetime import date, datetime, timedelta from pathlib import Path from typing import Any, Dict from urllib.parse import urlparse @@ -26,6 +26,7 @@ import numpy as np import pandas as pd import pyarrow as pa +import pyarrow.compute as pc import pyarrow.parquet as pq import pytest import pytz @@ -39,12 +40,12 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import GreaterThanOrEqual, In, Not +from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not from pyiceberg.io.pyarrow import _dataframe_to_data_files from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import TableProperties -from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( DateType, DoubleType, @@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None: @pytest.mark.integration -def test_delete_threshold() -> None: - catalog = load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, - ) - +def test_delete_threshold(session_catalog: Catalog) -> None: schema = Schema( NestedField(field_id=101, name="id", field_type=LongType(), required=True), NestedField(field_id=103, name="created_at", field_type=DateType(), required=False), @@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None: partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day")) try: - catalog.drop_table( + session_catalog.drop_table( identifier="default.scores", ) except NoSuchTableError: pass - catalog.create_table( + session_catalog.create_table( identifier="default.scores", schema=schema, partition_spec=partition_spec, @@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None: # Create the dataframe df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column}) - iceberg_table = catalog.load_table("default.scores") + iceberg_table = session_catalog.load_table("default.scores") # Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema arrow_schema = iceberg_table.schema().as_arrow() @@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None: assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before iceberg_table.delete(delete_condition) assert len(iceberg_table.scan().to_arrow()) == lower_before + + +@pytest.mark.integration +def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> None: + np.random.seed(876) + N = 1440 + d = { + "timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + timedelta(minutes=i) for i in range(N)]), + "category": pa.array([np.random.choice(["A", "B", "C"]) for _ in range(N)]), + "value": pa.array(np.random.normal(size=N)), + } + data = pa.Table.from_pydict(d) + + try: + session_catalog.drop_table( + identifier="default.test_error_table", + ) + except NoSuchTableError: + pass + + table = session_catalog.create_table( + "default.test_error_table", + schema=data.schema, + ) + + with table.update_spec() as update: + update.add_field("timestamp", transform=HourTransform()) + + table.append(data) + + with table.update_spec() as update: + update.add_field("category", transform=IdentityTransform()) + + data_ = data.filter( + (pc.field("category") == "A") + & (pc.field("timestamp") >= datetime(2023, 1, 1, 0)) + & (pc.field("timestamp") < datetime(2023, 1, 1, 1)) + ) + + table.overwrite( + df=data_, + overwrite_filter=And( + And( + GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 0).isoformat()), + LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()), + ), + EqualTo("category", "A"), + ), + )