From dfcf8e5d44c807d459f152a88b278ff376828a6b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Tue, 4 Feb 2025 19:28:23 -0800 Subject: [PATCH] Remove cudf.Scalar from date_range/to_datetime (#17860) Towards https://github.com/rapidsai/cudf/issues/17843 Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17860 --- python/cudf/cudf/core/tools/datetimes.py | 105 ++++++++++------------- 1 file changed, 45 insertions(+), 60 deletions(-) diff --git a/python/cudf/cudf/core/tools/datetimes.py b/python/cudf/cudf/core/tools/datetimes.py index bac61851497..22d0832b27f 100644 --- a/python/cudf/cudf/core/tools/datetimes.py +++ b/python/cudf/cudf/core/tools/datetimes.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd import pandas.tseries.offsets as pd_offset +import pyarrow as pa from typing_extensions import Self import pylibcudf as plc @@ -19,6 +20,7 @@ from cudf.core import column from cudf.core.buffer import acquire_spill_lock from cudf.core.index import ensure_index +from cudf.core.scalar import pa_scalar_to_plc_scalar if TYPE_CHECKING: from collections.abc import Sequence @@ -240,6 +242,11 @@ def to_datetime( ) times_column = None + factor_denominator = ( + column.datetime._unit_to_nanoseconds_conversion["s"] + if np.datetime_data(col.dtype)[0] == "s" + else 1 + ) for u in ["h", "m", "s", "ms", "us", "ns"]: value = unit_rev.get(u) if value is not None and value in arg: @@ -252,15 +259,9 @@ def to_datetime( except ValueError: current_col = current_col.astype(dtype="float64") - factor = cudf.Scalar( + factor = ( column.datetime._unit_to_nanoseconds_conversion[u] - / ( - column.datetime._unit_to_nanoseconds_conversion[ - "s" - ] - if np.datetime_data(col.dtype)[0] == "s" - else 1 - ) + / factor_denominator ) if times_column is None: @@ -324,10 +325,7 @@ def _process_col( ): if col.dtype.kind == "f": if unit not in (None, "ns"): - factor = cudf.Scalar( - column.datetime._unit_to_nanoseconds_conversion[unit] - ) - col = col * factor + col = col * column.datetime._unit_to_nanoseconds_conversion[unit] if format is not None: # Converting to int because, @@ -337,7 +335,7 @@ def _process_col( # Instead we directly cast to int and perform # parsing against `format`. col = ( - col.astype("int") + col.astype(np.dtype(np.int64)) .astype("str") .strptime( dtype=np.dtype("datetime64[us]") @@ -351,7 +349,7 @@ def _process_col( elif col.dtype.kind in "iu": if unit in ("D", "h", "m"): - factor = cudf.Scalar( + factor = ( column.datetime._unit_to_nanoseconds_conversion[unit] / column.datetime._unit_to_nanoseconds_conversion["s"] ) @@ -589,14 +587,13 @@ def __init__(self, n=1, normalize=False, **kwds): scalars = {} for k, v in kwds.items(): if k in all_possible_units: - # Months must be int16 + # Months must be int16 or int32 if k == "months": - # TODO: throw for out-of-bounds int16 values - dtype = np.dtype(np.int16) + # TODO: throw for out-of-bounds int32 values + scalars[k] = np.int32(v) else: unit = self._UNITS_TO_CODES[k] - dtype = np.dtype(f"timedelta64[{unit}]") - scalars[k] = cudf.Scalar(v, dtype=dtype) + scalars[k] = np.timedelta64(v, unit) self._scalars = scalars @@ -647,33 +644,23 @@ def _datetime_binop( f" and {type(datetime_col).__name__}" ) if not self._is_no_op: - if "months" in self._scalars: - rhs = self._generate_months_column(len(datetime_col), op) - with acquire_spill_lock(): - datetime_col = type(datetime_col).from_pylibcudf( - plc.datetime.add_calendrical_months( - datetime_col.to_pylibcudf(mode="read"), - rhs.to_pylibcudf(mode="read"), - ) - ) - for unit, value in self._scalars.items(): - if unit != "months": - value = -value if op == "__sub__" else value - datetime_col += cudf.core.column.as_column( + value = -value if op == "__sub__" else value + if unit == "months": + with acquire_spill_lock(): + datetime_col = type(datetime_col).from_pylibcudf( + plc.datetime.add_calendrical_months( + datetime_col.to_pylibcudf(mode="read"), + pa_scalar_to_plc_scalar(pa.scalar(value)), + ) + ) + else: + datetime_col += column.as_column( value, length=len(datetime_col) ) return datetime_col - def _generate_months_column(self, size, op): - months = self._scalars["months"] - months = -months if op == "__sub__" else months - # TODO: pass a scalar instead of constructing a column - # https://github.com/rapidsai/cudf/issues/6990 - col = cudf.core.column.as_column(months, length=size) - return col - @property def _is_no_op(self) -> bool: # some logic could be implemented here for more complex cases @@ -899,14 +886,15 @@ def date_range( ) dtype = np.dtype("datetime64[ns]") + unit, _ = np.datetime_data(dtype) if freq is None: # `start`, `end`, `periods` is specified, we treat the timestamps as # integers and divide the number range evenly with `periods` elements. - start = cudf.Scalar(start, dtype=dtype).value.astype("int64") - end = cudf.Scalar(end, dtype=dtype).value.astype("int64") - arr = np.linspace(start=start, stop=end, num=periods) - result = cudf.core.column.as_column(arr).astype("datetime64[ns]") + start = dtype.type(start, unit).astype(np.dtype(np.int64)) + end = dtype.type(end, unit).astype(np.dtype(np.int64)) + arr = np.linspace(start=start, stop=end, num=periods).astype(dtype) + result = column.as_column(arr) return cudf.DatetimeIndex._from_column(result, name=name).tz_localize( tz ) @@ -941,14 +929,13 @@ def date_range( _periods_not_specified = False if start is None: - end = cudf.Scalar(end, dtype=dtype) - start = cudf.Scalar( - pd.Timestamp(end.value) - - (periods - 1) * offset._maybe_as_fast_pandas_offset(), - dtype=dtype, - ) + end = dtype.type(end, unit) + start = ( + pd.Timestamp(end) + - (periods - 1) * offset._maybe_as_fast_pandas_offset() + ).to_numpy() elif end is None: - start = cudf.Scalar(start, dtype=dtype) + start = dtype.type(start, unit) elif periods is None: # When `periods` is unspecified, its upper bound estimated by # dividing the number of nanoseconds between two timestamps with @@ -956,8 +943,8 @@ def date_range( # may contain extra elements that exceeds `end`, they are trimmed # as a post processing step. [1] _periods_not_specified = True - start = cudf.Scalar(start, dtype=dtype) - end = cudf.Scalar(end, dtype=dtype) + start = dtype.type(start, unit) + end = dtype.type(end, unit) _is_increment_sequence = end >= start periods = math.floor( @@ -985,7 +972,7 @@ def date_range( # are dropped in conversion during the binops warnings.simplefilter("ignore", UserWarning) end_estim = ( - pd.Timestamp(start.value) + pd.Timestamp(start) + periods * offset._maybe_as_fast_pandas_offset() ).to_datetime64() @@ -998,7 +985,7 @@ def date_range( res = libcudf.column.Column.from_pylibcudf( plc.filling.calendrical_month_sequence( periods, - start.device_value, + pa_scalar_to_plc_scalar(pa.scalar(start)), months, ) ) @@ -1012,13 +999,11 @@ def date_range( else: # If `offset` is fixed frequency, we generate a range of # treating `start`, `stop` and `step` as ints: - stop = end_estim.astype("int64") - start = start.value.astype("int64") + stop = end_estim.astype(np.dtype(np.int64)) + start = start.astype(np.dtype(np.int64)) step = _offset_to_nanoseconds_lower_bound(offset) arr = range(int(start), int(stop), step) - res = cudf.core.column.as_column(arr, dtype="int64").astype( - "datetime64[ns]" - ) + res = column.as_column(arr).astype(dtype) return cudf.DatetimeIndex._from_column( res, name=name, freq=freq