From 2c54874425957173904517eb949295a2afaf38f3 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 12 Sep 2024 08:48:56 -0700 Subject: [PATCH 1/6] REFACTOR: Make left_on and right_on optional for join utility Signed-off-by: Naren Krishna --- .../modin/plugin/_internal/cut_utils.py | 2 - .../modin/plugin/_internal/indexing_utils.py | 2 - .../modin/plugin/_internal/join_utils.py | 132 ++++++++++++------ .../plugin/_internal/ordered_dataframe.py | 1 + .../modin/plugin/_internal/resample_utils.py | 2 - .../compiler/snowflake_query_compiler.py | 2 - 6 files changed, 87 insertions(+), 54 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/cut_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/cut_utils.py index 882dc79d2a..4eaf98d9b2 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/cut_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/cut_utils.py @@ -189,8 +189,6 @@ def compute_bin_indices( values_frame, cuts_frame, how="asof", - left_on=[], - right_on=[], left_match_col=values_frame.data_column_snowflake_quoted_identifiers[0], right_match_col=cuts_frame.data_column_snowflake_quoted_identifiers[0], match_comparator=MatchComparator.LESS_THAN_OR_EQUAL_TO diff --git a/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py index c2c224e404..6207bd2399 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py @@ -584,8 +584,6 @@ def _get_adjusted_key_frame_by_row_pos_int_frame( key, count_frame, "cross", - left_on=[], - right_on=[], inherit_join_index=InheritJoinIndex.FROM_LEFT, ) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 79f063b9ec..f2875865dd 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -103,12 +103,55 @@ class JoinOrAlignInternalFrameResult(NamedTuple): result_column_mapper: JoinOrAlignResultColumnMapper +def assert_snowpark_pandas_types_match( + left: InternalFrame, + right: InternalFrame, + left_identifiers: list[str], + right_identifiers: list[str], +) -> None: + """ + If Snowpark pandas types do not match for the given identifiers, then a ValueError will be raised. + + Args: + left: An internal frame to use on left side of join. + right: An internal frame to use on right side of join. + left_identifiers: List of snowflake identifiers to check types from 'left' frame. + right_identifiers: List of snowflake identifiers to check types from 'right' frame. + left_identifiers and right_identifiers must be lists of equal length. + + Returns: None + + Raises: ValueError + """ + left_types = [ + left.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) + for id in left_identifiers + ] + right_types = [ + right.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) + for id in right_identifiers + ] + for i, (lt, rt) in enumerate(zip(left_types, right_types)): + if lt != rt: + left_on_id = left_identifiers[i] + idx = left.data_column_snowflake_quoted_identifiers.index(left_on_id) + key = left.data_column_pandas_labels[idx] + lt = lt if lt is not None else left.get_snowflake_type(left_on_id) + rt = ( + rt if rt is not None else right.get_snowflake_type(right_identifiers[i]) + ) + raise ValueError( + f"You are trying to merge on {type(lt).__name__} and {type(rt).__name__} columns for key '{key}'. " + f"If you wish to proceed you should use pd.concat" + ) + + def join( left: InternalFrame, right: InternalFrame, how: JoinTypeLit, - left_on: list[str], - right_on: list[str], + left_on: Optional[list[str]] = None, + right_on: Optional[list[str]] = None, left_match_col: Optional[str] = None, right_match_col: Optional[str] = None, match_comparator: Optional[MatchComparator] = None, @@ -161,40 +204,31 @@ def join( include mapping for index + data columns, ordering columns and row position column if exists. """ - assert len(left_on) == len( - right_on - ), "left_on and right_on must be of same length or both be None" - if join_key_coalesce_config is not None: - assert len(join_key_coalesce_config) == len( - left_on - ), "join_key_coalesce_config must be of same length as left_on and right_on" assert how in get_args( JoinTypeLit ), f"Invalid join type: {how}. Allowed values are {get_args(JoinTypeLit)}" - - def assert_snowpark_pandas_types_match() -> None: - """If Snowpark pandas types do not match, then a ValueError will be raised.""" - left_types = [ - left.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) - for id in left_on - ] - right_types = [ - right.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) - for id in right_on - ] - for i, (lt, rt) in enumerate(zip(left_types, right_types)): - if lt != rt: - left_on_id = left_on[i] - idx = left.data_column_snowflake_quoted_identifiers.index(left_on_id) - key = left.data_column_pandas_labels[idx] - lt = lt if lt is not None else left.get_snowflake_type(left_on_id) - rt = rt if rt is not None else right.get_snowflake_type(right_on[i]) - raise ValueError( - f"You are trying to merge on {type(lt).__name__} and {type(rt).__name__} columns for key '{key}'. " - f"If you wish to proceed you should use pd.concat" - ) - - assert_snowpark_pandas_types_match() + if how == "asof": + assert ( + left_match_col + ), "ASOF join was not provided a column identifier to match on for the left table" + assert ( + right_match_col + ), "ASOF join was not provided a column identifier to match on for the right table" + assert ( + match_comparator + ), "ASOF join was not provided a comparator for the match condition" + assert_snowpark_pandas_types_match( + left, right, [left_match_col], [right_match_col] + ) + if left_on and right_on: + assert len(left_on) == len( + right_on + ), "left_on and right_on must be of same length or both be None" + if join_key_coalesce_config is not None: + assert len(join_key_coalesce_config) == len( + left_on + ), "join_key_coalesce_config must be of same length as left_on and right_on" + assert_snowpark_pandas_types_match(left, right, left_on, right_on) # Re-project the active columns to make sure all active columns of the internal frame participate # in the join operation, and unnecessary columns are dropped from the projected columns. @@ -212,15 +246,19 @@ def assert_snowpark_pandas_types_match() -> None: ) return _create_internal_frame_with_join_or_align_result( - joined_ordered_dataframe, - left, - right, - how, - left_on, - right_on, - sort, - join_key_coalesce_config, - inherit_join_index, + result_ordered_frame=joined_ordered_dataframe, + left=left, + right=right, + how=how, + left_on=[left_match_col] # type: ignore + if how == "asof" and join_key_coalesce_config and not left_on + else left_on, + right_on=[right_match_col] # type: ignore + if how == "asof" and join_key_coalesce_config and not right_on + else right_on, + sort=sort, + key_coalesce_config=join_key_coalesce_config, + inherit_index=inherit_join_index, ) @@ -229,8 +267,8 @@ def _create_internal_frame_with_join_or_align_result( left: InternalFrame, right: InternalFrame, how: Union[JoinTypeLit, AlignTypeLit], - left_on: list[str], - right_on: list[str], + left_on: Optional[list[str]] = None, + right_on: Optional[list[str]] = None, sort: Optional[bool] = False, key_coalesce_config: Optional[list[JoinKeyCoalesceConfig]] = None, inherit_index: InheritJoinIndex = InheritJoinIndex.FROM_LEFT, @@ -244,8 +282,8 @@ def _create_internal_frame_with_join_or_align_result( result_ordered_frame: OrderedDataFrame. The ordered dataframe result for the join/align operation. left: InternalFrame. The original left internal frame used for the join/align. right: InternalFrame. The original right internal frame used for the join/align. - left_on: List[str]. The columns in original left internal frame used for join/align. - right_on: List[str]. The columns in original right internal frame used for join/align. + left_on: Optional[List[str]]. The columns in original left internal frame used for join/align. + right_on: Optional[List[str]]. The columns in original right internal frame used for join/align. how: Union[JoinTypeLit, AlignTypeLit] join or align type. sort: Optional[bool] = False. Whether to sort the result lexicographically on the join/align keys. key_coalesce_config: Optional[List[JoinKeyCoalesceConfig]]. Optional list of coalesce config to @@ -259,6 +297,8 @@ def _create_internal_frame_with_join_or_align_result( Returns: InternalFrame for the join/aligned result with all fields set accordingly. """ + left_on = left_on or [] + right_on = right_on or [] result_helper = JoinOrAlignOrderedDataframeResultHelper( left.ordered_dataframe, diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index f7ae87c2a5..62a25156f2 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1213,6 +1213,7 @@ def join( assert match_comparator, "match_comparator was not provided to ASOF Join" snowpark_dataframe = left_snowpark_dataframe_ref.snowpark_dataframe.join( right=right_snowpark_dataframe_ref.snowpark_dataframe, + on=on, how=how, match_condition=getattr(left_match_col, match_comparator.value)( right_match_col diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index de83e0429b..ba8ceedec5 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -649,8 +649,6 @@ def perform_asof_join_on_frame( left=preserving_frame, right=referenced_frame, how="asof", - left_on=[], - right_on=[], left_match_col=left_timecol_snowflake_quoted_identifier, right_match_col=right_timecol_snowflake_quoted_identifier, match_comparator=( diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index b5022bff46..b5d7187a0d 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -7457,8 +7457,6 @@ def merge_asof( left=left_frame, right=right_frame, how="asof", - left_on=[left_match_col], - right_on=[right_match_col], left_match_col=left_match_col, right_match_col=right_match_col, match_comparator=match_comparator, From 9ccab00a29fc6c278ba858cb4af039fa229d76f4 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 12 Sep 2024 11:01:55 -0700 Subject: [PATCH 2/6] address comments Signed-off-by: Naren Krishna --- .../modin/plugin/_internal/join_utils.py | 80 ++++++++++--------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index f2875865dd..e387c0f8b2 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -106,8 +106,8 @@ class JoinOrAlignInternalFrameResult(NamedTuple): def assert_snowpark_pandas_types_match( left: InternalFrame, right: InternalFrame, - left_identifiers: list[str], - right_identifiers: list[str], + left_join_identifiers: list[str], + right_join_identifiers: list[str], ) -> None: """ If Snowpark pandas types do not match for the given identifiers, then a ValueError will be raised. @@ -115,8 +115,8 @@ def assert_snowpark_pandas_types_match( Args: left: An internal frame to use on left side of join. right: An internal frame to use on right side of join. - left_identifiers: List of snowflake identifiers to check types from 'left' frame. - right_identifiers: List of snowflake identifiers to check types from 'right' frame. + left_join_identifiers: List of snowflake identifiers to check types from 'left' frame. + right_join_identifiers: List of snowflake identifiers to check types from 'right' frame. left_identifiers and right_identifiers must be lists of equal length. Returns: None @@ -125,20 +125,22 @@ def assert_snowpark_pandas_types_match( """ left_types = [ left.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) - for id in left_identifiers + for id in left_join_identifiers ] right_types = [ right.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None) - for id in right_identifiers + for id in right_join_identifiers ] for i, (lt, rt) in enumerate(zip(left_types, right_types)): if lt != rt: - left_on_id = left_identifiers[i] + left_on_id = left_join_identifiers[i] idx = left.data_column_snowflake_quoted_identifiers.index(left_on_id) key = left.data_column_pandas_labels[idx] lt = lt if lt is not None else left.get_snowflake_type(left_on_id) rt = ( - rt if rt is not None else right.get_snowflake_type(right_identifiers[i]) + rt + if rt is not None + else right.get_snowflake_type(right_join_identifiers[i]) ) raise ValueError( f"You are trying to merge on {type(lt).__name__} and {type(rt).__name__} columns for key '{key}'. " @@ -217,18 +219,31 @@ def join( assert ( match_comparator ), "ASOF join was not provided a comparator for the match condition" - assert_snowpark_pandas_types_match( - left, right, [left_match_col], [right_match_col] - ) - if left_on and right_on: - assert len(left_on) == len( - right_on + if join_key_coalesce_config is not None: + assert ( + len(join_key_coalesce_config) == 1 + ), "ASOF join join_key_coalesce_config must be 1 since there is only one match condition" + left_join_key = [left_match_col] + right_join_key = [right_match_col] + assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) + else: + left_join_key = left_on or [] + right_join_key = right_on or [] + assert len(left_join_key) == len( + right_join_key ), "left_on and right_on must be of same length or both be None" + assert ( + left_match_col is None + and right_match_col is None + and match_comparator is None + ), f"match condition should not be provided for {how} join" if join_key_coalesce_config is not None: assert len(join_key_coalesce_config) == len( - left_on + left_join_key ), "join_key_coalesce_config must be of same length as left_on and right_on" - assert_snowpark_pandas_types_match(left, right, left_on, right_on) + assert_snowpark_pandas_types_match( + left, right, left_join_key, right_join_key + ) # Re-project the active columns to make sure all active columns of the internal frame participate # in the join operation, and unnecessary columns are dropped from the projected columns. @@ -246,19 +261,15 @@ def join( ) return _create_internal_frame_with_join_or_align_result( - result_ordered_frame=joined_ordered_dataframe, - left=left, - right=right, - how=how, - left_on=[left_match_col] # type: ignore - if how == "asof" and join_key_coalesce_config and not left_on - else left_on, - right_on=[right_match_col] # type: ignore - if how == "asof" and join_key_coalesce_config and not right_on - else right_on, - sort=sort, - key_coalesce_config=join_key_coalesce_config, - inherit_index=inherit_join_index, + joined_ordered_dataframe, + left, + right, + how, + left_join_key, + right_join_key, + sort, + join_key_coalesce_config, + inherit_join_index, ) @@ -267,8 +278,8 @@ def _create_internal_frame_with_join_or_align_result( left: InternalFrame, right: InternalFrame, how: Union[JoinTypeLit, AlignTypeLit], - left_on: Optional[list[str]] = None, - right_on: Optional[list[str]] = None, + left_on: list[str], + right_on: list[str], sort: Optional[bool] = False, key_coalesce_config: Optional[list[JoinKeyCoalesceConfig]] = None, inherit_index: InheritJoinIndex = InheritJoinIndex.FROM_LEFT, @@ -282,8 +293,8 @@ def _create_internal_frame_with_join_or_align_result( result_ordered_frame: OrderedDataFrame. The ordered dataframe result for the join/align operation. left: InternalFrame. The original left internal frame used for the join/align. right: InternalFrame. The original right internal frame used for the join/align. - left_on: Optional[List[str]]. The columns in original left internal frame used for join/align. - right_on: Optional[List[str]]. The columns in original right internal frame used for join/align. + left_on: List[str]. The columns in original left internal frame used for join/align. + right_on: List[str]. The columns in original right internal frame used for join/align. how: Union[JoinTypeLit, AlignTypeLit] join or align type. sort: Optional[bool] = False. Whether to sort the result lexicographically on the join/align keys. key_coalesce_config: Optional[List[JoinKeyCoalesceConfig]]. Optional list of coalesce config to @@ -297,9 +308,6 @@ def _create_internal_frame_with_join_or_align_result( Returns: InternalFrame for the join/aligned result with all fields set accordingly. """ - left_on = left_on or [] - right_on = right_on or [] - result_helper = JoinOrAlignOrderedDataframeResultHelper( left.ordered_dataframe, right.ordered_dataframe, From 0e427db995945b546d596ae88a1fe29764ae83b7 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 12 Sep 2024 16:31:42 -0700 Subject: [PATCH 3/6] Add support for by, left_by, and right_by Signed-off-by: Naren Krishna --- CHANGELOG.md | 1 + .../modin/supported/general_supported.rst | 3 +- .../modin/plugin/_internal/join_utils.py | 32 ++++--- .../plugin/_internal/ordered_dataframe.py | 30 ++++-- .../compiler/snowflake_query_compiler.py | 94 ++++++++++++------- tests/integ/modin/test_merge_asof.py | 63 ++++++++----- 6 files changed, 140 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b015147d47..d3b9b1deda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ #### New Features - Added support for `TimedeltaIndex.mean` method. +- Added support for `by`, `left_by`, and `right_by` for `pd.merge_asof`. ## 1.22.0 (2024-09-10) diff --git a/docs/source/modin/supported/general_supported.rst b/docs/source/modin/supported/general_supported.rst index 797ef3bbd5..95d9610202 100644 --- a/docs/source/modin/supported/general_supported.rst +++ b/docs/source/modin/supported/general_supported.rst @@ -38,8 +38,7 @@ Data manipulations +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``merge`` | P | ``validate`` | ``N`` if param ``validate`` is given | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``merge_asof`` | P | ``by``, ``left_by``, ``right_by``| ``N`` if param ``direction`` is ``nearest``. | -| | | , ``left_index``, ``right_index``| | +| ``merge_asof`` | P | ``left_index``, ``right_index``, | ``N`` if param ``direction`` is ``nearest``. | | | | , ``suffixes``, ``tolerance`` | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``merge_ordered`` | N | | | diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index e387c0f8b2..c9415a5492 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -209,6 +209,13 @@ def join( assert how in get_args( JoinTypeLit ), f"Invalid join type: {how}. Allowed values are {get_args(JoinTypeLit)}" + + left_on = left_on or [] + right_on = right_on or [] + assert len(left_on) == len( + right_on + ), "left_on and right_on must be of same length or both be None" + if how == "asof": assert ( left_match_col @@ -219,19 +226,18 @@ def join( assert ( match_comparator ), "ASOF join was not provided a comparator for the match condition" - if join_key_coalesce_config is not None: - assert ( - len(join_key_coalesce_config) == 1 - ), "ASOF join join_key_coalesce_config must be 1 since there is only one match condition" left_join_key = [left_match_col] right_join_key = [right_match_col] + left_join_key.extend(left_on) + right_join_key.extend(right_on) + if join_key_coalesce_config is not None: + assert len(join_key_coalesce_config) == len( + left_join_key + ), "ASOF join join_key_coalesce_config must be of same length as left_join_key and right_join_key" assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) else: - left_join_key = left_on or [] - right_join_key = right_on or [] - assert len(left_join_key) == len( - right_join_key - ), "left_on and right_on must be of same length or both be None" + left_join_key = left_on + right_join_key = right_on assert ( left_match_col is None and right_match_col is None @@ -241,9 +247,7 @@ def join( assert len(join_key_coalesce_config) == len( left_join_key ), "join_key_coalesce_config must be of same length as left_on and right_on" - assert_snowpark_pandas_types_match( - left, right, left_join_key, right_join_key - ) + assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) # Re-project the active columns to make sure all active columns of the internal frame participate # in the join operation, and unnecessary columns are dropped from the projected columns. @@ -259,7 +263,6 @@ def join( match_comparator=match_comparator, how=how, ) - return _create_internal_frame_with_join_or_align_result( joined_ordered_dataframe, left, @@ -1450,6 +1453,8 @@ def _sort_on_join_keys(self) -> None: ) elif self._how == "right": ordering_column_identifiers = mapped_right_on + elif self._how == "asof": + ordering_column_identifiers = [mapped_left_on[0]] else: # left join, inner join, left align, coalesce align ordering_column_identifiers = mapped_left_on @@ -1462,7 +1467,6 @@ def _sort_on_join_keys(self) -> None: ordering_columns = [ OrderingColumn(key) for key in ordering_column_identifiers ] + join_or_align_result.ordering_columns - # reset the order of the ordered_dataframe to the final order self.join_or_align_result = join_or_align_result.sort(ordering_columns) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index 62a25156f2..91537d98e3 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1197,20 +1197,26 @@ def join( # get the new mapped right on identifier right_on_cols = [right_identifiers_rename_map[key] for key in right_on_cols] - # Generate sql ON clause 'EQUAL_NULL(col1, col2) and EQUAL_NULL(col3, col4) ...' - on = None - for left_col, right_col in zip(left_on_cols, right_on_cols): - eq = Column(left_col).equal_null(Column(right_col)) - on = eq if on is None else on & eq - if how == "asof": - assert left_match_col, "left_match_col was not provided to ASOF Join" + assert ( + left_match_col + ), "ASOF join was not provided a column identifier to match on for the left table" left_match_col = Column(left_match_col) # Get the new mapped right match condition identifier - assert right_match_col, "right_match_col was not provided to ASOF Join" + assert ( + right_match_col + ), "ASOF join was not provided a column identifier to match on for the right table" right_match_col = Column(right_identifiers_rename_map[right_match_col]) # ASOF Join requires the use of match_condition - assert match_comparator, "match_comparator was not provided to ASOF Join" + assert ( + match_comparator + ), "ASOF join was not provided a comparator for the match condition" + + on = None + for left_col, right_col in zip(left_on_cols, right_on_cols): + eq = Column(left_col).__eq__(Column(right_col)) + on = eq if on is None else on & eq + snowpark_dataframe = left_snowpark_dataframe_ref.snowpark_dataframe.join( right=right_snowpark_dataframe_ref.snowpark_dataframe, on=on, @@ -1225,6 +1231,12 @@ def join( right_snowpark_dataframe_ref.snowpark_dataframe, how=how ) else: + # Generate sql ON clause 'EQUAL_NULL(col1, col2) and EQUAL_NULL(col3, col4) ...' + on = None + for left_col, right_col in zip(left_on_cols, right_on_cols): + eq = Column(left_col).equal_null(Column(right_col)) + on = eq if on is None else on & eq + snowpark_dataframe = left_snowpark_dataframe_ref.snowpark_dataframe.join( right_snowpark_dataframe_ref.snowpark_dataframe, on, how ) diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index b5d7187a0d..9af8e6dc2b 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -7395,28 +7395,34 @@ def merge_asof( SnowflakeQueryCompiler """ # TODO: SNOW-1634547: Implement remaining parameters by leveraging `merge` implementation - if ( - by - or left_by - or right_by - or left_index - or right_index - or tolerance - or suffixes != ("_x", "_y") - ): + if left_index or right_index or tolerance or suffixes != ("_x", "_y"): ErrorMessage.not_implemented( "Snowpark pandas merge_asof method does not currently support parameters " - + "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - + "'suffixes', or 'tolerance'" + + "'left_index', 'right_index', 'suffixes', or 'tolerance'" ) if direction not in ("backward", "forward"): ErrorMessage.not_implemented( "Snowpark pandas merge_asof method only supports directions 'forward' and 'backward'" ) + if direction == "backward": + match_comparator = ( + MatchComparator.GREATER_THAN_OR_EQUAL_TO + if allow_exact_matches + else MatchComparator.GREATER_THAN + ) + else: + match_comparator = ( + MatchComparator.LESS_THAN_OR_EQUAL_TO + if allow_exact_matches + else MatchComparator.LESS_THAN + ) + left_frame = self._modin_frame right_frame = right._modin_frame - left_keys, right_keys = join_utils.get_join_keys( + # Get the left and right matching key and quoted identifier corresponding to the match_condition + # There will only be matching key/identifier for each table as there is only a single match condition + left_match_keys, right_match_keys = join_utils.get_join_keys( left=left_frame, right=right_frame, on=on, @@ -7425,40 +7431,62 @@ def merge_asof( left_index=left_index, right_index=right_index, ) - left_match_col = ( + left_match_identifier = ( left_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( - left_keys + left_match_keys )[0][0] ) - right_match_col = ( + right_match_identifier = ( right_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( - right_keys + right_match_keys )[0][0] ) - - if direction == "backward": - match_comparator = ( - MatchComparator.GREATER_THAN_OR_EQUAL_TO - if allow_exact_matches - else MatchComparator.GREATER_THAN + coalesce_config = join_utils.get_coalesce_config( + left_keys=left_match_keys, + right_keys=right_match_keys, + external_join_keys=[], + ) + + # Get the left and right matching keys and quoted identifiers corresponding to the 'on' condition + if by or (left_by and right_by): + left_on_keys, right_on_keys = join_utils.get_join_keys( + left=left_frame, + right=right_frame, + on=by, + left_on=left_by, + right_on=right_by, + ) + left_on_identifiers = [ + ids[0] + for ids in left_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( + left_on_keys + ) + ] + right_on_identifiers = [ + ids[0] + for ids in right_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( + right_on_keys + ) + ] + coalesce_config.extend( + join_utils.get_coalesce_config( + left_keys=left_on_keys, + right_keys=right_on_keys, + external_join_keys=[], + ) ) else: - match_comparator = ( - MatchComparator.LESS_THAN_OR_EQUAL_TO - if allow_exact_matches - else MatchComparator.LESS_THAN - ) - - coalesce_config = join_utils.get_coalesce_config( - left_keys=left_keys, right_keys=right_keys, external_join_keys=[] - ) + left_on_identifiers = [] + right_on_identifiers = [] joined_frame, _ = join_utils.join( left=left_frame, right=right_frame, + left_on=left_on_identifiers, + right_on=right_on_identifiers, how="asof", - left_match_col=left_match_col, - right_match_col=right_match_col, + left_match_col=left_match_identifier, + right_match_col=right_match_identifier, match_comparator=match_comparator, join_key_coalesce_config=coalesce_config, sort=True, diff --git a/tests/integ/modin/test_merge_asof.py b/tests/integ/modin/test_merge_asof.py index 681d339da9..51dda7889e 100644 --- a/tests/integ/modin/test_merge_asof.py +++ b/tests/integ/modin/test_merge_asof.py @@ -105,6 +105,7 @@ def left_right_timestamp_data(): pd.Timestamp("2016-05-25 13:30:00.072"), pd.Timestamp("2016-05-25 13:30:00.075"), ], + "ticker": ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"], "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01], "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03], } @@ -118,6 +119,7 @@ def left_right_timestamp_data(): pd.Timestamp("2016-05-25 13:30:00.048"), pd.Timestamp("2016-05-25 13:30:00.048"), ], + "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"], "price": [51.95, 51.95, 720.77, 720.92, 98.0], "quantity": [75, 155, 100, 100, 100], } @@ -229,14 +231,39 @@ def test_merge_asof_left_right_on( assert_snowpark_pandas_equal_to_pandas(snow_output, native_output) +@pytest.mark.parametrize("by", ["ticker", ["ticker"]]) @sql_count_checker(query_count=1, join_count=1) -def test_merge_asof_timestamps(left_right_timestamp_data): +def test_merge_asof_by(left_right_timestamp_data, by): left_native_df, right_native_df = left_right_timestamp_data left_snow_df, right_snow_df = pd.DataFrame(left_native_df), pd.DataFrame( right_native_df ) - native_output = native_pd.merge_asof(left_native_df, right_native_df, on="time") - snow_output = pd.merge_asof(left_snow_df, right_snow_df, on="time") + native_output = native_pd.merge_asof( + left_native_df, right_native_df, on="time", by=by + ) + snow_output = pd.merge_asof(left_snow_df, right_snow_df, on="time", by=by) + assert_snowpark_pandas_equal_to_pandas(snow_output, native_output) + + +@pytest.mark.parametrize( + "left_by, right_by", + [ + ("ticker", "ticker"), + (["ticker", "bid"], ["ticker", "price"]), + ], +) +@sql_count_checker(query_count=1, join_count=1) +def test_merge_asof_left_right_by(left_right_timestamp_data, left_by, right_by): + left_native_df, right_native_df = left_right_timestamp_data + left_snow_df, right_snow_df = pd.DataFrame(left_native_df), pd.DataFrame( + right_native_df + ) + native_output = native_pd.merge_asof( + left_native_df, right_native_df, on="time", left_by=left_by, right_by=right_by + ) + snow_output = pd.merge_asof( + left_snow_df, right_snow_df, on="time", left_by=left_by, right_by=right_by + ) assert_snowpark_pandas_equal_to_pandas(snow_output, native_output) @@ -248,8 +275,10 @@ def test_merge_asof_date(left_right_timestamp_data): left_snow_df, right_snow_df = pd.DataFrame(left_native_df), pd.DataFrame( right_native_df ) - native_output = native_pd.merge_asof(left_native_df, right_native_df, on="time") - snow_output = pd.merge_asof(left_snow_df, right_snow_df, on="time") + native_output = native_pd.merge_asof( + left_native_df, right_native_df, on="time", by="ticker" + ) + snow_output = pd.merge_asof(left_snow_df, right_snow_df, on="time", by="ticker") assert_snowpark_pandas_equal_to_pandas(snow_output, native_output) @@ -360,9 +389,7 @@ def test_merge_asof_params_unsupported(left_right_timestamp_data): with pytest.raises( NotImplementedError, match=( - "Snowpark pandas merge_asof method does not currently support parameters " - "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - "'suffixes', or 'tolerance'" + "Snowpark pandas merge_asof method only supports directions 'forward' and 'backward'" ), ): pd.merge_asof( @@ -372,19 +399,7 @@ def test_merge_asof_params_unsupported(left_right_timestamp_data): NotImplementedError, match=( "Snowpark pandas merge_asof method does not currently support parameters " - "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - "'suffixes', or 'tolerance'" - ), - ): - pd.merge_asof( - left_snow_df, right_snow_df, on="time", left_by="price", right_by="quantity" - ) - with pytest.raises( - NotImplementedError, - match=( - "Snowpark pandas merge_asof method does not currently support parameters " - "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - "'suffixes', or 'tolerance'" + + "'left_index', 'right_index', 'suffixes', or 'tolerance'" ), ): pd.merge_asof(left_snow_df, right_snow_df, left_index=True, right_index=True) @@ -392,8 +407,7 @@ def test_merge_asof_params_unsupported(left_right_timestamp_data): NotImplementedError, match=( "Snowpark pandas merge_asof method does not currently support parameters " - "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - "'suffixes', or 'tolerance'" + + "'left_index', 'right_index', 'suffixes', or 'tolerance'" ), ): pd.merge_asof( @@ -406,8 +420,7 @@ def test_merge_asof_params_unsupported(left_right_timestamp_data): NotImplementedError, match=( "Snowpark pandas merge_asof method does not currently support parameters " - "'by', 'left_by', 'right_by', 'left_index', 'right_index', " - "'suffixes', or 'tolerance'" + + "'left_index', 'right_index', 'suffixes', or 'tolerance'" ), ): pd.merge_asof( From ad2a76d3cd8865e2eee5f83ca225933af494bdf9 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 12 Sep 2024 16:40:23 -0700 Subject: [PATCH 4/6] add comment Signed-off-by: Naren Krishna --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index c9415a5492..038256361c 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -311,6 +311,7 @@ def _create_internal_frame_with_join_or_align_result( Returns: InternalFrame for the join/aligned result with all fields set accordingly. """ + result_helper = JoinOrAlignOrderedDataframeResultHelper( left.ordered_dataframe, right.ordered_dataframe, @@ -1454,6 +1455,7 @@ def _sort_on_join_keys(self) -> None: elif self._how == "right": ordering_column_identifiers = mapped_right_on elif self._how == "asof": + # Order only by the left match_condition column ordering_column_identifiers = [mapped_left_on[0]] else: # left join, inner join, left align, coalesce align ordering_column_identifiers = mapped_left_on @@ -1467,6 +1469,7 @@ def _sort_on_join_keys(self) -> None: ordering_columns = [ OrderingColumn(key) for key in ordering_column_identifiers ] + join_or_align_result.ordering_columns + # reset the order of the ordered_dataframe to the final order self.join_or_align_result = join_or_align_result.sort(ordering_columns) From 85d4aef0e1076cc3702ec1ca10883942fb5f4371 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Fri, 13 Sep 2024 08:54:37 -0700 Subject: [PATCH 5/6] address comments Signed-off-by: Naren Krishna --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 038256361c..d07211dbcf 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -234,7 +234,6 @@ def join( assert len(join_key_coalesce_config) == len( left_join_key ), "ASOF join join_key_coalesce_config must be of same length as left_join_key and right_join_key" - assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) else: left_join_key = left_on right_join_key = right_on @@ -247,7 +246,8 @@ def join( assert len(join_key_coalesce_config) == len( left_join_key ), "join_key_coalesce_config must be of same length as left_on and right_on" - assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) + + assert_snowpark_pandas_types_match(left, right, left_join_key, right_join_key) # Re-project the active columns to make sure all active columns of the internal frame participate # in the join operation, and unnecessary columns are dropped from the projected columns. From 3f8fa23c90a72c1594e6c1d51c6a4603a60b1dc5 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Fri, 13 Sep 2024 14:02:54 -0700 Subject: [PATCH 6/6] fix merge conflict Signed-off-by: Naren Krishna --- CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19220e11ad..f46c6ea820 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,16 +11,13 @@ #### New Features - Added support for `TimedeltaIndex.mean` method. -<<<<<<< asof_join_refactor -- Added support for `by`, `left_by`, and `right_by` for `pd.merge_asof`. -======= - Added support for some cases of aggregating `Timedelta` columns on `axis=0` with `agg` or `aggregate`. +- Added support for `by`, `left_by`, and `right_by` for `pd.merge_asof`. ## 1.22.1 (2024-09-11) This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for detailed release content. ->>>>>>> main ## 1.22.0 (2024-09-10)