diff --git a/src/iterative_ensemble_smoother/experimental.py b/src/iterative_ensemble_smoother/experimental.py index 6005eb01..e25504f5 100644 --- a/src/iterative_ensemble_smoother/experimental.py +++ b/src/iterative_ensemble_smoother/experimental.py @@ -15,17 +15,6 @@ ) -def correlation_threshold(ensemble_size: int) -> float: - """Decides whether or not to use user-defined or default threshold. - - Default threshold taken from luo2022, - Continuous Hyper-parameter OPtimization (CHOP) in an ensemble Kalman filter - Section 2.3 - Localization in the CHOP problem - """ - # return 3 / np.sqrt(ensemble_size) - return -1 - - def compute_cross_covariance_multiplier( *, alpha: float, @@ -33,21 +22,29 @@ def compute_cross_covariance_multiplier( D: npt.NDArray[np.double], Y: npt.NDArray[np.double], ) -> npt.NDArray[np.double]: - """The update equation is: + """Compute transition matrix K such that: + + X + cov_XY @ transition_matrix + + In the notation of Emerick et al, recall that the update equation is: X_posterior = X_prior + cov_XY @ inv(C_DD + alpha * C_D) @ (D - Y) - This function computes a matrix K such that + This function computes a transition matrix K, defined as: K := inv(C_DD + alpha * C_D) @ (D - Y) Note that this is not the most efficient way to compute the update equation, since cov_XY := center(X) @ center(Y) / (N - 1), and we can avoid creating - this (huge) matrix by performing multiplications left-to-right, i.e., + this (huge) covariance matrix by performing multiplications right-to-left: cov_XY @ inv(C_DD + alpha * C_D) @ (D - Y) center(X) @ center(Y) / (N - 1) @ inv(C_DD + alpha * C_D) @ (D - Y) center(X) @ [center(Y) / (N - 1) @ inv(C_DD + alpha * C_D) @ (D - Y)] + + The advantage of forming K instead is that if we already have computed + cov_XY, then we can apply the same K to a reduced number of rows (parameters) + in cov_XY. """ C_DD = empirical_covariance_upper(Y) # Only compute upper part @@ -60,18 +57,33 @@ def compute_cross_covariance_multiplier( } # Compute K := sp.linalg.inv(C_DD + alpha * C_D) @ (D - Y) + # by solving the system (C_DD + alpha * C_D) @ K = (D - Y) if C_D.ndim == 2: # C_D is a covariance matrix C_DD += alpha * C_D # Save memory by mutating elif C_D.ndim == 1: # C_D is an array, so add it to the diagonal without forming diag(C_D) - C_DD.flat[:: C_DD.shape[1] + 1] += alpha * C_D + np.fill_diagonal(C_DD, C_DD.diagonal() + alpha * C_D) return sp.linalg.solve(C_DD, D - Y, **solver_kwargs) class AdaptiveESMDA(ESMDA): + def correlation_threshold(self, ensemble_size: int) -> float: + """Decides whether or not to use user-defined or default threshold. + + Default threshold taken from luo2022, + Continuous Hyper-parameter OPtimization (CHOP) in an ensemble Kalman filter + Section 2.3 - Localization in the CHOP problem + """ + # return 3 / np.sqrt(ensemble_size) + return -1e10 + def adaptive_transition_matrix(self, X, Y, D, alpha=1): + """Compute a transition matrix K, such that: + + X_posterior = X_prior + cov_XY @ K + """ assert X.shape[1] == Y.shape[1] assert D.shape == Y.shape @@ -79,42 +91,54 @@ def adaptive_transition_matrix(self, X, Y, D, alpha=1): return compute_cross_covariance_multiplier(alpha=alpha, C_D=self.C_D, D=D, Y=Y) def adaptive_assimilate(self, X, Y, transition_matrix): - assert X.shape[1] == Y.shape[1] + """Use X, or possibly a subset of variables (rows) in X, as well as Y, + to compute the cross covariance matrix cov_XY. + Then compute the update as: + + X + cov_XY @ transition_matrix + """ + assert X.shape[1] == Y.shape[1] == transition_matrix.shape[1] + assert Y.shape[0] == transition_matrix.shape[0] # Step 1: # Compute cross-correlation between parameters X and responses Y # Note: let the number of parameters be n and the number of responses be m. # This step requires both O(mn) computation and O(mn) storage, which is # larger than the O(n + m^2) computation used in ESMDA, which never explicitly - # forms the cross-covariance matrix between X and Y + # forms the cross-covariance matrix between X and Y. However, if we batch + # X by parameter group (rows), the storage requirement decreases cov_XY = empirical_cross_covariance(X, Y) + assert cov_XY.shape == (X.shape[0], Y.shape[0]) stds_Y = np.std(Y, axis=1, ddof=1) stds_X = np.std(X, axis=1, ddof=1) + + # Compute the correlation matrix from the covariance matrix corr_XY = (cov_XY / stds_X[:, np.newaxis]) / stds_Y[np.newaxis, :] assert corr_XY.max() <= 1 + assert corr_XY.min() >= -1 - # These will be set to zero - thres = correlation_threshold(ensemble_size=X.shape[1]) + # Determine which elements in the cross covariance matrix that will + # be set to zero + thres = self.correlation_threshold(ensemble_size=X.shape[1]) cov_XY[corr_XY < thres] = 0 # Set small values to zero - print("Number of entries in cov_XY set to zero", np.isclose(cov_XY, 0).sum()) + # print(" Entries in cov_XY set to zero", np.isclose(cov_XY, 0).sum()) return X + cov_XY @ transition_matrix if __name__ == "__main__": - import time # ============================================================================= # CREATE A PROBLEM - LINEAR REGRESSION # ============================================================================= - # Create a problem + # Create a problem with g(x) = A @ x rng = np.random.default_rng(42) - num_parameters = 100 - num_observations = 1000 + num_parameters = 10000 + num_observations = 100 num_ensemble = 25 - A = np.exp(np.random.randn(num_observations, num_parameters)) + A = np.exp(rng.standard_normal(size=(num_observations, num_parameters))) def g(X): """Forward model.""" @@ -128,19 +152,19 @@ def g(X): X = rng.normal(size=(num_parameters, num_ensemble)) covariance = rng.triangular(0.1, 1, 1, size=num_observations) - # Split the parameters into two groups - simulating retrieving from storage - split_index = rng.choice(range(num_parameters)) - parameters_groups = [ - tuple(range(split_index)), - tuple(range(split_index, num_parameters)), - ] + # Split the parameters into two groups of equal size + num_groups = 10 + assert num_observations % num_groups == 0, "Num groups must divide parameters" + group_size = num_parameters // num_groups + parameters_groups = list(zip(*(iter(range(num_parameters)),) * group_size)) + assert len(parameters_groups) == num_groups # ============================================================================= # SETUP ESMDA FOR LOCALIZATION AND SOLVE PROBLEM # ============================================================================= start_time = time.perf_counter() smoother = AdaptiveESMDA( - covariance=covariance, observations=observations, alpha=10, seed=1 + covariance=covariance, observations=observations, alpha=5, seed=1 ) # Simulate realization that die @@ -152,6 +176,7 @@ def g(X): for i, alpha_i in enumerate(smoother.alpha, 1): print(f"ESMDA iteration {i} with alpha_i={alpha_i}") + # Run forward model Y_i = g(X_i) # We simulate loss of realizations due to compute clusters going down. @@ -171,12 +196,16 @@ def g(X): ) # Loop over parameter groups and update - for parameter_mask_j in parameters_groups: - print(" Updating parameter group.") + for j, parameter_mask_j in enumerate(parameters_groups, 1): + print(f" Updating parameter group {j}/{len(parameters_groups)}") - # Update the relevant parameters - X_i[np.ix_(parameter_mask_j, alive_mask_i)] = smoother.adaptive_assimilate( - X_i[np.ix_(parameter_mask_j, alive_mask_i)], + # Mask out rows in this parameter group, and columns of realization + # that are still alive. This step simulates fetching from storage. + mask = np.ix_(parameter_mask_j, alive_mask_i) + + # Update the relevant parameters and write to X (storage) + X_i[mask] = smoother.adaptive_assimilate( + X_i[mask], Y_i[:, alive_mask_i], transition_matrix, ) @@ -197,7 +226,6 @@ def g(X): X_i2 = np.copy(X) for i in range(smoother.num_assimilations()): - # Run simulations Y_i = g(X_i2) @@ -210,8 +238,12 @@ def g(X): X_i2[:, alive_mask_i], Y_i[:, alive_mask_i] ) - # For this test to pass, correlation_threshold() should be be <= 0 - assert np.allclose(X_i, X_i2) + # For this test to pass, correlation_threshold() should return <= 0 + print( + "Norm difference between ESMDA with and without localization:", + np.linalg.norm(X_i - X_i2), + ) + assert np.allclose(X_i, X_i2, atol=1e-4) print(f"ESMDA without localization - Ran in {time.perf_counter() - start_time} s") @@ -298,7 +330,6 @@ def ensemble_smoother_update_step_row_scaling( # Loop over groups of rows (parameters) for X, row_scale in X_with_row_scaling: - # In the C++ code, multiply() will transform the transition matrix F as # F_new = F * alpha + I * (1 - alpha) # but the transition matrix F that we pass below is F := K + I, so: