Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-1748174]: Add support for size in groupby.agg #2474

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- Added support for `on` parameter with `Resampler`.
- Added support for timedelta inputs in `value_counts()`.
- Added support for applying Snowpark Python function `snowflake_cortex_summarize`.
- Added support for `size` in `GroupBy.aggregate`.

#### Improvements

Expand Down
5 changes: 3 additions & 2 deletions docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ Methods
| | | Categoricals are not implemented | label or Series from the current DataFrame; |
| | | yet | otherwise ``N``; |
| | | | Note that supported functions are agg, count, |
| | | | cumcount, cummax, cummin, cumsum, max, mean, |
| | | | median, min, quantile, shift, std, sum, and var. |
| | | | cumcount, cummax, cummin, cumsum, first, last, |
sfc-gh-helmeleegy marked this conversation as resolved.
Show resolved Hide resolved
| | | | max, mean, median, min, quantile, shift, size, |
| | | | std, sum, and var. |
| | | | Otherwise ``N`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``gt`` | P | ``level`` | |
Expand Down
2 changes: 1 addition & 1 deletion docs/source/modin/supported/groupby_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Function application
| GroupBy method | Snowpark implemented? (Y/N/P/D) | Missing parameters | Notes for current implementation |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``agg`` | P | ``axis`` other than 0 is not | ``Y``, support functions are count, mean, min, max,|
| | | implemented. | sum, median, std, and var |
| | | implemented. | sum, median, std, size, and var |
| | | | (including both Python and NumPy functions) |
| | | | otherwise ``N``. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
5 changes: 3 additions & 2 deletions docs/source/modin/supported/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ Methods
| | | Categoricals are not implemented | label or Series from the current DataFrame; |
| | | yet | otherwise ``N``; |
| | | | Note that supported functions are agg, count, |
| | | | cumcount, cummax, cummin, cumsum, max, mean, |
| | | | median, min, quantile, shift, std, sum, and var. |
| | | | cumcount, cummax, cummin, cumsum, first, last, |
| | | | max, mean, median, min, quantile, shift, size, |
| | | | std, sum, and var. |
| | | | Otherwise ``N`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``gt`` | P | ``level`` | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,15 @@ def _columns_count(*cols: SnowparkColumn) -> Callable:
return sum(builtin("nvl2")(col, pandas_lit(1), pandas_lit(0)) for col in cols)


def _columns_count_keep_nulls(*cols: SnowparkColumn) -> Callable:
"""
Counts the number of values (including NULL) in each row.
"""
# IMPORTANT: count and sum use python builtin sum to invoke __add__ on each column rather than Snowpark
# sum_, since Snowpark sum_ gets the sum of all rows within a single column.
return sum(pandas_lit(1) for _ in cols)


def _columns_coalescing_sum(*cols: SnowparkColumn) -> Callable:
"""
Sums all non-NaN elements in each row. If all elements are NaN, returns 0.
Expand Down Expand Up @@ -447,6 +456,12 @@ def _create_pandas_to_snowpark_pandas_aggregation_map(
axis_1_aggregation_skipna=_columns_count,
preserves_snowpark_pandas_types=False,
),
"size": _SnowparkPandasAggregation(
# We must count the total number of rows regardless of if they're null.
axis_0_aggregation=lambda _: builtin("count_if")(pandas_lit(True)),
axis_1_aggregation_keepna=_columns_count_keep_nulls,
preserves_snowpark_pandas_types=False,
),
**_create_pandas_to_snowpark_pandas_aggregation_map(
("mean", np.mean),
_SnowparkPandasAggregation(
Expand Down
28 changes: 28 additions & 0 deletions tests/integ/modin/groupby/test_groupby_named_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re

import modin.pandas as pd
import numpy as np
import pandas as native_pd
import pytest

Expand Down Expand Up @@ -127,3 +128,30 @@ def test_named_agg_with_invalid_function_raises_not_implemented(
pd.DataFrame(basic_df_data).groupby("col1").agg(
c1=("col2", "min"), c2=("col2", "random_function")
)


@sql_count_checker(query_count=1)
def test_named_agg_count_vs_size():
data = [[1, 2, 3], [1, 5, np.nan], [7, np.nan, 9]]
native_df = native_pd.DataFrame(
data, columns=["a", "b", "c"], index=["owl", "toucan", "eagle"]
)
snow_df = pd.DataFrame(native_df)
eval_snowpark_pandas_result(
snow_df,
native_df,
lambda df: df.groupby("a").agg(
l=("b", "size"), j=("c", "size"), m=("c", "count"), n=("b", "count")
),
)


@sql_count_checker(query_count=1)
def test_named_agg_size_on_series():
native_series = native_pd.Series([1, 2, 3, 3], index=["a", "a", "b", "c"])
snow_series = pd.Series(native_series)
eval_snowpark_pandas_result(
snow_series,
native_series,
lambda series: series.groupby(level=0).agg(new_col="size"),
)
99 changes: 99 additions & 0 deletions tests/integ/modin/groupby/test_groupby_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,84 @@ def test_groupby_size(by, as_index):
)


@pytest.mark.parametrize(
"by",
[
"col1_grp",
"col2_int64",
"col3_int_identical",
"col4_int32",
"col6_mixed",
"col7_bool",
"col8_bool_missing",
"col9_int_missing",
"col10_mixed_missing",
["col1_grp", "col2_int64"],
["col6_mixed", "col7_bool", "col3_int_identical"],
],
)
@pytest.mark.parametrize("as_index", [True, False])
def test_groupby_agg_size(by, as_index):
pandas_df = native_pd.DataFrame(
{
"col1_grp": ["g1", "g2", "g0", "g0", "g2", "g3", "g0", "g2", "g3"],
"col2_int64": np.arange(9, dtype="int64") // 3,
"col3_int_identical": [2] * 9,
"col4_int32": np.arange(9, dtype="int32") // 4,
"col5_int16": np.arange(9, dtype="int16") // 3,
"col6_mixed": np.concatenate(
[
np.arange(3, dtype="int64") // 3,
np.arange(3, dtype="int32") // 3,
np.arange(3, dtype="int16") // 3,
]
),
"col7_bool": [True] * 5 + [False] * 4,
"col8_bool_missing": [
True,
None,
False,
False,
None,
None,
True,
False,
None,
],
"col9_int_missing": [5, 6, np.nan, 2, 1, np.nan, 5, np.nan, np.nan],
"col10_mixed_missing": np.concatenate(
[
np.arange(2, dtype="int64") // 3,
[np.nan],
np.arange(2, dtype="int32") // 3,
[np.nan],
np.arange(2, dtype="int16") // 3,
[np.nan],
]
),
}
)
snowpark_pandas_df = pd.DataFrame(pandas_df)
with SqlCounter(query_count=1):
eval_snowpark_pandas_result(
snowpark_pandas_df,
pandas_df,
lambda df: df.groupby(by, as_index=as_index).agg(
new_col=pd.NamedAgg("col5_int16", "size")
),
)

# DataFrame with __getitem__
with SqlCounter(query_count=1):
eval_snowpark_pandas_result(
snowpark_pandas_df,
pandas_df,
lambda df: df.groupby(by, as_index=as_index)["col5_int16"].agg(
new_col="size"
),
)


@sql_count_checker(query_count=0)
def test_error_checking():
s = pd.Series(list("abc") * 4)
Expand All @@ -111,3 +189,24 @@ def test_timedelta(by):
native_df,
lambda df: df.groupby(by).size(),
)


@pytest.mark.parametrize("by", ["A", "B"])
@sql_count_checker(query_count=1)
def test_timedelta_agg(by):
native_df = native_pd.DataFrame(
{
"A": native_pd.to_timedelta(
["1 days 06:05:01.00003", "16us", "nan", "16us"]
),
"B": [8, 8, 12, 10],
"C": ["the", "name", "is", "bond"],
}
)
snow_df = pd.DataFrame(native_df)

eval_snowpark_pandas_result(
snow_df,
native_df,
lambda df: df.groupby(by).agg(d=pd.NamedAgg("A" if by != "A" else "C", "size")),
)
Loading