Skip to content

feat: add DuckDB join_asof #1860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,10 @@ def join_asof(
*,
left_on: str | None,
right_on: str | None,
on: str | None,
by_left: str | list[str] | None,
by_right: str | list[str] | None,
by: str | list[str] | None,
by_left: list[str] | None,
by_right: list[str] | None,
strategy: Literal["backward", "forward", "nearest"],
suffix: str = "_right",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really minor, but can we avoid setting the default in the compliant level? So, in narwhals.dataframe.py, we can have suffix: str = "_right", but in the internal methods in _arrow / _duckdb / _pandas_like / etc., we can just have suffix: str, to make sure that we're always passing it down?

) -> Self:
msg = "join_asof is not yet supported on PyArrow tables" # pragma: no cover
raise NotImplementedError(msg)
Expand Down
11 changes: 4 additions & 7 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,10 @@ def join_asof(
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
by_left: list[str] | None = None,
by_right: list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -349,12 +348,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
47 changes: 47 additions & 0 deletions narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,53 @@ def join(
res = rel.select(", ".join(select)).set_alias(original_alias)
return self._from_native_frame(res)

def join_asof(
self: Self,
other: Self,
*,
left_on: str | None = None,
right_on: str | None = None,
by_left: list[str] | None = None,
by_right: list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
import duckdb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can do this at the top of the file


lhs = self._native_frame
rhs = other._native_frame
conditions = []
if by_left is not None and by_right is not None:
conditions += [
f'lhs."{left}" = rhs."{right}"' for left, right in zip(by_left, by_right)
]
else:
by_left = by_right = []
if strategy == "backward":
conditions += [f'lhs."{left_on}" >= rhs."{right_on}"']
elif strategy == "forward":
conditions += [f'lhs."{left_on}" <= rhs."{right_on}"']
else:
msg = "Only 'backward' and 'forward' strategies are currently supported for DuckDB"
raise NotImplementedError(msg)
condition = " and ".join(conditions)
select = [f'lhs."{x}"' for x in lhs.columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ€” does `select = ['lhs.*'] work?

for col in rhs.columns:
if col in lhs.columns and (
right_on is None or col not in [right_on, *by_right]
):
select.append(f'rhs."{col}" as "{col}{suffix}"')
elif right_on is None or col not in [right_on, *by_right]:
select.append(col)
query = f"""
SELECT {",".join(select)}
FROM lhs
ASOF LEFT JOIN rhs
ON {condition}
""" # noqa: S608
res = duckdb.sql(query)
return self._from_native_frame(res)

def collect_schema(self: Self) -> dict[str, DType]:
return {
column_name: native_to_narwhals_dtype(str(duckdb_dtype), self._version)
Expand Down
11 changes: 4 additions & 7 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,10 @@ def join_asof(
*,
left_on: str | None = None,
right_on: str | None = None,
on: str | None = None,
by_left: str | list[str] | None = None,
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
by_left: list[str] | None = None,
by_right: list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
plx = self.__native_namespace__()
return self._from_native_frame(
Expand All @@ -675,12 +674,10 @@ def join_asof(
other._native_frame,
left_on=left_on,
right_on=right_on,
on=on,
left_by=by_left,
right_by=by_right,
by=by,
direction=strategy,
suffixes=("", "_right"),
suffixes=("", suffix),
),
)

Expand Down
27 changes: 16 additions & 11 deletions narwhals/dataframe.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one! πŸ‘Œ

Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
_supported_strategies = ("backward", "forward", "nearest")

Expand All @@ -302,25 +303,22 @@ def join_asof(
msg = "If `by` is specified, `by_left` and `by_right` should be None."
raise ValueError(msg)
if on is not None:
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
on=on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
)
)
left_on = right_on = on
if by is not None:
by_left = by_right = by
if isinstance(by_left, str):
by_left = [by_left]
if isinstance(by_right, str):
by_right = [by_right]
return self._from_compliant_dataframe(
self._compliant_frame.join_asof(
self._extract_compliant(other),
left_on=left_on,
right_on=right_on,
by_left=by_left,
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)
)

Expand Down Expand Up @@ -2748,6 +2746,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.

Expand All @@ -2764,6 +2763,7 @@ def join_asof(
by_right: join on these columns before doing asof join.
by: join on these columns before doing asof join.
strategy: Join strategy. The default is "backward".
suffix: Suffix to append to columns with a duplicate name.

* *backward*: selects the last row in the right DataFrame whose "on" key is less than or equal to the left's key.
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
Expand Down Expand Up @@ -2924,6 +2924,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

# --- descriptive ---
Expand Down Expand Up @@ -5030,6 +5031,7 @@ def join_asof(
by_right: str | list[str] | None = None,
by: str | list[str] | None = None,
strategy: Literal["backward", "forward", "nearest"] = "backward",
suffix: str = "_right",
) -> Self:
"""Perform an asof join.

Expand Down Expand Up @@ -5058,6 +5060,8 @@ def join_asof(
* *forward*: selects the first row in the right DataFrame whose "on" key is greater than or equal to the left's key.
* *nearest*: search selects the last row in the right DataFrame whose value is nearest to the left's key.

suffix: Suffix to append to columns with a duplicate name.

Returns:
A new joined LazyFrame.

Expand Down Expand Up @@ -5224,6 +5228,7 @@ def join_asof(
by_right=by_right,
by=by,
strategy=strategy,
suffix=suffix,
)

def clone(self: Self) -> Self:
Expand Down
Loading
Loading