Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iscc_usearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from iscc_usearch.utils import timer
from iscc_usearch.nphd import NphdIndex
from iscc_usearch.sharded import ShardedIndex, ShardedIndex128
from iscc_usearch.sharded import CorruptedShardError, ShardedIndex, ShardedIndex128
from iscc_usearch.sharded_nphd import ShardedNphdIndex, ShardedNphdIndex128
from iscc_usearch.bloom import ScalableBloomFilter

__all__ = [
"CorruptedShardError",
"NphdIndex",
"ShardedIndex",
"ShardedIndex128",
Expand Down
169 changes: 129 additions & 40 deletions src/iscc_usearch/sharded.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,29 @@
from iscc_usearch.bloom import ScalableBloomFilter
from iscc_usearch.utils import atomic_write, timer

__all__ = ["ShardedIndex", "ShardedIndex128", "ShardedIndexedKeys", "ShardedIndexedVectors"]
__all__ = [
"CorruptedShardError",
"ShardedIndex",
"ShardedIndex128",
"ShardedIndexedKeys",
"ShardedIndexedVectors",
]


class CorruptedShardError(Exception):
"""Raised or logged when a shard file is corrupted or unreadable.

Attributes:
path: Path to the corrupted shard file.
"""

def __init__(self, path: str | os.PathLike, reason: str = "") -> None:
self.path = Path(path)
msg = f"Corrupted shard: {self.path}"
if reason:
msg += f" ({reason})"
super().__init__(msg)


# Default bloom filter file name
BLOOM_FILENAME = "bloom.isbf"
Expand Down Expand Up @@ -1317,6 +1339,7 @@ def _load_existing(self) -> None:
self._bloom = ScalableBloomFilter()
return # pragma: no cover - defensive path for corruption recovery

corrupted_paths: list[Path] = []
mode = "view" if self._read_only else "load"
with timer(f"ShardedIndex {mode} {len(shard_files)} shards from {self._path}", log_start=True):
# Restore view shards individually (workaround for usearch bug #643:
Expand All @@ -1328,30 +1351,55 @@ def _load_existing(self) -> None:
# Read-only: view ALL shards (no active shard)
for p in shard_files:
viewed = self._restore_shard(p, view=True)
if viewed is None: # pragma: no cover
raise RuntimeError(f"Failed to restore shard: {p}")
if viewed is None:
logger.warning(f"Skipping corrupted shard: {p}")
corrupted_paths.append(p)
continue
self._register_view_shard(viewed)
self._active_shard = None
self._active_shard_path = None
last_shard = self._viewed_indexes[-1]
else:
# All but the last shard go into view mode
view_paths = shard_files[:-1]
active_path = shard_files[-1]

for p in view_paths:
viewed = self._restore_shard(p, view=True)
if viewed is None: # pragma: no cover
raise RuntimeError(f"Failed to restore shard: {p}")
if viewed is None:
logger.warning(f"Skipping corrupted view shard: {p}")
corrupted_paths.append(p)
continue
self._register_view_shard(viewed)

# Load active shard (writable) and track its path for save()
active_shard = self._restore_shard(active_path, view=False)
if active_shard is None: # pragma: no cover
raise RuntimeError(f"Failed to restore shard: {active_path}")
if active_shard is None:
logger.warning(f"Corrupted active shard, creating fresh shard: {active_path}")
corrupted_paths.append(active_path)
active_shard = self._create_shard()
self._active_shard = active_shard
self._active_shard_path = active_path
last_shard = active_shard
self._active_shard_path = active_path if active_path not in corrupted_paths else None

if corrupted_paths:
logger.warning(
f"ShardedIndex: {len(corrupted_paths)} corrupted shard(s) skipped: {[str(p) for p in corrupted_paths]}"
)

# Determine last valid shard for config update
last_shard: Index | None = None
if not self._read_only and self._active_shard is not None:
last_shard = self._active_shard
elif self._viewed_indexes:
last_shard = self._viewed_indexes[-1]

if last_shard is None:
# All shards corrupted — fall back to empty index
logger.warning("All shards corrupted, starting with empty index")
self._active_shard = self._create_shard()
self._active_shard_path = None
if self._use_bloom and self._bloom is None:
self._bloom = ScalableBloomFilter()
return

# Update config from last shard to ensure new shards match existing ones
self._config["ndim"] = last_shard.ndim
Expand Down Expand Up @@ -1708,16 +1756,28 @@ def _resolve_config(
:return: Resolved (ndim, metric, dtype) tuple
:raises ValueError: If ndim is None and no existing shards found
"""
from loguru import logger

if not existing_shards:
if ndim is None:
raise ValueError("ndim is required when creating a new index (no existing shards found)")
return ndim, metric, dtype

# Read metadata from first shard
meta = Index.metadata(str(existing_shards[0]))
if meta is None: # pragma: no cover - shard files are always valid in practice
# Try reading metadata from each shard until one succeeds
meta = None
for shard_path in existing_shards:
try:
meta = Index.metadata(str(shard_path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {shard_path} — {exc}")
continue
if meta is not None:
break
logger.warning(f"Corrupted shard (metadata is None): {shard_path}")

if meta is None:
if ndim is None:
raise ValueError("ndim is required (failed to read shard metadata)")
raise ValueError("ndim is required (all shard metadata unreadable)")
return ndim, metric, dtype

# Auto-detect from existing shards if not provided
Expand Down Expand Up @@ -1748,19 +1808,34 @@ def _load_bloom_if_exists(self) -> ScalableBloomFilter | None:
return None

def _restore_shard(self, path: Path, view: bool) -> Index | None:
"""Restore a shard from disk. Override in subclasses for custom shard types."""
meta = Index.metadata(str(path))
if meta is None: # pragma: no cover - shard files are always valid in practice
"""Restore a shard from disk. Override in subclasses for custom shard types.

Returns None and logs a warning if the shard file is corrupted or unreadable,
instead of letting C++ exceptions propagate as segfaults or unhandled errors.
"""
from loguru import logger

try:
meta = Index.metadata(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {path} — {exc}")
return None
if meta is None:
logger.warning(f"Corrupted shard (metadata is None): {path}")
return None
try:
idx = Index(
ndim=meta["dimensions"],
metric=meta["kind_metric"],
dtype=meta["kind_scalar"],
)
if view:
idx.view(str(path))
else:
idx.load(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (load/view failed): {path} — {exc}")
return None
idx = Index(
ndim=meta["dimensions"],
metric=meta["kind_metric"],
dtype=meta["kind_scalar"],
)
if view:
idx.view(str(path))
else:
idx.load(str(path))
return idx

def _discover_shards(self) -> list[Path]:
Expand Down Expand Up @@ -1837,7 +1912,7 @@ def _rotate_shard(self) -> None:
# Load the saved shard in view mode and register it
viewed_shard = self._restore_shard(shard_path, view=True)
if viewed_shard is None: # pragma: no cover
raise RuntimeError(f"Failed to restore shard: {shard_path}")
raise CorruptedShardError(shard_path, "failed to view freshly saved shard")
self._register_view_shard(viewed_shard)

# Create new active shard and reset size check countdown
Expand Down Expand Up @@ -2284,18 +2359,32 @@ def _create_shard(self) -> Index:
return Index(**self._config, key_kind="uuid")

def _restore_shard(self, path: Path, view: bool) -> Index | None:
"""Restore a uuid-keyed shard from disk."""
meta = Index.metadata(str(path))
if meta is None: # pragma: no cover
"""Restore a uuid-keyed shard from disk.

Returns None and logs a warning if the shard file is corrupted or unreadable.
"""
from loguru import logger

try:
meta = Index.metadata(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {path} — {exc}")
return None
if meta is None:
logger.warning(f"Corrupted shard (metadata is None): {path}")
return None
try:
idx = Index(
ndim=meta["dimensions"],
metric=meta["kind_metric"],
dtype=meta["kind_scalar"],
key_kind="uuid",
)
if view:
idx.view(str(path))
else:
idx.load(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (load/view failed): {path} — {exc}")
return None
idx = Index(
ndim=meta["dimensions"],
metric=meta["kind_metric"],
dtype=meta["kind_scalar"],
key_kind="uuid",
)
if view:
idx.view(str(path))
else:
idx.load(str(path))
return idx
100 changes: 69 additions & 31 deletions src/iscc_usearch/sharded_nphd.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ def __init__(
def _resolve_max_dim(self, max_dim: int | None) -> int:
"""Resolve max_dim from existing shards or use provided value.

Tries all available shards when reading metadata, skipping any corrupted ones.

:param max_dim: Provided max_dim or None for auto-detection
:return: Resolved max_dim value
:raises ValueError: If max_dim is None and no existing shards found
"""
from loguru import logger

# Check for existing shards
existing_shards = sorted(
self._path.glob("shard_*.usearch"),
Expand All @@ -232,13 +236,19 @@ def _resolve_max_dim(self, max_dim: int | None) -> int:
if max_dim is not None:
return max_dim

# Read metadata from first shard and compute max_dim
meta = Index.metadata(str(existing_shards[0]))
if meta is None: # pragma: no cover - shard files are always valid in practice
raise ValueError("max_dim is required (failed to read shard metadata)")
# Try reading metadata from each shard until one succeeds
for shard_path in existing_shards:
try:
meta = Index.metadata(str(shard_path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {shard_path} — {exc}")
continue
if meta is not None:
# ndim = max_dim + 8 (length signal byte), so max_dim = ndim - 8
return meta["dimensions"] - 8
logger.warning(f"Corrupted shard (metadata is None): {shard_path}")

# ndim = max_dim + 8 (length signal byte), so max_dim = ndim - 8
return meta["dimensions"] - 8
raise ValueError("max_dim is required (all shard metadata unreadable)")

def _create_shard(self) -> Index:
"""Create a new Index shard with NPHD metric.
Expand All @@ -255,19 +265,33 @@ def _create_shard(self) -> Index:
)

def _restore_shard(self, path: Path, view: bool) -> Index | None:
"""Restore an Index shard from disk."""
meta = Index.metadata(str(path))
if meta is None: # pragma: no cover - shard files are always valid in practice
"""Restore an Index shard from disk.

Returns None and logs a warning if the shard file is corrupted or unreadable.
"""
from loguru import logger

try:
meta = Index.metadata(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {path} — {exc}")
return None
if meta is None:
logger.warning(f"Corrupted shard (metadata is None): {path}")
return None
try:
shard = Index(
ndim=meta["dimensions"],
metric=MetricKind.NPHD,
dtype=meta["kind_scalar"],
)
if view:
shard.view(str(path))
else:
shard.load(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (load/view failed): {path} — {exc}")
return None
shard = Index(
ndim=meta["dimensions"],
metric=MetricKind.NPHD,
dtype=meta["kind_scalar"],
)
if view:
shard.view(str(path))
else:
shard.load(str(path))
return shard

@property
Expand Down Expand Up @@ -554,20 +578,34 @@ def _create_shard(self) -> Index:
)

def _restore_shard(self, path: Path, view: bool) -> Index | None:
"""Restore a uuid-keyed NPHD shard from disk."""
meta = Index.metadata(str(path))
if meta is None: # pragma: no cover
"""Restore a uuid-keyed NPHD shard from disk.

Returns None and logs a warning if the shard file is corrupted or unreadable.
"""
from loguru import logger

try:
meta = Index.metadata(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (metadata unreadable): {path} — {exc}")
return None
if meta is None:
logger.warning(f"Corrupted shard (metadata is None): {path}")
return None
try:
shard = Index(
ndim=meta["dimensions"],
metric=MetricKind.NPHD,
dtype=meta["kind_scalar"],
key_kind="uuid",
)
if view:
shard.view(str(path))
else:
shard.load(str(path))
except Exception as exc:
logger.warning(f"Corrupted shard (load/view failed): {path} — {exc}")
return None
shard = Index(
ndim=meta["dimensions"],
metric=MetricKind.NPHD,
dtype=meta["kind_scalar"],
key_kind="uuid",
)
if view:
shard.view(str(path))
else:
shard.load(str(path))
return shard

def __repr__(self) -> str:
Expand Down
Loading
Loading