From 81a1d88706bbe60444402a09d723e64e3737319b Mon Sep 17 00:00:00 2001 From: Hazem Elmeleegy Date: Mon, 14 Oct 2024 14:47:46 -0700 Subject: [PATCH] SNOW-1727330, SNOW-1727332, SNOW-1727334, SNOW-1727335: Add support for DataFrameGroupBy/SeriesGroupBy.bfill/ffill --- CHANGELOG.md | 1 + .../modin/supported/groupby_supported.rst | 6 +- .../modin/plugin/docstrings/groupby.py | 171 ++++++++++++- .../plugin/extensions/groupby_overrides.py | 40 ++- .../modin/groupby/test_groupby_bfill_ffill.py | 237 ++++++++++++++++++ tests/unit/modin/test_groupby_unsupported.py | 4 - 6 files changed, 449 insertions(+), 10 deletions(-) create mode 100644 tests/integ/modin/groupby/test_groupby_bfill_ffill.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 270d14ddc8e..e23ae1019cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ - Added support for applying Snowpark Python functions (e.g., `sin`) in `Series.map`, `Series.apply`, `DataFrame.apply` and `DataFrame.applymap`. - Added support for `np.subtract`, `np.multiply`, `np.divide`, and `np.true_divide`. - Added support for tracking usages of `__array_ufunc__`. +- Added support for `DataFrameGroupBy.bfill`, `SeriesGroupBy.bfill`, `DataFrameGroupBy.ffill`, and `SeriesGroupBy.ffill`. #### Improvements diff --git a/docs/source/modin/supported/groupby_supported.rst b/docs/source/modin/supported/groupby_supported.rst index 66e768ba671..61c026fa17d 100644 --- a/docs/source/modin/supported/groupby_supported.rst +++ b/docs/source/modin/supported/groupby_supported.rst @@ -80,7 +80,8 @@ Computations/descriptive stats +-----------------------------+---------------------------------+----------------------------------------------------+ | ``any`` | P | ``N`` for non-integer/boolean types | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``bfill`` | N | | +| ``bfill`` | P | When GroupBy axis is 1,``N``; | +| | | GroupBy axis = 0 is fully supported. | +-----------------------------+---------------------------------+----------------------------------------------------+ | ``corr`` | N | | +-----------------------------+---------------------------------+----------------------------------------------------+ @@ -104,7 +105,8 @@ Computations/descriptive stats +-----------------------------+---------------------------------+----------------------------------------------------+ | ``diff`` | N | | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``ffill`` | N | | +| ``ffill`` | P | When GroupBy axis is 1,``N``; | +| | | GroupBy axis = 0 is fully supported. | +-----------------------------+---------------------------------+----------------------------------------------------+ | ``fillna`` | P | GroupBy axis = 0 is supported. | | | | Does not support ``downcast`` parameter | diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py b/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py index 09a89010178..7e4b3bb4d97 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py @@ -197,7 +197,97 @@ def skew(): pass def ffill(): - pass + """ + Forward fill the values. + + Parameters + ---------- + limit : int, optional + Limit of how many values to fill. + + Returns + ------- + Series or DataFrame + Object with missing values filled. + + See also + -------- + Series.ffill + Returns Series with minimum number of char in object. + DataFrame.ffill + Object with missing values filled or None if inplace=True. + Series.fillna + Fill NaN values of a Series. + DataFrame.fillna + Fill NaN values of a DataFrame. + + Examples + -------- + For SeriesGroupBy: + + >>> key = [0, 0, 1, 1] + >>> ser = pd.Series([np.nan, 2, 3, np.nan], index=key) + >>> ser + 0 NaN + 0 2.0 + 1 3.0 + 1 NaN + dtype: float64 + >>> ser.groupby(level=0).ffill() + 0 NaN + 0 2.0 + 1 3.0 + 1 3.0 + dtype: float64 + + For DataFrameGroupBy: + + >>> df = pd.DataFrame( + ... { + ... "key": [0, 0, 1, 1, 1], + ... "A": [np.nan, 2, np.nan, 3, np.nan], + ... "B": [2, 3, np.nan, np.nan, np.nan], + ... "C": [np.nan, np.nan, 2, np.nan, np.nan], + ... } + ... ) + >>> df + key A B C + 0 0 NaN 2.0 NaN + 1 0 2.0 3.0 NaN + 2 1 NaN NaN 2.0 + 3 1 3.0 NaN NaN + 4 1 NaN NaN NaN + + Propagate non-null values forward or backward within each group along columns. + + >>> df.groupby("key").ffill() + A B C + 0 NaN 2.0 NaN + 1 2.0 3.0 NaN + 2 NaN NaN 2.0 + 3 3.0 NaN 2.0 + 4 3.0 NaN 2.0 + + Propagate non-null values forward or backward within each group along rows. + + >>> df.T.groupby(np.array([0, 0, 1, 1])).ffill().T + key A B C + 0 0.0 0.0 2.0 2.0 + 1 0.0 2.0 3.0 3.0 + 2 1.0 1.0 NaN 2.0 + 3 1.0 3.0 NaN NaN + 4 1.0 1.0 NaN NaN + + Only replace the first NaN element within a group along rows. + + >>> df.groupby("key").ffill(limit=1) + A B C + 0 NaN 2.0 NaN + 1 2.0 3.0 NaN + 2 NaN NaN 2.0 + 3 3.0 NaN 2.0 + 4 3.0 NaN NaN + """ def sem(): pass @@ -1086,7 +1176,84 @@ def cummin(): """ def bfill(): - pass + """ + Backward fill the values. + + Parameters + ---------- + limit : int, optional + Limit of how many values to fill. + + Returns + ------- + Series or DataFrame + Object with missing values filled. + + See also + ------- + Series.bfill + Backward fill the missing values in the dataset. + DataFrame.bfill + Backward fill the missing values in the dataset. + Series.fillna + Fill NaN values of a Series. + DataFrame.fillna + Fill NaN values of a DataFrame. + + Examples + -------- + With Series: + + >>> index = ['Falcon', 'Falcon', 'Parrot', 'Parrot', 'Parrot'] + >>> s = pd.Series([None, 1, None, None, 3], index=index) + >>> s + Falcon NaN + Falcon 1.0 + Parrot NaN + Parrot NaN + Parrot 3.0 + dtype: float64 + >>> s.groupby(level=0).bfill() + Falcon 1.0 + Falcon 1.0 + Parrot 3.0 + Parrot 3.0 + Parrot 3.0 + dtype: float64 + >>> s.groupby(level=0).bfill(limit=1) + Falcon 1.0 + Falcon 1.0 + Parrot NaN + Parrot 3.0 + Parrot 3.0 + dtype: float64 + + With DataFrame: + + >>> df = pd.DataFrame({'A': [1, None, None, None, 4], + ... 'B': [None, None, 5, None, 7]}, index=index) + >>> df + A B + Falcon 1.0 NaN + Falcon NaN NaN + Parrot NaN 5.0 + Parrot NaN NaN + Parrot 4.0 7.0 + >>> df.groupby(level=0).bfill() + A B + Falcon 1.0 NaN + Falcon NaN NaN + Parrot 4.0 5.0 + Parrot 4.0 7.0 + Parrot 4.0 7.0 + >>> df.groupby(level=0).bfill(limit=1) + A B + Falcon 1.0 NaN + Falcon NaN NaN + Parrot NaN 5.0 + Parrot 4.0 7.0 + Parrot 4.0 7.0 + """ def prod(): pass diff --git a/src/snowflake/snowpark/modin/plugin/extensions/groupby_overrides.py b/src/snowflake/snowpark/modin/plugin/extensions/groupby_overrides.py index afcc86f67fc..6d3728265da 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/groupby_overrides.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/groupby_overrides.py @@ -434,7 +434,25 @@ def any(self, skipna: bool = True): ) def bfill(self, limit=None): - ErrorMessage.method_not_implemented_error(name="bfill", class_="GroupBy") + is_series_groupby = self.ndim == 1 + + # TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.DataFrameGroupBy functions + query_compiler = self._query_compiler.groupby_fillna( + self._by, + self._axis, + self._kwargs, + value=None, + method="bfill", + fill_axis=None, + inplace=False, + limit=limit, + downcast=None, + ) + return ( + pd.Series(query_compiler=query_compiler) + if is_series_groupby + else pd.DataFrame(query_compiler=query_compiler) + ) def corr(self, **kwargs): # TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.DataFrameGroupBy functions @@ -507,7 +525,25 @@ def diff(self): ErrorMessage.method_not_implemented_error(name="diff", class_="GroupBy") def ffill(self, limit=None): - ErrorMessage.method_not_implemented_error(name="ffill", class_="GroupBy") + is_series_groupby = self.ndim == 1 + + # TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.DataFrameGroupBy functions + query_compiler = self._query_compiler.groupby_fillna( + self._by, + self._axis, + self._kwargs, + value=None, + method="ffill", + fill_axis=None, + inplace=False, + limit=limit, + downcast=None, + ) + return ( + pd.Series(query_compiler=query_compiler) + if is_series_groupby + else pd.DataFrame(query_compiler=query_compiler) + ) def fillna( self, diff --git a/tests/integ/modin/groupby/test_groupby_bfill_ffill.py b/tests/integ/modin/groupby/test_groupby_bfill_ffill.py new file mode 100644 index 00000000000..1593cd0b25a --- /dev/null +++ b/tests/integ/modin/groupby/test_groupby_bfill_ffill.py @@ -0,0 +1,237 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import ( + _GROUPBY_UNSUPPORTED_GROUPING_MESSAGE, +) +from tests.integ.modin.utils import eval_snowpark_pandas_result +from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker + +TEST_DF_DATA = { + "A": [None, 99, None, None, None, 98, 98, 98, None, 97], + "B": [88, None, None, None, 87, 88, 89, None, 86, None], + "C": [None, None, 1.99, 1.98, 1.97, 1.96, None, None, None, None], +} + +TEST_DF_INDEX_1 = native_pd.Index([0, 0, 0, 1, 1, 1, 1, 1, 2, 3], name="I") +TEST_DF_COLUMNS_1 = native_pd.Index(["A", "B", "C"], name="X") + +TEST_DF_DATA_2 = [ + [2, None, None, 99], + [2, 10, None, 98], + [2, None, 1.1, None], + [2, 15, None, 97], + [2, None, 1.1, None], + [1, None, 2.2, None], + [1, None, None, 96], + [1, None, 2.3], + [1, 20, 3.3, 95], + [2, None, None, 94], + [2, 30, None, None], + [2, None, 300, None], +] + +TEST_DF_INDEX_2 = pd.MultiIndex.from_tuples( + [ + (1, "a"), + (1, "a"), + (1, "a"), + (1, "a"), + (1, "a"), + (1, "b"), + (1, "b"), + (0, "a"), + (0, "a"), + (0, "a"), + (0, "a"), + (0, "a"), + ], + names=["I", "J"], +) +TEST_DF_COLUMNS_2 = pd.MultiIndex.from_tuples( + [(5, "A"), (5, "B"), (6, "C"), (6, "D")], names=["X", "Y"] +) + +TEST_DF_DATA_3 = ( + [[None, 100 + 10 * i, 200 + 10 * i] for i in range(6)] + + [[300 + 10 * i, None, 500 + 10 * i] for i in range(6)] + + [[400 + 10 * i, 600 + 10 * i, None] for i in range(6)] +) + +TEST_DF_INDEX_3 = native_pd.Index([50] * len(TEST_DF_DATA_3), name="I") +TEST_DF_COLUMNS_3 = native_pd.Index(["A", "B", "C"], name="X") + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("groupby_list", ["X", "key", ["X", "key"]]) +@pytest.mark.parametrize("limit", [None, 1, 3, 5, 10]) +@sql_count_checker(query_count=1) +def test_groupby_bfill_ffill_basic(method, groupby_list, limit): + native_df = native_pd.DataFrame( + { + "key": [0, 0, 1, 1, 1], + "A": [np.nan, 2, np.nan, 3, np.nan], + "B": [2, 3, np.nan, np.nan, np.nan], + "C": [np.nan, np.nan, 2, np.nan, np.nan], + }, + index=native_pd.Index(["A", "B", "C", "D", "E"], name="X"), + ) + snow_df = pd.DataFrame(native_df) + + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(groupby_list), method)(limit=limit), + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("by_list", ["I", 0, "A"]) +@sql_count_checker(query_count=1) +def test_groupby_bfill_ffill_single_index(method, by_list): + native_df = native_pd.DataFrame( + TEST_DF_DATA, index=TEST_DF_INDEX_1, columns=TEST_DF_COLUMNS_1 + ) + snow_df = pd.DataFrame(native_df) + + if isinstance(by_list, int): + level = by_list + by_list = None + else: + level = None + + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(by_list, level=level), method)(limit=None), + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("by_list", ["I", 0]) +@pytest.mark.parametrize("limit", [None, 1, 3, 5, 10]) +@sql_count_checker(query_count=1) +def test_groupby_bfill_ffill_with_limit(method, by_list, limit): + native_df = native_pd.DataFrame( + TEST_DF_DATA_3, index=TEST_DF_INDEX_3, columns=TEST_DF_COLUMNS_3 + ) + snow_df = pd.DataFrame(native_df) + + if isinstance(by_list, int): + level = by_list + by_list = None + else: + level = None + + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(by_list, level=level), method)(limit=limit), + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("by", ["I", ["I", "J"]]) +@pytest.mark.parametrize("limit", [None, 1, 3, 5, 10]) +@sql_count_checker(query_count=1) +def test_groupby_bfill_ffill_multiindex(method, by, limit): + native_df = native_pd.DataFrame( + TEST_DF_DATA_2, index=TEST_DF_INDEX_2, columns=TEST_DF_COLUMNS_2 + ) + snow_df = pd.DataFrame(native_df) + + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(by=by, level=None, axis=0), method)(limit=limit), + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("level", [0, 1, [0, 1]]) +@pytest.mark.parametrize("limit", [None, 1, 3, 5, 10]) +@sql_count_checker(query_count=1) +def test_groupby_bfill_ffill_multiindex_with_level(method, level, limit): + native_df = native_pd.DataFrame( + TEST_DF_DATA_2, index=TEST_DF_INDEX_2, columns=TEST_DF_COLUMNS_2 + ) + snow_df = pd.DataFrame(native_df) + + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(by=None, level=level, axis=0), method)( + limit=limit + ), + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize( + "by_info", [(["I", "A"], 1), (["A"], 0), (["A", "B"], 1), (10, 0)] +) +def test_groupby_bfill_ffill_multiindex_negative(method, by_info): + by_list, expected_query_count = by_info + native_df = native_pd.DataFrame( + TEST_DF_DATA_2, index=TEST_DF_INDEX_2, columns=TEST_DF_COLUMNS_2 + ) + snow_df = pd.DataFrame(native_df) + + if isinstance(by_list, int): + level = by_list + by_list = None + else: + level = None + + with SqlCounter(query_count=expected_query_count): + eval_snowpark_pandas_result( + snow_df, + native_df, + lambda df: getattr(df.groupby(by_list, level=level, axis=0), method)( + limit=None + ), + expect_exception=True, + expect_exception_type=IndexError if level is not None else KeyError, + ) + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@sql_count_checker(query_count=0) +def test_groupby_bfill_ffill_unsupported_grouping_negative(method): + native_df = native_pd.DataFrame( + TEST_DF_DATA, index=TEST_DF_INDEX_1, columns=TEST_DF_COLUMNS_1 + ) + snow_df = pd.DataFrame(native_df) + + with pytest.raises( + NotImplementedError, + match=f"{_GROUPBY_UNSUPPORTED_GROUPING_MESSAGE}", + ): + # call to_pandas to trigger the evaluation of the operation + getattr(snow_df.groupby("I", axis=1), method)(limit=None).to_pandas() + + +@pytest.mark.parametrize("method", ["bfill", "ffill"]) +@pytest.mark.parametrize("level", [0, 1, [0, 1]]) +@pytest.mark.parametrize("limit", [None, 1, 3, 5, 10]) +@sql_count_checker(query_count=1) +def test_groupby_series_bfill_ffill(method, level, limit): + native_df = native_pd.DataFrame( + TEST_DF_DATA_2, index=TEST_DF_INDEX_2, columns=TEST_DF_COLUMNS_2 + ) + native_ser = native_df[(5, "A")] + snow_ser = pd.Series(native_ser) + + eval_snowpark_pandas_result( + snow_ser, + native_ser, + lambda ser: getattr(ser.groupby(by=None, level=level, axis=0), method)( + limit=limit + ), + ) diff --git a/tests/unit/modin/test_groupby_unsupported.py b/tests/unit/modin/test_groupby_unsupported.py index e5c777a8178..2f11429e35e 100644 --- a/tests/unit/modin/test_groupby_unsupported.py +++ b/tests/unit/modin/test_groupby_unsupported.py @@ -14,13 +14,11 @@ (lambda se: se.groupby("A").dtypes, "dtypes"), (lambda se: se.groupby("A").pipe(lambda x: x.max() - x.min()), "pipe"), (lambda se: se.groupby("A").filter(lambda x: x.mean() > 3), "filter"), - (lambda se: se.groupby("A").bfill(limit=1), "bfill"), (lambda se: se.groupby("A").corr(), "corr"), (lambda se: se.groupby("A").cov(), "cov"), (lambda se: se.groupby("A").cumprod(), "cumprod"), (lambda se: se.groupby("A").describe(), "describe"), (lambda se: se.groupby("A").diff(), "diff"), - (lambda se: se.groupby("A").ffill(), "ffill"), (lambda se: se.groupby("A").is_monotonic_increasing, "is_monotonic_increasing"), (lambda se: se.groupby("A").is_monotonic_decreasing, "is_monotonic_decreasing"), (lambda se: se.groupby("A").ngroup(), "ngroup"), @@ -61,13 +59,11 @@ def test_series_groupby_unsupported_methods_raises( (lambda df: df.groupby("A").dtypes, "dtypes"), (lambda df: df.groupby("A").pipe(lambda x: x.max() - x.min()), "pipe"), (lambda df: df.groupby("A").filter(lambda x: x.mean() > 3), "filter"), - (lambda df: df.groupby("A").bfill(limit=1), "bfill"), (lambda df: df.groupby("A").corr(), "corr"), (lambda df: df.groupby("A").cov(), "cov"), (lambda df: df.groupby("A").cumprod(), "cumprod"), (lambda df: df.groupby("A").describe(), "describe"), (lambda df: df.groupby("A").diff(), "diff"), - (lambda df: df.groupby("A").ffill(), "ffill"), (lambda df: df.groupby("A").ngroup(), "ngroup"), (lambda df: df.groupby("A").nth(5), "nth"), (lambda df: df.groupby("A").ohlc(), "ohlc"),