Skip to content

Commit

Permalink
Fix tracing existing entries when there are deletes (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored and sungwy committed Aug 13, 2024
1 parent f73da80 commit f92994e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 10 deletions.
13 changes: 6 additions & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
Reference,
)
from pyiceberg.expressions.visitors import (
ROWS_CANNOT_MATCH,
ROWS_MIGHT_NOT_MATCH,
ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
_StrictMetricsEvaluator,
Expand Down Expand Up @@ -3360,13 +3360,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
# Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
else:
# Based on the metadata, it is unsure to say if the file can be deleted
partial_rewrites_needed = True
# Based on the metadata, we cannot determine if it can be deleted
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
partial_rewrites_needed = True

if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
Expand All @@ -3383,8 +3384,6 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
# else:
# deleted_manifests.append()
else:
existing_manifests.append(manifest_file)
else:
Expand Down
82 changes: 79 additions & 3 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Any, Dict
from urllib.parse import urlparse

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -38,13 +39,20 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import In
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import IntegerType, LongType, NestedField, StringType
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
IntegerType,
LongType,
NestedField,
StringType,
)
from utils import _create_table


Expand Down Expand Up @@ -1331,3 +1339,71 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
tbl.overwrite(data, In("id", ["1", "2", "3"]))

assert len(tbl.scan().to_arrow()) == 3


@pytest.mark.integration
def test_delete_threshold() -> None:
catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False),
)

partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))

try:
catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass

catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
)

# Parameters
num_rows = 100 # Number of rows in the dataframe
id_min, id_max = 1, 10000
date_start, date_end = date(2024, 1, 1), date(2024, 2, 1)

# Generate the 'id' column
id_column = np.random.randint(id_min, id_max, num_rows)

# Generate the 'created_at' column as dates only
date_range = pd.date_range(start=date_start, end=date_end, freq="D") # Daily frequency for dates
created_at_column = np.random.choice(date_range, num_rows) # Convert to string (YYYY-MM-DD format)

# Generate the 'relevancy_score' column with a peak around 0.1
relevancy_score_column = np.random.beta(a=2, b=20, size=num_rows) # Adjusting parameters to peak around 0.1

# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})

iceberg_table = catalog.load_table("default.scores")

# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
docs_table = pa.Table.from_pandas(df, schema=arrow_schema)

# Append the data to the Iceberg table
iceberg_table.append(docs_table)

delete_condition = GreaterThanOrEqual("relevancy_score", 0.1)
lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow())
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before

0 comments on commit f92994e

Please sign in to comment.