diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 4e30210..191d7c8 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -83,7 +83,7 @@ jobs: run: | MATRIX=$(jq -nsc '{ "os": ["ubuntu-latest", "macos-latest", "windows-latest"], - "environment": ["test-39", "test-312"] + "environment": ["test-310", "test-312"] }') echo "MATRIX=$MATRIX" >> $GITHUB_ENV - name: Set test matrix with 'full' option @@ -91,7 +91,7 @@ jobs: run: | MATRIX=$(jq -nsc '{ "os": ["ubuntu-latest", "macos-latest", "windows-latest"], - "environment": ["test-39", "test-310", "test-311", "test-312"] + "environment": ["test-310", "test-311", "test-312"] }') echo "MATRIX=$MATRIX" >> $GITHUB_ENV - name: Set test matrix with 'downstream' option diff --git a/pixi.toml b/pixi.toml index 408d0c7..405a23a 100644 --- a/pixi.toml +++ b/pixi.toml @@ -8,11 +8,9 @@ install = 'python -m pip install --no-deps --disable-pip-version-check -e .' [activation.env] PYTHONIOENCODING = "utf-8" -DASK_DATAFRAME__QUERY_PLANNING = "False" # TODO: Support query planning USE_PYGEOS = '0' [environments] -test-39 = ["py39", "test-core", "test", "test-task", "example", "test-example"] test-310 = ["py310", "test-core", "test", "test-task", "example", "test-example"] test-311 = ["py311", "test-core", "test", "test-task", "example", "test-example"] test-312 = ["py312", "test-core", "test", "test-task", "example", "test-example"] @@ -22,17 +20,14 @@ lint = ["py311", "lint"] [dependencies] numba = "*" -dask-core = "<2025.1" # TODO: Does not work with new DataFrame interface +dask-core = ">=2025.1" fsspec = "*" packaging = "*" -pandas = "*" +pandas = ">=2.0" pip = "*" -pyarrow = ">=10" +pyarrow = ">=14.0.1" retrying = "*" -[feature.py39.dependencies] -python = "3.9.*" - [feature.py310.dependencies] python = "3.10.*" @@ -86,6 +81,9 @@ test-unit = 'pytest spatialpandas/tests -n logical --dist loadgroup' [feature.test-example.dependencies] nbval = "*" +[feature.test-example.activation.env] +DASK_SCHEDULER = "single-threaded" + [feature.test-example.tasks] test-example = 'pytest -n logical --dist loadscope --nbval-lax examples' diff --git a/pyproject.toml b/pyproject.toml index 3eed850..6944577 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,14 +8,13 @@ dynamic = ["version"] description = 'Pandas extension arrays for spatial/geometric operations' readme = "README.md" license = { text = "BSD-2-Clause" } -requires-python = ">=3.9" +requires-python = ">=3.10" authors = [{ name = "HoloViz developers", email = "developers@holoviz.org" }] maintainers = [{ name = "HoloViz developers", email = "developers@holoviz.org" }] classifiers = [ "License :: OSI Approved :: BSD License", "Development Status :: 5 - Production/Stable", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -27,12 +26,12 @@ classifiers = [ "Topic :: Software Development :: Libraries", ] dependencies = [ - 'dask <2025.1', + 'dask >=2025.1', 'fsspec >=2022.8', 'numba', 'packaging', - 'pandas', - 'pyarrow >=10', + 'pandas >=2.0', + 'pyarrow >=14.0.1', 'retrying', ] @@ -74,11 +73,14 @@ filterwarnings = [ "ignore:datetime.datetime.utcnow():DeprecationWarning:botocore", # https://github.com/boto/boto3/issues/3889 # 2024-11 "ignore:The legacy Dask DataFrame implementation is deprecated:FutureWarning", # https://github.com/holoviz/spatialpandas/issues/146 + # 2025-02 + "ignore:Dask annotations ..retries.. 5. detected:UserWarning", # https://github.com/dask/dask/issues/11721 ] [tool.ruff] fix = true line-length = 100 +target-version = "py39" # TODO: Remove in follow up PR [tool.ruff.lint] select = [ diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 21cf467..54de057 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -13,16 +13,10 @@ from dask import delayed from dask.dataframe.core import get_parallel_type from dask.dataframe.extensions import make_array_nonempty -from dask.dataframe.partitionquantiles import partition_quantiles +from dask.dataframe.utils import make_meta_obj, meta_nonempty from packaging.version import Version from retrying import retry -try: - from dask.dataframe.backends import meta_nonempty - from dask.dataframe.dispatch import make_meta_dispatch -except ImportError: - from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty - from .geodataframe import GeoDataFrame from .geometry.base import GeometryDtype, _BaseCoordinateIndexer from .geoseries import GeoSeries @@ -30,8 +24,11 @@ class DaskGeoSeries(dd.Series): - def __init__(self, dsk, name, meta, divisions, *args, **kwargs): - super().__init__(dsk, name, meta, divisions) + + _partition_type = GeoSeries + + def __init__(self, expr, *args, **kwargs): + super().__init__(expr, *args, **kwargs) # Init backing properties self._partition_bounds = None @@ -105,8 +102,10 @@ def persist(self, **kwargs): ) -@make_meta_dispatch.register(GeoSeries) +@make_meta_obj.register(GeoSeries) def make_meta_series(s, index=None): + if hasattr(s, "__array__") or isinstance(s, np.ndarray): + return s[:0] result = s.head(0) if index is not None: result = result.reindex(index[:0]) @@ -119,13 +118,20 @@ def meta_nonempty_series(s, index=None): @get_parallel_type.register(GeoSeries) -def get_parallel_type_dataframe(df): +def get_parallel_type_series(df): + return DaskGeoSeries + + +@dd.get_collection_type.register(GeoSeries) +def get_collection_type_series(df): return DaskGeoSeries class DaskGeoDataFrame(dd.DataFrame): - def __init__(self, dsk, name, meta, divisions): - super().__init__(dsk, name, meta, divisions) + _partition_type = GeoDataFrame + + def __init__(self, expr, *args, **kwargs): + super().__init__(expr, *args, **kwargs) self._partition_sindex = {} self._partition_bounds = {} @@ -312,8 +318,9 @@ def move_retry(p1, p2): ddf = self._with_hilbert_distance_column(p) # Compute output hilbert_distance divisions - quantiles = partition_quantiles( - ddf.hilbert_distance, npartitions + from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection + quantiles = new_collection( + RepartitionQuantiles(ddf.hilbert_distance, npartitions) ).compute().values # Add _partition column containing output partition number of each row @@ -328,7 +335,7 @@ def move_retry(p1, p2): for out_partition in out_partitions ] part_output_paths = [ - os.path.join(path, f"part.{int(out_partition)}.parquet") + os.path.join(path, f"part.{out_partition}.parquet") for out_partition in out_partitions ] @@ -338,7 +345,7 @@ def move_retry(p1, p2): rm_retry(path) for out_partition in out_partitions: - part_dir = os.path.join(path, f"part.{int(out_partition)}.parquet" ) + part_dir = os.path.join(path, f"part.{out_partition}.parquet" ) mkdirs_retry(part_dir) tmp_part_dir = tempdir_format.format(partition=out_partition, uuid=dataset_uuid) mkdirs_retry(tmp_part_dir) @@ -360,7 +367,7 @@ def process_partition(df, i): for out_partition, df_part in df.groupby('_partition'): part_path = os.path.join( tempdir_format.format(partition=out_partition, uuid=dataset_uuid), - f'part{int(i)}.parquet', + f'part{i}.parquet', ) df_part = ( df_part @@ -584,8 +591,10 @@ def __getitem__(self, key): return result -@make_meta_dispatch.register(GeoDataFrame) +@make_meta_obj.register(GeoDataFrame) def make_meta_dataframe(df, index=None): + if hasattr(df, "__array__") or isinstance(df, np.ndarray): + return df[:0] result = df.head(0) if index is not None: result = result.reindex(index[:0]) @@ -598,10 +607,14 @@ def meta_nonempty_dataframe(df, index=None): @get_parallel_type.register(GeoDataFrame) -def get_parallel_type_series(s): +def get_parallel_type_dataframe(s): return DaskGeoDataFrame +@dd.get_collection_type.register(GeoDataFrame) +def get_collection_type_dataframe(df): + return DaskGeoDataFrame + class _DaskCoordinateIndexer(_BaseCoordinateIndexer): def __init__(self, obj, sindex): super().__init__(sindex) diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index 4d6dc8a..05979af 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -17,7 +17,6 @@ to_parquet as dd_to_parquet, ) from dask.utils import natural_sort_key -from packaging.version import Version from pandas.io.parquet import to_parquet as pd_to_parquet from pyarrow.parquet import ParquetDataset, ParquetFile, read_metadata @@ -30,9 +29,6 @@ validate_coerce_filesystem, ) -# improve pandas compatibility, based on geopandas _compat.py -PANDAS_GE_12 = Version(pd.__version__) >= Version("1.2.0") - def _load_parquet_pandas_metadata( path, @@ -90,15 +86,10 @@ def to_parquet( "compression": compression, "filesystem": filesystem, "index": index, + "storage_options": storage_options, **kwargs, } - if PANDAS_GE_12: - to_parquet_args.update({"storage_options": storage_options}) - elif filesystem is None: - filesystem = validate_coerce_filesystem(path, filesystem, storage_options) - to_parquet_args.update({"filesystem": filesystem}) - pd_to_parquet(**to_parquet_args)