Skip to content

Commit

Permalink
Use cachetools's LRUCache to cache manifest list (#1187)
Browse files Browse the repository at this point in the history
* use cachetools

* use LRU cache

* return tuple

* comment

* clear global cache for tests

* move _manifests to manifest.py

* rebase poetry.lock
  • Loading branch information
kevinjqliu committed Sep 24, 2024
1 parent 5dcda55 commit 6b7f07a
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 12 deletions.
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
List,
Literal,
Optional,
Tuple,
Type,
)

from cachetools import LRUCache, cached
from cachetools.keys import hashkey
from pydantic_core import to_json

from pyiceberg.avro.file import AvroFile, AvroOutputFile
Expand Down Expand Up @@ -620,6 +623,13 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
]


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
"""Read and cache manifests from the given manifest list, returning a tuple to prevent modification."""
file = io.new_input(manifest_list)
return tuple(read_manifest_list(file))


def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
"""
Read the manifests from the manifest list.
Expand Down
12 changes: 2 additions & 10 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import time
from collections import defaultdict
from enum import Enum
from functools import lru_cache
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema

Expand Down Expand Up @@ -231,13 +230,6 @@ def __eq__(self, other: Any) -> bool:
)


@lru_cache
def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
"""Return the manifests from the manifest list."""
file = io.new_input(manifest_list)
return list(read_manifest_list(file))


class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None)
Expand All @@ -260,7 +252,7 @@ def __str__(self) -> str:
def manifests(self, io: FileIO) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
if self.manifest_list:
return _manifests(io, self.manifest_list)
return list(_manifests(io, self.manifest_list))
return []


Expand Down
Loading

0 comments on commit 6b7f07a

Please sign in to comment.