Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Docstrings to pyiceberg/table/__init__.py #1189

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 101 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
return self

def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) -> DataScan:
"""Minimal data scan the table with the current state of the transaction."""
"""Minimal data scan of the table with the current state of the transaction."""
return DataScan(
table_metadata=self.table_metadata,
io=self._table.io,
Expand Down Expand Up @@ -681,6 +681,8 @@ def commit_transaction(self) -> Table:


class CreateTableTransaction(Transaction):
"""A transaction that involves the creation of a a new table."""

def _initial_changes(self, table_metadata: TableMetadata) -> None:
"""Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
self._updates += (
Expand Down Expand Up @@ -749,17 +751,23 @@ class TableIdentifier(IcebergBaseModel):


class CommitTableRequest(IcebergBaseModel):
"""A pydantic BaseModel for a table commit request."""

identifier: TableIdentifier = Field()
requirements: Tuple[TableRequirement, ...] = Field(default_factory=tuple)
updates: Tuple[TableUpdate, ...] = Field(default_factory=tuple)


class CommitTableResponse(IcebergBaseModel):
"""A pydantic BaseModel for a table commit response."""

metadata: TableMetadata
metadata_location: str = Field(alias="metadata-location")


class Table:
"""An Iceberg table."""

_identifier: Identifier = Field()
metadata: TableMetadata
metadata_location: str = Field()
Expand All @@ -785,11 +793,19 @@ def transaction(self) -> Transaction:

@property
def inspect(self) -> InspectTable:
"""Return the InspectTable object to browse the table metadata."""
"""Return the InspectTable object to browse the table metadata.

Returns:
InspectTable object based on this Table.
"""
return InspectTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata."""
"""Refresh the current table metadata.

Returns:
An updated instance of the same Iceberg table
"""
fresh = self.catalog.load_table(self._identifier)
self.metadata = fresh.metadata
self.io = fresh.io
Expand All @@ -798,7 +814,11 @@ def refresh(self) -> Table:

@property
def identifier(self) -> Identifier:
"""Return the identifier of this table."""
"""Return the identifier of this table.

Returns:
An Identifier tuple of the table name
"""
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
Expand All @@ -807,7 +827,11 @@ def identifier(self) -> Identifier:
return (self.catalog.name,) + self._identifier

def name(self) -> Identifier:
"""Return the identifier of this table."""
"""Return the identifier of this table.

Returns:
An Identifier tuple of the table name
"""
return self.identifier

def scan(
Expand All @@ -819,6 +843,35 @@ def scan(
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
"""Fetch a DataScan based on the table's current metadata.

The data scan can be used to project the table's data
that matches the provided row_filter onto the table's
current schema.

Args:
row_filter:
A string or BooleanExpression that decsribes the
desired rows
selected_fileds:
A tuple of strings representing the column names
to return in the output dataframe.
case_sensitive:
If True column matching is case sensitive
snapshot_id:
Optional Snapshot ID to time travel to. If None,
scans the table as of the current snapshot ID.
options:
Additional Table properties as a dictionary of
string key value pairs to use for this scan.
limit:
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.

Returns:
A DataScan based on the table's current metadata.
"""
return DataScan(
table_metadata=self.metadata,
io=self.io,
Expand Down Expand Up @@ -1212,6 +1265,8 @@ class ScanTask(ABC):

@dataclass(init=False)
class FileScanTask(ScanTask):
"""Task representing a data file and its corresponding delete files."""

file: DataFile
delete_files: Set[DataFile]
start: int
Expand All @@ -1236,6 +1291,11 @@ def _open_manifest(
partition_filter: Callable[[DataFile], bool],
metrics_evaluator: Callable[[DataFile], bool],
) -> List[ManifestEntry]:
"""Open a manifest file and return matching manifest entries.

Returns:
A list of ManifestEntry that matches the provided filters.
"""
return [
manifest_entry
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
Expand Down Expand Up @@ -1395,13 +1455,30 @@ def plan_files(self) -> Iterable[FileScanTask]:
]

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.

All rows will be loaded into memory at once.

Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
"""
from pyiceberg.io.pyarrow import ArrowScan

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
"""
import pyarrow as pa

from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
Expand All @@ -1417,9 +1494,19 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
)

def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
"""Read a Pandas DataFrame eagerly from this Iceberg table.

Returns:
pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
"""
return self.to_arrow().to_pandas(**kwargs)

def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
"""Shorthand for loading the Iceberg Table in DuckDB.

Returns:
DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
"""
import duckdb

con = connection or duckdb.connect(database=":memory:")
Expand All @@ -1428,13 +1515,20 @@ def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] =
return con

def to_ray(self) -> ray.data.dataset.Dataset:
"""Read a Ray Dataset eagerly from this Iceberg table.

Returns:
ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
"""
import ray

return ray.data.from_arrow(self.to_arrow())


@dataclass(frozen=True)
class WriteTask:
"""Task with the parameters for writing a DataFile."""

write_uuid: uuid.UUID
task_id: int
schema: Schema
Expand All @@ -1457,6 +1551,8 @@ def generate_data_file_path(self, extension: str) -> str:

@dataclass(frozen=True)
class AddFileTask:
"""Task with the parameters for adding a Parquet file as a DataFile."""

file_path: str
partition_field_value: Record

Expand Down