Skip to content

Commit

Permalink
SNOW-1638049: Add support for pd.to_timedelta (#2179)
Browse files Browse the repository at this point in the history
Fixes SNOW-1638049

Added support for pd.to_timedelta
- Supports lazy snowpark pandas objects Series and Index.
- does not support `errors` parameter.
- does not support converting from strings (on lazy objects).
  • Loading branch information
sfc-gh-nkumar authored Aug 29, 2024
1 parent d66f57b commit 98723ca
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
- support for adding or subtracting timestamps and `Timedelta`.
- support for binary arithmetic between two `Timedelta` values.
- support for lazy `TimedeltaIndex`.
- support for `pd.to_timedelta`.
- Added support for index's arithmetic and comparison operators.
- Added support for `Series.dt.round`.
- Added documentation pages for `DatetimeIndex`.
Expand Down
3 changes: 2 additions & 1 deletion docs/source/modin/supported/general_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ Top-level dealing with datetime-like data
| | | | - or ``arg`` is DataFrame and data type is not int |
| | | | - or ``arg`` is Series and data type is string |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_timedelta`` | N | | |
| ``to_timedelta`` | P | ``errors`` | ``N`` if ``errors`` is given or converting from |
| | | | string type |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+

Top-level dealing with Interval data
Expand Down
116 changes: 106 additions & 10 deletions src/snowflake/snowpark/modin/pandas/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from __future__ import annotations

from collections.abc import Hashable, Iterable, Mapping, Sequence
from datetime import date, datetime, tzinfo
from datetime import date, datetime, timedelta, tzinfo
from logging import getLogger
from typing import TYPE_CHECKING, Any, Literal, Union

Expand All @@ -35,6 +35,7 @@
from pandas._libs.tslibs import to_offset
from pandas._typing import (
AnyArrayLike,
ArrayLike,
Axis,
DateTimeErrorChoices,
IndexLabel,
Expand Down Expand Up @@ -2096,21 +2097,116 @@ def get_names(obj):
return None


@_inherit_docstrings(pandas.to_datetime, apilink="pandas.to_timedelta")
@snowpark_pandas_telemetry_standalone_function_decorator
@pandas_module_level_function_not_implemented()
def to_timedelta(arg, unit=None, errors="raise"): # noqa: PR01, RT01, D200
def to_timedelta(
arg: str
| int
| float
| timedelta
| list
| tuple
| range
| ArrayLike
| pd.Index
| pd.Series
| pandas.Index
| pandas.Series,
unit: str = None,
errors: DateTimeErrorChoices = "raise",
):
"""
Convert argument to timedelta.
Accepts str, timedelta, list-like or Series for arg parameter.
Returns a Series if and only if arg is provided as a Series.
Timedeltas are absolute differences in times, expressed in difference
units (e.g. days, hours, minutes, seconds). This method converts
an argument from a recognized timedelta format / value into
a Timedelta type.
Parameters
----------
arg : str, timedelta, list-like or Series
The data to be converted to timedelta.
unit : str, optional
Denotes the unit of the arg for numeric `arg`. Defaults to ``"ns"``.
Possible values:
* 'W'
* 'D' / 'days' / 'day'
* 'hours' / 'hour' / 'hr' / 'h' / 'H'
* 'm' / 'minute' / 'min' / 'minutes' / 'T'
* 's' / 'seconds' / 'sec' / 'second' / 'S'
* 'ms' / 'milliseconds' / 'millisecond' / 'milli' / 'millis' / 'L'
* 'us' / 'microseconds' / 'microsecond' / 'micro' / 'micros' / 'U'
* 'ns' / 'nanoseconds' / 'nano' / 'nanos' / 'nanosecond' / 'N'
Must not be specified when `arg` contains strings and ``errors="raise"``.
errors : {'ignore', 'raise', 'coerce'}, default 'raise'
- If 'raise', then invalid parsing will raise an exception.
- If 'coerce', then invalid parsing will be set as NaT.
- If 'ignore', then invalid parsing will return the input.
Returns
-------
timedelta
If parsing succeeded.
Return type depends on input:
- list-like: TimedeltaIndex of timedelta64 dtype
- Series: Series of timedelta64 dtype
- scalar: Timedelta
See Also
--------
DataFrame.astype : Cast argument to a specified dtype.
to_datetime : Convert argument to datetime.
convert_dtypes : Convert dtypes.
Notes
-----
If the precision is higher than nanoseconds, the precision of the duration is
truncated to nanoseconds for string inputs.
Examples
--------
Parsing a single string to a Timedelta:
>>> pd.to_timedelta('1 days 06:05:01.00003')
Timedelta('1 days 06:05:01.000030')
>>> pd.to_timedelta('15.5us')
Timedelta('0 days 00:00:00.000015500')
Parsing a list or array of strings:
>>> pd.to_timedelta(['1 days 06:05:01.00003', '15.5us', 'nan'])
TimedeltaIndex(['1 days 06:05:01.000030', '0 days 00:00:00.000015500', NaT], dtype='timedelta64[ns]', freq=None)
Converting numbers by specifying the `unit` keyword argument:
>>> pd.to_timedelta(np.arange(5), unit='s')
TimedeltaIndex(['0 days 00:00:00', '0 days 00:00:01', '0 days 00:00:02',
'0 days 00:00:03', '0 days 00:00:04'],
dtype='timedelta64[ns]', freq=None)
>>> pd.to_timedelta(np.arange(5), unit='d')
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]', freq=None)
"""
# TODO: SNOW-1063345: Modin upgrade - modin.pandas functions in general.py
if isinstance(arg, Series):
query_compiler = arg._query_compiler.to_timedelta(unit=unit, errors=errors)
return Series(query_compiler=query_compiler)
return pandas.to_timedelta(arg, unit=unit, errors=errors)
# If arg is snowpark pandas lazy object call to_timedelta on the query compiler.
if isinstance(arg, (Series, pd.Index)):
query_compiler = arg._query_compiler.to_timedelta(
unit=unit if unit else "ns",
errors=errors,
include_index=isinstance(arg, pd.Index),
)
return arg.__constructor__(query_compiler=query_compiler)

# Use native pandas to_timedelta for scalar values and list-like objects.
result = pandas.to_timedelta(arg, unit=unit, errors=errors)
# Convert to lazy if result is a native pandas Series or Index.
if isinstance(result, pandas.Index):
return pd.Index(result)
if isinstance(result, pandas.Series):
return pd.Series(result)
# Return the result as is for scaler.
return result


@snowpark_pandas_telemetry_standalone_function_decorator
Expand Down
63 changes: 63 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/timestamp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
cast,
convert_timezone,
date_part,
floor,
iff,
to_decimal,
)
Expand All @@ -39,6 +40,57 @@
_FractionalType,
)

# Reference: https://github.com/pandas-dev/pandas/blob/ef3368a8046f3c2e98c773be179f0a49a51d4bdc/pandas/_libs/tslibs/timedeltas.pyx#L109
# Note: this does not include deprecated units 'M' and 'Y'.
VALID_PANDAS_TIMEDELTA_ABBREVS = {
"W": "W",
"w": "W",
"D": "D",
"d": "D",
"days": "D",
"day": "D",
"hours": "h",
"hour": "h",
"hr": "h",
"h": "h",
"m": "m",
"minute": "m",
"min": "m",
"minutes": "m",
"s": "s",
"seconds": "s",
"sec": "s",
"second": "s",
"ms": "ms",
"milliseconds": "ms",
"millisecond": "ms",
"milli": "ms",
"millis": "ms",
"us": "us",
"microseconds": "us",
"microsecond": "us",
"µs": "us",
"micro": "us",
"micros": "us",
"ns": "ns",
"nanoseconds": "ns",
"nano": "ns",
"nanos": "ns",
"nanosecond": "ns",
}

# multipliers to convert the timedelta unit to nanoseconds
TIMEDELTA_UNIT_MULTIPLIER = {
"W": 7 * 24 * 3600 * (10**9),
"D": 24 * 3600 * (10**9),
"h": 3600 * (10**9),
"m": 60 * (10**9),
"s": (10**9),
"ms": (10**6),
"us": (10**3),
"ns": 1,
}

VALID_TO_DATETIME_DF_KEYS = {
"year": "year",
"years": "year",
Expand Down Expand Up @@ -111,6 +163,17 @@ def col_to_s(col: Column, unit: Literal["D", "s", "ms", "us", "ns"]) -> Column:
return col / 10**9


def col_to_timedelta(col: Column, unit: str) -> Column:
"""
Converts ``col`` (stored in the specified units) to timedelta nanoseconds.
"""
td_unit = VALID_PANDAS_TIMEDELTA_ABBREVS.get(unit)
if not td_unit:
# Same error as native pandas.
raise ValueError(f"invalid unit abbreviation: {unit}")
return cast(floor(col * TIMEDELTA_UNIT_MULTIPLIER[td_unit]), LongType())


PANDAS_DATETIME_FORMAT_TO_SNOWFLAKE_MAPPING = {
"%Y": "YYYY",
"%y": "YY",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@
from snowflake.snowpark.modin.plugin._internal.timestamp_utils import (
VALID_TO_DATETIME_DF_KEYS,
DateTimeOrigin,
col_to_timedelta,
generate_timestamp_col,
raise_if_to_datetime_not_supported,
to_snowflake_timestamp_format,
Expand Down Expand Up @@ -6223,6 +6224,86 @@ def dataframe_to_datetime(
)
)

def to_timedelta(
self,
unit: str = "ns",
errors: DateTimeErrorChoices = "raise",
include_index: bool = False,
) -> "SnowflakeQueryCompiler":
"""
Convert data to timedelta.

Args:
unit: Denotes unit of the input data. Defaults to 'ns'.
Possible values:
* 'W'
* 'D' / 'days' / 'day'
* 'hours' / 'hour' / 'hr' / 'h' / 'H'
* 'm' / 'minute' / 'min' / 'minutes' / 'T'
* 's' / 'seconds' / 'sec' / 'second' / 'S'
* 'ms' / 'milliseconds' / 'millisecond' / 'milli' / 'millis' / 'L'
* 'us' / 'microseconds' / 'microsecond' / 'micro' / 'micros' / 'U'
* 'ns' / 'nanoseconds' / 'nano' / 'nanos' / 'nanosecond' / 'N'
errors : {'ignore', 'raise', 'coerce'}, default 'raise'
- If 'raise', then invalid parsing will raise an exception.
- If 'coerce', then invalid parsing will be set as NaT.
- If 'ignore', then invalid parsing will return the input.
include_index: If true, also convert index columns to timedelta.

Returns:
A new query compiler with the data converted to timedelta.
"""
if errors != "raise":
ErrorMessage.parameter_not_implemented_error("errors", "pd.to_timedelta")
internal_frame = self._modin_frame
col_ids = internal_frame.data_column_snowflake_quoted_identifiers
data_column_types = [TimedeltaType()] * len(col_ids)

index_column_types = internal_frame.cached_index_column_snowpark_pandas_types
if include_index:
col_ids.extend(internal_frame.index_column_snowflake_quoted_identifiers)
index_column_types = [TimedeltaType()] * len(
internal_frame.index_column_snowflake_quoted_identifiers
)

# Raise error if the original data type is not numeric.
id_to_type = internal_frame.quoted_identifier_to_snowflake_type(col_ids)
for id, sf_type in id_to_type.items():
if isinstance(sf_type, TimedeltaType):
# already timedelta
col_ids.remove(id)
elif isinstance(sf_type, StringType):
ErrorMessage.not_implemented(
"Snowpark pandas method pd.to_timedelta does not yet support conversion from string type"
)
elif not isinstance(sf_type, _NumericType):
raise TypeError(
f"dtype {TypeMapper.to_pandas(sf_type)} cannot be converted to timedelta64[ns]"
)

# If all columns are already timedelta. No conversion is needed.
if not col_ids:
return self

internal_frame = (
internal_frame.update_snowflake_quoted_identifiers_with_expressions(
{col_id: col_to_timedelta(col(col_id), unit) for col_id in col_ids}
).frame
)

return SnowflakeQueryCompiler(
internal_frame.create(
ordered_dataframe=internal_frame.ordered_dataframe,
data_column_pandas_index_names=internal_frame.data_column_pandas_index_names,
data_column_pandas_labels=internal_frame.data_column_pandas_labels,
index_column_pandas_labels=internal_frame.index_column_pandas_labels,
data_column_snowflake_quoted_identifiers=internal_frame.data_column_snowflake_quoted_identifiers,
index_column_snowflake_quoted_identifiers=internal_frame.index_column_snowflake_quoted_identifiers,
data_column_types=data_column_types,
index_column_types=index_column_types,
)
)

def series_to_datetime(
self,
errors: DateTimeErrorChoices = "raise",
Expand Down
6 changes: 4 additions & 2 deletions src/snowflake/snowpark/modin/plugin/extensions/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2624,9 +2624,11 @@ def __repr__(self) -> str:
name_repr = f", name='{self.name}'" if self.name else ""
# Length is displayed only when the number of elements is greater than the number of elements to display.
length_repr = f", length={length_of_index}" if too_many_elem else ""
# The frequency is displayed only for DatetimeIndex.
# The frequency is displayed for DatetimeIndex and TimedeltaIndex
# TODO: SNOW-1625233 update freq_repr; replace None with the correct value.
freq_repr = ", freq=None" if "DatetimeIndex" in class_name else ""
freq_repr = (
", freq=None" if class_name in ("DatetimeIndex", "TimedeltaIndex") else ""
)

repr = (
class_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ def __init__(
Examples
--------
>>> pd.TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'])
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]')
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]', freq=None)
We can also let pandas infer the frequency when possible.
>>> pd.TimedeltaIndex(np.arange(5) * 24 * 3600 * 1e9, freq='infer')
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]')
TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]', freq=None)
"""
if query_compiler:
# Raise error if underlying type is not a Timedelta type.
Expand Down
Loading

0 comments on commit 98723ca

Please sign in to comment.