Skip to content

Commit

Permalink
Add list-refs cli command (#137)
Browse files Browse the repository at this point in the history
* Python: Add list-refs CLI command

* Update pyiceberg/cli/console.py

---------

Co-authored-by: Fokko Driesprong <fokko@apache.org>
  • Loading branch information
amogh-jahagirdar and Fokko authored Nov 10, 2023
1 parent 0c8b0b9 commit 04ca8ae
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
52 changes: 52 additions & 0 deletions pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import (
Any,
Callable,
Dict,
Literal,
Optional,
Tuple,
Expand All @@ -31,6 +32,10 @@
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table.refs import SnapshotRef

DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1
DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000


def catch_exception() -> Callable: # type: ignore
Expand Down Expand Up @@ -372,3 +377,50 @@ def table(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.command()
@click.argument("identifier")
@click.option("--type", required=False)
@click.option("--verbose", type=click.BOOL)
@click.pass_context
@catch_exception()
def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
"""List all the refs in the provided table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
refs = table.refs()
if type:
type = type.lower()
if type not in {"branch", "tag"}:
raise ValueError(f"Type must be either branch or tag, got: {type}")

relevant_refs = [
(ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties))
for (ref_name, ref) in refs.items()
if not type or ref.snapshot_ref_type == type
]

output.describe_refs(relevant_refs)


def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
retention_properties = {}
if ref.snapshot_ref_type == "branch":
default_min_snapshots_to_keep = table_properties.get(
"history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP
)
retention_properties["min_snapshots_to_keep"] = (
str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep)
)
default_max_snapshot_age_ms = table_properties.get("history.expire.max-snapshot-age-ms", DEFAULT_MAX_SNAPSHOT_AGE_MS)
retention_properties["max_snapshot_age_ms"] = (
str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms)
)
else:
retention_properties["min_snapshots_to_keep"] = "N/A"
retention_properties["max_snapshot_age_ms"] = "N/A"

retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"

return retention_properties
35 changes: 34 additions & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
# under the License.
import json
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from uuid import UUID

from rich.console import Console
Expand All @@ -26,6 +32,7 @@
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, TableMetadata
from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties


Expand Down Expand Up @@ -72,6 +79,10 @@ def uuid(self, uuid: Optional[UUID]) -> None:
def version(self, version: str) -> None:
...

@abstractmethod
def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
...


class ConsoleOutput(Output):
"""Writes to the console."""
Expand Down Expand Up @@ -174,6 +185,19 @@ def uuid(self, uuid: Optional[UUID]) -> None:
def version(self, version: str) -> None:
Console().print(version)

def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
refs_table = RichTable(title="Snapshot Refs")
refs_table.add_column("Ref")
refs_table.add_column("Type")
refs_table.add_column("Max ref age ms")
refs_table.add_column("Min snapshots to keep")
refs_table.add_column("Max snapshot age ms")
for name, type, ref_detail in ref_details:
refs_table.add_row(
name, type, ref_detail["max_ref_age_ms"], ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"]
)
Console().print(refs_table)


class JsonOutput(Output):
"""Writes json to stdout."""
Expand Down Expand Up @@ -226,3 +250,12 @@ def uuid(self, uuid: Optional[UUID]) -> None:

def version(self, version: str) -> None:
self._out({"version": version})

def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
self._out(
[
{"name": name, "type": type, detail_key: detail_val}
for name, type, detail in refs
for detail_key, detail_val in detail.items()
]
)
5 changes: 5 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
visit,
)
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import SortOrder
from pyiceberg.typedef import (
Expand Down Expand Up @@ -569,6 +570,10 @@ def history(self) -> List[SnapshotLogEntry]:
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive)

def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
return self.metadata.refs

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
Expand Down

0 comments on commit 04ca8ae

Please sign in to comment.