Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use in-place operations for cross-correlations #206

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/Adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
# [Kullback–Leibler divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence),
# but we do not pursue this here.


# %%
def get_prior(num_parameters, num_ensemble, prior_std):
"""Sample prior from N(0, prior_std)."""
Expand Down Expand Up @@ -289,6 +290,7 @@ def g(X):
# - The draw of prior realizations $X \sim N(0, \sigma)$.
# - The noise (given by the `covariance` argument) that ESMDA adds to the observations.


# %%
def corr_true(array):
return sp.stats.pearsonr(x_true, array).statistic
Expand Down
1 change: 0 additions & 1 deletion docs/source/Polynomial.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def poly(a, b, c, x):
)
n_ies_iter = 7
for i in range(n_ies_iter):

step_length = steplength_exponential(i + 1)
X_IES_ert = smoother_ies.sies_iteration(Y_IES_ert, step_length=step_length)

Expand Down
71 changes: 42 additions & 29 deletions src/iterative_ensemble_smoother/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,37 @@ def compute_cross_covariance_multiplier(

return sp.linalg.solve(C_DD, D - Y, **solver_kwargs) # type: ignore

def _correlation_matrix(
def _cov_to_corr_inplace(
self,
cov_XY: npt.NDArray[np.double],
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
) -> npt.NDArray[np.double]:
"""Compute a correlation matrix given a covariance matrix."""
assert cov_XY.shape == (X.shape[0], Y.shape[0])
stds_X: npt.NDArray[np.double],
stds_Y: npt.NDArray[np.double],
) -> None:
"""Convert a covariance matrix to a correlation matrix in-place."""

stds_Y = np.std(Y, axis=1, ddof=1)
stds_X = np.std(X, axis=1, ddof=1)

# Compute the correlation matrix from the covariance matrix
corr_XY: npt.NDArray[np.double] = (cov_XY / stds_X[:, np.newaxis]) / stds_Y[
np.newaxis, :
]
# Divide each element of cov_XY by the corresponding standard deviations
cov_XY /= stds_X[:, np.newaxis]
cov_XY /= stds_Y[np.newaxis, :]

# Perform checks. There appears to be occasional numerical issues in
# the equation. With 2 ensemble members, we get e.g. a max value of
# 1.0000000000016778. We allow some leeway and clip the results.
# Perform checks and clip values to [-1, 1]
eps = 1e-8
if not ((corr_XY.max() <= 1 + eps) and (corr_XY.min() >= -1 - eps)):
if not ((cov_XY.max() <= 1 + eps) and (cov_XY.min() >= -1 - eps)):
msg = "Cross-correlation matrix has entries not in [-1, 1]."
msg += f"The min and max values are: {corr_XY.min()} and {corr_XY.max()}"
msg += f"The min and max values are: {cov_XY.min()} and {cov_XY.max()}"
warnings.warn(msg)

corr_XY = np.clip(corr_XY, a_min=-1, a_max=1)
return np.clip(cov_XY, a_min=-1, a_max=1, out=cov_XY)

def _corr_to_cov_inplace(
self,
corr_XY: npt.NDArray[np.double],
stds_X: npt.NDArray[np.double],
stds_Y: npt.NDArray[np.double],
) -> None:
"""Convert a correlation matrix to a covariance matrix in-place."""
# Multiply each element of corr_XY by the corresponding standard deviations
corr_XY *= stds_X[:, np.newaxis]
corr_XY *= stds_Y[np.newaxis, :]
return corr_XY

def assimilate(
Expand All @@ -174,6 +178,7 @@ def assimilate(
X: npt.NDArray[np.double],
Y: npt.NDArray[np.double],
D: npt.NDArray[np.double],
overwrite: bool = False,
alpha: float,
correlation_threshold: Union[Callable[[int], float], float, None] = None,
cov_YY: Optional[npt.NDArray[np.double]] = None,
Expand Down Expand Up @@ -207,6 +212,10 @@ def assimilate(
The covariance inflation factor. The sequence of alphas should
obey the equation sum_i (1/alpha_i) = 1. However, this is NOT
enforced in this method. The user/caller is responsible for this.
overwrite: bool
If True, X will be overwritten and mutated.
If False, the method will not mutate inputs in any way.
Setting this to True saves memory.
correlation_threshold : callable or float or None
Either a callable with signature f(ensemble_size) -> float, or a
float in the range [0, 1]. Entries in the covariance matrix that
Expand Down Expand Up @@ -239,6 +248,10 @@ def assimilate(
"`correlation_threshold` must be a callable or a float in [0, 1]"
)

# Do not overwrite input arguments
if not overwrite:
X = np.copy(X)

# Create `correlation_threshold` if the argument is a float
if is_float:
corr_threshold: float = correlation_threshold # type: ignore
Expand All @@ -261,25 +274,27 @@ def correlation_threshold(ensemble_size: int) -> float:
# X by parameter group (rows), the storage requirement decreases
cov_XY = empirical_cross_covariance(X, Y)
assert cov_XY.shape == (X.shape[0], Y.shape[0])
corr_XY = self._correlation_matrix(cov_XY, X, Y)
assert corr_XY.shape == cov_XY.shape

stds_X = np.std(X, axis=1, ddof=1)
stds_Y = np.std(Y, axis=1, ddof=1)

corr_XY = self._cov_to_corr_inplace(cov_XY, stds_X, stds_Y)

# Determine which elements in the cross covariance matrix that will
# be set to zero
threshold = correlation_threshold(X.shape[1])
significant_corr_XY = np.abs(corr_XY) > threshold

cov_XY = self._corr_to_cov_inplace(corr_XY, stds_X, stds_Y)

# Pre-compute the covariance cov(Y, Y) here, and index on it later
if cov_YY is None:
cov_YY = empirical_cross_covariance(Y, Y)
else:
assert cov_YY.ndim == 2, "'cov_YY' must be a 2D array"
assert cov_YY.shape == (Y.shape[0], Y.shape[0])

# TODO: memory could be saved by overwriting the input X
X_out: npt.NDArray[np.double] = np.copy(X)
for (unique_row, indices_of_row) in groupby_indices(significant_corr_XY):

for unique_row, indices_of_row in groupby_indices(significant_corr_XY):
if verbose:
print(
f" Assimilating {len(indices_of_row)} parameters"
Expand All @@ -294,8 +309,6 @@ def correlation_threshold(ensemble_size: int) -> float:
if np.all(~unique_row):
continue

# Get the parameters (X) that have identical significant responses (Y)
X_subset = X[indices_of_row, :]
Y_subset = Y[unique_row, :]

# Compute the masked arrays for these variables
Expand All @@ -320,9 +333,9 @@ def correlation_threshold(ensemble_size: int) -> float:
Y=Y_subset,
cov_YY=cov_YY_subset, # Passing cov(Y, Y) avoids re-computation
)
X_out[indices_of_row, :] = X_subset + cov_XY_subset @ T
X[indices_of_row, :] += cov_XY_subset @ T

return X_out
return X


class RowScaling:
Expand Down
36 changes: 14 additions & 22 deletions tests/test_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

@pytest.mark.parametrize("seed", range(25))
def test_groupby_indices(seed):

rng = np.random.default_rng(seed)
rows = rng.integers(10, 100)
columns = rng.integers(2, 9)
Expand All @@ -39,8 +38,7 @@ def test_groupby_indices(seed):
assert intersection_idx == set()

# Verify each entry in the groups
for (unique_row, indices_of_row) in groups:

for unique_row, indices_of_row in groups:
# Repeat this unique row the number of times it occurs in X
repeated = np.repeat(
unique_row[np.newaxis, :], repeats=len(indices_of_row), axis=0
Expand All @@ -50,7 +48,6 @@ def test_groupby_indices(seed):

@pytest.fixture()
def linear_problem(request):

# Seed the problem using indirect parametrization:
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
rng = np.random.default_rng(request.param)
Expand Down Expand Up @@ -86,7 +83,6 @@ class TestAdaptiveESMDA:
def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
self, linear_problem
):

# Create a problem with g(x) = A @ x
X, g, observations, covariance, _ = linear_problem

Expand All @@ -98,7 +94,6 @@ def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
X_i = np.copy(X)
alpha = normalize_alpha(np.ones(5))
for alpha_i in alpha:

# Run forward model
Y_i = g(X_i)

Expand Down Expand Up @@ -127,7 +122,6 @@ def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(
def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
self, linear_problem
):

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem

Expand All @@ -141,8 +135,7 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

X_i = np.copy(X)
for i, alpha_i in enumerate(alpha, 1):

for _, alpha_i in enumerate(alpha, 1):
# Run forward model
Y_i = g(X_i)

Expand All @@ -152,12 +145,13 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

# Update the relevant parameters and write to X
X_i = smoother.assimilate(
smoother.assimilate(
X=X_i,
Y=Y_i,
D=D_i,
overwrite=True,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: 0,
correlation_threshold=0,
)

# =============================================================================
Expand All @@ -171,7 +165,7 @@ def test_that_adaptive_localization_with_cutoff_0_equals_standard_ESMDA_update(
)

X_i2 = np.copy(X)
for i in range(smoother.num_assimilations()):
for _ in range(smoother.num_assimilations()):
X_i2 = smoother.assimilate(X_i2, g(X_i2))

assert np.allclose(X_i, X_i2)
Expand All @@ -194,7 +188,7 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
members decrease, this test starts to fail more often."""

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem
X, g, observations, covariance, _ = linear_problem
if full_covariance:
covariance = np.diag(covariance)

Expand All @@ -208,8 +202,7 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
)

X_i = np.copy(X)
for i, alpha_i in enumerate(alpha, 1):

for _, alpha_i in enumerate(alpha, 1):
# Run forward model
Y_i = g(X_i)

Expand All @@ -226,14 +219,14 @@ def test_that_posterior_generalized_variance_increases_in_cutoff(
Y=Y_i,
D=D_i,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: cutoff_low,
correlation_threshold=cutoff_low,
)
X_i_high_cutoff = smoother.assimilate(
X=X_i,
Y=Y_i,
D=D_i,
alpha=alpha_i,
correlation_threshold=lambda ensemble_size: cutoff_high,
correlation_threshold=cutoff_high,
)

# Compute covariances
Expand Down Expand Up @@ -410,8 +403,8 @@ def test_that_cov_YY_can_be_computed_outside_of_assimilate(
"""

# Create a problem with g(x) = A @ x
X, g, observations, covariance, rng = linear_problem
num_parameters, ensemble_size = X.shape
X, g, observations, covariance, _ = linear_problem
_, ensemble_size = X.shape

alpha = normalize_alpha(np.array([5, 4, 3, 2, 1])) # Vector of inflation values
smoother = AdaptiveESMDA(
Expand Down Expand Up @@ -535,7 +528,7 @@ def multiply(self, X, K):
row_groups = [tuple(range(17)), tuple(range(17, 100))]
# When alpha=1 in row scaling, we perform a full update
X_with_row_scaling = [
(X[idx, :], RowScaling(alpha=1)) for i, idx in enumerate(row_groups)
(X[idx, :], RowScaling(alpha=1)) for _, idx in enumerate(row_groups)
]

# Perform an update using row scaling
Expand All @@ -556,7 +549,7 @@ def multiply(self, X, K):
inversion=inversion,
seed=1,
)
for iteration in range(smoother.num_assimilations()):
for _ in range(smoother.num_assimilations()):
X_posterior = smoother.assimilate(X, Y)

# The result should be the same
Expand All @@ -566,7 +559,6 @@ def multiply(self, X, K):


if __name__ == "__main__":

pytest.main(
args=[
__file__,
Expand Down