Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into ibis-lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli committed Jan 20, 2025
2 parents c286d79 + d9a04c0 commit 2723fa2
Show file tree
Hide file tree
Showing 70 changed files with 441 additions and 207 deletions.
2 changes: 1 addition & 1 deletion narwhals/_arrow/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def __call__(self: Self, df: ArrowDataFrame) -> Sequence[ArrowSeries]:
except TypeError:
# `self._then_value` is a scalar and can't be converted to an expression
value_series = plx._create_series_from_scalar(
self._then_value, reference_series=condition
self._then_value, reference_series=condition.alias("literal")
)

condition_native, value_series_native = broadcast_series(
Expand Down
4 changes: 2 additions & 2 deletions narwhals/_dask/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ def __call__(self, df: DaskLazyFrame) -> Sequence[dx.Series]:

if is_scalar:
_df = condition.to_frame("a")
_df["tmp"] = value_sequence[0]
value_series = _df["tmp"]
_df["literal"] = value_sequence[0]
value_series = _df["literal"]
else:
value_series = value_sequence

Expand Down
52 changes: 52 additions & 0 deletions narwhals/_duckdb/expr_dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,55 @@ def date(self) -> DuckDBExpr:
"date",
returns_scalar=self._compliant_expr._returns_scalar,
)

def total_minutes(self) -> DuckDBExpr:
from duckdb import ConstantExpression
from duckdb import FunctionExpression

return self._compliant_expr._from_call(
lambda _input: FunctionExpression(
"datepart", ConstantExpression("minute"), _input
),
"total_minutes",
returns_scalar=self._compliant_expr._returns_scalar,
)

def total_seconds(self) -> DuckDBExpr:
from duckdb import ConstantExpression
from duckdb import FunctionExpression

return self._compliant_expr._from_call(
lambda _input: 60
* FunctionExpression("datepart", ConstantExpression("minute"), _input)
+ FunctionExpression("datepart", ConstantExpression("second"), _input),
"total_seconds",
returns_scalar=self._compliant_expr._returns_scalar,
)

def total_milliseconds(self) -> DuckDBExpr:
from duckdb import ConstantExpression
from duckdb import FunctionExpression

return self._compliant_expr._from_call(
lambda _input: 60_000
* FunctionExpression("datepart", ConstantExpression("minute"), _input)
+ FunctionExpression("datepart", ConstantExpression("millisecond"), _input),
"total_milliseconds",
returns_scalar=self._compliant_expr._returns_scalar,
)

def total_microseconds(self) -> DuckDBExpr:
from duckdb import ConstantExpression
from duckdb import FunctionExpression

return self._compliant_expr._from_call(
lambda _input: 60_000_000
* FunctionExpression("datepart", ConstantExpression("minute"), _input)
+ FunctionExpression("datepart", ConstantExpression("microsecond"), _input),
"total_microseconds",
returns_scalar=self._compliant_expr._returns_scalar,
)

def total_nanoseconds(self) -> DuckDBExpr:
msg = "`total_nanoseconds` is not implemented for DuckDB"
raise NotImplementedError(msg)
17 changes: 11 additions & 6 deletions narwhals/_duckdb/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,22 +248,27 @@ def __call__(self, df: DuckDBLazyFrame) -> Sequence[duckdb.Expression]:
value = parse_into_expr(self._then_value, namespace=plx)(df)[0]
except TypeError:
# `self._otherwise_value` is a scalar and can't be converted to an expression
value = ConstantExpression(self._then_value)
value = ConstantExpression(self._then_value).alias("literal")
value = cast("duckdb.Expression", value)
value_name = get_column_name(df, value)

if self._otherwise_value is None:
return [CaseExpression(condition=condition, value=value)]
return [CaseExpression(condition=condition, value=value).alias(value_name)]
try:
otherwise_expr = parse_into_expr(self._otherwise_value, namespace=plx)
except TypeError:
# `self._otherwise_value` is a scalar and can't be converted to an expression
return [
CaseExpression(condition=condition, value=value).otherwise(
ConstantExpression(self._otherwise_value)
)
CaseExpression(condition=condition, value=value)
.otherwise(ConstantExpression(self._otherwise_value))
.alias(value_name)
]
otherwise = otherwise_expr(df)[0]
return [CaseExpression(condition=condition, value=value).otherwise(otherwise)]
return [
CaseExpression(condition=condition, value=value)
.otherwise(otherwise)
.alias(value_name)
]

def then(self, value: DuckDBExpr | Any) -> DuckDBThen:
self._then_value = value
Expand Down
3 changes: 1 addition & 2 deletions narwhals/_pandas_like/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,13 +467,12 @@ def __call__(self, df: PandasLikeDataFrame) -> Sequence[PandasLikeSeries]:
except TypeError:
# `self._then_value` is a scalar and can't be converted to an expression
value_series = plx._create_series_from_scalar(
self._then_value, reference_series=condition
self._then_value, reference_series=condition.alias("literal")
)

condition_native, value_series_native = broadcast_align_and_extract_native(
condition, value_series
)

if self._otherwise_value is None:
return [
value_series._from_native_series(
Expand Down
2 changes: 1 addition & 1 deletion narwhals/_pandas_like/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def broadcast_align_and_extract_native(
s = rhs._native_series
return (
lhs._native_series,
s.__class__(s.iloc[0], index=lhs_index, dtype=s.dtype),
s.__class__(s.iloc[0], index=lhs_index, dtype=s.dtype, name=rhs.name),
)
if lhs.len() == 1:
# broadcast
Expand Down
5 changes: 5 additions & 0 deletions narwhals/_spark_like/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Sequence

from narwhals._expression_parsing import infer_new_root_output_names
from narwhals._spark_like.expr_dt import SparkLikeExprDateTimeNamespace
from narwhals._spark_like.expr_name import SparkLikeExprNameNamespace
from narwhals._spark_like.expr_str import SparkLikeExprStringNamespace
from narwhals._spark_like.utils import get_column_name
Expand Down Expand Up @@ -541,3 +542,7 @@ def str(self: Self) -> SparkLikeExprStringNamespace:
@property
def name(self: Self) -> SparkLikeExprNameNamespace:
return SparkLikeExprNameNamespace(self)

@property
def dt(self: Self) -> SparkLikeExprDateTimeNamespace:
return SparkLikeExprDateTimeNamespace(self)
135 changes: 135 additions & 0 deletions narwhals/_spark_like/expr_dt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import Column
from typing_extensions import Self

from narwhals._spark_like.expr import SparkLikeExpr


class SparkLikeExprDateTimeNamespace:
def __init__(self: Self, expr: SparkLikeExpr) -> None:
self._compliant_expr = expr

def date(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.to_date,
"date",
returns_scalar=self._compliant_expr._returns_scalar,
)

def year(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.year,
"year",
returns_scalar=self._compliant_expr._returns_scalar,
)

def month(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.month,
"month",
returns_scalar=self._compliant_expr._returns_scalar,
)

def day(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.dayofmonth,
"day",
returns_scalar=self._compliant_expr._returns_scalar,
)

def hour(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.hour,
"hour",
returns_scalar=self._compliant_expr._returns_scalar,
)

def minute(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.minute,
"minute",
returns_scalar=self._compliant_expr._returns_scalar,
)

def second(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.second,
"second",
returns_scalar=self._compliant_expr._returns_scalar,
)

def millisecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _millisecond(_input: Column) -> Column:
return F.floor((F.unix_micros(_input) % 1_000_000) / 1000)

return self._compliant_expr._from_call(
_millisecond,
"millisecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def microsecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _microsecond(_input: Column) -> Column:
return F.unix_micros(_input) % 1_000_000

return self._compliant_expr._from_call(
_microsecond,
"microsecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def nanosecond(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _nanosecond(_input: Column) -> Column:
return (F.unix_micros(_input) % 1_000_000) * 1000

return self._compliant_expr._from_call(
_nanosecond,
"nanosecond",
returns_scalar=self._compliant_expr._returns_scalar,
)

def ordinal_day(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
F.dayofyear,
"ordinal_day",
returns_scalar=self._compliant_expr._returns_scalar,
)

def weekday(self: Self) -> SparkLikeExpr:
from pyspark.sql import functions as F # noqa: N812

def _weekday(_input: Column) -> Column:
# PySpark's dayofweek returns 1-7 for Sunday-Saturday
return (F.dayofweek(_input) + 6) % 7

return self._compliant_expr._from_call(
_weekday,
"weekday",
returns_scalar=self._compliant_expr._returns_scalar,
)
54 changes: 54 additions & 0 deletions narwhals/_spark_like/expr_str.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import overload

if TYPE_CHECKING:
from pyspark.sql import Column
Expand Down Expand Up @@ -128,3 +129,56 @@ def to_lowercase(self: Self) -> SparkLikeExpr:
"to_lowercase",
returns_scalar=self._compliant_expr._returns_scalar,
)

def to_datetime(self: Self, format: str | None) -> SparkLikeExpr: # noqa: A002
from pyspark.sql import functions as F # noqa: N812

return self._compliant_expr._from_call(
lambda _input: F.to_timestamp(
F.replace(_input, F.lit("T"), F.lit(" ")),
format=strptime_to_pyspark_format(format),
),
"to_datetime",
returns_scalar=self._compliant_expr._returns_scalar,
)


@overload
def strptime_to_pyspark_format(format: None) -> None: ...


@overload
def strptime_to_pyspark_format(format: str) -> str: ...


def strptime_to_pyspark_format(format: str | None) -> str | None: # noqa: A002
"""Converts a Python strptime datetime format string to a PySpark datetime format string."""
# Mapping from Python strptime format to PySpark format
if format is None:
return None

# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
format_mapping = {
"%Y": "y", # Year with century
"%y": "y", # Year without century
"%m": "M", # Month
"%d": "d", # Day of the month
"%H": "H", # Hour (24-hour clock) 0-23
"%I": "h", # Hour (12-hour clock) 1-12
"%M": "m", # Minute
"%S": "s", # Second
"%f": "S", # Microseconds -> Milliseconds
"%p": "a", # AM/PM
"%a": "E", # Abbreviated weekday name
"%A": "E", # Full weekday name
"%j": "D", # Day of the year
"%z": "Z", # Timezone offset
"%s": "X", # Unix timestamp
}

# Replace Python format specifiers with PySpark specifiers
pyspark_format = format
for py_format, spark_format in format_mapping.items():
pyspark_format = pyspark_format.replace(py_format, spark_format)
return pyspark_format.replace("T", " ")
Loading

0 comments on commit 2723fa2

Please sign in to comment.