From 6b7f07a3815349297f765c919f8fa170c2c4562b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 24 Sep 2024 08:45:53 -0700 Subject: [PATCH] Use `cachetools's LRUCache` to cache manifest list (#1187) * use cachetools * use LRU cache * return tuple * comment * clear global cache for tests * move _manifests to manifest.py * rebase poetry.lock --- poetry.lock | 4 +- pyiceberg/manifest.py | 10 ++ pyiceberg/table/snapshots.py | 12 +- pyproject.toml | 313 +++++++++++++++++++++++++++++++++++ tests/utils/test_manifest.py | 32 ++++ 5 files changed, 359 insertions(+), 12 deletions(-) diff --git a/poetry.lock b/poetry.lock index 34a884cbf..6a82f2742 100644 --- a/poetry.lock +++ b/poetry.lock @@ -439,7 +439,7 @@ virtualenv = ["virtualenv (>=20.0.35)"] name = "cachetools" version = "5.5.0" description = "Extensible memoizing collections and decorators" -optional = true +optional = false python-versions = ">=3.7" files = [ {file = "cachetools-5.5.0-py3-none-any.whl", hash = "sha256:02134e8439cdc2ffb62023ce1debca2944c3f289d66bb17ead3ab3dede74b292"}, @@ -4650,4 +4650,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.8, <3.13, !=3.9.7" -content-hash = "086f6774fe006d24ded1141068f63f2aa22c356c75979b2b44781d43dc10d977" +content-hash = "66129acb77e056f086d3cff1d3cfb74d25518ad9ebf03d3ca7e4add0ec9b3221" diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 960952d02..649840fc6 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -28,9 +28,12 @@ List, Literal, Optional, + Tuple, Type, ) +from cachetools import LRUCache, cached +from cachetools.keys import hashkey from pydantic_core import to_json from pyiceberg.avro.file import AvroFile, AvroOutputFile @@ -620,6 +623,13 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List ] +@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) +def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: + """Read and cache manifests from the given manifest list, returning a tuple to prevent modification.""" + file = io.new_input(manifest_list) + return tuple(read_manifest_list(file)) + + def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: """ Read the manifests from the manifest list. diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 980399a2a..829bd6029 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -19,13 +19,12 @@ import time from collections import defaultdict from enum import Enum -from functools import lru_cache from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list +from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -231,13 +230,6 @@ def __eq__(self, other: Any) -> bool: ) -@lru_cache -def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]: - """Return the manifests from the manifest list.""" - file = io.new_input(manifest_list) - return list(read_manifest_list(file)) - - class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) @@ -260,7 +252,7 @@ def __str__(self) -> str: def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" if self.manifest_list: - return _manifests(io, self.manifest_list) + return list(_manifests(io, self.manifest_list)) return [] diff --git a/pyproject.toml b/pyproject.toml index 7126f9051..c77391373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ numpy = [ { version = "1.26.0", python = ">=3.9,<3.13", optional = true }, { version = "1.24.4", python = ">=3.8,<3.9", optional = true } ] +cachetools = "^5.5.0" [tool.poetry.group.dev.dependencies] pytest = "7.4.4" @@ -571,6 +572,318 @@ ignore_missing_imports = true module = "tenacity.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pyarrow.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pandas.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "snappy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "zstandard.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pydantic.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pydantic_core.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pytest.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "fastavro.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "mmh3.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "hive_metastore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "thrift.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "requests_mock.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "click.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "rich.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "fsspec.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "s3fs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "azure.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "adlfs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "gcsfs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "packaging.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "tests.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "boto3" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "botocore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "mypy_boto3_glue.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "moto" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "aiobotocore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "aiohttp.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "duckdb.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "ray.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "daft.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pyparsing.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pyspark.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "strictyaml.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "sortedcontainers.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "numpy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "sqlalchemy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "Cython.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "setuptools.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "tenacity.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pyarrow.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pandas.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "snappy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "zstandard.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pydantic.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pydantic_core.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pytest.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "fastavro.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "mmh3.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "hive_metastore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "thrift.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "requests_mock.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "click.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "rich.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "fsspec.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "s3fs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "azure.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "adlfs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "gcsfs.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "packaging.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "tests.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "boto3" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "botocore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "mypy_boto3_glue.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "moto" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "aiobotocore.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "aiohttp.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "duckdb.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "ray.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "daft.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pyparsing.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "pyspark.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "strictyaml.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "sortedcontainers.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "numpy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "sqlalchemy.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "Cython.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "setuptools.*" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "tenacity.*" +ignore_missing_imports = true + [tool.poetry.scripts] pyiceberg = "pyiceberg.cli.console:run" diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index ef33b16b0..bb60ac0a2 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -17,6 +17,7 @@ # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory from typing import Dict +from unittest.mock import patch import fastavro import pytest @@ -31,6 +32,7 @@ ManifestEntryStatus, ManifestFile, PartitionFieldSummary, + _manifests, read_manifest_list, write_manifest, write_manifest_list, @@ -43,6 +45,12 @@ from pyiceberg.types import IntegerType, NestedField +@pytest.fixture(autouse=True) +def clear_global_manifests_cache() -> None: + # Clear the global cache before each test + _manifests.cache_clear() # type: ignore + + def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None: with open(avro_file, "rb") as f: reader = fastavro.reader(f) @@ -306,6 +314,30 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: assert entry.status == ManifestEntryStatus.ADDED +def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None: + with patch("pyiceberg.manifest.read_manifest_list") as mocked_read_manifest_list: + io = load_file_io() + + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + # Access the manifests property multiple times to test caching + manifests_first_call = snapshot.manifests(io) + manifests_second_call = snapshot.manifests(io) + + # Ensure that read_manifest_list was called only once + mocked_read_manifest_list.assert_called_once() + + # Ensure that the same manifest list is returned + assert manifests_first_call == manifests_second_call + + def test_write_empty_manifest() -> None: io = load_file_io() test_schema = Schema(NestedField(1, "foo", IntegerType(), False))