Skip to content

Commit

Permalink
Merge pull request #214 from iurillilab/tsdframe-interp
Browse files Browse the repository at this point in the history
Added interpolation for TsDataframe
  • Loading branch information
gviejo authored Jan 26, 2024
2 parents 5ad2867 + 928d26c commit 9638d73
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 39 deletions.
146 changes: 107 additions & 39 deletions pynapple/core/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,70 @@ def smooth(self, std, size):
window = window / window.sum()
return self.convolve(window)

def interpolate(self, ts, ep=None, left=None, right=None):
"""Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters.
The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit,
Parameters
----------
ts : Ts, Tsd or TsdFrame
The object holding the timestamps
ep : IntervalSet, optional
The epochs to use to interpolate. If None, the time support of Tsd is used.
left : None, optional
Value to return for ts < tsd[0], default is tsd[0].
right : None, optional
Value to return for ts > tsd[-1], default is tsd[-1].
"""
if not isinstance(ts, (Ts, Tsd, TsdFrame, TsdTensor)):
raise RuntimeError(
"First argument should be an instance of Ts, Tsd or TsdFrame"
)

if not isinstance(ep, IntervalSet):
ep = self.time_support

new_t = ts.restrict(ep).index

new_shape = (
len(new_t) if self.values.ndim == 1 else (len(new_t),) + self.shape[1:]
)
new_d = np.full(new_shape, np.nan)

start = 0
for i in range(len(ep)):
t = ts.get(ep.loc[i, "start"], ep.loc[i, "end"])
tmp = self.get(ep.loc[i, "start"], ep.loc[i, "end"])

if len(t) and len(tmp):
if self.values.ndim == 1:
new_d[start : start + len(t)] = np.interp(
t.index.values,
tmp.index.values,
tmp.values,
left=left,
right=right,
)
else:
interpolated_values = np.apply_along_axis(
lambda row: np.interp(
t.index.values,
tmp.index.values,
row,
left=left,
right=right,
),
0,
tmp.values,
)
new_d[start : start + len(t), ...] = interpolated_values

start += len(t)
kwargs_dict = dict(time_support=ep)
if hasattr(self, "columns"):
kwargs_dict["columns"] = self.columns
return self.__class__(t=new_t, d=new_d, **kwargs_dict)


class TsdTensor(NDArrayOperatorsMixin, _AbstractTsd):
"""
Expand Down Expand Up @@ -1404,6 +1468,49 @@ def save(self, filename):

return

# def interpolate(self, ts, ep=None, left=None, right=None):
# """Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters.
# The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit,

# Parameters
# ----------
# ts : Ts, Tsd or TsdFrame
# The object holding the timestamps
# ep : IntervalSet, optional
# The epochs to use to interpolate. If None, the time support of Tsd is used.
# left : None, optional
# Value to return for ts < tsd[0], default is tsd[0].
# right : None, optional
# Value to return for ts > tsd[-1], default is tsd[-1].
# """
# if not isinstance(ts, (Ts, Tsd, TsdFrame)):
# raise RuntimeError(
# "First argument should be an instance of Ts, Tsd or TsdFrame"
# )

# if not isinstance(ep, IntervalSet):
# ep = self.time_support

# new_t = ts.restrict(ep).index
# new_d = np.empty((len(new_t), self.shape[1]))
# new_d.fill(np.nan)

# start = 0
# for i in range(len(ep)):
# t = ts.restrict(ep.loc[[i]])
# tmp = self.restrict(ep.loc[[i]])
# if len(t) and len(tmp):
# interpolated_values = np.apply_along_axis(
# lambda row: np.interp(t.index.values, tmp.index.values, row),
# 0,
# tmp.values,
# )
# new_d[start : start + len(t), :] = interpolated_values

# start += len(t)

# return TsdFrame(t=new_t, d=new_d, columns=self.columns, time_support=ep)


class Tsd(NDArrayOperatorsMixin, _AbstractTsd):
"""
Expand Down Expand Up @@ -1736,45 +1843,6 @@ def save(self, filename):

return

def interpolate(self, ts, ep=None, left=None, right=None):
"""Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters.
The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit,
Parameters
----------
ts : Ts, Tsd or TsdFrame
The object holding the timestamps
ep : IntervalSet, optional
The epochs to use to interpolate. If None, the time support of Tsd is used.
left : None, optional
Value to return for ts < tsd[0], default is tsd[0].
right : None, optional
Value to return for ts > tsd[-1], default is tsd[-1].
"""
if not isinstance(ts, (Ts, Tsd, TsdFrame)):
raise RuntimeError(
"First argument should be an instance of Ts, Tsd or TsdFrame"
)

if not isinstance(ep, IntervalSet):
ep = self.time_support

new_t = ts.restrict(ep).index
new_d = np.empty(len(new_t))
new_d.fill(np.nan)

start = 0
for i in range(len(ep)):
t = ts.restrict(ep.loc[[i]])
tmp = self.restrict(ep.loc[[i]])
if len(t) and len(tmp):
new_d[start : start + len(t)] = np.interp(
t.index.values, tmp.index.values, tmp.values, left=left, right=right
)
start += len(t)

return Tsd(t=new_t, d=new_d, time_support=ep)


class Ts(_AbstractTsd):
"""
Expand Down
4 changes: 4 additions & 0 deletions tests/npzfilestest/tsd2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"time": "2024-01-09 08:48:11.597662",
"info": "Test description"
}
112 changes: 112 additions & 0 deletions tests/test_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import pandas as pd
import pytest

from pynapple.core.time_series import TsdTensor


def test_create_tsd():
tsd = nap.Tsd(t=np.arange(100), d=np.arange(100))
Expand Down Expand Up @@ -929,6 +931,60 @@ def test_save_npz(self, tsdframe):
os.remove("tsdframe.npz")
os.remove("tsdframe2.npz")

def test_interpolate(self, tsdframe):

y = np.arange(0, 1001)
data_stack = np.stack([np.arange(0, 1001),]*4).T

tsdframe = nap.TsdFrame(t=np.arange(0, 101), d=data_stack[0::10, :])

# Ts
ts = nap.Ts(t=y/10)
tsdframe2 = tsdframe.interpolate(ts)
np.testing.assert_array_almost_equal(tsdframe2.values, data_stack)

# Tsd
ts = nap.Tsd(t=y/10, d=np.zeros_like(y))
tsdframe2 = tsdframe.interpolate(ts)
np.testing.assert_array_almost_equal(tsdframe2.values, data_stack)

# TsdFrame
ts = nap.TsdFrame(t=y/10, d=np.zeros((len(y), 2)))
tsdframe2 = tsdframe.interpolate(ts)
np.testing.assert_array_almost_equal(tsdframe2.values, data_stack)

with pytest.raises(RuntimeError) as e:
tsdframe.interpolate([0, 1, 2])
assert str(e.value) == "First argument should be an instance of Ts, Tsd or TsdFrame"

# Right left
ep = nap.IntervalSet(start=0, end=5)
tsdframe = nap.Tsd(t=np.arange(1,4), d=np.arange(3), time_support=ep)
ts = nap.Ts(t=np.arange(0, 5))
tsdframe2 = tsdframe.interpolate(ts, left=1234)
assert float(tsdframe2.values[0]) == 1234.0
tsdframe2 = tsdframe.interpolate(ts, right=4321)
assert float(tsdframe2.values[-1]) == 4321.0

def test_interpolate_with_ep(self, tsdframe):
y = np.arange(0, 1001)
data_stack = np.stack([np.arange(0, 1001),]*4).T

tsdframe = nap.TsdFrame(t=np.arange(0, 101), d=data_stack[0::10, :])
ts = nap.Ts(t=y/10)
ep = nap.IntervalSet(start=np.arange(0, 100, 20), end=np.arange(10, 110, 20))
tsdframe2 = tsdframe.interpolate(ts, ep)
tmp = ts.restrict(ep).index * 10
print(tmp, tsdframe2.values)
print(tmp.shape, tsdframe2.values.shape)
print(tmp.mean(0), tsdframe2.values.mean(0))
np.testing.assert_array_almost_equal(tmp, tsdframe2.values[:, 0])

# Empty ep
ep = nap.IntervalSet(start=200, end=300)
tsdframe2 = tsdframe.interpolate(ts, ep)
assert len(tsdframe2) == 0

####################################################
# Test for ts
####################################################
Expand Down Expand Up @@ -1174,3 +1230,59 @@ def test_save_npz(self, tsdtensor):

os.remove("tsdtensor.npz")
os.remove("tsdtensor2.npz")

def test_interpolate(self, tsdtensor):

y = np.arange(0, 1001)
data_stack = np.stack([np.stack([np.arange(0, 1001),] * 4)] * 3).T

tsdtensor = nap.TsdTensor(t=np.arange(0, 101), d= data_stack[0::10, ...])

# Ts
ts = nap.Ts(t = y / 10)
tsdtensor2 = tsdtensor.interpolate(ts)
np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack)

# Tsd
ts = nap.Tsd(t=y/10, d=np.zeros_like(y))
tsdtensor2 = tsdtensor.interpolate(ts)
np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack)

# TsdFrame
ts = nap.TsdFrame(t=y/10, d=np.zeros((len(y), 2)))
tsdtensor2 = tsdtensor.interpolate(ts)
np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack)

with pytest.raises(RuntimeError) as e:
tsdtensor.interpolate([0, 1, 2])
assert str(e.value) == "First argument should be an instance of Ts, Tsd or TsdFrame"

# Right left
ep = nap.IntervalSet(start=0, end=5)
tsdtensor = nap.Tsd(t=np.arange(1,4), d=np.arange(3), time_support=ep)
ts = nap.Ts(t=np.arange(0, 5))
tsdtensor2 = tsdtensor.interpolate(ts, left=1234)
assert float(tsdtensor2.values[0]) == 1234.0
tsdtensor2 = tsdtensor.interpolate(ts, right=4321)
assert float(tsdtensor2.values[-1]) == 4321.0

def test_interpolate_with_ep(self, tsdtensor):
y = np.arange(0, 1001)
data_stack = np.stack([np.stack([np.arange(0, 1001),] * 4),] * 3).T

tsdtensor = nap.TsdTensor(t=np.arange(0, 101), d=data_stack[::10, ...])

ts = nap.Ts(t=y / 10)


ep = nap.IntervalSet(start=np.arange(0, 100, 20), end=np.arange(10, 110, 20))
tsdframe2 = tsdtensor.interpolate(ts, ep)
tmp = ts.restrict(ep).index * 10

np.testing.assert_array_almost_equal(tmp, tsdframe2.values[:, 0, 0])

# Empty ep
ep = nap.IntervalSet(start=200, end=300)
tsdframe2 = tsdtensor.interpolate(ts, ep)
assert len(tsdframe2) == 0

0 comments on commit 9638d73

Please sign in to comment.