Skip to content

Commit

Permalink
Add 'compute_transition_matrix' method to ESMDA (#162)
Browse files Browse the repository at this point in the history
To support row-by-row (parameter by parameter) computation.

* created method: compute_transition_matrix
* created method: perturb_observations
* updated docstrings
* we don't need to center X when computing cov(X, Y)
  • Loading branch information
tommyod authored Oct 23, 2023
1 parent dab58e8 commit b526c49
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 40 deletions.
128 changes: 112 additions & 16 deletions src/iterative_ensemble_smoother/esmda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""
import numbers
from typing import Union
from typing import Tuple, Union

import numpy as np
import numpy.typing as npt
Expand All @@ -42,7 +42,7 @@ class ESMDA:
Parameters
----------
covariance : npt.NDArray[np.double]
covariance : np.ndarray
Either a 1D array of diagonal covariances, or a 2D covariance matrix.
The shape is either (num_observations,) or (num_observations, num_observations).
This is C_D in Emerick (2013), and represents observation or measurement
Expand Down Expand Up @@ -165,22 +165,26 @@ def assimilate(
self,
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
*,
overwrite: bool = False,
truncation: float = 1.0,
) -> npt.NDArray[np.double]:
"""Assimilate data and return an updated ensemble X_posterior.
num_parameters, ensemble_size
Parameters
----------
X : np.ndarray
2D array of shape (num_parameters, ensemble_size).
A 2D array of shape (num_parameters, ensemble_size). Each row
corresponds to a parameter in the model, and each column corresponds
to an ensemble member (realization).
Y : np.ndarray
2D array of shape (num_parameters, ensemble_size).
2D array of shape (num_parameters, ensemble_size), containing
responses when evaluating the model at X. In other words, Y = g(X),
where g is the forward model.
overwrite : bool
If True, then arguments X and Y may be overwritten.
If False, then the method will not permute inputs in any way.
If True, then arguments X and Y may be overwritten and mutated.
If False, then the method will not mutate inputs in any way.
Setting this to True might save memory.
truncation : float
How large a fraction of the singular values to keep in the inversion
routine. Must be a float in the range (0, 1]. A lower number means
Expand Down Expand Up @@ -220,14 +224,7 @@ def assimilate(
# a zero cented normal means that y := L @ z, where z ~ norm(0, 1)
# Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha)
size = (num_outputs, num_ensemble)
if self.C_D.ndim == 2:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.C_D_L @ self.rng.normal(size=size)
else:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1)
D = self.perturb_observations(size=size, alpha=self.alpha[self.iteration])
assert D.shape == (num_outputs, num_ensemble)

# Line 2 (c) in the description of ES-MDA in the 2013 Emerick paper
Expand All @@ -248,6 +245,105 @@ def assimilate(
self.iteration += 1
return X

def compute_transition_matrix(
self,
Y: npt.NDArray[np.double],
*,
alpha: float,
truncation: float = 1.0,
) -> npt.NDArray[np.double]:
"""Return a matrix K such that X_posterior = X_prior + X_prior @ K.
The purpose of this method is to facilitate row-by-row, or batch-by-batch,
updates of X. This is useful if X is too large to fit in memory.
Parameters
----------
Y : np.ndarray
2D array of shape (num_parameters, ensemble_size), containing
responses when evaluating the model at X. In other words, Y = g(X),
where g is the forward model.
alpha : float
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced
in this method call. The user/caller is responsible for this.
truncation : float
How large a fraction of the singular values to keep in the inversion
routine. Must be a float in the range (0, 1]. A lower number means
a more approximate answer and a slightly faster computation.
Returns
-------
K : np.ndarray
A matrix K such that X_posterior = X_prior + X_prior @ K.
It has shape (num_ensemble_members, num_ensemble_members).
"""

# Recall the update equation:
# X += C_MD @ (C_DD + alpha * C_D)^(-1) @ (D - Y)
# X += X @ center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y)
# We form K := center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y),
# so that
# X_new = X_old + X_old @ K
# or
# X += X @ K

D = self.perturb_observations(size=Y.shape, alpha=alpha)
inversion_func = self._inversion_methods[self.inversion]
return inversion_func(
alpha=alpha,
C_D=self.C_D,
D=D,
Y=Y,
X=None, # We don't need X to compute the factor K
truncation=truncation,
return_K=True, # Ensures that we don't need X
)

def perturb_observations(
self, *, size: Tuple[int, int], alpha: float
) -> npt.NDArray[np.double]:
"""Create a matrix D with perturbed observations.
In the Emerick (2013) paper, the matrix D is defined in section 6.
See section 2(b) of the ES-MDA algorithm in the paper.
Parameters
----------
size : Tuple[int, int]
The size, a tuple with (num_observations, ensemble_size).
alpha : float
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced
in this method call. The user/caller is responsible for this.
Returns
-------
D : np.ndarray
Each column consists of perturbed observations, scaled by alpha.
"""
# Draw samples from zero-centered multivariate normal with cov=alpha * C_D,
# and add them to the observations. Notice that
# if C_D = L @ L.T by the cholesky factorization, then drawing y from
# a zero cented normal means that y := L @ z, where z ~ norm(0, 1).
# Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha).

D: npt.NDArray[np.double]

# Two cases, depending on whether C_D was given as 1D or 2D array
if self.C_D.ndim == 2:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.C_D_L @ self.rng.normal(size=size)
else:
D = self.observations.reshape(-1, 1) + np.sqrt(
self.alpha[self.iteration]
) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1)
assert D.shape == size

return D


if __name__ == "__main__":
import pytest
Expand Down
58 changes: 36 additions & 22 deletions src/iterative_ensemble_smoother/esmda_inversion.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

import numpy as np
import numpy.typing as npt
import scipy as sp # type: ignore
Expand Down Expand Up @@ -64,9 +66,15 @@ def empirical_cross_covariance(
assert X.shape[1] == Y.shape[1], "Ensemble size must be equal"

# https://en.wikipedia.org/wiki/Estimation_of_covariance_matrices
# Subtract means
X = X - np.mean(X, axis=1, keepdims=True)
Y = Y - np.mean(Y, axis=1, keepdims=True)
# Subtract mean. Even though the equation says E[(X - E[X])(Y - E[Y])^T],
# we actually only need to subtract the mean value from one matrix, since
# (X - E[X])(Y - E[Y])^T = E[(X - E[X])Y] - E[(X - E[X])E[Y]^T]
# = E[(X - E[X])Y] - E[(0)E[Y]^T] = E[(X - E[X])Y]
# We choose to subtract from the matrix with the smaller number of rows
if X.shape[0] > Y.shape[0]:
Y = Y - np.mean(Y, axis=1, keepdims=True)
else:
X = X - np.mean(X, axis=1, keepdims=True)

# Compute outer product and divide
cov = X @ Y.T / (X.shape[1] - 1)
Expand Down Expand Up @@ -178,8 +186,9 @@ def inversion_exact_cholesky(
C_D: npt.NDArray[np.double],
D: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
X: npt.NDArray[np.double],
X: Optional[npt.NDArray[np.double]],
truncation: float = 1.0,
return_K: bool = False,
) -> npt.NDArray[np.double]:
"""Computes an exact inversion using `sp.linalg.solve`, which uses a
Cholesky factorization in the case of symmetric, positive definite matrices.
Expand Down Expand Up @@ -211,12 +220,15 @@ def inversion_exact_cholesky(
C_DD.flat[:: C_DD.shape[1] + 1] += alpha * C_D
K = sp.linalg.solve(C_DD, D - Y, **solver_kwargs)

# Form C_MD = center(X) @ center(Y).T / (num_ensemble - 1)
X = X - np.mean(X, axis=1, keepdims=True)
# Center matrix
Y = Y - np.mean(Y, axis=1, keepdims=True)
_, num_ensemble = X.shape
_, num_ensemble = Y.shape

return np.linalg.multi_dot([X / (num_ensemble - 1), Y.T, K]) # type: ignore
# Don't left-multiply the X
if return_K:
return (Y.T @ K) / (num_ensemble - 1) # type: ignore

return np.linalg.multi_dot([X, Y.T / (num_ensemble - 1), K]) # type: ignore


def inversion_exact_lstsq(
Expand Down Expand Up @@ -247,10 +259,9 @@ def inversion_exact_lstsq(
lhs, D - Y, overwrite_a=True, overwrite_b=True, lapack_driver="gelsy"
)

# Compute C_MD := center(X) @ center(Y).T / (X.shape[1] - 1)
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (X.shape[1] - 1)
Y_shift = Y - np.mean(Y, axis=1, keepdims=True)
return np.linalg.multi_dot([X_shift, Y_shift.T, ans]) # type: ignore
# Compute C_MD := X @ center(Y).T / (Y.shape[1] - 1)
Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (Y.shape[1] - 1)
return np.linalg.multi_dot([X, Y_shift.T, ans]) # type: ignore


def inversion_exact_rescaled(
Expand Down Expand Up @@ -311,11 +322,10 @@ def inversion_exact_rescaled(
term = C_D_L_inv.T @ U_r if C_D.ndim == 2 else (C_D_L_inv * U_r.T).T

# Compute the first factors, which make up C_MD
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (N_e - 1)
Y_shift = Y - np.mean(Y, axis=1, keepdims=True)
Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (N_e - 1)

return np.linalg.multi_dot( # type: ignore
[X_shift, Y_shift.T, term / s_r, term.T, (D - Y)]
[X, Y_shift.T, term / s_r, term.T, (D - Y)]
)


Expand Down Expand Up @@ -352,7 +362,7 @@ def inversion_exact_subspace_woodbury(
D_delta /= np.sqrt(N_e - 1)

# Compute the first factors, which make up C_MD
X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1)
# X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1)

# A full covariance matrix was given
if C_D.ndim == 2:
Expand All @@ -370,7 +380,7 @@ def inversion_exact_subspace_woodbury(
# Compute the woodbury inversion, then return
inverted = C_D_inv - np.linalg.multi_dot([term, sp.linalg.inv(center), term.T])
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, inverted, (D - Y)]
[X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)]
)

# A diagonal covariance matrix was given as a 1D array.
Expand All @@ -384,7 +394,7 @@ def inversion_exact_subspace_woodbury(
[UT_D.T, sp.linalg.inv(center), UT_D]
)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, inverted, (D - Y)]
[X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)]
)


Expand All @@ -396,6 +406,7 @@ def inversion_subspace(
Y: npt.NDArray[np.double],
X: npt.NDArray[np.double],
truncation: float = 1.0,
return_K: bool = False,
) -> npt.NDArray[np.double]:
"""See Appendix A.2 in Emerick et al (2012)
Expand Down Expand Up @@ -468,10 +479,14 @@ def inversion_subspace(
# and finally we multiiply by (D - Y)
term = U_r_w_inv @ Z

if return_K:
return np.linalg.multi_dot( # type: ignore
[D_delta.T, (term / (1 + T)), term.T, (D - Y)]
)

# Compute C_MD = center(X) @ center(Y).T / (num_ensemble - 1)
X_shift = X - np.mean(X, axis=1, keepdims=True)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, (term / (1 + T)), term.T, (D - Y)]
[X, D_delta.T, (term / (1 + T)), term.T, (D - Y)]
)


Expand Down Expand Up @@ -530,9 +545,8 @@ def inversion_rescaled_subspace(
diag = 1 / (1 + T_r)

# Compute C_MD
X_shift = X - np.mean(X, axis=1, keepdims=True)
return np.linalg.multi_dot( # type: ignore
[X_shift, D_delta.T, (term * diag), term.T, (D - Y)]
[X, D_delta.T, (term * diag), term.T, (D - Y)]
)


Expand Down
58 changes: 57 additions & 1 deletion tests/test_esmda.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,69 @@ def test_that_float_dtypes_are_preserved(inversion, dtype, diagonal):
assert X_posterior.dtype == dtype


@pytest.mark.parametrize("inversion", ESMDA._inversion_methods.keys())
def test_row_by_row_assimilation(inversion):
# Create problem instance
rng = np.random.default_rng(42)

num_outputs = 4
num_inputs = 5
num_ensemble = 3

A = rng.normal(size=(num_outputs, num_inputs))

def g(X):
return A @ X

# Prior is N(0, 1)
X_prior = rng.normal(size=(num_inputs, num_ensemble))

covariance = np.exp(rng.normal(size=num_outputs))
observations = A @ np.linspace(0, 1, num=num_inputs) + rng.normal(
size=num_outputs, scale=0.01
)

# =========== Use the high level API ===========
smoother = ESMDA(
covariance=covariance,
observations=observations,
alpha=2,
inversion=inversion,
seed=1,
)
X = np.copy(X_prior)
for iteration in range(smoother.num_assimilations()):
X = smoother.assimilate(X, g(X))

X_posterior_highlevel_API = np.copy(X)

# =========== Use the low-level level API ===========
smoother = ESMDA(
covariance=covariance,
observations=observations,
alpha=2,
inversion=inversion,
seed=1,
)
X = np.copy(X_prior)
for alpha_i in smoother.alpha:
K = smoother.compute_transition_matrix(Y=g(X), alpha=alpha_i)

# Here we could loop over each row in X and multiply by K
X += X @ K

X_posterior_lowlevel_API = np.copy(X)

assert np.allclose(X_posterior_highlevel_API, X_posterior_lowlevel_API)


if __name__ == "__main__":
import pytest

pytest.main(
args=[
__file__,
"-v",
"-k test_that_float_dtypes_are_preserved",
"-k test_row_by_row_assimilation",
]
)
Loading

0 comments on commit b526c49

Please sign in to comment.