Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions src/dvc_data/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,15 @@ def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
finally:
self.index.commit()

def bulk_exists(
def bulk_exists( # noqa: C901
self,
entries: list["DataIndexEntry"],
refresh: bool = False,
jobs: Optional[int] = None,
callback: "Callback" = DEFAULT_CALLBACK,
) -> dict["DataIndexEntry", bool]:
from .build import build_entry

entries_with_hash = [e for e in entries if e.hash_info]
entries_without_hash = [e for e in entries if not e.hash_info]
results = dict.fromkeys(entries_without_hash, False)
Expand All @@ -267,39 +269,37 @@ def bulk_exists(
results[entry] = exists
return results

entry_map: dict[str, DataIndexEntry] = {
self.get(entry)[1]: entry for entry in entries_with_hash
}
path_to_entries: dict[str, list[DataIndexEntry]] = defaultdict(list)
for entry in entries_with_hash:
_, path = self.get(entry)
path_to_entries[path].append(entry)

try:
self.fs.ls(self.odb.path) # check for fs access
except FileNotFoundError:
pass

info_results = self.fs.info(
list(entry_map.keys()),
list(path_to_entries),
batch_size=jobs,
return_exceptions=True,
callback=callback,
)

for (path, entry), info in zip(entry_map.items(), info_results):
for (path, _entries), info in zip(path_to_entries.items(), info_results):
if isinstance(info, Exception) and not isinstance(info, FileNotFoundError):
raise info
assert _entries
entry = _entries[0]
assert entry.hash_info # built from entries_with_hash
value = cast("str", entry.hash_info.value)
key = self.odb._oid_parts(value)

if isinstance(info, FileNotFoundError) or info is None:
exists = info is not None and not isinstance(info, FileNotFoundError)
if exists:
self.index[key] = build_entry(path, self.fs, info=info)
else:
self.index.pop(key, None)
results[entry] = False
continue
if isinstance(info, Exception):
raise info

from .build import build_entry

built_entry = build_entry(path, self.fs, info=info)
self.index[key] = built_entry
results[entry] = True
results.update(dict.fromkeys(entries, exists))

if self.index is not None:
self.index.commit()
Expand Down
46 changes: 46 additions & 0 deletions tests/index/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from dvc_objects.fs.local import LocalFileSystem

from dvc_data.hashfile.hash_info import HashInfo
Expand Down Expand Up @@ -155,6 +156,51 @@ def test_multiple_entries(self, odb):
result = storage.bulk_exists(entries)
assert all(result[e] is True for e in entries)

@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("refresh", [True, False])
def test_duplicate_hashes_exist(self, odb, use_index, refresh):
"""Multiple entries with same hash should all return True if exists."""
index = None
if use_index:
index = DataIndex()
key = odb._oid_parts("d3b07384d113edec49eaa6238ad5ff00")
index[key] = DataIndexEntry(key=key)

storage = ObjectStorage(key=(), odb=odb, index=index)
entries = [
DataIndexEntry(
key=("foo",),
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
),
DataIndexEntry(
key=("bar",),
hash_info=HashInfo("md5", "d3b07384d113edec49eaa6238ad5ff00"),
),
]

result = storage.bulk_exists(entries, refresh=refresh)
assert result == {entries[0]: True, entries[1]: True}

@pytest.mark.parametrize("use_index", [True, False])
@pytest.mark.parametrize("refresh", [True, False])
def test_duplicate_hashes_not_exist(self, odb, use_index, refresh):
"""Multiple entries with same hash should all return False if not exists."""
index = DataIndex() if use_index else None
storage = ObjectStorage(key=(), odb=odb, index=index)
entries = [
DataIndexEntry(
key=("foo",),
hash_info=HashInfo("md5", "00000000000000000000000000000000"),
),
DataIndexEntry(
key=("bar",),
hash_info=HashInfo("md5", "00000000000000000000000000000000"),
),
]

result = storage.bulk_exists(entries, refresh=refresh)
assert result == {entries[0]: False, entries[1]: False}


class TestStorageMappingBulkExists:
def test_bulk_cache_exists_empty(self, odb):
Expand Down
Loading