Skip to content

Commit

Permalink
Use in-place operations for cross-correlations
Browse files Browse the repository at this point in the history
* Calc stds_X and Y only once
---------

Co-authored-by: Tommy <10076072+tommyod@users.noreply.github.com>
  • Loading branch information
dafeda and tommyod authored Jan 15, 2024
1 parent 5e96924 commit 1ef022d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 52 deletions.
2 changes: 2 additions & 0 deletions docs/source/Adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
# [Kullback–Leibler divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence),
# but we do not pursue this here.


# %%
def get_prior(num_parameters, num_ensemble, prior_std):
"""Sample prior from N(0, prior_std)."""
Expand Down Expand Up @@ -289,6 +290,7 @@ def g(X):
# - The draw of prior realizations $X \sim N(0, \sigma)$.
# - The noise (given by the `covariance` argument) that ESMDA adds to the observations.


# %%
def corr_true(array):
return sp.stats.pearsonr(x_true, array).statistic
Expand Down
1 change: 0 additions & 1 deletion docs/source/Polynomial.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def poly(a, b, c, x):
)
n_ies_iter = 7
for i in range(n_ies_iter):

step_length = steplength_exponential(i + 1)
X_IES_ert = smoother_ies.sies_iteration(Y_IES_ert, step_length=step_length)

Expand Down
71 changes: 42 additions & 29 deletions src/iterative_ensemble_smoother/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,37 @@ def compute_cross_covariance_multiplier(

return sp.linalg.solve(C_DD, D - Y, **solver_kwargs) # type: ignore

def _correlation_matrix(
def _cov_to_corr_inplace(
self,
cov_XY: npt.NDArray[np.double],
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
) -> npt.NDArray[np.double]:
"""Compute a correlation matrix given a covariance matrix."""
assert cov_XY.shape == (X.shape[0], Y.shape[0])
stds_X: npt.NDArray[np.double],
stds_Y: npt.NDArray[np.double],
) -> None:
"""Convert a covariance matrix to a correlation matrix in-place."""

stds_Y = np.std(Y, axis=1, ddof=1)
stds_X = np.std(X, axis=1, ddof=1)

# Compute the correlation matrix from the covariance matrix
corr_XY: npt.NDArray[np.double] = (cov_XY / stds_X[:, np.newaxis]) / stds_Y[
np.newaxis, :
]
# Divide each element of cov_XY by the corresponding standard deviations
cov_XY /= stds_X[:, np.newaxis]
cov_XY /= stds_Y[np.newaxis, :]

# Perform checks. There appears to be occasional numerical issues in
# the equation. With 2 ensemble members, we get e.g. a max value of
# 1.0000000000016778. We allow some leeway and clip the results.
# Perform checks and clip values to [-1, 1]
eps = 1e-8
if not ((corr_XY.max() <= 1 + eps) and (corr_XY.min() >= -1 - eps)):
if not ((cov_XY.max() <= 1 + eps) and (cov_XY.min() >= -1 - eps)):
msg = "Cross-correlation matrix has entries not in [-1, 1]."
msg += f"The min and max values are: {corr_XY.min()} and {corr_XY.max()}"
msg += f"The min and max values are: {cov_XY.min()} and {cov_XY.max()}"
warnings.warn(msg)

corr_XY = np.clip(corr_XY, a_min=-1, a_max=1)
return np.clip(cov_XY, a_min=-1, a_max=1, out=cov_XY)

def _corr_to_cov_inplace(
self,
corr_XY: npt.NDArray[np.double],
stds_X: npt.NDArray[np.double],
stds_Y: npt.NDArray[np.double],
) -> None:
"""Convert a correlation matrix to a covariance matrix in-place."""
# Multiply each element of corr_XY by the corresponding standard deviations
corr_XY *= stds_X[:, np.newaxis]
corr_XY *= stds_Y[np.newaxis, :]
return corr_XY

def assimilate(
Expand All @@ -174,6 +178,7 @@ def assimilate(
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
D: npt.NDArray[np.double],
overwrite: bool = False,
alpha: float,
correlation_threshold: Union[Callable[[int], float], float, None] = None,
cov_YY: Optional[npt.NDArray[np.double]] = None,
Expand Down Expand Up @@ -207,6 +212,10 @@ def assimilate(
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT
enforced in this method. The user/caller is responsible for this.
overwrite: bool
If True, X will be overwritten and mutated.
If False, the method will not mutate inputs in any way.
Setting this to True saves memory.
correlation_threshold : callable or float or None
Either a callable with signature f(ensemble_size) -> float, or a
float in the range [0, 1]. Entries in the covariance matrix that
Expand Down Expand Up @@ -239,6 +248,10 @@ def assimilate(
"`correlation_threshold` must be a callable or a float in [0, 1]"
)

# Do not overwrite input arguments
if not overwrite:
X = np.copy(X)

# Create `correlation_threshold` if the argument is a float
if is_float:
corr_threshold: float = correlation_threshold # type: ignore
Expand All @@ -261,25 +274,27 @@ def correlation_threshold(ensemble_size: int) -> float:
# X by parameter group (rows), the storage requirement decreases
cov_XY = empirical_cross_covariance(X, Y)
assert cov_XY.shape == (X.shape[0], Y.shape[0])
corr_XY = self._correlation_matrix(cov_XY, X, Y)
assert corr_XY.shape == cov_XY.shape

stds_X = np.std(X, axis=1, ddof=1)
stds_Y = np.std(Y, axis=1, ddof=1)

corr_XY = self._cov_to_corr_inplace(cov_XY, stds_X, stds_Y)

# Determine which elements in the cross covariance matrix that will
# be set to zero
threshold = correlation_threshold(X.shape[1])
significant_corr_XY = np.abs(corr_XY) > threshold

cov_XY = self._corr_to_cov_inplace(corr_XY, stds_X, stds_Y)

# Pre-compute the covariance cov(Y, Y) here, and index on it later
if cov_YY is None:
cov_YY = empirical_cross_covariance(Y, Y)
else:
assert cov_YY.ndim == 2, "'cov_YY' must be a 2D array"
assert cov_YY.shape == (Y.shape[0], Y.shape[0])

# TODO: memory could be saved by overwriting the input X
X_out: npt.NDArray[np.double] = np.copy(X)
for (unique_row, indices_of_row) in groupby_indices(significant_corr_XY):

for unique_row, indices_of_row in groupby_indices(significant_corr_XY):
if verbose:
print(
f" Assimilating {len(indices_of_row)} parameters"
Expand All @@ -294,8 +309,6 @@ def correlation_threshold(ensemble_size: int) -> float:
if np.all(~unique_row):
continue

# Get the parameters (X) that have identical significant responses (Y)
X_subset = X[indices_of_row, :]
Y_subset = Y[unique_row, :]

# Compute the masked arrays for these variables
Expand All @@ -320,9 +333,9 @@ def correlation_threshold(ensemble_size: int) -> float:
Y=Y_subset,
cov_YY=cov_YY_subset, # Passing cov(Y, Y) avoids re-computation
)
X_out[indices_of_row, :] = X_subset + cov_XY_subset @ T
X[indices_of_row, :] += cov_XY_subset @ T

return X_out
return X


class RowScaling:
Expand Down
36 changes: 14 additions & 22 deletions tests/test_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

@pytest.mark.parametrize("seed", range(25))
def test_groupby_indices(seed):

rng = np.random.default_rng(seed)
rows = rng.integers(10, 100)
columns = rng.integers(2, 9)
Expand All @@ -39,8 +38,7 @@ def test_groupby_indices(seed):
assert intersection_idx == set()

# Verify each entry in the groups
for (unique_row, indices_of_row) in groups:

for unique_row, indices_of_row in groups:
# Repeat this unique row the number of times it occurs in X
repeated = np.repeat(
unique_row[np.newaxis, :], repeats=len(indices_of_row), axis=0
Expand All @@ -50,7 +48,6 @@ def test_groupby_indices(seed):

@pytest.fixture()
def linear_problem(request):

# Seed the problem using indirect parametrization:
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
rng = np.random.default_rng(request.param)
Expand Down Expand Up @@ -86,7 +83,6 @@ class TestAdaptiveESMDA:
def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
self, linear_problem
):

# Create a problem with g(x) = A @ x
X, g, observations, covariance, _ = linear_problem

Expand All @@ -98,7 +94,6 @@ def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
X_i = np.copy(X)
alpha = normalize_alpha(np.ones(5))
for alpha_i in alpha:

# Run forward model
Y_i = g(X_i)

Expand Down Expand Up @@ -127,7 +122,6 @@ def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
self, linear_problem
):

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem

Expand All @@ -141,8 +135,7 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

X_i = np.copy(X)
for i, alpha_i in enumerate(alpha, 1):

for _, alpha_i in enumerate(alpha, 1):
# Run forward model
Y_i = g(X_i)

Expand All @@ -152,12 +145,13 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

# Update the relevant parameters and write to X
X_i = smoother.assimilate(
smoother.assimilate(
X=X_i,
Y=Y_i,
D=D_i,
overwrite=True,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: 0,
correlation_threshold=0,
)

# =============================================================================
Expand All @@ -171,7 +165,7 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

X_i2 = np.copy(X)
for i in range(smoother.num_assimilations()):
for _ in range(smoother.num_assimilations()):
X_i2 = smoother.assimilate(X_i2, g(X_i2))

assert np.allclose(X_i, X_i2)
Expand All @@ -194,7 +188,7 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
members decrease, this test starts to fail more often."""

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem
X, g, observations, covariance, _ = linear_problem
if full_covariance:
covariance = np.diag(covariance)

Expand All @@ -208,8 +202,7 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
)

X_i = np.copy(X)
for i, alpha_i in enumerate(alpha, 1):

for _, alpha_i in enumerate(alpha, 1):
# Run forward model
Y_i = g(X_i)

Expand All @@ -226,14 +219,14 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
Y=Y_i,
D=D_i,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: cutoff_low,
correlation_threshold=cutoff_low,
)
X_i_high_cutoff = smoother.assimilate(
X=X_i,
Y=Y_i,
D=D_i,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: cutoff_high,
correlation_threshold=cutoff_high,
)

# Compute covariances
Expand Down Expand Up @@ -410,8 +403,8 @@ def test_that_cov_YY_can_be_computed_outside_of_assimilate(
"""

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem
num_parameters, ensemble_size = X.shape
X, g, observations, covariance, _ = linear_problem
_, ensemble_size = X.shape

alpha = normalize_alpha(np.array([5, 4, 3, 2, 1])) # Vector of inflation values
smoother = AdaptiveESMDA(
Expand Down Expand Up @@ -535,7 +528,7 @@ def multiply(self, X, K):
row_groups = [tuple(range(17)), tuple(range(17, 100))]
# When alpha=1 in row scaling, we perform a full update
X_with_row_scaling = [
(X[idx, :], RowScaling(alpha=1)) for i, idx in enumerate(row_groups)
(X[idx, :], RowScaling(alpha=1)) for _, idx in enumerate(row_groups)
]

# Perform an update using row scaling
Expand All @@ -556,7 +549,7 @@ def multiply(self, X, K):
inversion=inversion,
seed=1,
)
for iteration in range(smoother.num_assimilations()):
for _ in range(smoother.num_assimilations()):
X_posterior = smoother.assimilate(X, Y)

# The result should be the same
Expand All @@ -566,7 +559,6 @@ def multiply(self, X, K):


if __name__ == "__main__":

pytest.main(
args=[
__file__,
Expand Down

0 comments on commit 1ef022d

Please sign in to comment.