Skip to content

Commit

Permalink
Closes #2749, #1166: In dataframe groupby, add missing aggregations a…
Browse files Browse the repository at this point in the history
…nd ability to aggregate on a list column names (#2751)

This PR (closes #2749 and closes #1166) add missing aggregations to dataframe groupby and add ability to aggregate on a list of column names.

Co-authored-by: Pierce Hayes <pierce314159@users.noreply.github.com>
  • Loading branch information
stress-tess and Pierce Hayes committed Sep 6, 2023
1 parent 3ac7e76 commit 6044082
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 65 deletions.
39 changes: 32 additions & 7 deletions PROTO_tests/tests/dataframe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@


class TestDataFrame:

@staticmethod
def build_pd_df():
username = ["Alice", "Bob", "Alice", "Carol", "Bob", "Alice"]
userid = [111, 222, 111, 333, 222, 111]
item = [0, 0, 1, 1, 2, 0]
day = [5, 5, 6, 5, 6, 6]
amount = [0.5, 0.6, 1.1, 1.2, 4.3, 0.6]
bi = [2 ** 200, 2 ** 200 + 1, 2 ** 200 + 2, 2 ** 200 + 3, 2 ** 200 + 4, 2 ** 200 + 5]
bi = [2**200, 2**200 + 1, 2**200 + 2, 2**200 + 3, 2**200 + 4, 2**200 + 5]
ui = (np.arange(6).astype(ak.uint64)) + 2**63
return pd.DataFrame(
{
Expand All @@ -25,7 +24,7 @@ def build_pd_df():
"day": day,
"amount": amount,
"bi": bi,
"ui": ui
"ui": ui,
}
)

Expand Down Expand Up @@ -62,7 +61,7 @@ def build_ak_append():
"day": day,
"amount": amount,
"bi": bi,
"ui": ui
"ui": ui,
}
)

Expand All @@ -83,7 +82,7 @@ def build_pd_df_append():
"day": day,
"amount": amount,
"bi": bi,
"ui": ui
"ui": ui,
}
)

Expand All @@ -110,7 +109,7 @@ def build_ak_typeerror():
"day": day,
"amount": amount,
"bi": bi,
"ui": ui
"ui": ui,
}
)

Expand Down Expand Up @@ -458,6 +457,32 @@ def test_gb_series(self):
assert c.index.to_list() == ["Bob", "Alice", "Carol"]
assert c.values.to_list() == [2, 3, 1]

@pytest.mark.parametrize("agg", ["sum", "first"])
def test_gb_aggregations(self, agg):
df = self.build_ak_df()
pd_df = self.build_pd_df()
# remove strings col because many aggregations don't support it
cols_without_str = list(set(df.columns) - {"userName"})
df = df[cols_without_str]
pd_df = pd_df[cols_without_str]

group_on = "userID"
for col in df.columns:
if col == group_on:
# pandas groupby doesn't return the column used to group
continue
ak_ans = getattr(df.groupby(group_on), agg)(col)
pd_ans = getattr(pd_df.groupby(group_on), agg)()[col]
assert ak_ans.to_list() == pd_ans.to_list()

# pandas groupby doesn't return the column used to group
cols_without_group_on = list(set(df.columns) - {group_on})
ak_ans = getattr(df.groupby(group_on), agg)()[cols_without_group_on]
pd_ans = getattr(pd_df.groupby(group_on), agg)()[cols_without_group_on]
# we don't currently support index names in arkouda
pd_ans.index.name = None
assert_frame_equal(pd_ans, ak_ans.to_pandas(retain_index=True))

def test_argsort(self):
df = self.build_ak_df()

Expand Down Expand Up @@ -595,7 +620,7 @@ def test_uint_greediness(self):
def test_head_tail_datetime_display(self):
# Reproducer for issue #2596
values = ak.array([1689221916000000] * 100, dtype=ak.int64)
dt = ak.Datetime(values, unit='u')
dt = ak.Datetime(values, unit="u")
df = ak.DataFrame({"Datetime from Microseconds": dt})
# verify _get_head_tail and _get_head_tail_server match
assert df._get_head_tail_server().__repr__() == df._get_head_tail().__repr__()
Expand Down
73 changes: 16 additions & 57 deletions arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
from arkouda.dtypes import bool as akbool
from arkouda.dtypes import float64 as akfloat64
from arkouda.dtypes import int64 as akint64
from arkouda.groupbyclass import GROUPBY_REDUCTION_TYPES
from arkouda.groupbyclass import GroupBy as akGroupBy
from arkouda.groupbyclass import unique
from arkouda.index import Index
from arkouda.numeric import cast as akcast
from arkouda.numeric import cumsum
from arkouda.numeric import isnan as akisnan
from arkouda.numeric import where
from arkouda.numeric import cumsum, where
from arkouda.pdarrayclass import RegistrationError, pdarray
from arkouda.pdarraycreation import arange, array, create_pdarray, zeros
from arkouda.pdarraysetops import concatenate, in1d, intersect1d
Expand All @@ -48,46 +47,13 @@


def groupby_operators(cls):
for name in [
"all",
"any",
"argmax",
"argmin",
"max",
"mean",
"min",
"nunique",
"prod",
"sum",
"OR",
"AND",
"XOR",
]:
for name in GROUPBY_REDUCTION_TYPES:
setattr(cls, name, cls._make_aggop(name))
return cls


class AggregateOps:
"""Base class for GroupBy and DiffAggregate containing common functions"""

def _gbvar(self, values):
"""Calculate the variance in a groupby"""

values = akcast(values, "float64")
mean = self.gb.mean(values)
mean_broad = self.gb.broadcast(mean[1])
centered = values - mean_broad
var = Series(self.gb.sum(centered * centered))
n = self.gb.sum(~akisnan(centered))
return var / (n[1] - 1)

def _gbstd(self, values):
"""Calculates the standard deviation in a groupby"""
return self._gbvar(values) ** 0.5


@groupby_operators
class GroupBy(AggregateOps):
class GroupBy:
"""A DataFrame that has been grouped by a subset of columns"""

def __init__(self, gb, df):
Expand All @@ -98,8 +64,17 @@ def __init__(self, gb, df):

@classmethod
def _make_aggop(cls, opname):
def aggop(self, colname):
return Series(self.gb.aggregate(self.df.data[colname], opname))
def aggop(self, colnames=None):
if isinstance(colnames, str):
return Series(self.gb.aggregate(self.df.data[colnames], opname))
else:
if colnames is None:
colnames = list(self.df.data.keys())
if isinstance(colnames, List):
return DataFrame(
{c: self.gb.aggregate(self.df.data[c], opname)[1] for c in colnames},
index=self.gb.unique_keys,
)

return aggop

Expand All @@ -126,14 +101,6 @@ def diff(self, colname):

return DiffAggregate(self.gb, self.df.data[colname])

def var(self, colname):
"""Calculate variance of the difference in each group"""
return self._gbvar(self.df.data[colname])

def std(self, colname):
"""Calculate standard deviation of the difference in each group"""
return self._gbstd(self.df.data[colname])

def broadcast(self, x, permute=True):
"""Fill each group’s segment with a constant value.
Expand All @@ -155,7 +122,7 @@ def broadcast(self, x, permute=True):


@groupby_operators
class DiffAggregate(AggregateOps):
class DiffAggregate:
"""
A column in a GroupBy that has been differenced.
Aggregation operations can be done on the result.
Expand All @@ -170,14 +137,6 @@ def __init__(self, gb, series):
values[gb.segments] = np.nan
self.values = values

def var(self):
"""Calculate variance of the difference in each group"""
return self._gbvar(self.values)

def std(self):
"""Calculate standard deviation of the difference in each group"""
return self._gbstd(self.values)

@classmethod
def _make_aggop(cls, opname):
def aggop(self):
Expand Down
29 changes: 28 additions & 1 deletion tests/dataframe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd # type: ignore
from base_test import ArkoudaTest
from context import arkouda as ak
from pandas.testing import assert_frame_equal

from arkouda import io_util

Expand Down Expand Up @@ -432,6 +433,32 @@ def test_gb_series(self):
self.assertListEqual(c.index.to_list(), ["Bob", "Alice", "Carol"])
self.assertListEqual(c.values.to_list(), [2, 3, 1])

def test_gb_aggregations(self):
df = build_ak_df()
pd_df = build_pd_df()
# remove strings col because many aggregations don't support it
cols_without_str = list(set(df.columns) - {"userName"})
df = df[cols_without_str]
pd_df = pd_df[cols_without_str]

group_on = "userID"
for agg in ["sum", "first"]:
for col in df.columns:
if col == group_on:
# pandas groupby doesn't return the column used to group
continue
ak_ans = getattr(df.groupby(group_on), agg)(col)
pd_ans = getattr(pd_df.groupby(group_on), agg)()[col]
self.assertListEqual(ak_ans.to_list(), pd_ans.to_list())

# pandas groupby doesn't return the column used to group
cols_without_group_on = list(set(df.columns) - {group_on})
ak_ans = getattr(df.groupby(group_on), agg)()[cols_without_group_on]
pd_ans = getattr(pd_df.groupby(group_on), agg)()[cols_without_group_on]
# we don't currently support index names in arkouda
pd_ans.index.name = None
assert_frame_equal(pd_ans, ak_ans.to_pandas(retain_index=True))

def test_to_pandas(self):
df = build_ak_df()
pd_df = build_pd_df()
Expand Down Expand Up @@ -644,7 +671,7 @@ def test_uint_greediness(self):
def test_head_tail_datetime_display(self):
# Reproducer for issue #2596
values = ak.array([1689221916000000] * 100, dtype=ak.int64)
dt = ak.Datetime(values, unit='u')
dt = ak.Datetime(values, unit="u")
df = ak.DataFrame({"Datetime from Microseconds": dt})
# verify _get_head_tail and _get_head_tail_server match
self.assertEqual(df._get_head_tail_server().__repr__(), df._get_head_tail().__repr__())
Expand Down

0 comments on commit 6044082

Please sign in to comment.