Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compat: Support dask query-planning, drop Python 3.9 #171

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ jobs:
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-312"]
"environment": ["test-310", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'full' option
if: env.MATRIX_OPTION == 'full'
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-310", "test-311", "test-312"]
"environment": ["test-310", "test-311", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'downstream' option
Expand Down
10 changes: 4 additions & 6 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ install = 'python -m pip install --no-deps --disable-pip-version-check -e .'

[activation.env]
PYTHONIOENCODING = "utf-8"
DASK_DATAFRAME__QUERY_PLANNING = "False" # TODO: Support query planning
USE_PYGEOS = '0'

[environments]
test-39 = ["py39", "test-core", "test", "test-task", "example", "test-example"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dask 2025.1 only supports Python 3.10.

test-310 = ["py310", "test-core", "test", "test-task", "example", "test-example"]
test-311 = ["py311", "test-core", "test", "test-task", "example", "test-example"]
test-312 = ["py312", "test-core", "test", "test-task", "example", "test-example"]
Expand All @@ -22,17 +20,14 @@ lint = ["py311", "lint"]

[dependencies]
numba = "*"
dask-core = "<2025.1" # TODO: Does not work with new DataFrame interface
dask-core = ">=2025.1"
fsspec = "*"
packaging = "*"
pandas = "*"
pip = "*"
pyarrow = ">=10"
retrying = "*"

[feature.py39.dependencies]
python = "3.9.*"

[feature.py310.dependencies]
python = "3.10.*"

Expand Down Expand Up @@ -86,6 +81,9 @@ test-unit = 'pytest spatialpandas/tests -n logical --dist loadgroup'
[feature.test-example.dependencies]
nbval = "*"

[feature.test-example.activation.env]
DASK_SCHEDULER = "single-threaded"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to try to avoid this kind of error (which also happens on main)

image


[feature.test-example.tasks]
test-example = 'pytest -n logical --dist loadscope --nbval-lax examples'

Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ dynamic = ["version"]
description = 'Pandas extension arrays for spatial/geometric operations'
readme = "README.md"
license = { text = "BSD-2-Clause" }
requires-python = ">=3.9"
requires-python = ">=3.10"
authors = [{ name = "HoloViz developers", email = "developers@holoviz.org" }]
maintainers = [{ name = "HoloViz developers", email = "developers@holoviz.org" }]
classifiers = [
"License :: OSI Approved :: BSD License",
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand All @@ -27,7 +26,7 @@ classifiers = [
"Topic :: Software Development :: Libraries",
]
dependencies = [
'dask <2025.1',
'dask >=2025.1',
'fsspec >=2022.8',
'numba',
'packaging',
Expand Down Expand Up @@ -79,6 +78,7 @@ filterwarnings = [
[tool.ruff]
fix = true
line-length = 100
target-version = "py39" # TODO: Remove in follow up PR

[tool.ruff.lint]
select = [
Expand Down
53 changes: 33 additions & 20 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,22 @@
from dask import delayed
from dask.dataframe.core import get_parallel_type
from dask.dataframe.extensions import make_array_nonempty
from dask.dataframe.partitionquantiles import partition_quantiles
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from dask.dataframe.utils import make_meta_obj, meta_nonempty
from packaging.version import Version
from retrying import retry

try:
from dask.dataframe.backends import meta_nonempty
from dask.dataframe.dispatch import make_meta_dispatch
except ImportError:
from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty

from .geodataframe import GeoDataFrame
from .geometry.base import GeometryDtype, _BaseCoordinateIndexer
from .geoseries import GeoSeries
from .spatialindex import HilbertRtree


class DaskGeoSeries(dd.Series):
def __init__(self, dsk, name, meta, divisions, *args, **kwargs):
super().__init__(dsk, name, meta, divisions)

_partition_type = GeoSeries

def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)

# Init backing properties
self._partition_bounds = None
Expand Down Expand Up @@ -105,8 +102,10 @@ def persist(self, **kwargs):
)


@make_meta_dispatch.register(GeoSeries)
@make_meta_obj.register(GeoSeries)
def make_meta_series(s, index=None):
if hasattr(s, "__array__") or isinstance(s, np.ndarray):
return s[:0]
result = s.head(0)
if index is not None:
result = result.reindex(index[:0])
Expand All @@ -119,13 +118,20 @@ def meta_nonempty_series(s, index=None):


@get_parallel_type.register(GeoSeries)
def get_parallel_type_dataframe(df):
def get_parallel_type_series(df):
return DaskGeoSeries


@dd.get_collection_type.register(GeoSeries)
def get_collection_type_series(df):
return DaskGeoSeries


class DaskGeoDataFrame(dd.DataFrame):
def __init__(self, dsk, name, meta, divisions):
super().__init__(dsk, name, meta, divisions)
_partition_type = GeoDataFrame

def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)
self._partition_sindex = {}
self._partition_bounds = {}

Expand Down Expand Up @@ -312,8 +318,9 @@ def move_retry(p1, p2):
ddf = self._with_hilbert_distance_column(p)

# Compute output hilbert_distance divisions
quantiles = partition_quantiles(
ddf.hilbert_distance, npartitions
from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection
quantiles = new_collection(
RepartitionQuantiles(ddf.hilbert_distance, npartitions)
).compute().values

# Add _partition column containing output partition number of each row
Expand All @@ -328,7 +335,7 @@ def move_retry(p1, p2):
for out_partition in out_partitions
]
part_output_paths = [
os.path.join(path, f"part.{int(out_partition)}.parquet")
os.path.join(path, f"part.{out_partition}.parquet")
for out_partition in out_partitions
]

Expand All @@ -338,7 +345,7 @@ def move_retry(p1, p2):
rm_retry(path)

for out_partition in out_partitions:
part_dir = os.path.join(path, f"part.{int(out_partition)}.parquet" )
part_dir = os.path.join(path, f"part.{out_partition}.parquet" )
mkdirs_retry(part_dir)
tmp_part_dir = tempdir_format.format(partition=out_partition, uuid=dataset_uuid)
mkdirs_retry(tmp_part_dir)
Expand All @@ -360,7 +367,7 @@ def process_partition(df, i):
for out_partition, df_part in df.groupby('_partition'):
part_path = os.path.join(
tempdir_format.format(partition=out_partition, uuid=dataset_uuid),
f'part{int(i)}.parquet',
f'part{i}.parquet',
)
df_part = (
df_part
Expand Down Expand Up @@ -584,8 +591,10 @@ def __getitem__(self, key):
return result


@make_meta_dispatch.register(GeoDataFrame)
@make_meta_obj.register(GeoDataFrame)
def make_meta_dataframe(df, index=None):
if hasattr(df, "__array__") or isinstance(df, np.ndarray):
return df[:0]
result = df.head(0)
if index is not None:
result = result.reindex(index[:0])
Expand All @@ -598,10 +607,14 @@ def meta_nonempty_dataframe(df, index=None):


@get_parallel_type.register(GeoDataFrame)
def get_parallel_type_series(s):
def get_parallel_type_dataframe(s):
return DaskGeoDataFrame


@dd.get_collection_type.register(GeoDataFrame)
def get_collection_type_dataframe(df):
return DaskGeoDataFrame

class _DaskCoordinateIndexer(_BaseCoordinateIndexer):
def __init__(self, obj, sindex):
super().__init__(sindex)
Expand Down
30 changes: 19 additions & 11 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import warnings
from collections.abc import Iterable
from functools import reduce
from glob import has_magic
Expand Down Expand Up @@ -180,17 +181,24 @@ def to_parquet_dask(
spatial_metadata = {'partition_bounds': partition_bounds}
b_spatial_metadata = json.dumps(spatial_metadata).encode('utf')

dd_to_parquet(
ddf,
path,
engine="pyarrow",
compression=compression,
storage_options=storage_options,
custom_metadata={b'spatialpandas': b_spatial_metadata},
write_metadata_file=True,
**engine_kwargs,
**kwargs,
)

with warnings.catch_warnings():
warnings.filterwarnings(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens for s3 parquet tests with following warning:

UserWarning: Dask annotations {'retries': 5} detected. Annotations will be ignored when using query-planning.

Full output
❯ pytest "spatialpandas/tests/test_parquet_s3.py::test_read_parquet_dask_remote[glob_parquet]"
/home/shh/.local/conda/envs/holoviz/lib/python3.12/site-packages/pytest_asyncio/plugin.py:207: PytestDeprecationWarning: The configuration option "asyncio_default_fixture_loop_scope" is unset.
The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session"

  warnings.warn(PytestDeprecationWarning(_DEFAULT_FIXTURE_LOOP_SCOPE_UNSET))
===================================================================================================================== test session starts =====================================================================================================================
platform linux -- Python 3.12.8, pytest-8.3.4, pluggy-1.5.0
Using --randomly-seed=2711051231
rootdir: /home/shh/projects/holoviz/repos/spatialpandas
configfile: pyproject.toml
plugins: hypothesis-6.124.7, anyio-4.8.0, xdist-3.6.1, rerunfailures-15.0, randomly-3.15.0, asyncio-0.25.3, base-url-2.1.0, nbval-0.11.0, playwright-0.6.2
asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None
collected 1 item

spatialpandas/tests/test_parquet_s3.py E                                                                                                                                                                                                                [100%]

=========================================================================================================================== ERRORS ============================================================================================================================
________________________________________________________________________________________________ ERROR at setup of test_read_parquet_dask_remote[glob_parquet] ________________________________________________________________________________________________

s3_fixture = ('s3://test_bucket', {'anon': False, 'endpoint_url': 'http://127.0.0.1:5555/'}), sdf =                point
0  Point([0.0, 1.0])
1  Point([2.0, 3.0])
2  Point([4.0, 5.0])
3  Point([6.0, 7.0])

    @pytest.fixture(scope="module")
    def s3_parquet_dask(s3_fixture, sdf):
        path, s3so = s3_fixture
        path = f"{path}/test_dask"
        ddf = dd.from_pandas(sdf, npartitions=2)
>       to_parquet_dask(ddf, path, storage_options=s3so)

spatialpandas/tests/test_parquet_s3.py:69:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
spatialpandas/io/parquet.py:189: in to_parquet_dask
    dd_to_parquet(
/home/shh/.local/conda/envs/holoviz/lib/python3.12/site-packages/dask/dataframe/dask_expr/io/parquet.py:634: in to_parquet
    out = new_collection(
/home/shh/.local/conda/envs/holoviz/lib/python3.12/site-packages/dask/dataframe/dask_expr/_collection.py:4816: in new_collection
    return get_collection_type(meta)(expr)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[RecursionError('maximum recursion depth exceeded') raised in repr()] Scalar object at 0x7db63aa19130>
expr = ToParquet(frame=RenameFrame(frame=ResetIndex(frame=df), columns={'index': '__null_dask_index__'}), path='test_bucket/t...'schema': point: fixed_size_binary[8]
__null_dask_index__: int64, 'index_cols': ['__null_dask_index__']}, append=False)

    def __init__(self, expr):
        global _WARN_ANNOTATIONS
        if _WARN_ANNOTATIONS and (annot := get_annotations()):
            _WARN_ANNOTATIONS = False
>           warnings.warn(
                f"Dask annotations {annot} detected. Annotations will be ignored when using query-planning."
            )
E           UserWarning: Dask annotations {'retries': 5} detected. Annotations will be ignored when using query-planning.

/home/shh/.local/conda/envs/holoviz/lib/python3.12/site-packages/dask/dataframe/dask_expr/_collection.py:308: UserWarning
-------------------------------------------------------------------------------------------------------------------- Captured stdout setup --------------------------------------------------------------------------------------------------------------------
Starting a new Thread with MotoServer running on 127.0.0.1:5555...
--------------------------------------------------------------------------------------------------------------------- Captured log setup ----------------------------------------------------------------------------------------------------------------------
INFO     aiobotocore.credentials:credentials.py:567 Found credentials in environment variables.
INFO     werkzeug:_internal.py:97 127.0.0.1 - - [31/Jan/2025 10:43:08] "GET /test_bucket?list-type=2&max-keys=1&encoding-type=url HTTP/1.1" 404 -
INFO     werkzeug:_internal.py:97 127.0.0.1 - - [31/Jan/2025 10:43:08] "GET /test_bucket?location HTTP/1.1" 404 -
INFO     werkzeug:_internal.py:97 127.0.0.1 - - [31/Jan/2025 10:43:08] "PUT /test_bucket HTTP/1.1" 200 -
INFO     werkzeug:_internal.py:97 127.0.0.1 - - [31/Jan/2025 10:43:08] "GET /test_bucket?list-type=2&max-keys=1&encoding-type=url HTTP/1.1" 200 -
INFO     werkzeug:_internal.py:97 127.0.0.1 - - [31/Jan/2025 10:43:08] "GET /test_bucket?list-type=2&max-keys=1&encoding-type=url HTTP/1.1" 200 -
=================================================================================================================== short test summary info ===================================================================================================================
ERROR spatialpandas/tests/test_parquet_s3.py::test_read_parquet_dask_remote[glob_parquet] - UserWarning: Dask annotations {'retries': 5} detected. Annotations will be ignored when using query-planning.
====================================================================================================================== 1 error in 1.91s =======================================================================================================================

"ignore",
category=UserWarning,
message="Dask annotations .* detected. Annotations will be ignored when using query-planning.",
)
dd_to_parquet(
ddf,
path,
engine="pyarrow",
compression=compression,
storage_options=storage_options,
custom_metadata={b'spatialpandas': b_spatial_metadata},
write_metadata_file=True,
**engine_kwargs,
**kwargs,
)


def read_parquet_dask(
Expand Down