Skip to content

Commit

Permalink
Use ArrowScan.to_table to replace project_table (#1180)
Browse files Browse the repository at this point in the history
* Use ArrowScan.to_table to replace project_table

* Use ArrowScan.to_table to replace project_table on these file:
** pyiceberg\table\__init__.py
** pyiceberg\io\pyarrow.py
** pyiceberg\test_pyarrow.py

* Replace all remaining of project_table using ArrowScan.to_table

Replace all remaining of project_table using ArrowScan.to_table

* Fix format

Fix format

* Modify by ruff

Modify by ruff
  • Loading branch information
JE-Chen committed Sep 20, 2024
1 parent 41a3c8e commit b8b2f66
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
9 changes: 4 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,9 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
snapshot_properties: Custom properties to be added to the snapshot summary
"""
from pyiceberg.io.pyarrow import (
ArrowScan,
_dataframe_to_data_files,
_expression_to_complementary_pyarrow,
project_table,
)

if (
Expand Down Expand Up @@ -559,13 +559,12 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
# - Apply the latest partition-spec
# - And sort order when added
for original_file in files:
df = project_table(
tasks=[original_file],
df = ArrowScan(
table_metadata=self.table_metadata,
io=self._table.io,
row_filter=AlwaysTrue(),
projected_schema=self.table_metadata.schema(),
)
row_filter=AlwaysTrue(),
).to_table(tasks=[original_file])
filtered_df = df.filter(preserve_row_filter)

# Only rewrite if there are records being deleted
Expand Down
50 changes: 24 additions & 26 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from pyiceberg.io import InputStream, OutputStream, load_file_io
from pyiceberg.io.pyarrow import (
ICEBERG_SCHEMA,
ArrowScan,
PyArrowFile,
PyArrowFileIO,
StatsAggregator,
Expand All @@ -69,7 +70,6 @@
_to_requested_schema,
bin_pack_arrow_table,
expression_to_pyarrow,
project_table,
schema_to_pyarrow,
)
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
Expand Down Expand Up @@ -952,7 +952,19 @@ def file_map(schema_map: Schema, tmpdir: str) -> str:
def project(
schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None
) -> pa.Table:
return project_table(
return ArrowScan(
table_metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[table_schema or schema],
partition_specs=[PartitionSpec()],
),
io=PyArrowFileIO(),
projected_schema=schema,
row_filter=expr or AlwaysTrue(),
case_sensitive=True,
).to_table(
tasks=[
FileScanTask(
DataFile(
Expand All @@ -965,18 +977,7 @@ def project(
)
)
for file in files
],
table_metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[table_schema or schema],
partition_specs=[PartitionSpec()],
),
io=PyArrowFileIO(),
row_filter=expr or AlwaysTrue(),
projected_schema=schema,
case_sensitive=True,
]
)


Expand Down Expand Up @@ -1411,9 +1412,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp
data_file=example_task.file,
delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)},
)

with_deletes = project_table(
tasks=[example_task_with_delete],
with_deletes = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
Expand All @@ -1423,9 +1422,9 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
)
row_filter=AlwaysTrue(),
).to_table(tasks=[example_task_with_delete])

assert (
str(with_deletes)
Expand All @@ -1450,8 +1449,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
},
)

with_deletes = project_table(
tasks=[example_task_with_delete],
with_deletes = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
Expand All @@ -1461,9 +1459,9 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
)
row_filter=AlwaysTrue(),
).to_table(tasks=[example_task_with_delete])

assert (
str(with_deletes)
Expand All @@ -1480,8 +1478,8 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_

def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None:
metadata_location = "file://a/b/c.json"
projection = project_table(
tasks=[example_task],

projection = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
Expand All @@ -1494,7 +1492,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc
case_sensitive=True,
projected_schema=table_schema_simple,
row_filter=AlwaysTrue(),
)
).to_table(tasks=[example_task])

assert (
str(projection)
Expand Down

0 comments on commit b8b2f66

Please sign in to comment.