Skip to content

Commit

Permalink
Use the correct spec when rewiting existing manifests (#1157)
Browse files Browse the repository at this point in the history
* Use the correct spec when rewiting existing manifests

Fixes #1108

* Rename test
  • Loading branch information
Fokko committed Sep 11, 2024
1 parent 4d23c55 commit 67e8773
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
if any(entry.data_file not in found_deleted_data_files for entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
Expand Down
75 changes: 57 additions & 18 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import math
import os
import time
from datetime import date, datetime
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
import pytz
Expand All @@ -39,12 +40,12 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
Expand Down Expand Up @@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:


@pytest.mark.integration
def test_delete_threshold() -> None:
catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

def test_delete_threshold(session_catalog: Catalog) -> None:
schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
Expand All @@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None:
partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))

try:
catalog.drop_table(
session_catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass

catalog.create_table(
session_catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
Expand All @@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None:
# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})

iceberg_table = catalog.load_table("default.scores")
iceberg_table = session_catalog.load_table("default.scores")

# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
Expand All @@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None:
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before


@pytest.mark.integration
def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> None:
np.random.seed(876)
N = 1440
d = {
"timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + timedelta(minutes=i) for i in range(N)]),
"category": pa.array([np.random.choice(["A", "B", "C"]) for _ in range(N)]),
"value": pa.array(np.random.normal(size=N)),
}
data = pa.Table.from_pydict(d)

try:
session_catalog.drop_table(
identifier="default.test_error_table",
)
except NoSuchTableError:
pass

table = session_catalog.create_table(
"default.test_error_table",
schema=data.schema,
)

with table.update_spec() as update:
update.add_field("timestamp", transform=HourTransform())

table.append(data)

with table.update_spec() as update:
update.add_field("category", transform=IdentityTransform())

data_ = data.filter(
(pc.field("category") == "A")
& (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
& (pc.field("timestamp") < datetime(2023, 1, 1, 1))
)

table.overwrite(
df=data_,
overwrite_filter=And(
And(
GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 0).isoformat()),
LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
),
EqualTo("category", "A"),
),
)

0 comments on commit 67e8773

Please sign in to comment.