diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index 806039fa..01c55f42 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -4,7 +4,8 @@ from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Iterator, MutableMapping -from typing import TYPE_CHECKING, Any, Callable, Optional, cast +from itertools import repeat +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast import attrs from fsspec import Callback @@ -17,6 +18,8 @@ from dvc_data.hashfile.tree import Tree if TYPE_CHECKING: + from collections.abc import Iterable + from dvc_objects.fs.base import FileSystem from dvc_data.hashfile.db import HashFileDB @@ -240,7 +243,7 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool: finally: self.index.commit() - def bulk_exists( # noqa: C901 + def bulk_exists( # noqa: C901, PLR0912 self, entries: list["DataIndexEntry"], refresh: bool = False, @@ -274,17 +277,24 @@ def bulk_exists( # noqa: C901 _, path = self.get(entry) path_to_entries[path].append(entry) + info_results: Union[ + Iterable[Union[Exception, Optional[dict[str, Any]]]], None + ] = None try: self.fs.ls(self.odb.path) # check for fs access - except FileNotFoundError: + except FileNotFoundError as exc: + info_results = repeat(exc, len(path_to_entries)) + callback.relative_update(len(entries_with_hash)) + except NotImplementedError: + # some filesystems don't implement ls pass - - info_results = self.fs.info( - list(path_to_entries), - batch_size=jobs, - return_exceptions=True, - callback=callback, - ) + if info_results is None: + info_results = self.fs.info( + list(path_to_entries), + batch_size=jobs, + return_exceptions=True, + callback=callback, + ) for (path, _entries), info in zip(path_to_entries.items(), info_results): if isinstance(info, Exception) and not isinstance(info, FileNotFoundError): @@ -302,6 +312,7 @@ def bulk_exists( # noqa: C901 results.update(dict.fromkeys(_entries, exists)) if self.index is not None: + logger.debug("Committing index results") self.index.commit() return results diff --git a/tests/index/test_storage.py b/tests/index/test_storage.py index 2b832aa4..5988e25e 100644 --- a/tests/index/test_storage.py +++ b/tests/index/test_storage.py @@ -1,6 +1,7 @@ import pytest from dvc_objects.fs.local import LocalFileSystem +from dvc_data.hashfile.db import HashFileDB from dvc_data.hashfile.hash_info import HashInfo from dvc_data.hashfile.meta import Meta from dvc_data.index import ( @@ -201,6 +202,29 @@ def test_duplicate_hashes_not_exist(self, odb, use_index, refresh): result = storage.bulk_exists(entries, refresh=refresh) assert result == {entries[0]: False, entries[1]: False} + def test_bulk_check_with_ls_not_implemented(self, tmp_path_factory): + class NonTraversableFileSystem(LocalFileSystem): + def ls(self, *args, **kwargs): + raise NotImplementedError + + index = DataIndex() + path = tmp_path_factory.mktemp("odb") + odb = HashFileDB(fs=NonTraversableFileSystem(), path=path) + storage = ObjectStorage(key=(), odb=odb, index=index) + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"), + ), + ] + + result = storage.bulk_exists(entries, refresh=True) + assert result == {entries[0]: False, entries[1]: False} + class TestStorageMappingBulkExists: def test_bulk_cache_exists_empty(self, odb):