Skip to content

Commit

Permalink
Merge pull request #316 from google/before-after-operator
Browse files Browse the repository at this point in the history
Added before and after operators
  • Loading branch information
javiber authored Nov 28, 2023
2 parents a713f27 + a406ec6 commit 69867bd
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/src/reference/temporian/operators/after.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: temporian.EventSet.after
1 change: 1 addition & 0 deletions docs/src/reference/temporian/operators/before.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: temporian.EventSet.before
119 changes: 119 additions & 0 deletions temporian/core/event_set_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union

from temporian.core.data.duration import Duration
Expand Down Expand Up @@ -4436,3 +4437,121 @@ def assign(
from temporian.core.operators.glue import assign

return assign(self, **others)

def before(
self: EventSetOrNode,
timestamp: Union[int, float, datetime],
) -> EventSetOrNode:
"""Filters events [`EventSet`][temporian.EventSet] that happened before
a particular timestamp.
The timestamp can be a datetime if the EventSet's timestamps are unix
timestamps.
The comparison is strict, meaning that the obtained timestamps would be
less than (`<`) the provided timestamp.
This operation is equivalent to:
`input.filter(input.timestamps() < timestamp)`
Usage example:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 5, 6],
... features={"f1": [0, 10, 50, 60]},
... )
>>> a.before(5)
indexes: []
features: [('f1', int64)]
events:
(2 events):
timestamps: [0. 1.]
'f1': [ 0 10]
...
>>> from datetime import datetime
>>> a = tp.event_set(
... timestamps=[datetime(2022, 1, 1), datetime(2022, 1, 2)],
... features={"f1": [1, 2]},
... )
>>> a.before(datetime(2022, 1, 1, 12))
indexes: []
features: [('f1', int64)]
events:
(1 events):
timestamps: ['2022-01-01T00:00:00']
'f1': [1]
...
```
Args:
timestamp: EventSet with a single boolean feature.
Returns:
Filtered EventSet.
"""
from temporian.core.operators.filter import before

return before(self, timestamp=timestamp)

def after(
self: EventSetOrNode,
timestamp: Union[int, float, datetime],
) -> EventSetOrNode:
"""Filters events [`EventSet`][temporian.EventSet] that happened after a
particular timestamp.
The timestamp can be a datetime if the EventSet's timestamps are unix
timestamps.
The comparison is strict, meaning that the obtained timestamps would be
greater than (`>`) the provided timestamp.
This operation is equivalent to:
`input.filter(input.timestamps() < timestamp)`
Usage example:
```python
>>> a = tp.event_set(
... timestamps=[0, 1, 5, 6],
... features={"f1": [0, 10, 50, 60]},
... )
>>> a.after(4)
indexes: []
features: [('f1', int64)]
events:
(2 events):
timestamps: [5. 6.]
'f1': [50 60]
...
>>> from datetime import datetime
>>> a = tp.event_set(
... timestamps=[datetime(2022, 1, 1), datetime(2022, 1, 2)],
... features={"f1": [1, 2]},
... )
>>> a.after(datetime(2022, 1, 1, 12))
indexes: []
features: [('f1', int64)]
events:
(1 events):
timestamps: ['2022-01-02T00:00:00']
'f1': [2]
...
```
Args:
timestamp: EventSet with a single boolean feature.
Returns:
Filtered EventSet.
"""
from temporian.core.operators.filter import after

return after(self, timestamp=timestamp)
56 changes: 53 additions & 3 deletions temporian/core/operators/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

"""Filter operator class and public API function definition."""

from typing import Optional
from datetime import datetime
from typing import Optional, Union

from temporian.core import operator_lib
from temporian.core.compilation import compile
from temporian.core.compilation import (
compile,
) # pylint: disable=redefined-builtin
from temporian.core.data.dtype import DType
from temporian.core.data.node import (
EventSetNode,
Expand Down Expand Up @@ -80,7 +83,6 @@ def build_op_definition(cls) -> pb.OperatorDef:
operator_lib.register_operator(FilterOperator)


# pylint: disable=redefined-builtin
@compile
def filter(
input: EventSetOrNode,
Expand All @@ -93,3 +95,51 @@ def filter(
assert isinstance(condition, EventSetNode)

return FilterOperator(input, condition).outputs["output"]


@compile
def before(
input: EventSetOrNode, timestamp: Union[float, datetime]
) -> EventSetOrNode:
assert isinstance(input, EventSetNode)

if isinstance(timestamp, datetime):
if not input.schema.is_unix_timestamp:
raise ValueError(
"Cannot use a datetime timestamp to filter timestamps that are"
" not unix timestamp. Set `is_unix_timestamp=True` on the"
" EventSet or use a float when calling `before`"
)
timestamp = timestamp.timestamp()
else:
if input.schema.is_unix_timestamp:
raise ValueError(
"Cannot use a float timestamp to filter unix timestamp. Set"
" `is_unix_timestamp=False` on the EventSet or use a float"
" when calling `before`"
)
return filter(input, input.timestamps() < timestamp)


@compile
def after(
input: EventSetOrNode, timestamp: Union[float, datetime]
) -> EventSetOrNode:
assert isinstance(input, EventSetNode)

if isinstance(timestamp, datetime):
if not input.schema.is_unix_timestamp:
raise ValueError(
"Cannot use a datetime timestamp to filter timestamps that are"
" not unix timestamp. Set `is_unix_timestamp=True` on the"
" EventSet or use a float when calling `after`"
)
timestamp = timestamp.timestamp()
else:
if input.schema.is_unix_timestamp:
raise ValueError(
"Cannot use a float timestamp to filter unix timestamp. Set"
" `is_unix_timestamp=False` on the EventSet or use a float when"
" calling `after`"
)
return filter(input, input.timestamps() > timestamp)
21 changes: 20 additions & 1 deletion temporian/core/operators/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,26 @@ py_test(
srcs_version = "PY3",
deps = [
"//temporian/implementation/numpy/data:io",
# "//temporian/core/data:duration",
"//temporian/test:utils",
],
)

py_test(
name = "test_before",
srcs = ["test_before.py"],
srcs_version = "PY3",
deps = [
"//temporian/implementation/numpy/data:io",
"//temporian/test:utils",
],
)

py_test(
name = "test_after",
srcs = ["test_after.py"],
srcs_version = "PY3",
deps = [
"//temporian/implementation/numpy/data:io",
"//temporian/test:utils",
],
)
122 changes: 122 additions & 0 deletions temporian/core/operators/test/test_after.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import numpy as np
from absl.testing import absltest
from absl.testing.parameterized import TestCase

from temporian.implementation.numpy.data.io import event_set
from temporian.test.utils import assertOperatorResult


class AfterTest(TestCase):
def test_basic(self):
evset = event_set(timestamps=[1, 2, 3], features={"x": [4, 5, 6]})

result = evset.after(2)

expected = event_set(
timestamps=[3],
features={"x": [6]},
)

assertOperatorResult(self, result, expected, check_sampling=False)

def test_empty(self):
evset = event_set(timestamps=[1, 2, 3], features={"x": [4, 5, 6]})

result = evset.after(4)

# use numpy arrays to maintain the correct dtype on empty arrays
expected = event_set(
timestamps=np.array([], dtype=np.int64),
features={
"x": np.array([], dtype=np.int64),
},
)

assertOperatorResult(self, result, expected, check_sampling=False)

def test_all(self):
evset = event_set(timestamps=[1, 2, 3], features={"x": [4, 5, 6]})

result = evset.after(0)

expected = event_set(timestamps=[1, 2, 3], features={"x": [4, 5, 6]})

assertOperatorResult(self, result, expected, check_sampling=False)

def test_floats_and_int(self):
evset = event_set(
timestamps=[1.1, 1.9, 2.1],
features={"x": [4, 5, 6]},
)

result = evset.after(2)

expected = event_set(
timestamps=[2.1],
features={"x": [6]},
)

assertOperatorResult(self, result, expected, check_sampling=False)

evset = event_set(
timestamps=[1, 2, 3],
features={"x": [4, 5, 6]},
)

result = evset.after(1.9999)

expected = event_set(
timestamps=[2, 3],
features={"x": [5, 6]},
)

assertOperatorResult(self, result, expected, check_sampling=False)

def test_datetime(self):
evset = event_set(
timestamps=[
datetime(2023, 11, 16, 10, 15),
datetime(2023, 11, 16, 10, 16),
datetime(2023, 11, 16, 10, 17),
],
features={"x": [4, 5, 6]},
)

result = evset.after(datetime(2023, 11, 16, 10, 16))

expected = event_set(
timestamps=[
datetime(2023, 11, 16, 10, 17),
],
features={"x": [6]},
)

assertOperatorResult(self, result, expected, check_sampling=False)

evset = event_set(
timestamps=[1.1, 1.9, 2.1],
features={"x": [4, 5, 6]},
)

with self.assertRaisesRegex(ValueError, "unix"):
result = evset.after(datetime(2023, 11, 16, 10, 16, 00))


if __name__ == "__main__":
absltest.main()
Loading

0 comments on commit 69867bd

Please sign in to comment.