diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3eedff4581..de621ead76 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -523,9 +523,9 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti snapshot_properties: Custom properties to be added to the snapshot summary """ from pyiceberg.io.pyarrow import ( + ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow, - project_table, ) if ( @@ -559,13 +559,12 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti # - Apply the latest partition-spec # - And sort order when added for original_file in files: - df = project_table( - tasks=[original_file], + df = ArrowScan( table_metadata=self.table_metadata, io=self._table.io, - row_filter=AlwaysTrue(), projected_schema=self.table_metadata.schema(), - ) + row_filter=AlwaysTrue(), + ).to_table(tasks=[original_file]) filtered_df = df.filter(preserve_row_filter) # Only rewrite if there are records being deleted diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 82b35341b9..e4017e1df5 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -58,6 +58,7 @@ from pyiceberg.io import InputStream, OutputStream, load_file_io from pyiceberg.io.pyarrow import ( ICEBERG_SCHEMA, + ArrowScan, PyArrowFile, PyArrowFileIO, StatsAggregator, @@ -69,7 +70,6 @@ _to_requested_schema, bin_pack_arrow_table, expression_to_pyarrow, - project_table, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat @@ -952,7 +952,19 @@ def file_map(schema_map: Schema, tmpdir: str) -> str: def project( schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None ) -> pa.Table: - return project_table( + return ArrowScan( + table_metadata=TableMetadataV2( + location="file://a/b/", + last_column_id=1, + format_version=2, + schemas=[table_schema or schema], + partition_specs=[PartitionSpec()], + ), + io=PyArrowFileIO(), + projected_schema=schema, + row_filter=expr or AlwaysTrue(), + case_sensitive=True, + ).to_table( tasks=[ FileScanTask( DataFile( @@ -965,18 +977,7 @@ def project( ) ) for file in files - ], - table_metadata=TableMetadataV2( - location="file://a/b/", - last_column_id=1, - format_version=2, - schemas=[table_schema or schema], - partition_specs=[PartitionSpec()], - ), - io=PyArrowFileIO(), - row_filter=expr or AlwaysTrue(), - projected_schema=schema, - case_sensitive=True, + ] ) @@ -1411,9 +1412,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp data_file=example_task.file, delete_files={DataFile(content=DataFileContent.POSITION_DELETES, file_path=deletes_file, file_format=FileFormat.PARQUET)}, ) - - with_deletes = project_table( - tasks=[example_task_with_delete], + with_deletes = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1423,9 +1422,9 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp partition_specs=[PartitionSpec()], ), io=load_file_io(), - row_filter=AlwaysTrue(), projected_schema=table_schema_simple, - ) + row_filter=AlwaysTrue(), + ).to_table(tasks=[example_task_with_delete]) assert ( str(with_deletes) @@ -1450,8 +1449,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ }, ) - with_deletes = project_table( - tasks=[example_task_with_delete], + with_deletes = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1461,9 +1459,9 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ partition_specs=[PartitionSpec()], ), io=load_file_io(), - row_filter=AlwaysTrue(), projected_schema=table_schema_simple, - ) + row_filter=AlwaysTrue(), + ).to_table(tasks=[example_task_with_delete]) assert ( str(with_deletes) @@ -1480,8 +1478,8 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None: metadata_location = "file://a/b/c.json" - projection = project_table( - tasks=[example_task], + + projection = ArrowScan( table_metadata=TableMetadataV2( location=metadata_location, last_column_id=1, @@ -1494,7 +1492,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc case_sensitive=True, projected_schema=table_schema_simple, row_filter=AlwaysTrue(), - ) + ).to_table(tasks=[example_task]) assert ( str(projection)