Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DuckDB join_asof #1860

Merged
merged 8 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,10 @@ def join_asof(
*,
left_on: str | None,
right_on: str | None,
on: str | None,
by_left: str | list[str] | None,
by_right: str | list[str] | None,
by: str | list[str] | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
msg = "join_asof is not yet supported on PyArrow tables" # pragma: no cover
raise NotImplementedError(msg)
Expand Down
17 changes: 7 additions & 10 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,12 @@ def join_asof(
self: Self,
other: Self,
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -349,12 +348,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
47 changes: 46 additions & 1 deletion narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Literal
from typing import Sequence

import duckdb
from duckdb import ColumnExpression

from narwhals._duckdb.utils import native_to_narwhals_dtype
Expand All @@ -22,7 +23,6 @@
if TYPE_CHECKING:
from types import ModuleType

import duckdb
import pandas as pd
import pyarrow as pa
from typing_extensions import Self
Expand Down Expand Up @@ -260,6 +260,51 @@ def join(
res = rel.select(", ".join(select)).set_alias(original_alias)
return self._from_native_frame(res)

def join_asof(
self: Self,
other: Self,
*,
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
lhs = self._native_frame
rhs = other._native_frame
conditions = []
if by_left is not None and by_right is not None:
conditions += [
f'lhs."{left}" = rhs."{right}"' for left, right in zip(by_left, by_right)
]
else:
by_left = by_right = []
if strategy == "backward":
conditions += [f'lhs."{left_on}" >= rhs."{right_on}"']
elif strategy == "forward":
conditions += [f'lhs."{left_on}" <= rhs."{right_on}"']
else:
msg = "Only 'backward' and 'forward' strategies are currently supported for DuckDB"
raise NotImplementedError(msg)
condition = " and ".join(conditions)
select = ["lhs.*"]
for col in rhs.columns:
if col in lhs.columns and (
right_on is None or col not in [right_on, *by_right]
):
select.append(f'rhs."{col}" as "{col}{suffix}"')
elif right_on is None or col not in [right_on, *by_right]:
select.append(col)
query = f"""
SELECT {",".join(select)}
FROM lhs
ASOF LEFT JOIN rhs
ON {condition}
""" # noqa: S608
res = duckdb.sql(query)
return self._from_native_frame(res)

def collect_schema(self: Self) -> dict[str, DType]:
return {
column_name: native_to_narwhals_dtype(str(duckdb_dtype), self._version)
Expand Down
17 changes: 7 additions & 10 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,13 +660,12 @@ def join_asof(
self: Self,
other: Self,
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
left_on: str | None,
right_on: str | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str,
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -675,12 +674,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
27 changes: 16 additions & 11 deletions narwhals/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
_supported_strategies = ("backward", "forward", "nearest")

Expand All @@ -302,25 +303,22 @@ def join_asof(
msg = "If `by` is specified, `by_left` and `by_right` should be None."
raise ValueError(msg)
if on is not None:
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
on=on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
)
)
left_on = right_on = on
if by is not None:
by_left = by_right = by
if isinstance(by_left, str):
by_left = [by_left]
if isinstance(by_right, str):
by_right = [by_right]
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
left_on=left_on,
right_on=right_on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)
)

Expand Down Expand Up @@ -2748,6 +2746,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.

Expand All @@ -2764,6 +2763,7 @@ def join_asof(
by_right: join on these columns before doing asof join.
by: join on these columns before doing asof join.
strategy: Join strategy. The default is "backward".
suffix: Suffix to append to columns with a duplicate name.

* *backward*: selects the last row in the right DataFrame whose "on" key is less than or equal to the left's key.
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
Expand Down Expand Up @@ -2924,6 +2924,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

# --- descriptive ---
Expand Down Expand Up @@ -5030,6 +5031,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.

Expand Down Expand Up @@ -5058,6 +5060,8 @@ def join_asof(
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
* *nearest*: search selects the last row in the right DataFrame whose value is nearest to the left's key.

suffix: Suffix to append to columns with a duplicate name.

Returns:
A new joined LazyFrame.

Expand Down Expand Up @@ -5224,6 +5228,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

def clone(self: Self) -> Self:
Expand Down
Loading
Loading