Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facilitate streaming (row-by-row update) in ESMDA #162

Merged
merged 9 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 112 additions & 16 deletions src/iterative_ensemble_smoother/esmda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

"""
import numbers
from typing import Union
from typing import Tuple, Union

import numpy as np
import numpy.typing as npt
Expand All @@ -42,7 +42,7 @@ class ESMDA:

Parameters
----------
covariance : npt.NDArray[np.double]
covariance : np.ndarray
Either a 1D array of diagonal covariances, or a 2D covariance matrix.
The shape is either (num_observations,) or (num_observations, num_observations).
This is C_D in Emerick (2013), and represents observation or measurement
Expand Down Expand Up @@ -165,22 +165,26 @@ def assimilate(
self,
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
*,
overwrite: bool = False,
truncation: float = 1.0,
) -> npt.NDArray[np.double]:
"""Assimilate data and return an updated ensemble X_posterior.

num_parameters, ensemble_size

Parameters
----------
X : np.ndarray
2D array of shape (num_parameters, ensemble_size).
A 2D array of shape (num_parameters, ensemble_size). Each row
corresponds to a parameter in the model, and each column corresponds
to an ensemble member (realization).
Y : np.ndarray
2D array of shape (num_parameters, ensemble_size).
2D array of shape (num_parameters, ensemble_size), containing
responses when evaluating the model at X. In other words, Y = g(X),
where g is the forward model.
overwrite : bool
If True, then arguments X and Y may be overwritten.
If False, then the method will not permute inputs in any way.
If True, then arguments X and Y may be overwritten and mutated.
If False, then the method will not mutate inputs in any way.
Setting this to True might save memory.
truncation : float
How large a fraction of the singular values to keep in the inversion
routine. Must be a float in the range (0, 1]. A lower number means
Expand Down Expand Up @@ -220,14 +224,7 @@ def assimilate(
# a zero cented normal means that y := L @ z, where z ~ norm(0, 1)
# Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha)
size = (num_outputs, num_ensemble)
if self.C_D.ndim == 2:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.C_D_L @ self.rng.normal(size=size)
else:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1)
D = self.perturb_observations(size=size, alpha=self.alpha[self.iteration])
assert D.shape == (num_outputs, num_ensemble)

# Line 2 (c) in the description of ES-MDA in the 2013 Emerick paper
Expand All @@ -248,6 +245,105 @@ def assimilate(
self.iteration += 1
return X

def compute_transition_matrix(
self,
Y: npt.NDArray[np.double],
*,
alpha: float,
truncation: float = 1.0,
) -> npt.NDArray[np.double]:
"""Return a matrix K such that X_posterior = X_prior + X_prior @ K.

The purpose of this method is to facilitate row-by-row, or batch-by-batch,
updates of X. This is useful if X is too large to fit in memory.

Parameters
----------
Y : np.ndarray
2D array of shape (num_parameters, ensemble_size), containing
responses when evaluating the model at X. In other words, Y = g(X),
where g is the forward model.
alpha : float
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced
in this method call. The user/caller is responsible for this.
truncation : float
How large a fraction of the singular values to keep in the inversion
routine. Must be a float in the range (0, 1]. A lower number means
a more approximate answer and a slightly faster computation.

Returns
-------
K : np.ndarray
A matrix K such that X_posterior = X_prior + X_prior @ K.
It has shape (num_ensemble_members, num_ensemble_members).
"""

# Recall the update equation:
# X += C_MD @ (C_DD + alpha * C_D)^(-1) @ (D - Y)
# X += X @ center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y)
# We form K := center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y),
# so that
# X_new = X_old + X_old @ K
# or
# X += X @ K

D = self.perturb_observations(size=Y.shape, alpha=alpha)
inversion_func = self._inversion_methods[self.inversion]
return inversion_func(
alpha=alpha,
C_D=self.C_D,
D=D,
Y=Y,
X=None, # We don't need X to compute the factor K
truncation=truncation,
return_K=True, # Ensures that we don't need X
)

def perturb_observations(
self, *, size: Tuple[int, int], alpha: float
) -> npt.NDArray[np.double]:
"""Create a matrix D with perturbed observations.

In the Emerick (2013) paper, the matrix D is defined in section 6.
See section 2(b) of the ES-MDA algorithm in the paper.

Parameters
----------
size : Tuple[int, int]
The size, a tuple with (num_observations, ensemble_size).
alpha : float
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced
in this method call. The user/caller is responsible for this.

Returns
-------
D : np.ndarray
Each column consists of perturbed observations, scaled by alpha.

"""
# Draw samples from zero-centered multivariate normal with cov=alpha * C_D,
# and add them to the observations. Notice that
# if C_D = L @ L.T by the cholesky factorization, then drawing y from
# a zero cented normal means that y := L @ z, where z ~ norm(0, 1).
# Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha).

D: npt.NDArray[np.double]

# Two cases, depending on whether C_D was given as 1D or 2D array
if self.C_D.ndim == 2:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.C_D_L @ self.rng.normal(size=size)
else:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1)
assert D.shape == size

return D


if __name__ == "__main__":
import pytest
Expand Down
58 changes: 36 additions & 22 deletions src/iterative_ensemble_smoother/esmda_inversion.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

import numpy as np
import numpy.typing as npt
import scipy as sp # type: ignore
Expand Down Expand Up @@ -64,9 +66,15 @@ def empirical_cross_covariance(
assert X.shape[1] == Y.shape[1], "Ensemble size must be equal"

# https://en.wikipedia.org/wiki/Estimation_of_covariance_matrices
# Subtract means
X = X - np.mean(X, axis=1, keepdims=True)
Y = Y - np.mean(Y, axis=1, keepdims=True)
# Subtract mean. Even though the equation says E[(X - E[X])(Y - E[Y])^T],
# we actually only need to subtract the mean value from one matrix, since
# (X - E[X])(Y - E[Y])^T = E[(X - E[X])Y] - E[(X - E[X])E[Y]^T]
# = E[(X - E[X])Y] - E[(0)E[Y]^T] = E[(X - E[X])Y]
# We choose to subtract from the matrix with the smaller number of rows
if X.shape[0] > Y.shape[0]:
Y = Y - np.mean(Y, axis=1, keepdims=True)
else:
X = X - np.mean(X, axis=1, keepdims=True)

# Compute outer product and divide
cov = X @ Y.T / (X.shape[1] - 1)
Expand Down Expand Up @@ -178,8 +186,9 @@ def inversion_exact_cholesky(
C_D: npt.NDArray[np.double],
D: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
X: npt.NDArray[np.double],
X: Optional[npt.NDArray[np.double]],
truncation: float = 1.0,
return_K: bool = False,
) -> npt.NDArray[np.double]:
"""Computes an exact inversion using `sp.linalg.solve`, which uses a
Cholesky factorization in the case of symmetric, positive definite matrices.
Expand Down Expand Up @@ -211,12 +220,15 @@ def inversion_exact_cholesky(
C_DD.flat[:: C_DD.shape[1] + 1] += alpha * C_D
K = sp.linalg.solve(C_DD, D - Y, **solver_kwargs)

# Form C_MD = center(X) @ center(Y).T / (num_ensemble - 1)
X = X - np.mean(X, axis=1, keepdims=True)
# Center matrix
Y = Y - np.mean(Y, axis=1, keepdims=True)
_, num_ensemble = X.shape
_, num_ensemble = Y.shape

return np.linalg.multi_dot([X / (num_ensemble - 1), Y.T, K]) # type: ignore
# Don't left-multiply the X
if return_K:
return (Y.T @ K) / (num_ensemble - 1) # type: ignore

return np.linalg.multi_dot([X, Y.T / (num_ensemble - 1), K]) # type: ignore


def inversion_exact_lstsq(
Expand Down Expand Up @@ -247,10 +259,9 @@ def inversion_exact_lstsq(
lhs, D - Y, overwrite_a=True, overwrite_b=True, lapack_driver="gelsy"
)

# Compute C_MD := center(X) @ center(Y).T / (X.shape[1] - 1)
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (X.shape[1] - 1)
Y_shift = Y - np.mean(Y, axis=1, keepdims=True)
return np.linalg.multi_dot([X_shift, Y_shift.T, ans]) # type: ignore
# Compute C_MD := X @ center(Y).T / (Y.shape[1] - 1)
Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (Y.shape[1] - 1)
return np.linalg.multi_dot([X, Y_shift.T, ans]) # type: ignore


def inversion_exact_rescaled(
Expand Down Expand Up @@ -311,11 +322,10 @@ def inversion_exact_rescaled(
term = C_D_L_inv.T @ U_r if C_D.ndim == 2 else (C_D_L_inv * U_r.T).T

# Compute the first factors, which make up C_MD
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (N_e - 1)
Y_shift = Y - np.mean(Y, axis=1, keepdims=True)
Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (N_e - 1)

return np.linalg.multi_dot( # type: ignore
[X_shift, Y_shift.T, term / s_r, term.T, (D - Y)]
[X, Y_shift.T, term / s_r, term.T, (D - Y)]
)


Expand Down Expand Up @@ -352,7 +362,7 @@ def inversion_exact_subspace_woodbury(
D_delta /= np.sqrt(N_e - 1)

# Compute the first factors, which make up C_MD
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1)
# X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1)

# A full covariance matrix was given
if C_D.ndim == 2:
Expand All @@ -370,7 +380,7 @@ def inversion_exact_subspace_woodbury(
# Compute the woodbury inversion, then return
inverted = C_D_inv - np.linalg.multi_dot([term, sp.linalg.inv(center), term.T])
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, inverted, (D - Y)]
[X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)]
)

# A diagonal covariance matrix was given as a 1D array.
Expand All @@ -384,7 +394,7 @@ def inversion_exact_subspace_woodbury(
[UT_D.T, sp.linalg.inv(center), UT_D]
)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, inverted, (D - Y)]
[X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)]
)


Expand All @@ -396,6 +406,7 @@ def inversion_subspace(
Y: npt.NDArray[np.double],
X: npt.NDArray[np.double],
truncation: float = 1.0,
return_K: bool = False,
) -> npt.NDArray[np.double]:
"""See Appendix A.2 in Emerick et al (2012)

Expand Down Expand Up @@ -468,10 +479,14 @@ def inversion_subspace(
# and finally we multiiply by (D - Y)
term = U_r_w_inv @ Z

if return_K:
return np.linalg.multi_dot( # type: ignore
[D_delta.T, (term / (1 + T)), term.T, (D - Y)]
)

# Compute C_MD = center(X) @ center(Y).T / (num_ensemble - 1)
X_shift = X - np.mean(X, axis=1, keepdims=True)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, (term / (1 + T)), term.T, (D - Y)]
[X, D_delta.T, (term / (1 + T)), term.T, (D - Y)]
)


Expand Down Expand Up @@ -530,9 +545,8 @@ def inversion_rescaled_subspace(
diag = 1 / (1 + T_r)

# Compute C_MD
X_shift = X - np.mean(X, axis=1, keepdims=True)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, (term * diag), term.T, (D - Y)]
[X, D_delta.T, (term * diag), term.T, (D - Y)]
)


Expand Down
58 changes: 57 additions & 1 deletion tests/test_esmda.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,69 @@ def test_that_float_dtypes_are_preserved(inversion, dtype, diagonal):
assert X_posterior.dtype == dtype


@pytest.mark.parametrize("inversion", ESMDA._inversion_methods.keys())
def test_row_by_row_assimilation(inversion):
# Create problem instance
rng = np.random.default_rng(42)

num_outputs = 4
num_inputs = 5
num_ensemble = 3

A = rng.normal(size=(num_outputs, num_inputs))

def g(X):
return A @ X

# Prior is N(0, 1)
X_prior = rng.normal(size=(num_inputs, num_ensemble))

covariance = np.exp(rng.normal(size=num_outputs))
observations = A @ np.linspace(0, 1, num=num_inputs) + rng.normal(
size=num_outputs, scale=0.01
)

# =========== Use the high level API ===========
smoother = ESMDA(
covariance=covariance,
observations=observations,
alpha=2,
inversion=inversion,
seed=1,
)
X = np.copy(X_prior)
for iteration in range(smoother.num_assimilations()):
X = smoother.assimilate(X, g(X))

X_posterior_highlevel_API = np.copy(X)

# =========== Use the low-level level API ===========
smoother = ESMDA(
covariance=covariance,
observations=observations,
alpha=2,
inversion=inversion,
seed=1,
)
X = np.copy(X_prior)
for alpha_i in smoother.alpha:
K = smoother.compute_transition_matrix(Y=g(X), alpha=alpha_i)

# Here we could loop over each row in X and multiply by K
X += X @ K

X_posterior_lowlevel_API = np.copy(X)

assert np.allclose(X_posterior_highlevel_API, X_posterior_lowlevel_API)


if __name__ == "__main__":
import pytest

pytest.main(
args=[
__file__,
"-v",
"-k test_that_float_dtypes_are_preserved",
"-k test_row_by_row_assimilation",
]
)
Loading