diff --git a/src/iterative_ensemble_smoother/esmda.py b/src/iterative_ensemble_smoother/esmda.py index e66f5cb3..9de8a13e 100644 --- a/src/iterative_ensemble_smoother/esmda.py +++ b/src/iterative_ensemble_smoother/esmda.py @@ -22,7 +22,7 @@ """ import numbers -from typing import Union +from typing import Tuple, Union import numpy as np import numpy.typing as npt @@ -42,7 +42,7 @@ class ESMDA: Parameters ---------- - covariance : npt.NDArray[np.double] + covariance : np.ndarray Either a 1D array of diagonal covariances, or a 2D covariance matrix. The shape is either (num_observations,) or (num_observations, num_observations). This is C_D in Emerick (2013), and represents observation or measurement @@ -165,22 +165,26 @@ def assimilate( self, X: npt.NDArray[np.double], Y: npt.NDArray[np.double], + *, overwrite: bool = False, truncation: float = 1.0, ) -> npt.NDArray[np.double]: """Assimilate data and return an updated ensemble X_posterior. - num_parameters, ensemble_size - Parameters ---------- X : np.ndarray - 2D array of shape (num_parameters, ensemble_size). + A 2D array of shape (num_parameters, ensemble_size). Each row + corresponds to a parameter in the model, and each column corresponds + to an ensemble member (realization). Y : np.ndarray - 2D array of shape (num_parameters, ensemble_size). + 2D array of shape (num_parameters, ensemble_size), containing + responses when evaluating the model at X. In other words, Y = g(X), + where g is the forward model. overwrite : bool - If True, then arguments X and Y may be overwritten. - If False, then the method will not permute inputs in any way. + If True, then arguments X and Y may be overwritten and mutated. + If False, then the method will not mutate inputs in any way. + Setting this to True might save memory. truncation : float How large a fraction of the singular values to keep in the inversion routine. Must be a float in the range (0, 1]. A lower number means @@ -220,14 +224,7 @@ def assimilate( # a zero cented normal means that y := L @ z, where z ~ norm(0, 1) # Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha) size = (num_outputs, num_ensemble) - if self.C_D.ndim == 2: - D = self.observations.reshape(-1, 1) + np.sqrt( - self.alpha[self.iteration] - ) * self.C_D_L @ self.rng.normal(size=size) - else: - D = self.observations.reshape(-1, 1) + np.sqrt( - self.alpha[self.iteration] - ) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1) + D = self.perturb_observations(size=size, alpha=self.alpha[self.iteration]) assert D.shape == (num_outputs, num_ensemble) # Line 2 (c) in the description of ES-MDA in the 2013 Emerick paper @@ -248,6 +245,105 @@ def assimilate( self.iteration += 1 return X + def compute_transition_matrix( + self, + Y: npt.NDArray[np.double], + *, + alpha: float, + truncation: float = 1.0, + ) -> npt.NDArray[np.double]: + """Return a matrix K such that X_posterior = X_prior + X_prior @ K. + + The purpose of this method is to facilitate row-by-row, or batch-by-batch, + updates of X. This is useful if X is too large to fit in memory. + + Parameters + ---------- + Y : np.ndarray + 2D array of shape (num_parameters, ensemble_size), containing + responses when evaluating the model at X. In other words, Y = g(X), + where g is the forward model. + alpha : float + The covariance inflation factor. The sequence of alphas should + obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced + in this method call. The user/caller is responsible for this. + truncation : float + How large a fraction of the singular values to keep in the inversion + routine. Must be a float in the range (0, 1]. A lower number means + a more approximate answer and a slightly faster computation. + + Returns + ------- + K : np.ndarray + A matrix K such that X_posterior = X_prior + X_prior @ K. + It has shape (num_ensemble_members, num_ensemble_members). + """ + + # Recall the update equation: + # X += C_MD @ (C_DD + alpha * C_D)^(-1) @ (D - Y) + # X += X @ center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y) + # We form K := center(Y).T / (N-1) @ (C_DD + alpha * C_D)^(-1) @ (D - Y), + # so that + # X_new = X_old + X_old @ K + # or + # X += X @ K + + D = self.perturb_observations(size=Y.shape, alpha=alpha) + inversion_func = self._inversion_methods[self.inversion] + return inversion_func( + alpha=alpha, + C_D=self.C_D, + D=D, + Y=Y, + X=None, # We don't need X to compute the factor K + truncation=truncation, + return_K=True, # Ensures that we don't need X + ) + + def perturb_observations( + self, *, size: Tuple[int, int], alpha: float + ) -> npt.NDArray[np.double]: + """Create a matrix D with perturbed observations. + + In the Emerick (2013) paper, the matrix D is defined in section 6. + See section 2(b) of the ES-MDA algorithm in the paper. + + Parameters + ---------- + size : Tuple[int, int] + The size, a tuple with (num_observations, ensemble_size). + alpha : float + The covariance inflation factor. The sequence of alphas should + obey the equation sum_i (1/alpha_i) = 1. However, this is NOT enforced + in this method call. The user/caller is responsible for this. + + Returns + ------- + D : np.ndarray + Each column consists of perturbed observations, scaled by alpha. + + """ + # Draw samples from zero-centered multivariate normal with cov=alpha * C_D, + # and add them to the observations. Notice that + # if C_D = L @ L.T by the cholesky factorization, then drawing y from + # a zero cented normal means that y := L @ z, where z ~ norm(0, 1). + # Therefore, scaling C_D by alpha is equivalent to scaling L with sqrt(alpha). + + D: npt.NDArray[np.double] + + # Two cases, depending on whether C_D was given as 1D or 2D array + if self.C_D.ndim == 2: + D = self.observations.reshape(-1, 1) + np.sqrt( + self.alpha[self.iteration] + ) * self.C_D_L @ self.rng.normal(size=size) + else: + D = self.observations.reshape(-1, 1) + np.sqrt( + self.alpha[self.iteration] + ) * self.rng.normal(size=size) * self.C_D_L.reshape(-1, 1) + assert D.shape == size + + return D + if __name__ == "__main__": import pytest diff --git a/src/iterative_ensemble_smoother/esmda_inversion.py b/src/iterative_ensemble_smoother/esmda_inversion.py index 5d1586d9..ac42551a 100644 --- a/src/iterative_ensemble_smoother/esmda_inversion.py +++ b/src/iterative_ensemble_smoother/esmda_inversion.py @@ -1,3 +1,5 @@ +from typing import Optional + import numpy as np import numpy.typing as npt import scipy as sp # type: ignore @@ -64,9 +66,15 @@ def empirical_cross_covariance( assert X.shape[1] == Y.shape[1], "Ensemble size must be equal" # https://en.wikipedia.org/wiki/Estimation_of_covariance_matrices - # Subtract means - X = X - np.mean(X, axis=1, keepdims=True) - Y = Y - np.mean(Y, axis=1, keepdims=True) + # Subtract mean. Even though the equation says E[(X - E[X])(Y - E[Y])^T], + # we actually only need to subtract the mean value from one matrix, since + # (X - E[X])(Y - E[Y])^T = E[(X - E[X])Y] - E[(X - E[X])E[Y]^T] + # = E[(X - E[X])Y] - E[(0)E[Y]^T] = E[(X - E[X])Y] + # We choose to subtract from the matrix with the smaller number of rows + if X.shape[0] > Y.shape[0]: + Y = Y - np.mean(Y, axis=1, keepdims=True) + else: + X = X - np.mean(X, axis=1, keepdims=True) # Compute outer product and divide cov = X @ Y.T / (X.shape[1] - 1) @@ -178,8 +186,9 @@ def inversion_exact_cholesky( C_D: npt.NDArray[np.double], D: npt.NDArray[np.double], Y: npt.NDArray[np.double], - X: npt.NDArray[np.double], + X: Optional[npt.NDArray[np.double]], truncation: float = 1.0, + return_K: bool = False, ) -> npt.NDArray[np.double]: """Computes an exact inversion using `sp.linalg.solve`, which uses a Cholesky factorization in the case of symmetric, positive definite matrices. @@ -211,12 +220,15 @@ def inversion_exact_cholesky( C_DD.flat[:: C_DD.shape[1] + 1] += alpha * C_D K = sp.linalg.solve(C_DD, D - Y, **solver_kwargs) - # Form C_MD = center(X) @ center(Y).T / (num_ensemble - 1) - X = X - np.mean(X, axis=1, keepdims=True) + # Center matrix Y = Y - np.mean(Y, axis=1, keepdims=True) - _, num_ensemble = X.shape + _, num_ensemble = Y.shape - return np.linalg.multi_dot([X / (num_ensemble - 1), Y.T, K]) # type: ignore + # Don't left-multiply the X + if return_K: + return (Y.T @ K) / (num_ensemble - 1) # type: ignore + + return np.linalg.multi_dot([X, Y.T / (num_ensemble - 1), K]) # type: ignore def inversion_exact_lstsq( @@ -247,10 +259,9 @@ def inversion_exact_lstsq( lhs, D - Y, overwrite_a=True, overwrite_b=True, lapack_driver="gelsy" ) - # Compute C_MD := center(X) @ center(Y).T / (X.shape[1] - 1) - X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (X.shape[1] - 1) - Y_shift = Y - np.mean(Y, axis=1, keepdims=True) - return np.linalg.multi_dot([X_shift, Y_shift.T, ans]) # type: ignore + # Compute C_MD := X @ center(Y).T / (Y.shape[1] - 1) + Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (Y.shape[1] - 1) + return np.linalg.multi_dot([X, Y_shift.T, ans]) # type: ignore def inversion_exact_rescaled( @@ -311,11 +322,10 @@ def inversion_exact_rescaled( term = C_D_L_inv.T @ U_r if C_D.ndim == 2 else (C_D_L_inv * U_r.T).T # Compute the first factors, which make up C_MD - X_shift = (X - np.mean(X, axis=1, keepdims=True)) / (N_e - 1) - Y_shift = Y - np.mean(Y, axis=1, keepdims=True) + Y_shift = (Y - np.mean(Y, axis=1, keepdims=True)) / (N_e - 1) return np.linalg.multi_dot( # type: ignore - [X_shift, Y_shift.T, term / s_r, term.T, (D - Y)] + [X, Y_shift.T, term / s_r, term.T, (D - Y)] ) @@ -352,7 +362,7 @@ def inversion_exact_subspace_woodbury( D_delta /= np.sqrt(N_e - 1) # Compute the first factors, which make up C_MD - X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1) + # X_shift = (X - np.mean(X, axis=1, keepdims=True)) / np.sqrt(N_e - 1) # A full covariance matrix was given if C_D.ndim == 2: @@ -370,7 +380,7 @@ def inversion_exact_subspace_woodbury( # Compute the woodbury inversion, then return inverted = C_D_inv - np.linalg.multi_dot([term, sp.linalg.inv(center), term.T]) return np.linalg.multi_dot( # type: ignore - [X_shift, D_delta.T, inverted, (D - Y)] + [X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)] ) # A diagonal covariance matrix was given as a 1D array. @@ -384,7 +394,7 @@ def inversion_exact_subspace_woodbury( [UT_D.T, sp.linalg.inv(center), UT_D] ) return np.linalg.multi_dot( # type: ignore - [X_shift, D_delta.T, inverted, (D - Y)] + [X, D_delta.T / np.sqrt(N_e - 1), inverted, (D - Y)] ) @@ -396,6 +406,7 @@ def inversion_subspace( Y: npt.NDArray[np.double], X: npt.NDArray[np.double], truncation: float = 1.0, + return_K: bool = False, ) -> npt.NDArray[np.double]: """See Appendix A.2 in Emerick et al (2012) @@ -468,10 +479,14 @@ def inversion_subspace( # and finally we multiiply by (D - Y) term = U_r_w_inv @ Z + if return_K: + return np.linalg.multi_dot( # type: ignore + [D_delta.T, (term / (1 + T)), term.T, (D - Y)] + ) + # Compute C_MD = center(X) @ center(Y).T / (num_ensemble - 1) - X_shift = X - np.mean(X, axis=1, keepdims=True) return np.linalg.multi_dot( # type: ignore - [X_shift, D_delta.T, (term / (1 + T)), term.T, (D - Y)] + [X, D_delta.T, (term / (1 + T)), term.T, (D - Y)] ) @@ -530,9 +545,8 @@ def inversion_rescaled_subspace( diag = 1 / (1 + T_r) # Compute C_MD - X_shift = X - np.mean(X, axis=1, keepdims=True) return np.linalg.multi_dot( # type: ignore - [X_shift, D_delta.T, (term * diag), term.T, (D - Y)] + [X, D_delta.T, (term * diag), term.T, (D - Y)] ) diff --git a/tests/test_esmda.py b/tests/test_esmda.py index 74f48100..3fe83c0b 100644 --- a/tests/test_esmda.py +++ b/tests/test_esmda.py @@ -387,6 +387,62 @@ def test_that_float_dtypes_are_preserved(inversion, dtype, diagonal): assert X_posterior.dtype == dtype +@pytest.mark.parametrize("inversion", ESMDA._inversion_methods.keys()) +def test_row_by_row_assimilation(inversion): + # Create problem instance + rng = np.random.default_rng(42) + + num_outputs = 4 + num_inputs = 5 + num_ensemble = 3 + + A = rng.normal(size=(num_outputs, num_inputs)) + + def g(X): + return A @ X + + # Prior is N(0, 1) + X_prior = rng.normal(size=(num_inputs, num_ensemble)) + + covariance = np.exp(rng.normal(size=num_outputs)) + observations = A @ np.linspace(0, 1, num=num_inputs) + rng.normal( + size=num_outputs, scale=0.01 + ) + + # =========== Use the high level API =========== + smoother = ESMDA( + covariance=covariance, + observations=observations, + alpha=2, + inversion=inversion, + seed=1, + ) + X = np.copy(X_prior) + for iteration in range(smoother.num_assimilations()): + X = smoother.assimilate(X, g(X)) + + X_posterior_highlevel_API = np.copy(X) + + # =========== Use the low-level level API =========== + smoother = ESMDA( + covariance=covariance, + observations=observations, + alpha=2, + inversion=inversion, + seed=1, + ) + X = np.copy(X_prior) + for alpha_i in smoother.alpha: + K = smoother.compute_transition_matrix(Y=g(X), alpha=alpha_i) + + # Here we could loop over each row in X and multiply by K + X += X @ K + + X_posterior_lowlevel_API = np.copy(X) + + assert np.allclose(X_posterior_highlevel_API, X_posterior_lowlevel_API) + + if __name__ == "__main__": import pytest @@ -394,6 +450,6 @@ def test_that_float_dtypes_are_preserved(inversion, dtype, diagonal): args=[ __file__, "-v", - "-k test_that_float_dtypes_are_preserved", + "-k test_row_by_row_assimilation", ] ) diff --git a/tests/test_esmda_inversion.py b/tests/test_esmda_inversion.py index 465087a1..df534741 100644 --- a/tests/test_esmda_inversion.py +++ b/tests/test_esmda_inversion.py @@ -37,6 +37,39 @@ class TestEsmdaInversion: + # Functions that take the parameter return_K + @pytest.mark.parametrize( + "function", + [inversion_exact_cholesky, inversion_subspace], + ) + @pytest.mark.parametrize("ensemble_members", [5, 10, 15]) + @pytest.mark.parametrize("num_outputs", [5, 10, 15]) + @pytest.mark.parametrize("num_inputs", [5, 10, 15]) + def test_that_returning_K_is_equivalent_to_full_computation( + self, function, ensemble_members, num_outputs, num_inputs + ): + # ensemble_members, num_outputs, num_inputs = 5, 10, 15 + + np.random.seed(ensemble_members * 100 + num_outputs * 10 + num_inputs) + + # Create positive symmetric definite covariance C_D + E = np.random.randn(num_outputs, num_outputs) + C_D = E.T @ E + + # Set alpha to something other than 1 to test that it works + alpha = 3 + + # Create observations + D = np.random.randn(num_outputs, ensemble_members) + Y = np.random.randn(num_outputs, ensemble_members) + X = np.random.randn(num_inputs, ensemble_members) + + # Test both with and without X / return_K + ans = function(alpha=alpha, C_D=C_D, D=D, Y=Y, X=X) + K = function(alpha=alpha, C_D=C_D, D=D, Y=Y, X=None, return_K=True) + + assert np.allclose(X + ans, X + X @ K) + @pytest.mark.parametrize("length", list(range(1, 101, 5))) def test_that_the_sum_of_normalize_alpha_is_one(self, length): # Generate random array @@ -350,6 +383,6 @@ def test_timing(num_outputs=100, num_inputs=50, ensemble_members=25): args=[ __file__, "-v", - "-k test_that_exact_inverions_are_equal_with_few_ensemble_members", + "-k test_that_returning_K_is_equivalent_to_full_computation", ] )