diff --git a/pynapple/core/time_series.py b/pynapple/core/time_series.py index 51e11fc3..f048834c 100644 --- a/pynapple/core/time_series.py +++ b/pynapple/core/time_series.py @@ -927,6 +927,70 @@ def smooth(self, std, size): window = window / window.sum() return self.convolve(window) + def interpolate(self, ts, ep=None, left=None, right=None): + """Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters. + The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit, + + Parameters + ---------- + ts : Ts, Tsd or TsdFrame + The object holding the timestamps + ep : IntervalSet, optional + The epochs to use to interpolate. If None, the time support of Tsd is used. + left : None, optional + Value to return for ts < tsd[0], default is tsd[0]. + right : None, optional + Value to return for ts > tsd[-1], default is tsd[-1]. + """ + if not isinstance(ts, (Ts, Tsd, TsdFrame, TsdTensor)): + raise RuntimeError( + "First argument should be an instance of Ts, Tsd or TsdFrame" + ) + + if not isinstance(ep, IntervalSet): + ep = self.time_support + + new_t = ts.restrict(ep).index + + new_shape = ( + len(new_t) if self.values.ndim == 1 else (len(new_t),) + self.shape[1:] + ) + new_d = np.full(new_shape, np.nan) + + start = 0 + for i in range(len(ep)): + t = ts.get(ep.loc[i, "start"], ep.loc[i, "end"]) + tmp = self.get(ep.loc[i, "start"], ep.loc[i, "end"]) + + if len(t) and len(tmp): + if self.values.ndim == 1: + new_d[start : start + len(t)] = np.interp( + t.index.values, + tmp.index.values, + tmp.values, + left=left, + right=right, + ) + else: + interpolated_values = np.apply_along_axis( + lambda row: np.interp( + t.index.values, + tmp.index.values, + row, + left=left, + right=right, + ), + 0, + tmp.values, + ) + new_d[start : start + len(t), ...] = interpolated_values + + start += len(t) + kwargs_dict = dict(time_support=ep) + if hasattr(self, "columns"): + kwargs_dict["columns"] = self.columns + return self.__class__(t=new_t, d=new_d, **kwargs_dict) + class TsdTensor(NDArrayOperatorsMixin, _AbstractTsd): """ @@ -1404,6 +1468,49 @@ def save(self, filename): return + # def interpolate(self, ts, ep=None, left=None, right=None): + # """Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters. + # The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit, + + # Parameters + # ---------- + # ts : Ts, Tsd or TsdFrame + # The object holding the timestamps + # ep : IntervalSet, optional + # The epochs to use to interpolate. If None, the time support of Tsd is used. + # left : None, optional + # Value to return for ts < tsd[0], default is tsd[0]. + # right : None, optional + # Value to return for ts > tsd[-1], default is tsd[-1]. + # """ + # if not isinstance(ts, (Ts, Tsd, TsdFrame)): + # raise RuntimeError( + # "First argument should be an instance of Ts, Tsd or TsdFrame" + # ) + + # if not isinstance(ep, IntervalSet): + # ep = self.time_support + + # new_t = ts.restrict(ep).index + # new_d = np.empty((len(new_t), self.shape[1])) + # new_d.fill(np.nan) + + # start = 0 + # for i in range(len(ep)): + # t = ts.restrict(ep.loc[[i]]) + # tmp = self.restrict(ep.loc[[i]]) + # if len(t) and len(tmp): + # interpolated_values = np.apply_along_axis( + # lambda row: np.interp(t.index.values, tmp.index.values, row), + # 0, + # tmp.values, + # ) + # new_d[start : start + len(t), :] = interpolated_values + + # start += len(t) + + # return TsdFrame(t=new_t, d=new_d, columns=self.columns, time_support=ep) + class Tsd(NDArrayOperatorsMixin, _AbstractTsd): """ @@ -1736,45 +1843,6 @@ def save(self, filename): return - def interpolate(self, ts, ep=None, left=None, right=None): - """Wrapper of the numpy linear interpolation method. See https://numpy.org/doc/stable/reference/generated/numpy.interp.html for an explanation of the parameters. - The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit, - - Parameters - ---------- - ts : Ts, Tsd or TsdFrame - The object holding the timestamps - ep : IntervalSet, optional - The epochs to use to interpolate. If None, the time support of Tsd is used. - left : None, optional - Value to return for ts < tsd[0], default is tsd[0]. - right : None, optional - Value to return for ts > tsd[-1], default is tsd[-1]. - """ - if not isinstance(ts, (Ts, Tsd, TsdFrame)): - raise RuntimeError( - "First argument should be an instance of Ts, Tsd or TsdFrame" - ) - - if not isinstance(ep, IntervalSet): - ep = self.time_support - - new_t = ts.restrict(ep).index - new_d = np.empty(len(new_t)) - new_d.fill(np.nan) - - start = 0 - for i in range(len(ep)): - t = ts.restrict(ep.loc[[i]]) - tmp = self.restrict(ep.loc[[i]]) - if len(t) and len(tmp): - new_d[start : start + len(t)] = np.interp( - t.index.values, tmp.index.values, tmp.values, left=left, right=right - ) - start += len(t) - - return Tsd(t=new_t, d=new_d, time_support=ep) - class Ts(_AbstractTsd): """ diff --git a/tests/npzfilestest/tsd2.json b/tests/npzfilestest/tsd2.json new file mode 100644 index 00000000..001b401d --- /dev/null +++ b/tests/npzfilestest/tsd2.json @@ -0,0 +1,4 @@ +{ + "time": "2024-01-09 08:48:11.597662", + "info": "Test description" +} \ No newline at end of file diff --git a/tests/test_time_series.py b/tests/test_time_series.py index 5d267c06..e81023c8 100755 --- a/tests/test_time_series.py +++ b/tests/test_time_series.py @@ -12,6 +12,8 @@ import pandas as pd import pytest +from pynapple.core.time_series import TsdTensor + def test_create_tsd(): tsd = nap.Tsd(t=np.arange(100), d=np.arange(100)) @@ -929,6 +931,60 @@ def test_save_npz(self, tsdframe): os.remove("tsdframe.npz") os.remove("tsdframe2.npz") + def test_interpolate(self, tsdframe): + + y = np.arange(0, 1001) + data_stack = np.stack([np.arange(0, 1001),]*4).T + + tsdframe = nap.TsdFrame(t=np.arange(0, 101), d=data_stack[0::10, :]) + + # Ts + ts = nap.Ts(t=y/10) + tsdframe2 = tsdframe.interpolate(ts) + np.testing.assert_array_almost_equal(tsdframe2.values, data_stack) + + # Tsd + ts = nap.Tsd(t=y/10, d=np.zeros_like(y)) + tsdframe2 = tsdframe.interpolate(ts) + np.testing.assert_array_almost_equal(tsdframe2.values, data_stack) + + # TsdFrame + ts = nap.TsdFrame(t=y/10, d=np.zeros((len(y), 2))) + tsdframe2 = tsdframe.interpolate(ts) + np.testing.assert_array_almost_equal(tsdframe2.values, data_stack) + + with pytest.raises(RuntimeError) as e: + tsdframe.interpolate([0, 1, 2]) + assert str(e.value) == "First argument should be an instance of Ts, Tsd or TsdFrame" + + # Right left + ep = nap.IntervalSet(start=0, end=5) + tsdframe = nap.Tsd(t=np.arange(1,4), d=np.arange(3), time_support=ep) + ts = nap.Ts(t=np.arange(0, 5)) + tsdframe2 = tsdframe.interpolate(ts, left=1234) + assert float(tsdframe2.values[0]) == 1234.0 + tsdframe2 = tsdframe.interpolate(ts, right=4321) + assert float(tsdframe2.values[-1]) == 4321.0 + + def test_interpolate_with_ep(self, tsdframe): + y = np.arange(0, 1001) + data_stack = np.stack([np.arange(0, 1001),]*4).T + + tsdframe = nap.TsdFrame(t=np.arange(0, 101), d=data_stack[0::10, :]) + ts = nap.Ts(t=y/10) + ep = nap.IntervalSet(start=np.arange(0, 100, 20), end=np.arange(10, 110, 20)) + tsdframe2 = tsdframe.interpolate(ts, ep) + tmp = ts.restrict(ep).index * 10 + print(tmp, tsdframe2.values) + print(tmp.shape, tsdframe2.values.shape) + print(tmp.mean(0), tsdframe2.values.mean(0)) + np.testing.assert_array_almost_equal(tmp, tsdframe2.values[:, 0]) + + # Empty ep + ep = nap.IntervalSet(start=200, end=300) + tsdframe2 = tsdframe.interpolate(ts, ep) + assert len(tsdframe2) == 0 + #################################################### # Test for ts #################################################### @@ -1174,3 +1230,59 @@ def test_save_npz(self, tsdtensor): os.remove("tsdtensor.npz") os.remove("tsdtensor2.npz") + + def test_interpolate(self, tsdtensor): + + y = np.arange(0, 1001) + data_stack = np.stack([np.stack([np.arange(0, 1001),] * 4)] * 3).T + + tsdtensor = nap.TsdTensor(t=np.arange(0, 101), d= data_stack[0::10, ...]) + + # Ts + ts = nap.Ts(t = y / 10) + tsdtensor2 = tsdtensor.interpolate(ts) + np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack) + + # Tsd + ts = nap.Tsd(t=y/10, d=np.zeros_like(y)) + tsdtensor2 = tsdtensor.interpolate(ts) + np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack) + + # TsdFrame + ts = nap.TsdFrame(t=y/10, d=np.zeros((len(y), 2))) + tsdtensor2 = tsdtensor.interpolate(ts) + np.testing.assert_array_almost_equal(tsdtensor2.values, data_stack) + + with pytest.raises(RuntimeError) as e: + tsdtensor.interpolate([0, 1, 2]) + assert str(e.value) == "First argument should be an instance of Ts, Tsd or TsdFrame" + + # Right left + ep = nap.IntervalSet(start=0, end=5) + tsdtensor = nap.Tsd(t=np.arange(1,4), d=np.arange(3), time_support=ep) + ts = nap.Ts(t=np.arange(0, 5)) + tsdtensor2 = tsdtensor.interpolate(ts, left=1234) + assert float(tsdtensor2.values[0]) == 1234.0 + tsdtensor2 = tsdtensor.interpolate(ts, right=4321) + assert float(tsdtensor2.values[-1]) == 4321.0 + + def test_interpolate_with_ep(self, tsdtensor): + y = np.arange(0, 1001) + data_stack = np.stack([np.stack([np.arange(0, 1001),] * 4),] * 3).T + + tsdtensor = nap.TsdTensor(t=np.arange(0, 101), d=data_stack[::10, ...]) + + ts = nap.Ts(t=y / 10) + + + ep = nap.IntervalSet(start=np.arange(0, 100, 20), end=np.arange(10, 110, 20)) + tsdframe2 = tsdtensor.interpolate(ts, ep) + tmp = ts.restrict(ep).index * 10 + + np.testing.assert_array_almost_equal(tmp, tsdframe2.values[:, 0, 0]) + + # Empty ep + ep = nap.IntervalSet(start=200, end=300) + tsdframe2 = tsdtensor.interpolate(ts, ep) + assert len(tsdframe2) == 0 +