Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compat: Support dask query-planning, drop Python 3.9 #171

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ jobs:
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-312"]
"environment": ["test-310", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'full' option
if: env.MATRIX_OPTION == 'full'
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-310", "test-311", "test-312"]
"environment": ["test-310", "test-311", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'downstream' option
Expand Down
7 changes: 1 addition & 6 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ install = 'python -m pip install --no-deps --disable-pip-version-check -e .'

[activation.env]
PYTHONIOENCODING = "utf-8"
DASK_DATAFRAME__QUERY_PLANNING = "False" # TODO: Support query planning
USE_PYGEOS = '0'

[environments]
test-39 = ["py39", "test-core", "test", "test-task", "example", "test-example"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dask 2025.1 only supports Python 3.10.

test-310 = ["py310", "test-core", "test", "test-task", "example", "test-example"]
test-311 = ["py311", "test-core", "test", "test-task", "example", "test-example"]
test-312 = ["py312", "test-core", "test", "test-task", "example", "test-example"]
Expand All @@ -22,17 +20,14 @@ lint = ["py311", "lint"]

[dependencies]
numba = "*"
dask-core = "<2025.1" # TODO: Does not work with new DataFrame interface
dask-core = ">=2025.1"
fsspec = "*"
packaging = "*"
pandas = "*"
pip = "*"
pyarrow = ">=10"
retrying = "*"

[feature.py39.dependencies]
python = "3.9.*"

[feature.py310.dependencies]
python = "3.10.*"

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ classifiers = [
"Topic :: Software Development :: Libraries",
]
dependencies = [
'dask <2025.1',
'dask >=2025.1',
'fsspec >=2022.8',
'numba',
'packaging',
Expand Down
49 changes: 29 additions & 20 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,22 @@
from dask import delayed
from dask.dataframe.core import get_parallel_type
from dask.dataframe.extensions import make_array_nonempty
from dask.dataframe.partitionquantiles import partition_quantiles
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from dask.dataframe.utils import make_meta_obj, meta_nonempty
from packaging.version import Version
from retrying import retry

try:
from dask.dataframe.backends import meta_nonempty
from dask.dataframe.dispatch import make_meta_dispatch
except ImportError:
from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty

from .geodataframe import GeoDataFrame
from .geometry.base import GeometryDtype, _BaseCoordinateIndexer
from .geoseries import GeoSeries
from .spatialindex import HilbertRtree


class DaskGeoSeries(dd.Series):
def __init__(self, dsk, name, meta, divisions, *args, **kwargs):
super().__init__(dsk, name, meta, divisions)

_partition_type = GeoSeries

def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)

# Init backing properties
self._partition_bounds = None
Expand Down Expand Up @@ -105,7 +102,7 @@ def persist(self, **kwargs):
)


@make_meta_dispatch.register(GeoSeries)
@make_meta_obj.register(GeoSeries)
def make_meta_series(s, index=None):
result = s.head(0)
if index is not None:
Expand All @@ -119,13 +116,20 @@ def meta_nonempty_series(s, index=None):


@get_parallel_type.register(GeoSeries)
def get_parallel_type_dataframe(df):
def get_parallel_type_series(df):
return DaskGeoSeries


@dd.get_collection_type.register(GeoSeries)
def get_collection_type_series(df):
return DaskGeoSeries


class DaskGeoDataFrame(dd.DataFrame):
def __init__(self, dsk, name, meta, divisions):
super().__init__(dsk, name, meta, divisions)
_partition_type = GeoDataFrame

def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)
self._partition_sindex = {}
self._partition_bounds = {}

Expand Down Expand Up @@ -312,8 +316,9 @@ def move_retry(p1, p2):
ddf = self._with_hilbert_distance_column(p)

# Compute output hilbert_distance divisions
quantiles = partition_quantiles(
ddf.hilbert_distance, npartitions
from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection
quantiles = new_collection(
RepartitionQuantiles(ddf.hilbert_distance, npartitions)
).compute().values

# Add _partition column containing output partition number of each row
Expand All @@ -328,7 +333,7 @@ def move_retry(p1, p2):
for out_partition in out_partitions
]
part_output_paths = [
os.path.join(path, f"part.{int(out_partition)}.parquet")
os.path.join(path, f"part.{out_partition}.parquet")
for out_partition in out_partitions
]

Expand All @@ -338,7 +343,7 @@ def move_retry(p1, p2):
rm_retry(path)

for out_partition in out_partitions:
part_dir = os.path.join(path, f"part.{int(out_partition)}.parquet" )
part_dir = os.path.join(path, f"part.{out_partition}.parquet" )
mkdirs_retry(part_dir)
tmp_part_dir = tempdir_format.format(partition=out_partition, uuid=dataset_uuid)
mkdirs_retry(tmp_part_dir)
Expand All @@ -360,7 +365,7 @@ def process_partition(df, i):
for out_partition, df_part in df.groupby('_partition'):
part_path = os.path.join(
tempdir_format.format(partition=out_partition, uuid=dataset_uuid),
f'part{int(i)}.parquet',
f'part{i}.parquet',
)
df_part = (
df_part
Expand Down Expand Up @@ -584,7 +589,7 @@ def __getitem__(self, key):
return result


@make_meta_dispatch.register(GeoDataFrame)
@make_meta_obj.register(GeoDataFrame)
def make_meta_dataframe(df, index=None):
result = df.head(0)
if index is not None:
Expand All @@ -598,10 +603,14 @@ def meta_nonempty_dataframe(df, index=None):


@get_parallel_type.register(GeoDataFrame)
def get_parallel_type_series(s):
def get_parallel_type_dataframe(s):
return DaskGeoDataFrame


@dd.get_collection_type.register(GeoDataFrame)
def get_collection_type_dataframe(df):
return DaskGeoDataFrame

class _DaskCoordinateIndexer(_BaseCoordinateIndexer):
def __init__(self, obj, sindex):
super().__init__(sindex)
Expand Down
30 changes: 19 additions & 11 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import warnings
from collections.abc import Iterable
from functools import reduce
from glob import has_magic
Expand Down Expand Up @@ -180,17 +181,24 @@ def to_parquet_dask(
spatial_metadata = {'partition_bounds': partition_bounds}
b_spatial_metadata = json.dumps(spatial_metadata).encode('utf')

dd_to_parquet(
ddf,
path,
engine="pyarrow",
compression=compression,
storage_options=storage_options,
custom_metadata={b'spatialpandas': b_spatial_metadata},
write_metadata_file=True,
**engine_kwargs,
**kwargs,
)

with warnings.catch_warnings():
warnings.filterwarnings(
hoxbro marked this conversation as resolved.
Show resolved Hide resolved
"ignore",
category=UserWarning,
message="Dask annotations .* detected. Annotations will be ignored when using query-planning.",
)
dd_to_parquet(
ddf,
path,
engine="pyarrow",
compression=compression,
storage_options=storage_options,
custom_metadata={b'spatialpandas': b_spatial_metadata},
write_metadata_file=True,
**engine_kwargs,
**kwargs,
)


def read_parquet_dask(
Expand Down