Skip to content

Commit

Permalink
Add metadata tables for data_files and delete_files (#1066)
Browse files Browse the repository at this point in the history
* Add metadata tables for data_files and delete_files

* Update API docs for `data_files` and `delete_files`

* Update mehtod signature of `_files()`

* Migrate implementation of files() table from __init__.py
  • Loading branch information
soumya-ghosh authored Sep 20, 2024
1 parent de47590 commit 41a3c8e
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 131 deletions.
5 changes: 5 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,11 @@ readable_metrics: [
[6.0989]]
```

!!! info
Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes`

To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
Expand Down
27 changes: 19 additions & 8 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
Expand Down Expand Up @@ -473,7 +473,7 @@ def history(self) -> "pa.Table":

return pa.Table.from_pylist(history, schema=history_schema)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -530,6 +530,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
for manifest_list in snapshot.manifests(io):
for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
if data_file_filter and data_file.content not in data_file_filter:
continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
Expand Down Expand Up @@ -558,12 +560,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
"column_sizes": dict(data_file.column_sizes),
"value_counts": dict(data_file.value_counts),
"null_value_counts": dict(data_file.null_value_counts),
"nan_value_counts": dict(data_file.nan_value_counts),
"lower_bounds": dict(data_file.lower_bounds),
"upper_bounds": dict(data_file.upper_bounds),
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
Expand All @@ -575,3 +577,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
files,
schema=files_schema,
)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id)

def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.DATA})

def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
268 changes: 145 additions & 123 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,126 +672,141 @@ def test_inspect_files(
# append more data
tbl.append(arrow_table_with_null)

df = tbl.refresh().inspect.files()
# configure table properties
if format_version == 2:
with tbl.transaction() as txn:
txn.set_properties({"write.delete.mode": "merge-on-read"})
spark.sql(f"DELETE FROM {identifier} WHERE int = 1")

assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

# make sure the non-nullable fields are filled
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
for value in df[int_column]:
assert isinstance(value.as_py(), int)

for split_offsets in df["split_offsets"]:
assert isinstance(split_offsets.as_py(), list)

for file_format in df["file_format"]:
assert file_format.as_py() == "PARQUET"
files_df = tbl.refresh().inspect.files()

for file_path in df["file_path"]:
assert file_path.as_py().startswith("s3://")
data_files_df = tbl.inspect.data_files()

lhs = df.to_pandas()
rhs = spark.table(f"{identifier}.files").toPandas()
delete_files_df = tbl.inspect.delete_files()

lhs_subset = lhs[
[
def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]
]
rhs_subset = rhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",

# make sure the non-nullable fields are filled
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
for value in df[int_column]:
assert isinstance(value.as_py(), int)

for split_offsets in df["split_offsets"]:
assert isinstance(split_offsets.as_py(), list)

for file_format in df["file_format"]:
assert file_format.as_py() == "PARQUET"

for file_path in df["file_path"]:
assert file_path.as_py().startswith("s3://")

lhs = df.to_pandas()
rhs = spark_df.toPandas()

lhs_subset = lhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",
]
]
rhs_subset = rhs[
[
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"split_offsets",
"equality_ids",
"sort_order_id",
]
]
]

assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)

for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
if column in [
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
]:
if isinstance(right, dict):
left = dict(left)
assert left == right, f"Difference in column {column}: {left} != {right}"
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
# NaN != NaN in Python
continue
if column in [
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
]:
if isinstance(right, dict):
left = dict(left)
assert left == right, f"Difference in column {column}: {left} != {right}"

elif column == "readable_metrics":
assert list(left.keys()) == [
"bool",
"string",
"string_long",
"int",
"long",
"float",
"double",
"timestamp",
"timestamptz",
"date",
"binary",
"fixed",
]
assert left.keys() == right.keys()

for rm_column in left.keys():
rm_lhs = left[rm_column]
rm_rhs = right[rm_column]

assert rm_lhs["column_size"] == rm_rhs["column_size"]
assert rm_lhs["value_count"] == rm_rhs["value_count"]
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]

if rm_column == "timestamptz":
# PySpark does not correctly set the timstamptz
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)

assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
else:
assert left == right, f"Difference in column {column}: {left} != {right}"
elif column == "readable_metrics":
assert list(left.keys()) == [
"bool",
"string",
"string_long",
"int",
"long",
"float",
"double",
"timestamp",
"timestamptz",
"date",
"binary",
"fixed",
]
assert left.keys() == right.keys()

for rm_column in left.keys():
rm_lhs = left[rm_column]
rm_rhs = right[rm_column]

assert rm_lhs["column_size"] == rm_rhs["column_size"]
assert rm_lhs["value_count"] == rm_rhs["value_count"]
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]

if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
# PySpark does not correctly set the timstamptz
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)

assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
else:
assert left == right, f"Difference in column {column}: {left} != {right}"

inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files"))
inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files"))


@pytest.mark.integration
Expand All @@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog

tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

df = tbl.refresh().inspect.files()
files_df = tbl.refresh().inspect.files()
data_files_df = tbl.inspect.data_files()
delete_files_df = tbl.inspect.delete_files()

assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]
def inspect_files_asserts(df: pa.Table) -> None:
assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

assert df.to_pandas().empty is True

assert df.to_pandas().empty is True
inspect_files_asserts(files_df)
inspect_files_asserts(data_files_df)
inspect_files_asserts(delete_files_df)

0 comments on commit 41a3c8e

Please sign in to comment.