Skip to content

Commit

Permalink
feat: add DuckDB join_asof (#1860)
Browse files Browse the repository at this point in the history
  • Loading branch information
raisadz authored Jan 24, 2025
1 parent aa8a501 commit 05cb650
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 119 deletions.
7 changes: 3 additions & 4 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,10 @@ def join_asof(
*,
left_on: str | None,
right_on: str | None,
on: str | None,
by_left: str | list[str] | None,
by_right: str | list[str] | None,
by: str | list[str] | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
msg = "join_asof is not yet supported on PyArrow tables" # pragma: no cover
raise NotImplementedError(msg)
Expand Down
17 changes: 7 additions & 10 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,12 @@ def join_asof(
self: Self,
other: Self,
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -349,12 +348,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
47 changes: 46 additions & 1 deletion narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Literal
from typing import Sequence

import duckdb
from duckdb import ColumnExpression

from narwhals._duckdb.utils import native_to_narwhals_dtype
Expand All @@ -22,7 +23,6 @@
if TYPE_CHECKING:
from types import ModuleType

import duckdb
import pandas as pd
import pyarrow as pa
from typing_extensions import Self
Expand Down Expand Up @@ -260,6 +260,51 @@ def join(
res = rel.select(", ".join(select)).set_alias(original_alias)
return self._from_native_frame(res)

def join_asof(
self: Self,
other: Self,
*,
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
lhs = self._native_frame
rhs = other._native_frame
conditions = []
if by_left is not None and by_right is not None:
conditions += [
f'lhs."{left}" = rhs."{right}"' for left, right in zip(by_left, by_right)
]
else:
by_left = by_right = []
if strategy == "backward":
conditions += [f'lhs."{left_on}" >= rhs."{right_on}"']
elif strategy == "forward":
conditions += [f'lhs."{left_on}" <= rhs."{right_on}"']
else:
msg = "Only 'backward' and 'forward' strategies are currently supported for DuckDB"
raise NotImplementedError(msg)
condition = " and ".join(conditions)
select = ["lhs.*"]
for col in rhs.columns:
if col in lhs.columns and (
right_on is None or col not in [right_on, *by_right]
):
select.append(f'rhs."{col}" as "{col}{suffix}"')
elif right_on is None or col not in [right_on, *by_right]:
select.append(col)
query = f"""
SELECT {",".join(select)}
FROM lhs
ASOF LEFT JOIN rhs
ON {condition}
""" # noqa: S608
res = duckdb.sql(query)
return self._from_native_frame(res)

def collect_schema(self: Self) -> dict[str, DType]:
return {
column_name: native_to_narwhals_dtype(str(duckdb_dtype), self._version)
Expand Down
17 changes: 7 additions & 10 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,13 +660,12 @@ def join_asof(
self: Self,
other: Self,
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -675,12 +674,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
27 changes: 16 additions & 11 deletions narwhals/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
_supported_strategies = ("backward", "forward", "nearest")

Expand All @@ -302,25 +303,22 @@ def join_asof(
msg = "If `by` is specified, `by_left` and `by_right` should be None."
raise ValueError(msg)
if on is not None:
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
on=on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
)
)
left_on = right_on = on
if by is not None:
by_left = by_right = by
if isinstance(by_left, str):
by_left = [by_left]
if isinstance(by_right, str):
by_right = [by_right]
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
left_on=left_on,
right_on=right_on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)
)

Expand Down Expand Up @@ -2748,6 +2746,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.
Expand All @@ -2764,6 +2763,7 @@ def join_asof(
by_right: join on these columns before doing asof join.
by: join on these columns before doing asof join.
strategy: Join strategy. The default is "backward".
suffix: Suffix to append to columns with a duplicate name.
* *backward*: selects the last row in the right DataFrame whose "on" key is less than or equal to the left's key.
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
Expand Down Expand Up @@ -2924,6 +2924,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

# --- descriptive ---
Expand Down Expand Up @@ -5030,6 +5031,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.
Expand Down Expand Up @@ -5058,6 +5060,8 @@ def join_asof(
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
* *nearest*: search selects the last row in the right DataFrame whose value is nearest to the left's key.
suffix: Suffix to append to columns with a duplicate name.
Returns:
A new joined LazyFrame.
Expand Down Expand Up @@ -5224,6 +5228,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

def clone(self: Self) -> Self:
Expand Down
Loading

0 comments on commit 05cb650

Please sign in to comment.