Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions lance_ray/compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,7 @@ def compact_database(
if not database:
raise ValueError("'database' must be a non-empty list of path segments.")
if not namespace_impl:
raise ValueError(
"'namespace_impl' is required when using compact_database."
)
raise ValueError("'namespace_impl' is required when using compact_database.")

from lance_namespace import ListTablesRequest

Expand Down Expand Up @@ -318,8 +316,6 @@ def compact_database(
results.append({"table_id": table_id, "metrics": metrics})
except Exception as e:
logger.exception("Compaction failed for table %s: %s", table_id, e)
raise RuntimeError(
f"Compaction failed for table {table_id}: {e}"
) from e
raise RuntimeError(f"Compaction failed for table {table_id}: {e}") from e

return results
11 changes: 10 additions & 1 deletion lance_ray/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,16 @@ def get_read_tasks(self, parallelism: int, **kwargs) -> list[ReadTask]:
)

read_task = ReadTask(
lambda fids=fragment_ids, uri=dataset_uri, version=dataset_version, storage_options=dataset_storage_options, manifest=serialized_manifest, ns_impl=namespace_impl, ns_props=namespace_properties, tbl_id=table_id, scanner_options=self._scanner_options, retry_params=self._retry_params: (
lambda fids=fragment_ids,
uri=dataset_uri,
version=dataset_version,
storage_options=dataset_storage_options,
manifest=serialized_manifest,
ns_impl=namespace_impl,
ns_props=namespace_properties,
tbl_id=table_id,
scanner_options=self._scanner_options,
retry_params=self._retry_params: (
_read_fragments_with_retry(
fids,
uri,
Expand Down
12 changes: 9 additions & 3 deletions lance_ray/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ def create_scalar_index(
existing_indices = dataset.list_indices()
existing_names = {idx["name"] for idx in existing_indices}
index_exists = name in existing_names
except Exception: # pragma: no cover - list_indices() not available in older lance versions
except (
Exception
): # pragma: no cover - list_indices() not available in older lance versions
pass
if index_exists:
raise ValueError(
Expand Down Expand Up @@ -825,7 +827,9 @@ def create_index(
dataset_obj = uri
dataset_uri = dataset_obj.uri
if not merged_storage_options:
merged_storage_options = getattr(dataset_obj, "_storage_options", None) or {}
merged_storage_options = (
getattr(dataset_obj, "_storage_options", None) or {}
)
storage_options_provider = None

try:
Expand All @@ -845,7 +849,9 @@ def create_index(
existing_indices = dataset_obj.list_indices()
existing_names = {idx["name"] for idx in existing_indices}
index_exists = name in existing_names
except Exception: # pragma: no cover - list_indices() not available in older lance versions
except (
Exception
): # pragma: no cover - list_indices() not available in older lance versions
pass
if index_exists:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [

dependencies = [
"ray[data]>=2.41.0",
"pylance>=2.0.0",
"pylance>=5.0.0b4",
"lance-namespace",
"pyarrow>=17.0.0",
"more_itertools>=2.6.0; python_version<'3.12'",
Expand Down
14 changes: 5 additions & 9 deletions tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,7 @@ def test_stream_copy_basic_local(temp_dir):
assert src.schema == dst.schema

src_df = (
ray.data.from_arrow(table)
.to_pandas()
.sort_values("id")
.reset_index(drop=True)
ray.data.from_arrow(table).to_pandas().sort_values("id").reset_index(drop=True)
)
dst_df = (
lr.read_lance(str(dst_path))
Expand All @@ -367,7 +364,9 @@ def test_stream_copy_resume_local(temp_dir):
# Legacy blob schema
schema = pa.schema(
[
pa.field("blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}),
pa.field(
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("val", pa.float64()),
Expand Down Expand Up @@ -402,10 +401,7 @@ def test_stream_copy_resume_local(temp_dir):
)

src_df = (
ray.data.from_arrow(table)
.to_pandas()
.sort_values("id")
.reset_index(drop=True)
ray.data.from_arrow(table).to_pandas().sort_values("id").reset_index(drop=True)
)
dst_df = (
lr.read_lance(str(dst_path))
Expand Down
19 changes: 13 additions & 6 deletions tests/test_distributed_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,13 @@ def test_scalar_index_on_mixed_schema_list_indices(self, temp_dir):
)

indices = updated_dataset.list_indices()
assert len(indices) >= 1, "list_indices should return at least the new scalar index"
assert len(indices) >= 1, (
"list_indices should return at least the new scalar index"
)
names = [idx["name"] for idx in indices]
assert index_name in names, f"Expected index name {index_name!r} in list_indices: {names}"
assert index_name in names, (
f"Expected index name {index_name!r} in list_indices: {names}"
)

label_index = next(idx for idx in indices if idx["name"] == index_name)
assert label_index["type"] == "BTree", (
Expand Down Expand Up @@ -1018,7 +1022,9 @@ def test_optimize_indices_success_with_uri(self, multi_fragment_lance_dataset):
assert result.count_rows() == lance.LanceDataset(dataset_uri).count_rows()

indices = result.list_indices()
assert len(indices) >= 1, "list_indices should include at least the existing index"
assert len(indices) >= 1, (
"list_indices should include at least the existing index"
)
names = [idx["name"] for idx in indices]
assert "text_idx" in names, f"Expected 'text_idx' in list_indices: {names}"

Expand All @@ -1029,9 +1035,10 @@ def test_optimize_indices_runtime_error_when_api_missing(self, temp_dir):
lr.write_lance(ray.data.from_pandas(df), str(path))
ds = lance.LanceDataset(str(path))

if getattr(ds, "optimize_indices", None) is not None or getattr(
ds, "optimize", None
) is not None:
if (
getattr(ds, "optimize_indices", None) is not None
or getattr(ds, "optimize", None) is not None
):
pytest.skip(
"This lance version exposes optimize_indices/optimize; "
"cannot test RuntimeError path."
Expand Down
30 changes: 15 additions & 15 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading