Skip to content

Add AveragingOp #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 30, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/mrpro/operators/AveragingOp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Averaging operator."""

from collections.abc import Sequence
from warnings import warn

import torch

from mrpro.operators.LinearOperator import LinearOperator


class AveragingOp(LinearOperator):
"""Averaging operator.

This operator averages the input tensor along a specified dimension.
The averaging is performed over groups of elements defined by the `idx` parameter.
The output tensor will have the same shape as the input tensor, except for the `dim` dimension,
which will have a size equal to the number of groups specified in `idx`. For each group,
the average of the elements in that group is computed.

For example, this operator can be used to simulate the effect of a sliding window average
on a signal model.
"""

def __init__(
self,
dim: int,
idx: Sequence[Sequence[int] | torch.Tensor | slice] | torch.Tensor = (slice(None),), # noqa: B008
domain_size: int | None = None,
) -> None:
"""Initialize the averaging operator.

Parameters
----------
dim
The dimension along which to average.
idx
The indices of the input tensor to average over. Each element of the sequence will result in a
separate entry in the `dim` dimension of the output tensor.
The entries can be either a sequence of integers or an integer tensor, a slice object, or a boolean tensor.
domain_size
The size of the input along `dim`. It is only used in the `adjoint` method.
If not set, the size will be guessed from the input tensor during the forward pass.
"""
super().__init__()
self.domain_size = domain_size
self._last_domain_size = domain_size
self.idx = idx
self.dim = dim

def forward(self, x: torch.Tensor) -> tuple[torch.Tensor]:
"""Apply the averaging operator to the input tensor."""
if self.domain_size and self.domain_size != x.shape[self.dim]:
raise ValueError(f'Expected domain size {self.domain_size}, got {x.shape[self.dim]}')
self._last_domain_size = x.shape[self.dim]

placeholder = (slice(None),) * (self.dim % x.ndim)
averaged = torch.stack([x[(*placeholder, i)].mean(self.dim) for i in self.idx], self.dim)
return (averaged,)

def adjoint(self, x: torch.Tensor) -> tuple[torch.Tensor]:
"""Apply the adjoint of the averaging operator to the input tensor."""
if self.domain_size is None:
if self._last_domain_size is None:
raise ValueError('Domain size is not set. Please set it explicitly or run forward first.')
warn(
'Domain size is not set. Guessing the last used input size of the forward pass. '
'Consider setting the domain size explicitly.',
stacklevel=2,
)
self.domain_size = self._last_domain_size

adjoint = x.new_zeros(*x.shape[: self.dim], self.domain_size, *x.shape[self.dim + 1 :])
placeholder = (slice(None),) * (self.dim % x.ndim)
for i, group in enumerate(self.idx):
if isinstance(group, slice):
n = len(range(*group.indices(self.domain_size)))
elif isinstance(group, torch.Tensor) and group.dtype == torch.bool:
n = int(group.sum())
else:
n = len(group)

adjoint[(*placeholder, group)] += (
x[(*placeholder, i, None)].expand(*x.shape[: self.dim], n, *x.shape[self.dim + 1 :]) / n # type: ignore[index]
)

return (adjoint,)
4 changes: 3 additions & 1 deletion src/mrpro/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from mrpro.operators.LinearOperator import LinearOperator
from mrpro.operators.Functional import Functional, ProximableFunctional, ElementaryFunctional, ElementaryProximableFunctional, ScaledFunctional, ScaledProximableFunctional
from mrpro.operators import functionals, models
from mrpro.operators.AveragingOp import AveragingOp
from mrpro.operators.CartesianSamplingOp import CartesianSamplingOp
from mrpro.operators.ConstraintsOp import ConstraintsOp
from mrpro.operators.DensityCompensationOp import DensityCompensationOp
Expand All @@ -30,6 +31,7 @@
from mrpro.operators.ZeroOp import ZeroOp

__all__ = [
"AveragingOp",
"CartesianSamplingOp",
"ConstraintsOp",
"DensityCompensationOp",
Expand Down Expand Up @@ -64,4 +66,4 @@
"ZeroPadOp",
"functionals",
"models"
]
]
47 changes: 47 additions & 0 deletions tests/operators/test_averagingop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Test the averaging operator."""

import pytest
import torch
from mrpro.operators import AveragingOp

from tests import RandomGenerator
from tests.helper import dotproduct_adjointness_test


def test_averageingop_adjointness() -> None:
"""Test the adjointness of the averaging operator."""
rng = RandomGenerator(seed=0)
u = rng.complex64_tensor(size=(5, 15, 10))
v = rng.complex64_tensor(size=(5, 3, 10))
idx = [0, 1, 2], slice(0, 6, 2), torch.tensor([-1, -2, -3])
op = AveragingOp(dim=-2, idx=idx, domain_size=15)
dotproduct_adjointness_test(op, u, v)


def test_averageingop_forward() -> None:
"""Test the forward method of the averaging operator."""
rng = RandomGenerator(seed=1)
u = rng.complex64_tensor(size=(5, 10))
idx = [0, 1, 2], slice(0, 6, 2), torch.tensor([-1, -2, -3])
op = AveragingOp(dim=0, idx=idx, domain_size=5)
(actual,) = op(u)
torch.testing.assert_close(actual[0], u[(0, 1, 2),].mean(dim=0))
torch.testing.assert_close(actual[1], u[(0, 2, 4),].mean(dim=0))
torch.testing.assert_close(actual[2], u[(-1, -2, -3),].mean(dim=0))


def test_averageingop_no_domain_size() -> None:
"""Test the adjoint method of the averaging operator without domain size."""
rng = RandomGenerator(seed=0)
v = rng.float32_tensor(size=(5, 2))
idx = rng.int64_tensor(size=(2, 3), low=0, high=9)
op = AveragingOp(dim=1, idx=idx)

with pytest.raises(ValueError, match='Domain'):
op.adjoint(v)

u = rng.float32_tensor(size=(5, 10))
op(u) # this sets the domain size
with pytest.warns(match='Domain'):
(actual,) = op.adjoint(v)
assert actual.shape == u.shape