From b7a0c227398aa9dc8e10f636f3e13f9f1a944e24 Mon Sep 17 00:00:00 2001 From: pierce <48131946+pierce314159@users.noreply.github.com> Date: Wed, 15 Nov 2023 14:24:46 -0500 Subject: [PATCH] Closes #2838: Expand dataframe merge functions to accept multiple columns (#2848) This PR (closes #2838) expands the dataframe merge functions to act on multiple columns. When no value is provided for `on`, it defaults to the intersection of the columns of the left and right dataframe. `inner_join_merge` and `right_join_merge` were turned into helper functions that aren't exposed to the user to more closely match the pandas merge functionality where these are only avialble through `merge` Co-authored-by: Pierce Hayes --- PROTO_tests/tests/dataframe_test.py | 31 ++++ arkouda/dataframe.py | 224 ++++++++++------------------ arkouda/pdarrayclass.py | 2 +- tests/dataframe_test.py | 38 ++++- 4 files changed, 145 insertions(+), 150 deletions(-) diff --git a/PROTO_tests/tests/dataframe_test.py b/PROTO_tests/tests/dataframe_test.py index c43089df9e..67f307d8d4 100644 --- a/PROTO_tests/tests/dataframe_test.py +++ b/PROTO_tests/tests/dataframe_test.py @@ -666,3 +666,34 @@ def test_subset(self): assert df.index.to_list() == df2.index.to_list() assert df["a"].to_list() == df2["a"].to_list() assert df["b"].to_list() == df2["b"].to_list() + + def test_multi_col_merge(self): + size = 1000 + seed = 1 + a = ak.randint(-size // 10, size // 10, size, seed=seed) + b = ak.randint(-size // 10, size // 10, size, seed=seed + 1) + c = ak.randint(-size // 10, size // 10, size, seed=seed + 2) + d = ak.randint(-size // 10, size // 10, size, seed=seed + 3) + left_df = ak.DataFrame({"first": a, "second": b, "third": ak.ones(size, int)}) + right_df = ak.DataFrame( + {"first": c, "second": d, "third": ak.cast(ak.arange(size) % 2 == 0, int)} + ) + l_pd, r_pd = left_df.to_pandas(), right_df.to_pandas() + + for how in "inner", "left", "right": + for on in "first", "second", "third", ["first", "third"], ["second", "third"], None: + ak_merge = ak.merge(left_df, right_df, on=on, how=how) + pd_merge = pd.merge(l_pd, r_pd, on=on, how=how) + + sorted_columns = sorted(ak_merge.columns) + assert sorted_columns == sorted(pd_merge.columns.to_list()) + sorted_ak = ak_merge.sort_values(sorted_columns).reset_index() + sorted_pd = pd_merge.sort_values(sorted_columns).reset_index(drop=True) + for col in sorted_columns: + assert np.allclose( + sorted_ak[col].to_ndarray(), sorted_pd[col].to_numpy(), equal_nan=True + ) + # TODO arkouda seems to be sometimes convert columns to floats on a right merge + # when pandas doesnt. Eventually we want to test frame_equal, not just value equality + # from pandas.testing import assert_frame_equal + # assert_frame_equal(sorted_ak.to_pandas()[sorted_columns], sorted_pd[sorted_columns]) diff --git a/arkouda/dataframe.py b/arkouda/dataframe.py index fd21e40c78..87b1c7c941 100644 --- a/arkouda/dataframe.py +++ b/arkouda/dataframe.py @@ -11,7 +11,6 @@ import pandas as pd # type: ignore from typeguard import typechecked -from arkouda.alignment import find from arkouda.categorical import Categorical from arkouda.client import generic_msg, maxTransferBytes from arkouda.client_dtypes import BitVector, Fields, IPv4 @@ -26,8 +25,8 @@ from arkouda.numeric import cast as akcast from arkouda.numeric import cumsum, where from arkouda.pdarrayclass import RegistrationError, pdarray -from arkouda.pdarraycreation import arange, array, create_pdarray, zeros -from arkouda.pdarraysetops import concatenate, in1d, intersect1d, setdiff1d +from arkouda.pdarraycreation import arange, array, create_pdarray, full, zeros +from arkouda.pdarraysetops import concatenate, in1d, intersect1d from arkouda.row import Row from arkouda.segarray import SegArray from arkouda.series import Series @@ -45,8 +44,6 @@ "intersect", "invert_permutation", "intx", - "inner_join_merge", - "right_join_merge", "merge", ] @@ -2243,72 +2240,12 @@ def numeric_help(d): ret_dict = json.loads(generic_msg(cmd="corrMatrix", args=args)) return DataFrame({c: create_pdarray(ret_dict[c]) for c in self.columns}) - @typechecked - def inner_join_merge( - self, - right: DataFrame, - on: str, - left_suffix: str = "_x", - right_suffix: str = "_y", - ) -> DataFrame: - """ - Utilizes the ak.join.inner_join function to return an ak - DataFrame object containing only rows that are in both - self and right Dataframes, (based on the "on" param), - as well as their associated values. For this function self - is considered the left dataframe. - - Parameters - ---------- - right : DataFrame - The Right DataFrame to be joined - on : str - The name of the DataFrame column the join is being - performed on - left_suffix: str = "_x" - A string indicating the suffix to add to columns from self for overlapping - column names in both left and right. Defaults to "_x" - right_suffix: str = "_y" - A string indicating the suffix to add to columns from the other dataframe for overlapping - column names in both left and right. Defaults to "_y" - - Returns - ------- - DataFrame - Inner-Joined Arkouda DataFrame - """ - return inner_join_merge(self, right, on, left_suffix, right_suffix) - - def right_join_merge(self, right: DataFrame, on: str) -> DataFrame: - """ - Utilizes the ak.join.inner_join_merge function to return an - ak DataFrame object containing all the rows in the right Dataframe, - as well as corresponding rows in self (based on the "on" param), - and all of their associated values. For this function self - is considered the left dataframe. - Based on pandas merge functionality. - - Parameters - ---------- - right : DataFrame - The Right DataFrame to be joined - on : str - The name of the DataFrame column the join is being - performed on - - Returns - ------- - DataFrame - Right-Joined Arkouda DataFrame - """ - return right_join_merge(self, right, on) - @typechecked def merge( self, right: DataFrame, - on: str, - how: str, + on: Optional[Union[str, List[str]]] = None, + how: str = "inner", left_suffix: str = "_x", right_suffix: str = "_y", ) -> DataFrame: @@ -2325,10 +2262,10 @@ def merge( ---------- right: DataFrame The Right DataFrame to be joined - on: str - The name of the DataFrame column the join is being - performed on - how: str + on: Optional[Union[str, List[str]]] = None + The name or list of names of the DataFrame column(s) to join on. + If on is None, this defaults to the intersection of the columns in both DataFrames. + how: str = "inner", The merge condition. Must be "inner", "left", or "right" left_suffix: str = "_x" @@ -2342,18 +2279,10 @@ def merge( ------- DataFrame Joined Arkouda DataFrame - """ - if how == "inner": - return inner_join_merge(self, right, on, left_suffix, right_suffix) - elif how == "right": - return right_join_merge(self, right, on) - elif how == "left": - return right_join_merge(right, self, on) - else: - raise ValueError( - f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'" - ) + Note: Multiple column joins are only supported for integer columns + """ + return merge(self, right, on, how, left_suffix, right_suffix) @typechecked def register(self, user_defined_name: str) -> DataFrame: @@ -2826,10 +2755,11 @@ def invert_permutation(perm): @typechecked -def inner_join_merge( +def _inner_join_merge( left: DataFrame, right: DataFrame, - on: str, + on: Union[str, List[str]], + col_intersect: Union[str, List[str]], left_suffix: str = "_x", right_suffix: str = "_y", ) -> DataFrame: @@ -2844,9 +2774,9 @@ def inner_join_merge( The Left DataFrame to be joined right: DataFrame The Right DataFrame to be joined - on: str - The name of the DataFrame column the join is being - performed on + on: Optional[Union[str, List[str]]] = None + The name or list of names of the DataFrame column(s) to join on. + If on is None, this defaults to the intersection of the columns in both DataFrames. left_suffix: str = "_x" A string indicating the suffix to add to columns from the left dataframe for overlapping column names in both left and right. Defaults to "_x" @@ -2858,36 +2788,33 @@ def inner_join_merge( DataFrame Inner-Joined Arkouda DataFrame """ - - left_inds, right_inds = inner_join(left[on], right[on]) - - left_cols = left.columns.copy() - left_cols.remove(on) - right_cols = right.columns.copy() - right_cols.remove(on) - - new_dict = {on: left[on][left_inds]} + left_cols, right_cols = left.columns.copy(), right.columns.copy() + if isinstance(on, str): + left_inds, right_inds = inner_join(left[on], right[on]) + new_dict = {on: left[on][left_inds]} + left_cols.remove(on) + right_cols.remove(on) + else: + left_inds, right_inds = inner_join([left[col] for col in on], [right[col] for col in on]) + new_dict = {col: left[col][left_inds] for col in on} + for col in on: + left_cols.remove(col) + right_cols.remove(col) for col in left_cols: - if col in right_cols: - new_col = col + left_suffix - else: - new_col = col + new_col = col + left_suffix if col in col_intersect else col new_dict[new_col] = left[col][left_inds] for col in right_cols: - if col in left_cols: - new_col = col + right_suffix - else: - new_col = col + new_col = col + right_suffix if col in col_intersect else col new_dict[new_col] = right[col][right_inds] - return DataFrame(new_dict) -def right_join_merge( +def _right_join_merge( left: DataFrame, right: DataFrame, - on: str, + on: Union[str, List[str]], + col_intersect: Union[str, List[str]], left_suffix: str = "_x", right_suffix: str = "_y", ) -> DataFrame: @@ -2903,9 +2830,9 @@ def right_join_merge( The Left DataFrame to be joined right: DataFrame The Right DataFrame to be joined - on: str - The name of the DataFrame column the join is being - performed on + on: Optional[Union[str, List[str]]] = None + The name or list of names of the DataFrame column(s) to join on. + If on is None, this defaults to the intersection of the columns in both DataFrames. left_suffix: str = "_x" A string indicating the suffix to add to columns from the left dataframe for overlapping column names in both left and right. Defaults to "_x" @@ -2917,49 +2844,44 @@ def right_join_merge( DataFrame Right-Joined Arkouda DataFrame """ + in_left = _inner_join_merge(left, right, on, col_intersect, left_suffix, right_suffix) + in_left_cols, left_cols = in_left.columns.copy(), left.columns.copy() + if isinstance(on, str): + left_at_on = left[on] + right_at_on = right[on] + left_cols.remove(on) + in_left_cols.remove(on) + else: + left_at_on = [left[col] for col in on] + right_at_on = [right[col] for col in on] + for col in on: + left_cols.remove(col) + in_left_cols.remove(col) - left_cols = left.columns.copy() - left_cols.remove(on) - - in_left = inner_join_merge(left, right, on, left_suffix, right_suffix) - in_left_cols = in_left.columns.copy() - in_left_cols.remove(on) - - not_in_left = right[find(setdiff1d(right[on], left[on]), right[on])] + not_in_left = right[~in1d(right_at_on, left_at_on)] for col in not_in_left.columns: if col in left_cols: - new_col = col + right_suffix - not_in_left[new_col] = not_in_left[col] + not_in_left[col + right_suffix] = not_in_left[col] not_in_left = not_in_left.drop(col, axis=1) nan_cols = list(set(in_left) - set(in_left).intersection(set(not_in_left))) - for col in nan_cols: # Create a nan array for all values not in the left df - nan_arr = zeros(len(not_in_left)) - nan_arr.fill(np.nan) - left_col_type = type(in_left[col]) + nan_arr = full(len(not_in_left), np.nan) if in_left[col].dtype == int: in_left[col] = akcast(in_left[col], akfloat64) else: nan_arr = akcast(nan_arr, in_left[col].dtype) - - try: - not_in_left[col] = left_col_type(nan_arr) - except TypeError: - not_in_left[col] = nan_arr - - right_ak_df = DataFrame.append(in_left, not_in_left) - - return right_ak_df + not_in_left[col] = nan_arr + return DataFrame.append(in_left, not_in_left) @typechecked def merge( left: DataFrame, right: DataFrame, - on: str, - how: str, + on: Optional[Union[str, List[str]]] = None, + how: str = "inner", left_suffix: str = "_x", right_suffix: str = "_y", ) -> DataFrame: @@ -2976,12 +2898,12 @@ def merge( The Left DataFrame to be joined right: DataFrame The Right DataFrame to be joined - on: str - The name of the DataFrame column the join is being - performed on - how: str + on: Optional[Union[str, List[str]]] = None + The name or list of names of the DataFrame column(s) to join on. + If on is None, this defaults to the intersection of the columns in both DataFrames. + how: str = "inner" The merge condition. - Must be "inner", "left", or "right" + Must be one of "inner", "left", or "right" left_suffix: str = "_x" A string indicating the suffix to add to columns from the left dataframe for overlapping column names in both left and right. Defaults to "_x". Only used when how is "inner" @@ -2992,13 +2914,21 @@ def merge( ------- DataFrame Joined Arkouda DataFrame - """ - if how == 'inner': - return inner_join_merge(left, right, on, left_suffix, right_suffix) - elif how == 'right': - return right_join_merge(left, right, on, left_suffix, right_suffix) - elif how == 'left': - return right_join_merge(right, left, on, right_suffix, left_suffix) + Note: Multiple column joins are only supported for integer columns + """ + col_intersect = list(set(left.columns) & set(right.columns)) + on = on if on is not None else col_intersect + + if not isinstance(on, str): + if not all(isinstance(left[col], pdarray) and isinstance(right[col], pdarray) for col in on): + raise ValueError("All columns of a multi-column merge must be pdarrays") + + if how == "inner": + return _inner_join_merge(left, right, on, col_intersect, left_suffix, right_suffix) + elif how == "right": + return _right_join_merge(left, right, on, col_intersect, left_suffix, right_suffix) + elif how == "left": + return _right_join_merge(right, left, on, col_intersect, right_suffix, left_suffix) else: raise ValueError(f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'") diff --git a/arkouda/pdarrayclass.py b/arkouda/pdarrayclass.py index 4670352646..a1851327bf 100755 --- a/arkouda/pdarrayclass.py +++ b/arkouda/pdarrayclass.py @@ -203,7 +203,7 @@ def __del__(self): try: logger.debug(f"deleting pdarray with name {self.name}") generic_msg(cmd="delete", args={"name": self.name}) - except RuntimeError: + except (RuntimeError, AttributeError): pass def __bool__(self) -> builtins.bool: diff --git a/tests/dataframe_test.py b/tests/dataframe_test.py index 85579584c9..6d88bbb27e 100644 --- a/tests/dataframe_test.py +++ b/tests/dataframe_test.py @@ -768,7 +768,7 @@ def test_merge(self): "key": ak.array([2, 3]), "value1_x": ak.array(["C", "D"]), "value1_y": ak.array(["A", "B"]), - "value2": ak.array(["apple", "banana"]) + "value2": ak.array(["apple", "banana"]), } ) @@ -785,7 +785,7 @@ def test_merge(self): "key": ak.array([2, 3, 4, 5]), "value1_x": ak.array(["C", "D", "nan", "nan"]), "value1_y": ak.array(["A", "B", "D", "F"]), - "value2": ak.array(["apple", "banana", "cherry", "date"]) + "value2": ak.array(["apple", "banana", "cherry", "date"]), } ) @@ -813,3 +813,37 @@ def test_merge(self): self.assertListEqual(lj_expected_df["value1_x"].to_list(), lj_merged_df["value1_x"].to_list()) self.assertListEqual(lj_expected_df["value1_y"].to_list(), lj_merged_df["value1_y"].to_list()) self.assertListEqual(lj_expected_df["value2"].to_list(), lj_merged_df["value2"].to_list()) + + def test_multi_col_merge(self): + size = 1000 + seed = 1 + a = ak.randint(-size // 10, size // 10, size, seed=seed) + b = ak.randint(-size // 10, size // 10, size, seed=seed + 1) + c = ak.randint(-size // 10, size // 10, size, seed=seed + 2) + d = ak.randint(-size // 10, size // 10, size, seed=seed + 3) + left_df = ak.DataFrame({"first": a, "second": b, "third": ak.ones(size, int)}) + right_df = ak.DataFrame( + {"first": c, "second": d, "third": ak.cast(ak.arange(size) % 2 == 0, int)} + ) + l_pd, r_pd = left_df.to_pandas(), right_df.to_pandas() + + for how in "inner", "left", "right": + for on in "first", "second", "third", ["first", "third"], ["second", "third"], None: + ak_merge = ak.merge(left_df, right_df, on=on, how=how) + pd_merge = pd.merge(l_pd, r_pd, on=on, how=how) + + sorted_columns = sorted(ak_merge.columns) + self.assertListEqual(sorted_columns, sorted(pd_merge.columns.to_list())) + sorted_ak = ak_merge.sort_values(sorted_columns).reset_index() + sorted_pd = pd_merge.sort_values(sorted_columns).reset_index(drop=True) + for col in sorted_columns: + self.assertTrue( + np.allclose( + sorted_ak[col].to_ndarray(), sorted_pd[col].to_numpy(), equal_nan=True + ) + ) + + # TODO arkouda seems to be sometimes convert columns to floats on a right merge + # when pandas doesnt. Eventually we want to test frame_equal, not just value equality + # from pandas.testing import assert_frame_equal + # assert_frame_equal(sorted_ak.to_pandas()[sorted_columns], sorted_pd[sorted_columns])