Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Potential dask-expr compatitbility errors #434

@wilsonbb

Description

@wilsonbb

We're seeing some consistent unit test failures with dask-expr 1.0.12. See the following example

tests/tape_tests/test_ensemble.py:1944: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:417: in __repr__
    data = self._repr_data().to_string(max_rows=5)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:3993: in _repr_data
    index = self._repr_divisions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:[257](https://github.com/lincc-frameworks/tape/actions/runs/8821291865/job/24216829647#step:5:258)2: in _repr_divisions
    name = f"npartitions={self.npartitions}"
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:344: in npartitions
    return self.expr.npartitions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:647: in _divisions
    return _get_divisions_map_partitions(
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in _get_divisions_map_partitions
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in <genexpr>
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:528: in _divisions
    if not self._broadcast_dep(arg):
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:519: in _broadcast_dep
    return dep.npartitions == 1 and dep.ndim < self.ndim
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:901: in _divisions
    if self.need_to_shuffle:
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:920: in need_to_shuffle
    if any(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <set_iterator object at 0x7f43fc3c36c0>

    if any(
>       set(self._by_columns) >= set(cols)
        for cols in self.frame.unique_partition_mapping_columns_from_shuffle
    ):
E   TypeError: 'NoneType' object is not iterable

We're also seeing more intermittent failures on earlier versions, but it is less clear if this is tied to specific versions of dask-expr or if dask-expr is involved. Example


________________________ test_batch_by_band[bounds-on1] ________________________

parquet_ensemble = <tape.ensemble.Ensemble object at 0x7fbac85f8250>
func_label = 'bounds', on = ['ps1_objid', 'filterName']

    @pytest.mark.parametrize("on", [None, ["ps1_objid", "filterName"], ["filterName", "ps1_objid"]])
    @pytest.mark.parametrize("func_label", ["mean", "bounds"])
    def test_batch_by_band(parquet_ensemble, func_label, on):
        """
        Test that ensemble.batch(by_band=True) works as intended.
        """
    
        if func_label == "mean":
    
            def my_mean(flux):
                """returns a single value"""
                return np.mean(flux)
    
            res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=True)
    
            parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
            filter_res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=False)
    
            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)
    
            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["result_g", "result_r"]])
    
            # These should be equivalent
            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")
            assert (
                res.loc[8847[293](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:294)5274829959]["result_g"]
                .compute()
                .equals(filter_res.loc[88472935274829959]["result"].compute())
            )
    
        elif func_label == "bounds":
    
            def my_bounds(flux):
                """returns a series"""
                return pd.Series({"min": np.min(flux), "max": np.max(flux)})
    
            res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=True, meta={"min": float, "max": float}
            )
    
            parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
            filter_res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=False, meta={"min": float, "max": float}
            )
    
            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)
    
            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["max_g", "max_r", "min_g", "min_r"]])
    
            # These should be equivalent
    
            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")
    
>           assert (
                res.loc[884729352748[299](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:300)59]["max_g"]
                .compute()
                .equals(filter_res.loc[88472935274829959]["max"].compute())
            )
E           AssertionError: assert False
E            +  where False = <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32>(ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32)
E            +    where <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32> = ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32.equals
E            +      where ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']>()
E            +        where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g'].compute
E            +    and   ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']>()
E            +      where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=A...ns(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max'].compute

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions