From cd0df606bfbd28458f0bbecd46ee88883f38022e Mon Sep 17 00:00:00 2001 From: sydneyvernon Date: Tue, 15 Aug 2023 15:45:36 -0700 Subject: [PATCH] Define Accelerator type Move EKP state update to Accelerator function initial setup of Nesterov momentum fixes for indexing, typos, constructors undo accidental test changes accelerator struct fixes sanity-check comparison on simple problem fixed index shift, added function to set accelerator ICs add exp sin example convergence plots reproduced multi-trial convergence results on exp sin IP fix bug with accelerator setup in default case visualize momentum acceleration in EKI,EKS processes darcy in progress accelerator work and unit tests formatting undo formatting Move EKP state update to Accelerator function initial setup of Nesterov momentum fixes for indexing, typos, constructors undo accidental test changes accelerator struct fixes sanity-check comparison on simple problem fixed index shift, added function to set accelerator ICs add exp sin example convergence plots reproduced multi-trial convergence results on exp sin IP fix bug with accelerator setup in default case visualize momentum acceleration in EKI,EKS processes darcy in progress accelerator work and unit tests ignore UKI, fix tests Delete examples/LearningRateSchedulers/compare_schedulers_accelerated.jl Delete examples/Sinusoid/exp_sin_multi_comparison.pdf Delete examples/Sinusoid/exp_sin.pdf Delete examples/Sinusoid/exp_sin_.pdf Delete examples/Sinusoid/exp_sin_eki.pdf Delete examples/Sinusoid/exp_sin_eks.pdf Delete examples/Sinusoid/exp_sin_multi_comparison_a.pdf Delete examples/Sinusoid/exp_sin_multi_comparison_b.pdf Delete examples/Sinusoid/exp_sin_multi_comparison_c.pdf Delete examples/Sinusoid/exp_sin_narrow.pdf Delete examples/Sinusoid/exp_sin_targeted.pdf Delete examples/Sinusoid/exp_sin_shifted.pdf Delete examples/Sinusoid/exp_sin_wide.pdf Delete examples/Sinusoid/exp_sinusoid_example_accelerated.jl Delete examples/Sinusoid/exp_sinusoid_example_comparison.jl Delete examples/Sinusoid/sinusoid_example_accelerated.jl Delete test/Accelerators directory Accelerator tests will be condensed with EKP tests Delete output/ensembles_acc.pdf Delete output/error_vs_spread_over_iteration_acc.pdf Delete output/error_vs_spread_over_time_acc.pdf Delete exp_sin_.pdf restore original Project.toml Delete examples/Darcy/darcy_accelerated.jl changed test @info printing, formatting Delete output/ensembles.pdf Delete output/error_vs_spread_over_iteration.pdf Delete output/error_vs_spread_over_time.pdf Delete examples/Darcy/output/data_storage.jld2 Delete examples/Darcy/output/parameter_storage.jld2 fix test file include EKTI test code coverage fixes code cleanup Warning for accelerated EKS process --- Project.toml | 2 +- src/Accelerators.jl | 94 ++++++++++++++++++++++++ src/EnsembleKalmanInversion.jl | 6 +- src/EnsembleKalmanProcess.jl | 52 +++++++++++-- src/EnsembleKalmanSampler.jl | 5 +- src/EnsembleTransformKalmanInversion.jl | 5 +- src/SparseEnsembleKalmanInversion.jl | 3 +- test/EnsembleKalmanProcess/runtests.jl | 98 +++++++++++++++++++++++++ 8 files changed, 250 insertions(+), 15 deletions(-) create mode 100644 src/Accelerators.jl diff --git a/Project.toml b/Project.toml index 9d88ea2cc..c3c52e53f 100644 --- a/Project.toml +++ b/Project.toml @@ -40,4 +40,4 @@ StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["StableRNGs", "Test", "Plots"] +test = ["StableRNGs", "Test", "Plots"] \ No newline at end of file diff --git a/src/Accelerators.jl b/src/Accelerators.jl new file mode 100644 index 000000000..fea306743 --- /dev/null +++ b/src/Accelerators.jl @@ -0,0 +1,94 @@ +# included in EnsembleKalmanProcess.jl + +export DefaultAccelerator, NesterovAccelerator +export update_state!, set_initial_acceleration! + +""" +$(TYPEDEF) + +Default accelerator provides no acceleration, runs traditional EKI +""" +struct DefaultAccelerator <: Accelerator end + +""" +$(TYPEDEF) + +Accelerator that adapts Nesterov's momentum method for EKI. +Stores a previous state value u_prev for computational purposes (note this is distinct from state returned as "ensemble value") + +$(TYPEDFIELDS) +""" +mutable struct NesterovAccelerator{FT <: AbstractFloat} <: Accelerator + r::FT + u_prev::Any +end + +function NesterovAccelerator(r = 3.0, initial = Float64[]) + return NesterovAccelerator(r, initial) +end + + +""" +Sets u_prev to the initial parameter values +""" +function set_ICs!(accelerator::NesterovAccelerator{FT}, u::MA) where {FT <: AbstractFloat, MA <: AbstractMatrix{FT}} + accelerator.u_prev = u +end + + +""" +Performs traditional state update with no momentum. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + push!(ekp.u, DataContainer(u, data_are_columns = true)) +end + +""" +Performs state update with modified Nesterov momentum approach. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + ## update "v" state: + k = get_N_iterations(ekp) + 2 + v = u .+ (1 - ekp.accelerator.r / k) * (u .- ekp.accelerator.u_prev) + + ## update "u" state: + ekp.accelerator.u_prev = u + + ## push "v" state to EKP object + push!(ekp.u, DataContainer(v, data_are_columns = true)) +end + + +""" +State update method for UKI with no acceleration. +The Accelerator framework has not yet been integrated with UKI process; +UKI tracks its own states, so this method is empty. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + +end + +""" +Placeholder state update method for UKI with Nesterov Accelerator. +The Accelerator framework has not yet been integrated with UKI process, so this +method throws an error. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + throw( + ArgumentError( + "option `accelerator = NesterovAccelerator` is not implemented for UKI, please use `DefaultAccelerator`", + ), + ) +end diff --git a/src/EnsembleKalmanInversion.jl b/src/EnsembleKalmanInversion.jl index 91da39379..35a986f17 100644 --- a/src/EnsembleKalmanInversion.jl +++ b/src/EnsembleKalmanInversion.jl @@ -138,17 +138,17 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) - # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error compute_error!(ekp) # Diagnostics - cov_new = cov(get_u_final(ekp), dims = 2) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/EnsembleKalmanProcess.jl b/src/EnsembleKalmanProcess.jl index c901b37f3..803cf62f0 100644 --- a/src/EnsembleKalmanProcess.jl +++ b/src/EnsembleKalmanProcess.jl @@ -13,7 +13,7 @@ export get_u, get_g, get_ϕ export get_u_prior, get_u_final, get_g_final, get_ϕ_final export get_N_iterations, get_error, get_cov_blocks export get_u_mean, get_u_cov, get_g_mean, get_ϕ_mean -export get_u_mean_final, get_u_cov_prior, get_u_cov_final, get_g_mean_final, get_ϕ_mean_final +export get_u_mean_final, get_u_cov_prior, get_u_cov_final, get_g_mean_final, get_ϕ_mean_final, get_accelerator export compute_error! export update_ensemble! export sample_empirical_gaussian, split_indices_by_success @@ -29,6 +29,9 @@ abstract type LearningRateScheduler end # Failure handlers abstract type FailureHandlingMethod end +# Accelerators +abstract type Accelerator end + "Failure handling method that ignores forward model failures" @@ -104,7 +107,13 @@ Inputs: $(METHODLIST) """ -struct EnsembleKalmanProcess{FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler} +struct EnsembleKalmanProcess{ + FT <: AbstractFloat, + IT <: Int, + P <: Process, + LRS <: LearningRateScheduler, + ACC <: Accelerator, +} "array of stores for parameters (`u`), each of size [`N_par × N_ens`]" u::Array{DataContainer{FT}} "vector of the observed vector size [`N_obs`]" @@ -119,6 +128,8 @@ struct EnsembleKalmanProcess{FT <: AbstractFloat, IT <: Int, P <: Process, LRS < err::Vector{FT} "Scheduler to calculate the timestep size in each EK iteration" scheduler::LRS + "accelerator object that informs EK update steps, stores additional state variables as needed" + accelerator::ACC "stored vector of timesteps used in each EK iteration" Δt::Vector{FT} "the particular EK process (`Inversion` or `Sampler` or `Unscented` or `TransformInversion` or `SparseInversion`)" @@ -139,6 +150,7 @@ function EnsembleKalmanProcess( obs_noise_cov::Union{AbstractMatrix{FT}, UniformScaling{FT}}, process::P; scheduler::Union{Nothing, LRS} = nothing, + accelerator::Union{Nothing, ACC} = nothing, Δt = nothing, rng::AbstractRNG = Random.GLOBAL_RNG, failure_handler_method::FM = IgnoreFailures(), @@ -147,6 +159,7 @@ function EnsembleKalmanProcess( ) where { FT <: AbstractFloat, LRS <: LearningRateScheduler, + ACC <: Accelerator, P <: Process, FM <: FailureHandlingMethod, LM <: LocalizationMethod, @@ -193,16 +206,31 @@ function EnsembleKalmanProcess( # timestep store Δt = FT[] + # set up accelerator + if isnothing(accelerator) + acc = DefaultAccelerator() + else + acc = accelerator + end + AC = typeof(acc) + + if AC <: NesterovAccelerator + set_ICs!(acc, params) + if P <: Sampler + @warn "Acceleration is experimental for Sampler processes and may affect convergence." + end + end + # failure handler fh = FailureHandler(process, failure_handler_method) # localizer loc = Localizer(localization_method, N_par, N_obs, N_ens, FT) if verbose - @info "Initializing ensemble Kalman process of type $(nameof(typeof(process)))\nNumber of ensemble members: $(N_ens)\nLocalization: $(nameof(typeof(localization_method)))\nFailure handler: $(nameof(typeof(failure_handler_method)))\nScheduler: $(nameof(typeof(lrs)))" + @info "Initializing ensemble Kalman process of type $(nameof(typeof(process)))\nNumber of ensemble members: $(N_ens)\nLocalization: $(nameof(typeof(localization_method)))\nFailure handler: $(nameof(typeof(failure_handler_method)))\nScheduler: $(nameof(typeof(lrs)))\nAccelerator: $(nameof(typeof(acc)))" end - EnsembleKalmanProcess{FT, IT, P, RS}( + EnsembleKalmanProcess{FT, IT, P, RS, AC}( [init_params], obs_mean, obs_noise_cov, @@ -210,6 +238,7 @@ function EnsembleKalmanProcess( g, err, lrs, + acc, Δt, process, rng, @@ -222,7 +251,6 @@ end include("LearningRateSchedulers.jl") - """ get_u(ekp::EnsembleKalmanProcess, iteration::IT; return_array=true) where {IT <: Integer} @@ -423,6 +451,14 @@ function get_scheduler(ekp::EnsembleKalmanProcess) return ekp.scheduler end +""" + get_accelerator(ekp::EnsembleKalmanProcess) +Return accelerator type of EnsembleKalmanProcess. +""" +function get_accelerator(ekp::EnsembleKalmanProcess) + return ekp.accelerator +end + """ construct_initial_ensemble( @@ -628,7 +664,8 @@ function update_ensemble!( terminate = calculate_timestep!(ekp, g, Δt_new) if isnothing(terminate) - update_ensemble!(ekp, g, get_process(ekp); ekp_kwargs...) + u = update_ensemble!(ekp, g, get_process(ekp); ekp_kwargs...) + update_state!(ekp, u) if s > 0.0 multiplicative_inflation ? multiplicative_inflation!(ekp; s = s) : nothing additive_inflation ? additive_inflation!(ekp; use_prior_cov = use_prior_cov, s = s) : nothing @@ -664,3 +701,6 @@ export Unscented export Gaussian_2d export construct_initial_ensemble, construct_mean, construct_cov include("UnscentedKalmanInversion.jl") + +# struct Accelerator +include("Accelerators.jl") diff --git a/src/EnsembleKalmanSampler.jl b/src/EnsembleKalmanSampler.jl index fdccc1f55..fdda23393 100644 --- a/src/EnsembleKalmanSampler.jl +++ b/src/EnsembleKalmanSampler.jl @@ -139,7 +139,6 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u_old, g, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # u_old is N_ens × N_par, g is N_ens × N_obs, # but stored in data container with N_ens as the 2nd dim @@ -147,9 +146,11 @@ function update_ensemble!( compute_error!(ekp) # Diagnostics - cov_new = get_u_cov_final(ekp) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/EnsembleTransformKalmanInversion.jl b/src/EnsembleTransformKalmanInversion.jl index 0640c09a7..64482a697 100644 --- a/src/EnsembleTransformKalmanInversion.jl +++ b/src/EnsembleTransformKalmanInversion.jl @@ -124,16 +124,17 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error compute_error!(ekp) # Diagnostics - cov_new = cov(get_u_final(ekp), dims = 2) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/SparseEnsembleKalmanInversion.jl b/src/SparseEnsembleKalmanInversion.jl index fc92ccb7a..974a36891 100644 --- a/src/SparseEnsembleKalmanInversion.jl +++ b/src/SparseEnsembleKalmanInversion.jl @@ -222,7 +222,6 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error @@ -230,4 +229,6 @@ function update_ensemble!( # Check convergence cov_new = cov(get_u_final(ekp), dims = 2) + + return u end diff --git a/test/EnsembleKalmanProcess/runtests.jl b/test/EnsembleKalmanProcess/runtests.jl index 6c574111d..519db3961 100644 --- a/test/EnsembleKalmanProcess/runtests.jl +++ b/test/EnsembleKalmanProcess/runtests.jl @@ -79,6 +79,104 @@ inv_problems = [inv_problems..., nl_inv_problems...] end +@testset "Accelerators" begin + # Get an inverse problem + y_obs, G, Γy, _ = inv_problems[end - 1] # additive noise inv problem + rng = Random.MersenneTwister(rng_seed) + initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) + + # build accelerated and non-accelerated processes + ekiobj = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion(), accelerator = NesterovAccelerator()) + eksobj = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior), accelerator = NesterovAccelerator()) + ekiobj_noacc = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion()) + eksobj_noacc = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior)) + ekiobj_noacc_specified = + EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion(), accelerator = DefaultAccelerator()) + eksobj_noacc_specified = + EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior), accelerator = DefaultAccelerator()) + + ## test EKP object's accelerator type is consistent (EKP constructor reassigns object in some cases) + @test typeof(ekiobj.accelerator) <: NesterovAccelerator + @test typeof(eksobj.accelerator) <: NesterovAccelerator + @test typeof(ekiobj_noacc.accelerator) <: DefaultAccelerator + @test typeof(eksobj_noacc.accelerator) <: DefaultAccelerator + @test typeof(ekiobj_noacc_specified.accelerator) <: DefaultAccelerator + @test typeof(eksobj_noacc_specified.accelerator) <: DefaultAccelerator + + ## test NesterovAccelerators satisfy desired ICs + @test ekiobj.accelerator.r == 3.0 + @test ekiobj.accelerator.u_prev == initial_ensemble + @test eksobj.accelerator.r == 3.0 + @test eksobj.accelerator.u_prev == initial_ensemble + + ## test method convergence + # Note: this test only requires that the final ensemble is an improvement on the initial ensemble, + # NOT that the accelerated processes are more effective than the default, as this is not guaranteed. + # Specific cost values are printed to give an idea of acceleration. + processes = [Inversion(), Sampler(prior), TransformInversion(inv(Γy))] + T_end = 1 # (this could fail a test if N_iters is not enough to reach T_end) + for process in processes + accelerators = [DefaultAccelerator(), NesterovAccelerator()] + N_iters = [30, 30, 30] + init_means = [] + final_means = [] + + for (accelerator, N_iter) in zip(accelerators, N_iters) + println("Accelerator: ", nameof(typeof(accelerator)), " Process: ", nameof(typeof(process))) + if !(nameof(typeof(process)) == Symbol(Unscented)) + ekpobj = EKP.EnsembleKalmanProcess( + initial_ensemble, + y_obs, + Γy, + process, + rng = copy(rng), + accelerator = accelerator, + ) + end + + ## test get_accelerator function in EKP + @test ekpobj.accelerator == get_accelerator(ekpobj) + + for i in 1:N_iter + params_i = get_ϕ_final(prior, ekpobj) + g_ens = G(params_i) + terminated = EKP.update_ensemble!(ekpobj, g_ens) + if !isnothing(terminated) + break + end + end + push!(init_means, vec(mean(get_u_prior(ekpobj), dims = 2))) + push!(final_means, vec(mean(get_u_final(ekpobj), dims = 2))) + + inv_sqrt_Γy = sqrt(inv(Γy)) + cost_initial = + norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, init_means[end])))) + cost_final = + norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end])))) + @info "Convergence:" cost_initial cost_final + + if typeof(process) <: Inversion + u_star = transform_constrained_to_unconstrained(prior, ϕ_star) + @test norm( + inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end]))), + ) < norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, init_means[end])))) + end + + end + + end + ## test that error is thrown when applying acceleration to UKI + ukiobj = EKP.EnsembleKalmanProcess( + y_obs, + Γy, + Unscented(prior; impose_prior = true), + rng = copy(rng), + accelerator = NesterovAccelerator(), + ) + @test_throws ArgumentError EKP.update_ensemble!(ukiobj, zeros(length(y_obs), 5)) +end + + @testset "LearningRateSchedulers" begin # Default Δt = 3