Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fourier #49

Closed
wants to merge 15 commits into from
46 changes: 24 additions & 22 deletions docs/examples/plot_1D_basis_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@
# Additionally, the Fourier basis has the advantage of being orthogonal, which simplifies the estimation and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably explain orthogonal here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still think this. at least a foot note or link

# interpretation of the model parameters, each of which will represent the relative contribution of a specific
# oscillation frequency to the overall signal.
#
# A Fourier basis can be instantiated with the following syntax:
# the user can provide the maximum frequency of the cosine and negative
# sine pairs by setting the `max_freq` parameter.
# The sinusoidal basis elements will have frequencies from 0 to `max_freq`.


# A Fourier basis can be instantiated with the usual syntax.
# The user can pass the desired frequencies for the basis or
# the frequencies will be set to np.arange(n_basis_funcs//2).
# The number of basis function is required to be even.
fourier_basis = nmo.basis.FourierBasis(n_freqs=4)
fourier_basis = nmo.basis.FourierBasis(max_freq=3)

# evaluate on equi-spaced samples
samples, eval_basis = fourier_basis.evaluate_on_grid(1000)
Expand All @@ -112,44 +113,45 @@
plt.tight_layout()

# %%
# !!! note "Fourier basis convolution and Fourier transform"
# The Fourier transform of a signal $ s(t) $ restricted to a temporal window $ [t_0,\;t_1] $ is
# $$ \\hat{x}(\\omega) = \\int_{t_0}^{t_1} s(\\tau) e^{-j\\omega \\tau} d\\tau. $$
# where $ e^{-j\\omega \\tau} = \\cos(\\omega \\tau) - j \\sin (\\omega \\tau) $.
# ## Fourier Basis Convolution and Fourier Transform
BalzaniEdoardo marked this conversation as resolved.
Show resolved Hide resolved
# The Fourier transform of a signal $ s(t) $ restricted to a temporal window $ [t_0,\;t_1] $ is
# $$ \\hat{x}(\\omega) = \\int_{t_0}^{t_1} s(\\tau) e^{-j\\omega \\tau} d\\tau. $$
# where $ e^{-j\\omega \\tau} = \\cos(\\omega \\tau) - j \\sin (\\omega \\tau) $.
#
# When computing the cross-correlation of a signal with the Fourier basis functions,
# we essentially measure how well the signal correlates with sinusoids of different frequencies,
# within a specified temporal window. This process mirrors the operation performed by the Fourier transform.
# Therefore, it becomes clear that computing the cross-correlation of a signal with the Fourier basis defined here
# is equivalent to computing the discrete Fourier transform on a sliding window of the same size
# as that of the basis.
# When computing the cross-correlation of a signal with the Fourier basis functions,
# we essentially measure how well the signal correlates with sinusoids of different frequencies,
# within a specified temporal window. This process mirrors the operation performed by the Fourier transform.
# Therefore, it becomes clear that computing the cross-correlation of a signal with the Fourier basis defined here
# is equivalent to computing the discrete Fourier transform on a sliding window of the same size
# as that of the basis.
BalzaniEdoardo marked this conversation as resolved.
Show resolved Hide resolved


n_samples = 1000
n_freqs = 20
max_freq = 20

# define a signal
signal = np.random.normal(size=n_samples)

# evaluate the basis
_, eval_basis = nmo.basis.FourierBasis(n_freqs=n_freqs).evaluate_on_grid(n_samples)
_, eval_basis = nmo.basis.FourierBasis(max_freq=max_freq).evaluate_on_grid(n_samples)

# compute the cross-corr with the signal and the basis
# Note that we are inverting the time axis of the basis because we are aiming
# for a cross-correlation, while np.convolve compute a convolution which would flip the time axis.
xcorr = np.array(
[
np.convolve(eval_basis[::-1, k], signal, mode="valid")[0]
for k in range(2 * n_freqs - 1)
for k in range(2 * max_freq + 1)
]
)

# compute the power (add back sin(0 * t) = 0)
fft_complex = np.fft.fft(signal)
fft_amplitude = np.abs(fft_complex[:n_freqs])
fft_phase = np.angle(fft_complex[:n_freqs])
fft_amplitude = np.abs(fft_complex[:max_freq + 1])
fft_phase = np.angle(fft_complex[:max_freq + 1])
# compute the phase and amplitude from the convolution
xcorr_phase = np.arctan2(np.hstack([[0], xcorr[n_freqs:]]), xcorr[:n_freqs])
xcorr_aplitude = np.sqrt(xcorr[:n_freqs] ** 2 + np.hstack([[0], xcorr[n_freqs:]]) ** 2)
xcorr_phase = np.arctan2(np.hstack([[0], xcorr[max_freq+1:]]), xcorr[:max_freq+1])
xcorr_aplitude = np.sqrt(xcorr[:max_freq+1] ** 2 + np.hstack([[0], xcorr[max_freq+1:]]) ** 2)

fig, ax = plt.subplots(1, 2)
ax[0].set_aspect("equal")
Expand Down
50 changes: 27 additions & 23 deletions src/nemos/basis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,16 +1182,20 @@ def evaluate_on_grid(self, n_samples: int) -> Tuple[NDArray, NDArray]:
class FourierBasis(Basis):
"""Set of 1D Fourier basis.

This class defines a cosine and negative sine basis (quadrature pair)
with frequencies ranging 0 to max_freq.

Parameters
----------
n_freqs
Number of frequencies. The number of basis function will be 2*n_freqs - 1.
max_freq
Highest frequency of the cosine, negative sine pairs.
The number of basis function will be 2*max_freq + 1.
"""

def __init__(self, n_freqs: int):
super().__init__(n_basis_funcs=2 * n_freqs - 1)
def __init__(self, max_freq: int):
super().__init__(n_basis_funcs=2 * max_freq + 1)

self._frequencies = np.arange(n_freqs, dtype=np.float32)
self._frequencies = np.arange(max_freq + 1, dtype=float)
self._n_input_dimensionality = 1

def _check_n_basis_min(self) -> None:
Expand All @@ -1202,15 +1206,15 @@ def _check_n_basis_min(self) -> None:
Raises
------
ValueError
If an insufficient number of basis element is requested for the basis type
If an insufficient number of basis element is requested for the basis type.
"""
if self.n_basis_funcs < 1:
raise ValueError(
f"Object class {self.__class__.__name__} requires >= 1 basis elements. "
f"Object class {self.__class__.__name__} requires >= 0 basis elements. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the error message now matches the check, but shouldn't this actually at least 1 basis element? With max_freq=0 (just the DC term), you get a n_basis_funcs=1. And it doesn't make sense to have max_freq<0

f"{self.n_basis_funcs} basis elements specified instead"
)

def evaluate(self, sample_pts: NDArray) -> NDArray:
def evaluate(self, sample_pts: ArrayLike) -> NDArray:
"""Generate basis functions with given spacing.

Parameters
Expand All @@ -1225,25 +1229,25 @@ def evaluate(self, sample_pts: NDArray) -> NDArray:

Notes
-----
If the frequencies provided are np.arange(n_freq), convolving a signal
of length n_samples with this basis is equivalent, but slower,
then computing the FFT truncated to the first n_freq components.
The frequencies are set to np.arange(max_freq+1), convolving a signal
of length n_samples with this basis is equivalent, but slower,
then computing the FFT truncated to the first max_freq components.

Therefore, convolving a signal with this basis is equivalent
to compute the FFT over sliding window.
Therefore, convolving a signal with this basis is equivalent
to compute the FFT over a sliding window.

Examples
--------
>>> import nemos as nmo
>>> import numpy as np
>>> n_samples, n_freqs = 1000, 10
>>> basis = nmo.basis.FourierBasis(n_freqs*2)
>>> eval_basis = basis.evaluate(np.linspace(0, 1, n_samples))
>>> sinusoid = np.cos(3 * np.arange(0, 1000) * np.pi * 2 / 1000.)
>>> conv = [np.convolve(eval_basis[::-1, k], sinusoid, mode='valid')[0] for k in range(2*n_freqs-1)]
>>> fft = np.fft.fft(sinusoid)
>>> print('FFT power: ', np.round(np.real(fft[:10]), 4))
>>> print('Convolution: ', np.round(conv[:10], 4))
>>> import nemos as nmo
>>> import numpy as np
>>> n_samples, max_freq = 1000, 10
>>> basis = nmo.basis.FourierBasis(max_freq)
>>> eval_basis = basis.evaluate(np.linspace(0, 1, n_samples))
>>> sinusoid = np.cos(3 * np.arange(0, 1000) * np.pi * 2 / 1000.)
>>> conv = [np.convolve(eval_basis[::-1, k], sinusoid, mode='valid')[0] for k in range(2*max_freq+1)]
>>> fft = np.fft.fft(sinusoid)
>>> print('FFT power: ', np.round(np.real(fft[:max_freq]), 4))
>>> print('Convolution: ', np.round(conv[:max_freq], 4))
Comment on lines +1241 to +1250

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way in mkdocs to set this as a python codeblock? Can you remove the >>>? Because right now in the docs the >>> shows up and when you copy it copies with the >>> so a user can't just copy and paste and run from the docs 😄

for example at the bottom here the arrows render and get copied: https://nemos.readthedocs.io/en/latest/reference/nemos/utils/#nemos.utils.pytree_map_and_reduce

"""
(sample_pts,) = self._check_evaluate_input(sample_pts)
# assumes equi-spaced samples.
Expand Down
50 changes: 25 additions & 25 deletions tests/test_basis.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,15 +849,15 @@ def test_decay_rate_size_match_n_basis_func(self, decay_rates, n_basis_func):
class TestFourierBasis(BasisFuncsTesting):
cls = basis.FourierBasis

@pytest.mark.parametrize("n_freqs", [2, 4, 8])
@pytest.mark.parametrize("max_freq", [2, 4, 8])
@pytest.mark.parametrize("sample_size", [20, 1000])
def test_evaluate_returns_expected_number_of_basis(
self, n_freqs, sample_size
self, max_freq, sample_size
):
"""Tests whether the evaluate method returns the expected number of basis functions."""
basis_obj = self.cls(n_freqs=n_freqs)
basis_obj = self.cls(max_freq=max_freq)
eval_basis = basis_obj.evaluate(np.linspace(0, 1, sample_size))
assert(eval_basis.shape[1] == 2*n_freqs-1)
assert(eval_basis.shape[1] == 2*max_freq+1)

@pytest.mark.parametrize("samples, expectation",
[
Expand All @@ -882,19 +882,19 @@ def test_input_to_evaluate_is_arraylike(self, arraylike, expectation):
"""
Checks that the sample size of the output from the evaluate() method matches the input sample size.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
with expectation:
basis_obj.evaluate(arraylike)

@pytest.mark.parametrize("sample_size", [100, 1000])
@pytest.mark.parametrize("n_freqs", [4, 10])
@pytest.mark.parametrize("max_freq", [4, 10])
def test_sample_size_of_evaluate_matches_that_of_input(
self, n_freqs, sample_size
self, max_freq, sample_size
):
"""
Checks that the sample size of the output from the evaluate() method matches the input sample size.
"""
basis_obj = self.cls(n_freqs)
basis_obj = self.cls(max_freq)
eval_basis = basis_obj.evaluate(np.linspace(0, 1, sample_size))
if eval_basis.shape[0] != sample_size:
raise ValueError(
Expand All @@ -903,39 +903,39 @@ def test_sample_size_of_evaluate_matches_that_of_input(
f"The second dimension of the evaluated basis is {eval_basis.shape[0]}",
)

@pytest.mark.parametrize("n_freqs, expectation",
@pytest.mark.parametrize("max_freq, expectation",
[
(-1, pytest.raises(ValueError,match=r"Object class FourierBasis requires >= 1 basis elements")),
(0, pytest.raises(ValueError,match=r"Object class FourierBasis requires >= 1 basis elements")),
(-1, pytest.raises(ValueError,match=r"Object class FourierBasis requires >= 0 basis elements")),
(0, does_not_raise()),
(1, does_not_raise()),
(3, does_not_raise())
]
)
def test_minimum_number_of_basis_required_is_matched(self, n_freqs, expectation):
def test_minimum_number_of_basis_required_is_matched(self, max_freq, expectation):
"""
Verifies that the minimum number of basis functions and order required (i.e., at least 1) and
order < #basis are enforced.
"""
n_samples = 10
with expectation:
basis_obj = self.cls(n_freqs=n_freqs)
basis_obj = self.cls(max_freq=max_freq)
basis_obj.evaluate(np.linspace(0, 1, n_samples))

@pytest.mark.parametrize("n_freqs, expectation",
@pytest.mark.parametrize("max_freq, expectation",
[
(3, does_not_raise()),
(6, does_not_raise()),
(6, pytest.raises(ValueError,match=rf"Not enough samples, aliasing likely to occur")),
(7, pytest.raises(ValueError,match=rf"Not enough samples, aliasing likely to occur")),
(10, pytest.raises(ValueError,match=rf"Not enough samples, aliasing likely to occur"))
]
)
def test_minimum_aliasing_detection(self, n_freqs, expectation):
def test_minimum_aliasing_detection(self, max_freq, expectation):
"""
Verifies that the minimum number of basis functions and order required (i.e., at least 1) and
order < #basis are enforced.
"""
n_samples = 10
basis_obj = self.cls(n_freqs=n_freqs)
basis_obj = self.cls(max_freq=max_freq)
with expectation:
basis_obj.evaluate(np.linspace(0, 1, n_samples))

Expand All @@ -946,7 +946,7 @@ def test_samples_range_matches_evaluate_requirements(self, sample_range: tuple):
"""
Verifies that the evaluate() method can handle input range.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
basis_obj.evaluate(np.linspace(*sample_range, 100))

@pytest.mark.parametrize("n_input, expectation", [
Expand All @@ -960,7 +960,7 @@ def test_number_of_required_inputs_evaluate(self, n_input, expectation):
"""
Confirms that the evaluate() method correctly handles the number of input samples that are provided.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
inputs = [np.linspace(0, 1, 20)] * n_input
with expectation:
basis_obj.evaluate(*inputs)
Expand All @@ -979,7 +979,7 @@ def test_evaluate_on_grid_meshgrid_valid_size(self, sample_size, expectation):
"""
Checks that the evaluate_on_grid() method returns a grid of the expected size.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
with expectation:
basis_obj.evaluate_on_grid(sample_size)

Expand All @@ -995,7 +995,7 @@ def test_evaluate_on_grid_meshgrid_match_size(self, sample_size, expectation):
"""
Checks that the evaluate_on_grid() method returns a grid of the expected size.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
with expectation:
grid, _ = basis_obj.evaluate_on_grid(sample_size)
assert grid.shape[0] == sample_size
Expand All @@ -1012,7 +1012,7 @@ def test_evaluate_on_grid_input_number(self, n_input, expectation):
"""
Validates that the evaluate_on_grid() method correctly handles the number of input samples that are provided.
"""
basis_obj = self.cls(n_freqs=5)
basis_obj = self.cls(max_freq=5)
inputs = [10] * n_input
with expectation:
basis_obj.evaluate_on_grid(*inputs)
Expand Down Expand Up @@ -1426,7 +1426,7 @@ def instantiate_basis(n_basis, basis_class):
n_basis_funcs=n_basis, decay_rates=np.arange(1, 1 + n_basis)
)
elif basis_class == basis.FourierBasis:
basis_obj = basis_class(n_freqs=n_basis)
basis_obj = basis_class(max_freq=n_basis)
elif basis_class == basis.BSplineBasis:
basis_obj = basis_class(n_basis_funcs=n_basis, order=3)
elif basis_class == basis.CyclicBSplineBasis:
Expand Down Expand Up @@ -1481,7 +1481,7 @@ def test_evaluate_input(self, eval_input):

@pytest.mark.parametrize("n_basis_a", [5, 6])
@pytest.mark.parametrize("n_basis_b", [5, 6])
@pytest.mark.parametrize("sample_size", [10, 1000])
@pytest.mark.parametrize("sample_size", [15, 1000])
@pytest.mark.parametrize(
"basis_a",
[class_obj for _, class_obj in utils_testing.get_non_abstract_classes(basis)],
Expand Down Expand Up @@ -1697,7 +1697,7 @@ def test_evaluate_input(self, eval_input):

@pytest.mark.parametrize("n_basis_a", [5, 6])
@pytest.mark.parametrize("n_basis_b", [5, 6])
@pytest.mark.parametrize("sample_size", [10, 1000])
@pytest.mark.parametrize("sample_size", [15, 1000])
@pytest.mark.parametrize(
"basis_a",
[class_obj for _, class_obj in utils_testing.get_non_abstract_classes(basis)],
Expand Down