Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/dvc_data/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterator, MutableMapping
from typing import TYPE_CHECKING, Any, Callable, Optional, cast
from itertools import repeat
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

import attrs
from fsspec import Callback
Expand All @@ -17,6 +18,8 @@
from dvc_data.hashfile.tree import Tree

if TYPE_CHECKING:
from collections.abc import Iterable

from dvc_objects.fs.base import FileSystem

from dvc_data.hashfile.db import HashFileDB
Expand Down Expand Up @@ -240,7 +243,7 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
finally:
self.index.commit()

def bulk_exists( # noqa: C901
def bulk_exists( # noqa: C901, PLR0912
self,
entries: list["DataIndexEntry"],
refresh: bool = False,
Expand Down Expand Up @@ -274,17 +277,24 @@ def bulk_exists( # noqa: C901
_, path = self.get(entry)
path_to_entries[path].append(entry)

info_results: Union[
Iterable[Union[Exception, Optional[dict[str, Any]]]], None
] = None
try:
self.fs.ls(self.odb.path) # check for fs access
except FileNotFoundError:
except FileNotFoundError as exc:
info_results = repeat(exc, len(path_to_entries))
callback.relative_update(len(entries_with_hash))
except NotImplementedError:
# some filesystems don't implement ls
pass

info_results = self.fs.info(
list(path_to_entries),
batch_size=jobs,
return_exceptions=True,
callback=callback,
)
if info_results is None:
info_results = self.fs.info(
list(path_to_entries),
batch_size=jobs,
return_exceptions=True,
callback=callback,
)

for (path, _entries), info in zip(path_to_entries.items(), info_results):
if isinstance(info, Exception) and not isinstance(info, FileNotFoundError):
Expand All @@ -302,6 +312,7 @@ def bulk_exists( # noqa: C901
results.update(dict.fromkeys(_entries, exists))

if self.index is not None:
logger.debug("Committing index results")
self.index.commit()

return results
Expand Down
24 changes: 24 additions & 0 deletions tests/index/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dvc_objects.fs.local import LocalFileSystem

from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.meta import Meta
from dvc_data.index import (
Expand Down Expand Up @@ -201,6 +202,29 @@ def test_duplicate_hashes_not_exist(self, odb, use_index, refresh):
result = storage.bulk_exists(entries, refresh=refresh)
assert result == {entries[0]: False, entries[1]: False}

def test_bulk_check_with_ls_not_implemented(self, tmp_path_factory):
class NonTraversableFileSystem(LocalFileSystem):
def ls(self, *args, **kwargs):
raise NotImplementedError

index = DataIndex()
path = tmp_path_factory.mktemp("odb")
odb = HashFileDB(fs=NonTraversableFileSystem(), path=path)
storage = ObjectStorage(key=(), odb=odb, index=index)
entries = [
DataIndexEntry(
key=("foo",),
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
),
DataIndexEntry(
key=("bar",),
hash_info=HashInfo("md5", "c157a79031e1c40f85931829bc5fc552"),
),
]

result = storage.bulk_exists(entries, refresh=True)
assert result == {entries[0]: False, entries[1]: False}


class TestStorageMappingBulkExists:
def test_bulk_cache_exists_empty(self, odb):
Expand Down