diff --git a/src/dvc_data/index/index.py b/src/dvc_data/index/index.py index ddb9e9d1..bb3d1ea6 100644 --- a/src/dvc_data/index/index.py +++ b/src/dvc_data/index/index.py @@ -240,13 +240,15 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool: finally: self.index.commit() - def bulk_exists( + def bulk_exists( # noqa: C901 self, entries: list["DataIndexEntry"], refresh: bool = False, jobs: Optional[int] = None, callback: "Callback" = DEFAULT_CALLBACK, ) -> dict["DataIndexEntry", bool]: + from .build import build_entry + entries_with_hash = [e for e in entries if e.hash_info] entries_without_hash = [e for e in entries if not e.hash_info] results = dict.fromkeys(entries_without_hash, False) @@ -267,9 +269,10 @@ def bulk_exists( results[entry] = exists return results - entry_map: dict[str, DataIndexEntry] = { - self.get(entry)[1]: entry for entry in entries_with_hash - } + path_to_entries: dict[str, list[DataIndexEntry]] = defaultdict(list) + for entry in entries_with_hash: + _, path = self.get(entry) + path_to_entries[path].append(entry) try: self.fs.ls(self.odb.path) # check for fs access @@ -277,29 +280,26 @@ def bulk_exists( pass info_results = self.fs.info( - list(entry_map.keys()), + list(path_to_entries), batch_size=jobs, return_exceptions=True, callback=callback, ) - for (path, entry), info in zip(entry_map.items(), info_results): + for (path, _entries), info in zip(path_to_entries.items(), info_results): + if isinstance(info, Exception) and not isinstance(info, FileNotFoundError): + raise info + assert _entries + entry = _entries[0] assert entry.hash_info # built from entries_with_hash value = cast("str", entry.hash_info.value) key = self.odb._oid_parts(value) - - if isinstance(info, FileNotFoundError) or info is None: + exists = info is not None and not isinstance(info, FileNotFoundError) + if exists: + self.index[key] = build_entry(path, self.fs, info=info) + else: self.index.pop(key, None) - results[entry] = False - continue - if isinstance(info, Exception): - raise info - - from .build import build_entry - - built_entry = build_entry(path, self.fs, info=info) - self.index[key] = built_entry - results[entry] = True + results.update(dict.fromkeys(entries, exists)) if self.index is not None: self.index.commit() diff --git a/tests/index/test_storage.py b/tests/index/test_storage.py index 737b98ed..2b832aa4 100644 --- a/tests/index/test_storage.py +++ b/tests/index/test_storage.py @@ -1,3 +1,4 @@ +import pytest from dvc_objects.fs.local import LocalFileSystem from dvc_data.hashfile.hash_info import HashInfo @@ -155,6 +156,51 @@ def test_multiple_entries(self, odb): result = storage.bulk_exists(entries) assert all(result[e] is True for e in entries) + @pytest.mark.parametrize("use_index", [True, False]) + @pytest.mark.parametrize("refresh", [True, False]) + def test_duplicate_hashes_exist(self, odb, use_index, refresh): + """Multiple entries with same hash should all return True if exists.""" + index = None + if use_index: + index = DataIndex() + key = odb._oid_parts("d3b07384d113edec49eaa6238ad5ff00") + index[key] = DataIndexEntry(key=key) + + storage = ObjectStorage(key=(), odb=odb, index=index) + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"), + ), + ] + + result = storage.bulk_exists(entries, refresh=refresh) + assert result == {entries[0]: True, entries[1]: True} + + @pytest.mark.parametrize("use_index", [True, False]) + @pytest.mark.parametrize("refresh", [True, False]) + def test_duplicate_hashes_not_exist(self, odb, use_index, refresh): + """Multiple entries with same hash should all return False if not exists.""" + index = DataIndex() if use_index else None + storage = ObjectStorage(key=(), odb=odb, index=index) + entries = [ + DataIndexEntry( + key=("foo",), + hash_info=HashInfo("md5", "00000000000000000000000000000000"), + ), + DataIndexEntry( + key=("bar",), + hash_info=HashInfo("md5", "00000000000000000000000000000000"), + ), + ] + + result = storage.bulk_exists(entries, refresh=refresh) + assert result == {entries[0]: False, entries[1]: False} + class TestStorageMappingBulkExists: def test_bulk_cache_exists_empty(self, odb):