Skip to content

Commit 5b17a02

Browse files
author
Tristan Nixon
committed
code to create time-range DFs and (empty) lattices
1 parent baf9f80 commit 5b17a02

File tree

2 files changed

+168
-9
lines changed

2 files changed

+168
-9
lines changed

python/tempo/tsdf.py

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,24 @@
44
import logging
55
import operator
66
from abc import ABCMeta, abstractmethod
7+
from collections.abc import Iterable
78
from functools import cached_property
8-
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union
9+
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union, Mapping
910
from typing import Collection, Dict, cast, overload
1011

11-
import pyspark.sql.functions as sfn
12+
from datetime import datetime as dt, timedelta as td
13+
1214
from IPython.core.display import HTML
1315
from IPython.display import display as ipydisplay
14-
from pyspark.sql import GroupedData
15-
from pyspark.sql import SparkSession
16+
import pandas as pd
17+
from pandas.core.frame import DataFrame as PandasDataFrame
18+
19+
from pyspark import RDD
20+
import pyspark.sql.functions as sfn
21+
from pyspark.sql import SparkSession, GroupedData
1622
from pyspark.sql.column import Column
1723
from pyspark.sql.dataframe import DataFrame
18-
from pyspark.sql.types import DataType, StructType
24+
from pyspark.sql.types import AtomicType, DataType, StructType
1925
from pyspark.sql.window import Window, WindowSpec
2026

2127
import tempo.interpol as t_interpolation
@@ -97,7 +103,9 @@ def time_str_to_double(df: DataFrame,
97103

98104
class TSDF(WindowBuilder):
99105
"""
100-
This object is the main wrapper over a Spark data frame which allows a user to parallelize time series computations on a Spark data frame by various dimensions. The two dimensions required are partition_cols (list of columns by which to summarize) and ts_col (timestamp column, which can be epoch or TimestampType).
106+
This class represents a time series DataFrame (TSDF) - a DataFrame with a
107+
time series index. It can represent multiple logical time series,
108+
each identified by a unique set of series IDs.
101109
"""
102110

103111
def __init__(
@@ -142,7 +150,8 @@ def __withStandardizedColOrder(self) -> TSDF:
142150
* ts_index,
143151
* observation columns
144152
145-
:return: a :class:`TSDF` with the columns reordered into "standard order" (as described above)
153+
:return: a :class:`TSDF` with the columns reordered into
154+
"standard order" (as described above)
146155
"""
147156
std_ordered_cols = (
148157
list(self.series_ids)
@@ -155,6 +164,88 @@ def __withStandardizedColOrder(self) -> TSDF:
155164
# default column name for constructed timeseries index struct columns
156165
__DEFAULT_TS_IDX_COL = "ts_idx"
157166

167+
@classmethod
168+
def buildEmptyLattice(
169+
cls,
170+
spark: SparkSession,
171+
start_time: dt,
172+
end_time: Optional[dt] = None,
173+
step_size: Optional[td] = None,
174+
num_intervals: Optional[int] = None,
175+
ts_col: Optional[str] = None,
176+
series_ids: Optional[Any] = None,
177+
series_schema: Optional[Union[AtomicType, StructType, str]] = None,
178+
observation_cols: Optional[Union[Mapping[str, str], Iterable[str]]] = None,
179+
num_partitions: Optional[int] = None) -> TSDF:
180+
"""
181+
Construct an empty "lattice", i.e. a :class:`TSDF` with a time range
182+
for each unique series and a set of observational columns (initialized to Nulls)
183+
184+
:param spark: the Spark session to use
185+
:param start_time: the start time of the lattice
186+
:param end_time: the end time of the lattice (optional)
187+
:param step_size: the step size between each time interval (optional)
188+
:param num_intervals: the number of intervals to create (optional)
189+
:param ts_col: the name of the timestamp column (optional)
190+
:param series_ids: the unique series identifiers (optional)
191+
:param series_schema: the schema of the series identifiers (optional)
192+
:param observation_cols: the observational columns to include (optional)
193+
:param num_partitions: the number of partitions to create (optional)
194+
195+
:return: a :class:`TSDF` representing the empty lattice
196+
"""
197+
198+
# set a default timestamp column if not provided
199+
if ts_col is None:
200+
ts_col = cls.__DEFAULT_TS_IDX_COL
201+
202+
# initialize the lattice as a time range
203+
lattice_df = t_utils.time_range(spark,
204+
start_time,
205+
end_time,
206+
step_size,
207+
num_intervals,
208+
ts_colname=ts_col)
209+
select_exprs = [sfn.col(ts_col)]
210+
211+
# handle construction of the series_ids DataFrame
212+
series_df = None
213+
if series_ids:
214+
if isinstance(series_ids, DataFrame):
215+
series_df = series_ids
216+
elif isinstance(series_ids, (RDD, PandasDataFrame)):
217+
series_df = spark.createDataFrame(series_ids)
218+
elif isinstance(series_ids, dict):
219+
series_df = spark.createDataFrame(pd.DataFrame(series_ids))
220+
else:
221+
series_df = spark.createDataFrame(data=series_ids, schema=series_schema)
222+
# add the series columns to the select expressions
223+
select_exprs += [sfn.col(c) for c in series_df.columns]
224+
# lattice is the cross join of the time range and the series identifiers
225+
lattice_df = lattice_df.crossJoin(series_df)
226+
227+
# set up select expressions for the observation columns
228+
if observation_cols:
229+
# convert to a dict if not already, mapping all columns to "double" types
230+
if not isinstance(observation_cols, dict):
231+
observation_cols = {col: "double" for col in observation_cols}
232+
select_exprs += [sfn.lit(None).cast(coltype).alias(colname)
233+
for colname, coltype in observation_cols.items()]
234+
lattice_df = lattice_df.select(*select_exprs)
235+
236+
# repartition the lattice in a more optimal way
237+
if num_partitions is None:
238+
num_partitions = lattice_df.rdd.getNumPartitions()
239+
if series_df:
240+
sort_cols = series_df.columns + [ts_col]
241+
lattice_df = (lattice_df.repartition(num_partitions, *(series_df.columns))
242+
.sortWithinPartitions(*sort_cols))
243+
else:
244+
lattice_df = lattice_df.repartitionByRange(num_partitions, ts_col)
245+
246+
# construct the appropriate TSDF
247+
return TSDF(lattice_df, ts_col=ts_col, series_ids=series_df.columns)
248+
158249
@classmethod
159250
def fromSubsequenceCol(
160251
cls,

python/tempo/utils.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
from __future__ import annotations
22

33
import logging
4-
import os
54
import warnings
65
from typing import Optional, Union, overload
76

7+
import os
8+
import math
9+
from datetime import datetime as dt, timedelta as td
10+
811
import pyspark.sql.functions as sfn
12+
from pyspark.sql import SparkSession, DataFrame
13+
914
from IPython import get_ipython
1015
from IPython.core.display import HTML
1116
from IPython.display import display as ipydisplay
1217
from pandas.core.frame import DataFrame as pandasDataFrame
13-
from pyspark.sql.dataframe import DataFrame
1418

1519
import tempo.resample as t_resample
1620
import tempo.tsdf as t_tsdf
@@ -26,6 +30,70 @@
2630
"""
2731

2832

33+
def time_range(spark: SparkSession,
34+
start_time: dt,
35+
end_time: Optional[dt] = None,
36+
step_size: Optional[td] = None,
37+
num_intervals: Optional[int] = None,
38+
ts_colname: str = "ts",
39+
include_interval_ends: bool = False) -> DataFrame:
40+
"""
41+
Generate a DataFrame of a range of timestamps with a regular interval,
42+
similar to pandas.date_range, but for Spark DataFrames.
43+
The DataFrame will have a single column named `ts_colname` (default is "ts")
44+
that contains timestamps starting at `start_time` and ending at `end_time`
45+
(if provided), with a step size of `step_size` (if provided) or
46+
`num_intervals` (if provided). At least 2 of the 3 arguments `end_time`,
47+
`step_size`, and `num_intervals` must be provided. The third
48+
argument can be computed based on the other two, if needed. Optionally, the end of
49+
each time interval can be included as a separate column in the DataFrame.
50+
51+
:param spark: SparkSession object
52+
:param start_time: start time of the range
53+
:param end_time: end time of the range (optional)
54+
:param step_size: time step size (optional)
55+
:param num_intervals: number of intervals (optional)
56+
:param ts_colname: name of the timestamp column, default is "ts"
57+
:param include_interval_ends: whether to include the end of each time
58+
interval as a separate column in the DataFrame
59+
60+
:return: DataFrame with a time range of timestamps
61+
"""
62+
63+
# compute step_size if not provided
64+
if not step_size:
65+
# must have both end_time and num_intervals defined
66+
assert end_time and num_intervals, \
67+
"must provide at least 2 of: end_time, step_size, num_intervals"
68+
diff_time = end_time - start_time
69+
step_size = diff_time / num_intervals
70+
71+
# compute the number of intervals if not provided
72+
if not num_intervals:
73+
# must have both end_time and num_intervals defined
74+
assert end_time and step_size, \
75+
"must provide at least 2 of: end_time, step_size, num_intervals"
76+
diff_time = end_time - start_time
77+
num_intervals = math.ceil(diff_time / step_size)
78+
79+
# define expressions for the time range
80+
start_time_expr = sfn.to_timestamp(sfn.lit(str(start_time)))
81+
step_fractional_seconds = step_size.seconds + (step_size.microseconds / 1E6)
82+
interval_expr = sfn.make_dt_interval(days=sfn.lit(step_size.days),
83+
secs=sfn.lit(step_fractional_seconds))
84+
85+
# create the DataFrame
86+
range_df = spark.range(0, num_intervals) \
87+
.withColumn(ts_colname,
88+
start_time_expr + sfn.col("id") * interval_expr)
89+
if include_interval_ends:
90+
interval_end_colname = ts_colname + "_interval_end"
91+
range_df = range_df.withColumn(
92+
interval_end_colname,
93+
start_time_expr + (sfn.col("id") + sfn.lit(1)) * interval_expr)
94+
return range_df.drop("id")
95+
96+
2997
class ResampleWarning(Warning):
3098
"""
3199
This class is a warning that is raised when the interpolate or resample with fill methods are called.

0 commit comments

Comments
 (0)