diff --git a/src/Accelerators.jl b/src/Accelerators.jl index fea306743..78ce9021b 100644 --- a/src/Accelerators.jl +++ b/src/Accelerators.jl @@ -63,32 +63,3 @@ function update_state!( ## push "v" state to EKP object push!(ekp.u, DataContainer(v, data_are_columns = true)) end - - -""" -State update method for UKI with no acceleration. -The Accelerator framework has not yet been integrated with UKI process; -UKI tracks its own states, so this method is empty. -""" -function update_state!( - ekp::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, - u::MA, -) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} - -end - -""" -Placeholder state update method for UKI with Nesterov Accelerator. -The Accelerator framework has not yet been integrated with UKI process, so this -method throws an error. -""" -function update_state!( - ekp::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, - u::MA, -) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} - throw( - ArgumentError( - "option `accelerator = NesterovAccelerator` is not implemented for UKI, please use `DefaultAccelerator`", - ), - ) -end diff --git a/src/EnsembleKalmanProcess.jl b/src/EnsembleKalmanProcess.jl index 803cf62f0..f5d1506da 100644 --- a/src/EnsembleKalmanProcess.jl +++ b/src/EnsembleKalmanProcess.jl @@ -696,11 +696,13 @@ include("SparseEnsembleKalmanInversion.jl") export Sampler include("EnsembleKalmanSampler.jl") +# struct Accelerator +# [Must go before Unscented include] +include("Accelerators.jl") + + # struct Unscented export Unscented export Gaussian_2d export construct_initial_ensemble, construct_mean, construct_cov include("UnscentedKalmanInversion.jl") - -# struct Accelerator -include("Accelerators.jl") diff --git a/src/UnscentedKalmanInversion.jl b/src/UnscentedKalmanInversion.jl index 99483822d..25b51d012 100644 --- a/src/UnscentedKalmanInversion.jl +++ b/src/UnscentedKalmanInversion.jl @@ -226,8 +226,12 @@ function EnsembleKalmanProcess( process::Unscented{FT, IT}; kwargs..., ) where {FT <: AbstractFloat, IT <: Int} + + u_mean = process.u_mean[end] + uu_cov = process.uu_cov[end] + # use the distribution stored in process to generate initial ensemble - init_params = update_ensemble_prediction!(process, 0.0) + init_params = update_ensemble_prediction!(process, u_mean, uu_cov, 0.0) return EnsembleKalmanProcess(init_params, obs_mean, obs_noise_cov, process; kwargs...) end @@ -235,10 +239,10 @@ end function FailureHandler(process::Unscented, method::IgnoreFailures) function failsafe_update(uki, u, g, failed_ens) #perform analysis on the model runs - update_ensemble_analysis!(uki, u, g) + u_mean, uu_cov = update_ensemble_analysis!(uki, u, g) #perform new prediction output to model parameters u_p - u = update_ensemble_prediction!(uki.process, uki.Δt[end]) - return u + u_p = update_ensemble_prediction!(uki.process, u_mean, uu_cov, uki.Δt[end]) + return u_p, u_mean, uu_cov end return FailureHandler{Unscented, IgnoreFailures}(failsafe_update) end @@ -290,19 +294,19 @@ function FailureHandler(process::Unscented, method::SampleSuccGauss) ########### Save results push!(uki.process.obs_pred, g_mean) # N_ens x N_data - push!(uki.process.u_mean, u_mean) # N_ens x N_params - push!(uki.process.uu_cov, uu_cov) # N_ens x N_data push!(uki.g, DataContainer(g, data_are_columns = true)) compute_error!(uki) + + return u_mean, uu_cov end function failsafe_update(uki, u, g, failed_ens) #perform analysis on the model runs - succ_gauss_analysis!(uki, u, g, failed_ens) + u_mean, uu_cov = succ_gauss_analysis!(uki, u, g, failed_ens) #perform new prediction output to model parameters u_p - u = update_ensemble_prediction!(uki.process, uki.Δt[end]) - return u + u = update_ensemble_prediction!(uki.process, u_mean, uu_cov, uki.Δt[end]) + return u, u_mean, uu_cov end return FailureHandler{Unscented, SampleSuccGauss}(failsafe_update) end @@ -355,6 +359,47 @@ function construct_sigma_ensemble( return x end +""" +State update method for UKI with no acceleration. Here it simply saves it. +""" +function update_state!( + uki::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, + u_tuple::TT, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, TT <: Tuple} + + (u, u_mean, uu_cov) = u_tuple + + ## push state to UKI object + push!(uki.u, DataContainer(u, data_are_columns = true)) + push!(uki.process.u_mean, u_mean) # N_ens x N_params + push!(uki.process.uu_cov, uu_cov) # N_ens x N_data +end + +""" +State update method for UKI with Nesterov Accelerator. +Performs identical update as with other methods, but requires reconstruction of mean and covariance of the accelerated positions prior to saving. +""" +function update_state!( + uki::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, + u_tuple::TT, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, TT <: Tuple} + + (u, u_mean, uu_cov) = u_tuple + + #identical update stage as before + ## update "v" state: + k = get_N_iterations(uki) + 2 + v = u .+ (1 - uki.accelerator.r / k) * (u .- uki.accelerator.u_prev) + + ## update "u" state: + uki.accelerator.u_prev = u + + ## push "v" state to UKI object + push!(uki.u, DataContainer(v, data_are_columns = true)) + push!(uki.process.u_mean, construct_mean(uki, v)) # N_ens x N_params + push!(uki.process.uu_cov, construct_cov(uki, v)) # N_ens x N_data + +end """ construct_mean( @@ -550,7 +595,12 @@ end UKI prediction step : generate sigma points. """ -function update_ensemble_prediction!(process::Unscented, Δt::FT) where {FT <: AbstractFloat} +function update_ensemble_prediction!( + process::Unscented, + u_mean::AV, + uu_cov::AM, + Δt::FT, +) where {FT <: AbstractFloat, AV <: AbstractVector, AM <: AbstractMatrix} process.iter += 1 # update evolution covariance matrix @@ -558,14 +608,14 @@ function update_ensemble_prediction!(process::Unscented, Δt::FT) where {FT <: A process.Σ_ω = (2 - process.α_reg^2) * process.uu_cov[end] end - u_mean = process.u_mean[end] - uu_cov = process.uu_cov[end] + # u_mean = process.u_mean[end] + # uu_cov = process.uu_cov[end] α_reg = process.α_reg r = process.r Σ_ω = process.Σ_ω - N_par = length(process.u_mean[1]) + N_par = length(u_mean[1]) ############# Prediction step: u_p_mean = α_reg * u_mean + (1 - α_reg) * r @@ -621,12 +671,11 @@ function update_ensemble_analysis!( ########### Save results push!(uki.process.obs_pred, g_mean) # N_ens x N_data - push!(uki.process.u_mean, u_mean) # N_ens x N_params - push!(uki.process.uu_cov, uu_cov) # N_ens x N_data push!(uki.g, DataContainer(g, data_are_columns = true)) compute_error!(uki) + return u_mean, uu_cov end """ @@ -675,16 +724,14 @@ function update_ensemble!( @info "$(length(failed_ens)) particle failure(s) detected. Handler used: $(nameof(typeof(fh).parameters[2]))." end - u_p = fh.failsafe_update(uki, u_p_old, g_in, failed_ens) + u_p, u_mean, uu_cov = fh.failsafe_update(uki, u_p_old, g_in, failed_ens) - push!(uki.u, DataContainer(u_p, data_are_columns = true)) if uki.verbose - cov_new = get_u_cov_final(uki) - @info "Covariance-weighted error: $(get_error(uki)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" + @info "Covariance-weighted error: $(get_error(uki)[end])\nCovariance trace: $(tr(uu_cov))\nCovariance trace ratio (current/previous): $(tr(uu_cov)/tr(cov_init))" end - return u_p + return u_p, u_mean, uu_cov end """ diff --git a/test/EnsembleKalmanProcess/runtests.jl b/test/EnsembleKalmanProcess/runtests.jl index 519db3961..28af52be0 100644 --- a/test/EnsembleKalmanProcess/runtests.jl +++ b/test/EnsembleKalmanProcess/runtests.jl @@ -81,8 +81,9 @@ end @testset "Accelerators" begin # Get an inverse problem - y_obs, G, Γy, _ = inv_problems[end - 1] # additive noise inv problem + y_obs, G, Γy, _ = inv_problems[end - 2] # additive noise inv problem (deterministic map) rng = Random.MersenneTwister(rng_seed) + N_ens = 5 initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) # build accelerated and non-accelerated processes @@ -104,36 +105,48 @@ end @test typeof(eksobj_noacc_specified.accelerator) <: DefaultAccelerator ## test NesterovAccelerators satisfy desired ICs - @test ekiobj.accelerator.r == 3.0 + @test ekiobj.accelerator.r ≈ 3.0 @test ekiobj.accelerator.u_prev == initial_ensemble - @test eksobj.accelerator.r == 3.0 + @test eksobj.accelerator.r ≈ 3.0 @test eksobj.accelerator.u_prev == initial_ensemble ## test method convergence # Note: this test only requires that the final ensemble is an improvement on the initial ensemble, # NOT that the accelerated processes are more effective than the default, as this is not guaranteed. # Specific cost values are printed to give an idea of acceleration. - processes = [Inversion(), Sampler(prior), TransformInversion(inv(Γy))] - T_end = 1 # (this could fail a test if N_iters is not enough to reach T_end) + processes = [Inversion(), TransformInversion(inv(Γy)), Unscented(prior; impose_prior = true), Sampler(prior)] + for process in processes accelerators = [DefaultAccelerator(), NesterovAccelerator()] - N_iters = [30, 30, 30] + N_iters = [5, 5, 5, 5] init_means = [] final_means = [] - + scheduler = DefaultScheduler(0.1)# DataMisfitController() for (accelerator, N_iter) in zip(accelerators, N_iters) - println("Accelerator: ", nameof(typeof(accelerator)), " Process: ", nameof(typeof(process))) + process_copy = deepcopy(process) + scheduler_copy = deepcopy(scheduler) + println("Accelerator: ", nameof(typeof(accelerator)), " Process: ", nameof(typeof(process_copy))) if !(nameof(typeof(process)) == Symbol(Unscented)) ekpobj = EKP.EnsembleKalmanProcess( initial_ensemble, y_obs, Γy, - process, + process_copy, + rng = copy(rng), + scheduler = scheduler_copy, + accelerator = accelerator, + verbose = true, + ) + else + ekpobj = EKP.EnsembleKalmanProcess( + y_obs, + Γy, + process_copy, rng = copy(rng), + scheduler = scheduler_copy, accelerator = accelerator, ) end - ## test get_accelerator function in EKP @test ekpobj.accelerator == get_accelerator(ekpobj) @@ -155,7 +168,7 @@ end norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end])))) @info "Convergence:" cost_initial cost_final - if typeof(process) <: Inversion + if typeof(process_copy) <: Inversion u_star = transform_constrained_to_unconstrained(prior, ϕ_star) @test norm( inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end]))), @@ -165,15 +178,7 @@ end end end - ## test that error is thrown when applying acceleration to UKI - ukiobj = EKP.EnsembleKalmanProcess( - y_obs, - Γy, - Unscented(prior; impose_prior = true), - rng = copy(rng), - accelerator = NesterovAccelerator(), - ) - @test_throws ArgumentError EKP.update_ensemble!(ukiobj, zeros(length(y_obs), 5)) + end