diff --git a/Project.toml b/Project.toml index 9d88ea2cc..c3c52e53f 100644 --- a/Project.toml +++ b/Project.toml @@ -40,4 +40,4 @@ StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["StableRNGs", "Test", "Plots"] +test = ["StableRNGs", "Test", "Plots"] \ No newline at end of file diff --git a/src/Accelerators.jl b/src/Accelerators.jl new file mode 100644 index 000000000..fea306743 --- /dev/null +++ b/src/Accelerators.jl @@ -0,0 +1,94 @@ +# included in EnsembleKalmanProcess.jl + +export DefaultAccelerator, NesterovAccelerator +export update_state!, set_initial_acceleration! + +""" +$(TYPEDEF) + +Default accelerator provides no acceleration, runs traditional EKI +""" +struct DefaultAccelerator <: Accelerator end + +""" +$(TYPEDEF) + +Accelerator that adapts Nesterov's momentum method for EKI. +Stores a previous state value u_prev for computational purposes (note this is distinct from state returned as "ensemble value") + +$(TYPEDFIELDS) +""" +mutable struct NesterovAccelerator{FT <: AbstractFloat} <: Accelerator + r::FT + u_prev::Any +end + +function NesterovAccelerator(r = 3.0, initial = Float64[]) + return NesterovAccelerator(r, initial) +end + + +""" +Sets u_prev to the initial parameter values +""" +function set_ICs!(accelerator::NesterovAccelerator{FT}, u::MA) where {FT <: AbstractFloat, MA <: AbstractMatrix{FT}} + accelerator.u_prev = u +end + + +""" +Performs traditional state update with no momentum. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + push!(ekp.u, DataContainer(u, data_are_columns = true)) +end + +""" +Performs state update with modified Nesterov momentum approach. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + ## update "v" state: + k = get_N_iterations(ekp) + 2 + v = u .+ (1 - ekp.accelerator.r / k) * (u .- ekp.accelerator.u_prev) + + ## update "u" state: + ekp.accelerator.u_prev = u + + ## push "v" state to EKP object + push!(ekp.u, DataContainer(v, data_are_columns = true)) +end + + +""" +State update method for UKI with no acceleration. +The Accelerator framework has not yet been integrated with UKI process; +UKI tracks its own states, so this method is empty. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, DefaultAccelerator}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + +end + +""" +Placeholder state update method for UKI with Nesterov Accelerator. +The Accelerator framework has not yet been integrated with UKI process, so this +method throws an error. +""" +function update_state!( + ekp::EnsembleKalmanProcess{FT, IT, P, LRS, NesterovAccelerator{FT}}, + u::MA, +) where {FT <: AbstractFloat, IT <: Int, P <: Unscented, LRS <: LearningRateScheduler, MA <: AbstractMatrix{FT}} + throw( + ArgumentError( + "option `accelerator = NesterovAccelerator` is not implemented for UKI, please use `DefaultAccelerator`", + ), + ) +end diff --git a/src/EnsembleKalmanInversion.jl b/src/EnsembleKalmanInversion.jl index 91da39379..35a986f17 100644 --- a/src/EnsembleKalmanInversion.jl +++ b/src/EnsembleKalmanInversion.jl @@ -138,17 +138,17 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) - # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error compute_error!(ekp) # Diagnostics - cov_new = cov(get_u_final(ekp), dims = 2) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/EnsembleKalmanProcess.jl b/src/EnsembleKalmanProcess.jl index c901b37f3..803cf62f0 100644 --- a/src/EnsembleKalmanProcess.jl +++ b/src/EnsembleKalmanProcess.jl @@ -13,7 +13,7 @@ export get_u, get_g, get_ϕ export get_u_prior, get_u_final, get_g_final, get_ϕ_final export get_N_iterations, get_error, get_cov_blocks export get_u_mean, get_u_cov, get_g_mean, get_ϕ_mean -export get_u_mean_final, get_u_cov_prior, get_u_cov_final, get_g_mean_final, get_ϕ_mean_final +export get_u_mean_final, get_u_cov_prior, get_u_cov_final, get_g_mean_final, get_ϕ_mean_final, get_accelerator export compute_error! export update_ensemble! export sample_empirical_gaussian, split_indices_by_success @@ -29,6 +29,9 @@ abstract type LearningRateScheduler end # Failure handlers abstract type FailureHandlingMethod end +# Accelerators +abstract type Accelerator end + "Failure handling method that ignores forward model failures" @@ -104,7 +107,13 @@ Inputs: $(METHODLIST) """ -struct EnsembleKalmanProcess{FT <: AbstractFloat, IT <: Int, P <: Process, LRS <: LearningRateScheduler} +struct EnsembleKalmanProcess{ + FT <: AbstractFloat, + IT <: Int, + P <: Process, + LRS <: LearningRateScheduler, + ACC <: Accelerator, +} "array of stores for parameters (`u`), each of size [`N_par × N_ens`]" u::Array{DataContainer{FT}} "vector of the observed vector size [`N_obs`]" @@ -119,6 +128,8 @@ struct EnsembleKalmanProcess{FT <: AbstractFloat, IT <: Int, P <: Process, LRS < err::Vector{FT} "Scheduler to calculate the timestep size in each EK iteration" scheduler::LRS + "accelerator object that informs EK update steps, stores additional state variables as needed" + accelerator::ACC "stored vector of timesteps used in each EK iteration" Δt::Vector{FT} "the particular EK process (`Inversion` or `Sampler` or `Unscented` or `TransformInversion` or `SparseInversion`)" @@ -139,6 +150,7 @@ function EnsembleKalmanProcess( obs_noise_cov::Union{AbstractMatrix{FT}, UniformScaling{FT}}, process::P; scheduler::Union{Nothing, LRS} = nothing, + accelerator::Union{Nothing, ACC} = nothing, Δt = nothing, rng::AbstractRNG = Random.GLOBAL_RNG, failure_handler_method::FM = IgnoreFailures(), @@ -147,6 +159,7 @@ function EnsembleKalmanProcess( ) where { FT <: AbstractFloat, LRS <: LearningRateScheduler, + ACC <: Accelerator, P <: Process, FM <: FailureHandlingMethod, LM <: LocalizationMethod, @@ -193,16 +206,31 @@ function EnsembleKalmanProcess( # timestep store Δt = FT[] + # set up accelerator + if isnothing(accelerator) + acc = DefaultAccelerator() + else + acc = accelerator + end + AC = typeof(acc) + + if AC <: NesterovAccelerator + set_ICs!(acc, params) + if P <: Sampler + @warn "Acceleration is experimental for Sampler processes and may affect convergence." + end + end + # failure handler fh = FailureHandler(process, failure_handler_method) # localizer loc = Localizer(localization_method, N_par, N_obs, N_ens, FT) if verbose - @info "Initializing ensemble Kalman process of type $(nameof(typeof(process)))\nNumber of ensemble members: $(N_ens)\nLocalization: $(nameof(typeof(localization_method)))\nFailure handler: $(nameof(typeof(failure_handler_method)))\nScheduler: $(nameof(typeof(lrs)))" + @info "Initializing ensemble Kalman process of type $(nameof(typeof(process)))\nNumber of ensemble members: $(N_ens)\nLocalization: $(nameof(typeof(localization_method)))\nFailure handler: $(nameof(typeof(failure_handler_method)))\nScheduler: $(nameof(typeof(lrs)))\nAccelerator: $(nameof(typeof(acc)))" end - EnsembleKalmanProcess{FT, IT, P, RS}( + EnsembleKalmanProcess{FT, IT, P, RS, AC}( [init_params], obs_mean, obs_noise_cov, @@ -210,6 +238,7 @@ function EnsembleKalmanProcess( g, err, lrs, + acc, Δt, process, rng, @@ -222,7 +251,6 @@ end include("LearningRateSchedulers.jl") - """ get_u(ekp::EnsembleKalmanProcess, iteration::IT; return_array=true) where {IT <: Integer} @@ -423,6 +451,14 @@ function get_scheduler(ekp::EnsembleKalmanProcess) return ekp.scheduler end +""" + get_accelerator(ekp::EnsembleKalmanProcess) +Return accelerator type of EnsembleKalmanProcess. +""" +function get_accelerator(ekp::EnsembleKalmanProcess) + return ekp.accelerator +end + """ construct_initial_ensemble( @@ -628,7 +664,8 @@ function update_ensemble!( terminate = calculate_timestep!(ekp, g, Δt_new) if isnothing(terminate) - update_ensemble!(ekp, g, get_process(ekp); ekp_kwargs...) + u = update_ensemble!(ekp, g, get_process(ekp); ekp_kwargs...) + update_state!(ekp, u) if s > 0.0 multiplicative_inflation ? multiplicative_inflation!(ekp; s = s) : nothing additive_inflation ? additive_inflation!(ekp; use_prior_cov = use_prior_cov, s = s) : nothing @@ -664,3 +701,6 @@ export Unscented export Gaussian_2d export construct_initial_ensemble, construct_mean, construct_cov include("UnscentedKalmanInversion.jl") + +# struct Accelerator +include("Accelerators.jl") diff --git a/src/EnsembleKalmanSampler.jl b/src/EnsembleKalmanSampler.jl index fdccc1f55..fdda23393 100644 --- a/src/EnsembleKalmanSampler.jl +++ b/src/EnsembleKalmanSampler.jl @@ -139,7 +139,6 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u_old, g, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # u_old is N_ens × N_par, g is N_ens × N_obs, # but stored in data container with N_ens as the 2nd dim @@ -147,9 +146,11 @@ function update_ensemble!( compute_error!(ekp) # Diagnostics - cov_new = get_u_cov_final(ekp) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/EnsembleTransformKalmanInversion.jl b/src/EnsembleTransformKalmanInversion.jl index 0640c09a7..64482a697 100644 --- a/src/EnsembleTransformKalmanInversion.jl +++ b/src/EnsembleTransformKalmanInversion.jl @@ -124,16 +124,17 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error compute_error!(ekp) # Diagnostics - cov_new = cov(get_u_final(ekp), dims = 2) + cov_new = cov(u, dims = 2) if ekp.verbose @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" end + + return u end diff --git a/src/SparseEnsembleKalmanInversion.jl b/src/SparseEnsembleKalmanInversion.jl index fc92ccb7a..974a36891 100644 --- a/src/SparseEnsembleKalmanInversion.jl +++ b/src/SparseEnsembleKalmanInversion.jl @@ -222,7 +222,6 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) # store new parameters (and model outputs) - push!(ekp.u, DataContainer(u, data_are_columns = true)) push!(ekp.g, DataContainer(g, data_are_columns = true)) # Store error @@ -230,4 +229,6 @@ function update_ensemble!( # Check convergence cov_new = cov(get_u_final(ekp), dims = 2) + + return u end diff --git a/test/EnsembleKalmanProcess/runtests.jl b/test/EnsembleKalmanProcess/runtests.jl index 6c574111d..519db3961 100644 --- a/test/EnsembleKalmanProcess/runtests.jl +++ b/test/EnsembleKalmanProcess/runtests.jl @@ -79,6 +79,104 @@ inv_problems = [inv_problems..., nl_inv_problems...] end +@testset "Accelerators" begin + # Get an inverse problem + y_obs, G, Γy, _ = inv_problems[end - 1] # additive noise inv problem + rng = Random.MersenneTwister(rng_seed) + initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) + + # build accelerated and non-accelerated processes + ekiobj = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion(), accelerator = NesterovAccelerator()) + eksobj = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior), accelerator = NesterovAccelerator()) + ekiobj_noacc = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion()) + eksobj_noacc = EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior)) + ekiobj_noacc_specified = + EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Inversion(), accelerator = DefaultAccelerator()) + eksobj_noacc_specified = + EKP.EnsembleKalmanProcess(initial_ensemble, y_obs, Γy, Sampler(prior), accelerator = DefaultAccelerator()) + + ## test EKP object's accelerator type is consistent (EKP constructor reassigns object in some cases) + @test typeof(ekiobj.accelerator) <: NesterovAccelerator + @test typeof(eksobj.accelerator) <: NesterovAccelerator + @test typeof(ekiobj_noacc.accelerator) <: DefaultAccelerator + @test typeof(eksobj_noacc.accelerator) <: DefaultAccelerator + @test typeof(ekiobj_noacc_specified.accelerator) <: DefaultAccelerator + @test typeof(eksobj_noacc_specified.accelerator) <: DefaultAccelerator + + ## test NesterovAccelerators satisfy desired ICs + @test ekiobj.accelerator.r == 3.0 + @test ekiobj.accelerator.u_prev == initial_ensemble + @test eksobj.accelerator.r == 3.0 + @test eksobj.accelerator.u_prev == initial_ensemble + + ## test method convergence + # Note: this test only requires that the final ensemble is an improvement on the initial ensemble, + # NOT that the accelerated processes are more effective than the default, as this is not guaranteed. + # Specific cost values are printed to give an idea of acceleration. + processes = [Inversion(), Sampler(prior), TransformInversion(inv(Γy))] + T_end = 1 # (this could fail a test if N_iters is not enough to reach T_end) + for process in processes + accelerators = [DefaultAccelerator(), NesterovAccelerator()] + N_iters = [30, 30, 30] + init_means = [] + final_means = [] + + for (accelerator, N_iter) in zip(accelerators, N_iters) + println("Accelerator: ", nameof(typeof(accelerator)), " Process: ", nameof(typeof(process))) + if !(nameof(typeof(process)) == Symbol(Unscented)) + ekpobj = EKP.EnsembleKalmanProcess( + initial_ensemble, + y_obs, + Γy, + process, + rng = copy(rng), + accelerator = accelerator, + ) + end + + ## test get_accelerator function in EKP + @test ekpobj.accelerator == get_accelerator(ekpobj) + + for i in 1:N_iter + params_i = get_ϕ_final(prior, ekpobj) + g_ens = G(params_i) + terminated = EKP.update_ensemble!(ekpobj, g_ens) + if !isnothing(terminated) + break + end + end + push!(init_means, vec(mean(get_u_prior(ekpobj), dims = 2))) + push!(final_means, vec(mean(get_u_final(ekpobj), dims = 2))) + + inv_sqrt_Γy = sqrt(inv(Γy)) + cost_initial = + norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, init_means[end])))) + cost_final = + norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end])))) + @info "Convergence:" cost_initial cost_final + + if typeof(process) <: Inversion + u_star = transform_constrained_to_unconstrained(prior, ϕ_star) + @test norm( + inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, final_means[end]))), + ) < norm(inv_sqrt_Γy * (y_obs .- G(transform_unconstrained_to_constrained(prior, init_means[end])))) + end + + end + + end + ## test that error is thrown when applying acceleration to UKI + ukiobj = EKP.EnsembleKalmanProcess( + y_obs, + Γy, + Unscented(prior; impose_prior = true), + rng = copy(rng), + accelerator = NesterovAccelerator(), + ) + @test_throws ArgumentError EKP.update_ensemble!(ukiobj, zeros(length(y_obs), 5)) +end + + @testset "LearningRateSchedulers" begin # Default Δt = 3