Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix: Position Deletes + row_filter yields less data when the DataFile is large #1141

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,10 +1238,13 @@ def _task_to_record_batches(
for batch in batches:
next_index = next_index + len(batch)
current_index = next_index - len(batch)
output_batches = iter([batch])
if positional_deletes:
# Create the mask of indices that we're interested in
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
batch = batch.take(indices)
output_batches = iter([batch])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu moved this assignment here for readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, ty!


# Apply the user filter
if pyarrow_filter is not None:
# we need to switch back and forth between RecordBatch and Table
Expand All @@ -1251,10 +1254,15 @@ def _task_to_record_batches(
arrow_table = arrow_table.filter(pyarrow_filter)
if len(arrow_table) == 0:
continue
batch = arrow_table.to_batches()[0]
yield _to_requested_schema(
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
)
output_batches = arrow_table.to_batches()
for output_batch in output_batches:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about all the nested for-loops in this function. But correctness comes first and we can always refactor later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not alone 😨

I think the long term solution is to upgrade our minimum requirement for PyArrow to 17.0.0, but as you said, I feel that we should still have a fix that works with the lower versions while we handle the longer process of discussing the version bump, and actually enforcing it slowly and together with the community

yield _to_requested_schema(
projected_schema,
file_project_schema,
output_batch,
downcast_ns_timestamp_to_us=True,
use_large_types=use_large_types,
)


def _task_to_table(
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,59 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
assert len(reader.read_all()) == 0


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"

run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number int
)
USING iceberg
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
],
)

tbl = session_catalog.load_table(identifier)

arrow_table = pa.Table.from_arrays(
[
pa.array(list(range(1, 1001)) * 100),
],
schema=pa.schema([pa.field("number", pa.int32())]),
)

tbl.append(arrow_table)

run_spark_commands(
spark,
[
f"""
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
""",
],
)

tbl.refresh()

reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
pyiceberg_count = len(reader.read_all())
expected_count = 46 * 100
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestCatalog) -> None:
Expand Down