diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..700707ced --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/DocPreviewCleanup.yml b/.github/workflows/DocPreviewCleanup.yml index bc29462c0..94b9f70ad 100644 --- a/.github/workflows/DocPreviewCleanup.yml +++ b/.github/workflows/DocPreviewCleanup.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout gh-pages branch - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: ref: gh-pages - name: Delete preview and history + push changes diff --git a/.github/workflows/Docs.yml b/.github/workflows/Docs.yml index 8ac100fd0..99b15e2bc 100644 --- a/.github/workflows/Docs.yml +++ b/.github/workflows/Docs.yml @@ -13,16 +13,16 @@ jobs: timeout-minutes: 60 steps: - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@0.9.1 + uses: styfle/cancel-workflow-action@0.12.1 with: access_token: ${{ github.token }} - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: julia-actions/setup-julia@v1 with: version: '1' show-versioninfo: 'true' - name: Cache artifacts - uses: actions/cache@v1 + uses: actions/cache@v4 env: cache-name: cache-artifacts with: @@ -37,5 +37,6 @@ jobs: - name: Build and deploy env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }} JULIA_DEBUG: Documenter run: julia --color=yes --project=docs/ docs/make.jl diff --git a/.github/workflows/JuliaFormatter.yml b/.github/workflows/JuliaFormatter.yml index c5083914a..3726c4f89 100644 --- a/.github/workflows/JuliaFormatter.yml +++ b/.github/workflows/JuliaFormatter.yml @@ -15,13 +15,13 @@ jobs: timeout-minutes: 30 steps: - name: Cancel Previous Runs - uses: styfle/cancel-workflow-action@0.9.1 + uses: styfle/cancel-workflow-action@0.12.1 with: access_token: ${{ github.token }} - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - - uses: dorny/paths-filter@v2.9.1 + - uses: dorny/paths-filter@v3.0.2 id: filter with: filters: | diff --git a/.github/workflows/Tests.yml b/.github/workflows/Tests.yml index 009960a38..333b39184 100644 --- a/.github/workflows/Tests.yml +++ b/.github/workflows/Tests.yml @@ -9,25 +9,23 @@ jobs: name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} runs-on: ${{ matrix.os }} strategy: - matrix: + fail-fast: false #don't cancel all jobs if one fails + matrix: version: - - '1.6' # Long-Term Support release + - 'lts' # Long-Term Support release - '1' # Latest 1.x release of julia os: - ubuntu-latest - windows-latest - macOS-latest - arch: - - x64 steps: - - uses: styfle/cancel-workflow-action@0.9.1 + - uses: styfle/cancel-workflow-action@0.12.1 with: access_token: ${{ github.token }} - - uses: actions/checkout@v2 - - uses: julia-actions/setup-julia@v1 + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v2 with: version: ${{ matrix.version }} - arch: ${{ matrix.arch }} - run: julia --project -e 'using Pkg; Pkg.update()' #windows in particular sometimes doesnt update packages - uses: julia-actions/julia-buildpkg@v1 @@ -38,7 +36,7 @@ jobs: LCOV.writefile("coverage-lcov.info", Codecov.process_folder())' if: ${{ matrix.os == 'ubuntu-latest' }} - name: Submit coverage - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v5 with: token: ${{secrets.CODECOV_TOKEN}} if: ${{ matrix.os == 'ubuntu-latest' }} diff --git a/Project.toml b/Project.toml index 11a02ef40..f154b055c 100644 --- a/Project.toml +++ b/Project.toml @@ -7,12 +7,12 @@ version = "2.0.1" Convex = "f65535da-76fb-5f13-bab9-19810c17039a" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae" +FFMPEG = "c87230d0-a227-11e9-1b43-d7ebe4e7570a" GaussianRandomFields = "e4b2fa32-6e09-5554-b718-106ed5adafe9" Interpolations = "a98d9a8b-a2ab-59e6-89dd-64a1c18fca59" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" MathOptInterface = "b8f27783-ece8-5eb3-8dc8-9495eed66fee" Optim = "429524aa-4258-5aef-a3af-852621145aeb" -Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80" QuadGK = "1fd47b50-473d-5c70-9696-f719f8f3bcdc" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01" @@ -26,6 +26,7 @@ TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76" Convex = "0.15, 0.16" Distributions = "0.24.14, 0.25" DocStringExtensions = "0.8, 0.9" +FFMPEG = "0.4" GaussianRandomFields = "2" Interpolations = "0.13, 0.14, 0.15" LinearAlgebra = "1" diff --git a/README.md b/README.md index 977d57aee..55fb8b183 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ If you use the examples or code, please cite our article at JOSS in your publish [joss-url]: https://joss.theoj.org/papers/5cb2d4c6af8840af61b44071ae1e672a ### Requirements -Julia version 1.6+ +Julia LTS version or newer # Quick links! @@ -51,4 +51,4 @@ Julia version 1.6+ ![eki-getting-started](https://github.com/CliMA/EnsembleKalmanProcesses.jl/assets/45243236/e083ab8c-4f93-432f-9ad5-97aff22764ad) \ No newline at end of file +--> diff --git a/docs/make.jl b/docs/make.jl index f8272d426..a0608bd27 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -83,6 +83,7 @@ pages = [ "Learning rate schedulers" => "learning_rate_scheduler.md", "Prior distributions" => "parameter_distributions.md", "Observations and Minibatching" => "observations.md", + "Update Groups" => "update_groups.md", "Localization and SEC" => "localization.md", "Inflation" => "inflation.md", "Parallelism and HPC" => "parallel_hpc.md", diff --git a/docs/src/update_groups.md b/docs/src/update_groups.md new file mode 100644 index 000000000..504e3dd17 --- /dev/null +++ b/docs/src/update_groups.md @@ -0,0 +1,101 @@ +# [Update Groups] (@id update-groups) + +The `UpdateGroup` object facilitates blocked EKP updates, based on a provided updating a series user-defined pairs of parameters and data. This allows users to enforce any *known* (in)dependences between different groups of parameters during the update. For example, +```julia +# update parameter 1 with data 1 and 2 +# update parameters 2 and 3 jointly with data 2, 3, and 4 +Dict( + ["parameter_1"] => ["data_1", "data_2"], + ["parameter_2", "parameter_3"] => ["data_2", "data_3", "data_4"], +) +``` +Construction and passing of this into the EnsembleKalmanProcesses is detailed below. + +!!! note "This improves scaling at the cost of user-imposed structure" + As many of the `Process` updates scale say with ``d^\alpha``, in the data dimension ``d`` and ``\alpha > 1`` (super-linearly), update groups with ``K`` groups of equal size will improving this scaling to ``K (\frac{d}{K})^\alpha``. + +## Recommended construction - shown by example + +The key component to construct update groups starts with constructing the prior and the observations. Parameter distributions and observations may be constructed in units and given names, and these names are utilized to build the update groups with a convenient constructor `create_update_groups`. + +For illustration, we take code snippets from the example found [here](https://github.com/CliMA/EnsembleKalmanProcesses.jl/blob/main/examples/UpdateGroups/). This example is concerned with learning several parameters in a coupled two-scale Lorenz 96 system: +```math +\begin{aligned} + \frac{\partial X_i}{\partial t} & = -X_{i-1}(X_{i-2} - X_{i+1}) - X_i - GY_i + F_1 + F_2\,\sin(2\pi t F_3)\\ + \frac{\partial Y_i}{\partial t} & = -cbY_{i+1}(Y_{i+2} - Y_{i-1}) - cY_i + \frac{hc}{b} X_i +\end{aligned} +``` +Parameters are learnt by fitting estimated moments of a realized `X` and `Y` system, to some target moments over a time interval. + +We create a prior by combining several *named* `ParameterDistribution`s. +```julia +param_names = ["F", "G", "h", "c", "b"] + +prior_F = ParameterDistribution( + Dict( + "name" => param_names[1], + "distribution" => Parameterized(MvNormal([1.0, 0.0, -2.0], I)), + "constraint" => repeat([bounded_below(0)], 3), + ), +) # gives 3-D dist +prior_G = constrained_gaussian(param_names[2], 5.0, 4.0, 0, Inf) +prior_h = constrained_gaussian(param_names[3], 5.0, 4.0, 0, Inf) +prior_c = constrained_gaussian(param_names[4], 5.0, 4.0, 0, Inf) +prior_b = constrained_gaussian(param_names[5], 5.0, 4.0, 0, Inf) +priors = combine_distributions([prior_F, prior_G, prior_h, prior_c, prior_b]) +``` +Now we likewise construct observed moments by combining several *named* `Observation`s +```julia +# given a list of vector statistics y and their covariances Γ +data_block_names = ["", "", "", "", ""] + +observation_vec = [] +for i in 1:length(data_block_names) + push!( + observation_vec, + Observation(Dict( + "samples" => y[i], + "covariances" => Γ[i], + "names" => data_block_names[i] + )), + ) +end +observation = combine_observations(observation_vec) +``` +Finally, we are ready to define the update groups. We may specify our choice by partitioning the parameter names as keys of a dictionary, and their paired data names as values. Here we create two groups: +```julia +# update parameters F,G with data , , +# update parameters h, c, b with data , , +group_identifiers = Dict( + ["F", "G"] => ["", "", ""], + ["h", "c", "b"] => ["", "", ""], +) +``` +We then create the update groups with our convenient constructor +```julia +update_groups = create_update_groups(prior, observation, group_identifiers) +``` +and this can then be entered into the `EnsembleKalmanProcess` object as a keyword argument +```julia +# initial_params = construct_initial_ensemble(rng, priors, N_ens) +ekiobj = EnsembleKalmanProcess( + initial_params, + observation, + Inversion(), + update_groups = update_groups +) +``` + +## What happens internally? + +We simply perform an independent `update_ensemble!` for each provided pairing and combine model output and updated parameters afterwards. Note that even without specifying an update group, by default EKP will always be construct one under-the-hood. + + + +## Advice for constructing blocks +1. A parameter cannot appear in more than one block (i.e. parameters cannot be updated more than once) +2. The block structure is user-defined, and directly assumes that there is no correlation between blocks. It is up to the user to confirm if there truly is independence between different blocks. Otherwise convergence properties may suffer. +3. This can be used in conjunction with minibatching, so long as the defined data objects are available in all `Observation`s in the series. + +!!! note "In future..." + In theory this opens up the possibility to have different configurations, or even processes, in different groups. This could be useful when parameter-data pairings are highly heterogeneous and so the user may wish to exploit, for example, the different processes scaling properties. However this has not yet been implemented. \ No newline at end of file diff --git a/examples/Localization/localization_example_lorenz96.jl b/examples/Localization/localization_example_lorenz96.jl index 74b2bc52d..bb8cb4586 100644 --- a/examples/Localization/localization_example_lorenz96.jl +++ b/examples/Localization/localization_example_lorenz96.jl @@ -76,8 +76,10 @@ prior = combine_distributions(priors) initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) + # Solve problem without localization -ekiobj_vanilla = EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng) +ekiobj_vanilla = + EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng, scheduler = DefaultScheduler()) for i in 1:N_iter g_ens_vanilla = G(get_ϕ_final(prior, ekiobj_vanilla)) EKP.update_ensemble!(ekiobj_vanilla, g_ens_vanilla, deterministic_forward_map = true) @@ -91,6 +93,7 @@ ekiobj_inflated = EKP.EnsembleKalmanProcess( Γ, Inversion(); rng = rng, + scheduler = DefaultScheduler(), # localization_method = BernoulliDropout(0.98), ) @@ -108,7 +111,15 @@ end @info "EKI (inflated) - complete" # Test SEC -ekiobj_sec = EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng, localization_method = SEC(1.0)) +ekiobj_sec = EKP.EnsembleKalmanProcess( + initial_ensemble, + y, + Γ, + Inversion(); + rng = rng, + localization_method = SEC(1.0), + scheduler = DefaultScheduler(), +) for i in 1:N_iter g_ens = G(get_ϕ_final(prior, ekiobj_sec)) @@ -117,8 +128,15 @@ end @info "EKI (SEC) - complete" # Test SEC with cutoff -ekiobj_sec_cutoff = - EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng, localization_method = SEC(1.0, 0.1)) +ekiobj_sec_cutoff = EKP.EnsembleKalmanProcess( + initial_ensemble, + y, + Γ, + Inversion(); + rng = rng, + localization_method = SEC(1.0, 0.1), + scheduler = DefaultScheduler(), +) for i in 1:N_iter g_ens = G(get_ϕ_final(prior, ekiobj_sec_cutoff)) @@ -127,8 +145,15 @@ end @info "EKI (SEC cut-off) - complete" # Test SECFisher -ekiobj_sec_fisher = - EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng, localization_method = SECFisher()) +ekiobj_sec_fisher = EKP.EnsembleKalmanProcess( + initial_ensemble, + y, + Γ, + Inversion(); + rng = rng, + localization_method = SECFisher(), + scheduler = DefaultScheduler(), +) for i in 1:N_iter g_ens = G(get_ϕ_final(prior, ekiobj_sec_fisher)) @@ -137,8 +162,15 @@ end @info "EKI (SEC Fisher) - complete" # Test SECNice -ekiobj_sec_nice = - EKP.EnsembleKalmanProcess(initial_ensemble, y, Γ, Inversion(); rng = rng, localization_method = SECNice()) +ekiobj_sec_nice = EKP.EnsembleKalmanProcess( + initial_ensemble, + y, + Γ, + Inversion(); + rng = rng, + localization_method = SECNice(), + scheduler = DefaultScheduler(), +) for i in 1:N_iter g_ens = G(get_ϕ_final(prior, ekiobj_sec_nice)) @@ -150,7 +182,9 @@ end u_final = get_u_final(ekiobj_sec) g_final = get_g_final(ekiobj_sec) cov_est = cov([u_final; g_final], [u_final; g_final], dims = 2, corrected = false) -cov_localized = get_localizer(ekiobj_sec).localize(cov_est) +# need dimension args too +cov_localized = + get_localizer(ekiobj_sec).localize(cov_est, eltype(g_final), size(u_final, 1), size(g_final, 1), size(u_final, 2)) fig = plot( get_error(ekiobj_vanilla), diff --git a/examples/UpdateGroups/Project.toml b/examples/UpdateGroups/Project.toml new file mode 100644 index 000000000..4b9187be3 --- /dev/null +++ b/examples/UpdateGroups/Project.toml @@ -0,0 +1,7 @@ +[deps] +CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0" +ColorSchemes = "35d6a980-a343-548e-a6ea-1d62b119f2f4" +Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" +EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d" +JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" +Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" diff --git a/examples/UpdateGroups/calibrate.jl b/examples/UpdateGroups/calibrate.jl new file mode 100644 index 000000000..cb6cf241c --- /dev/null +++ b/examples/UpdateGroups/calibrate.jl @@ -0,0 +1,480 @@ + +# Import modules +using Distributions # probability distributions and associated functions +using LinearAlgebra +using CairoMakie, ColorSchemes +using Random +using JLD2 + +# CES +using EnsembleKalmanProcesses +using EnsembleKalmanProcesses.DataContainers +using EnsembleKalmanProcesses.ParameterDistributions +using EnsembleKalmanProcesses.Localizers + + +const EKP = EnsembleKalmanProcesses + +# includes +include("lorenz_model.jl") # Contains coupled Lorenz 96 and integrator +include("data_processing.jl") # Contains processing to produce data and forward map + + +rng_seed = 79453 +rng = Random.seed!(Random.GLOBAL_RNG, rng_seed) + +# Output figure save directory +homedir = pwd() +println(homedir) +output_directory = homedir * "/output/" +if ~isdir(output_directory) + mkdir(output_directory) +end + +case = "test" + + +# A function to run the ensemble over ϕ_i, generating trajectories and data +function run_G_ensemble(state, lsettings, ϕ_i, window, data_dim) + N_ens = size(ϕ_i, 2) + data_sample = zeros(data_dim, N_ens) + + Threads.@threads for j in 1:N_ens + # ϕ_i is n_params x n_ens + params_i = LParams( + ϕ_i[1:3, j], # F + ϕ_i[1, j], # G + ϕ_i[2, j], # h + ϕ_i[3, j], # c + ϕ_i[4, j], # b + ) + new_state, new_t = lorenz_solve(state, lsettings, params_i) + data_sample[:, j] = process_trajectory_to_data(window, new_state, new_t) + end + return data_sample +end + + +function main() + # Lorenz 96 multiscale system with 1-slow timescale per slow timescale + # Fast: Atmosphere with timescale difference c, nonlinear strength b, coupling y_k + # dy_k/dt = -cby_{k+1}(y_{k+2}-y_{k-1}) - cy_k + (hc/b)x_k + # Slow: Ocean is just damping with e.g. seasonal forcing F and coupling strength G + # dx_k/dt = - x_k + (Fm + Fa*cos(Ff*2πt)) - Gy_k + # Inspired from the coupled L63 model + # Zhang, Liu, Rosati & Delworth (2012) + # https://doi.org/10.3402/tellusa.v64i0.10963 + + ### + ### Define the (true) parameters + ### + + # Time scaling + time_scale_factor = 0.05 + year = 360.0 * time_scale_factor + month = year / 12.0 * time_scale_factor + + # Fast + h_true = 10.0 + c_true = 10.0# timescale difference + b_true = 10.0 + + # Slow + F_true = [8.0, 2.0, 1 / year] # (mean,amplitude,frequency) const for now (maybe add seasonal cycle) + G_true = h_true * c_true / b_true # for consistency in coupling terms + + true_params = LParams(F_true, G_true, h_true, c_true, b_true) + param_names = ["F", "G", "h", "c", "b"] + + ### + ### Define the parameter priors (needed later for inversion) + ### + + prior_F = ParameterDistribution( + Dict( + "name" => "F", + "distribution" => Parameterized(MvNormal([1.0, 0.0, -2.0], I)), + "constraint" => repeat([bounded_below(0)], 3), + ), + ) # gives 3-D dist on the order of~ [12,1,0.08] + prior_G = constrained_gaussian(param_names[2], 5.0, 4.0, 0, Inf) + prior_h = constrained_gaussian(param_names[3], 5.0, 4.0, 0, Inf) + prior_c = constrained_gaussian(param_names[4], 5.0, 4.0, 0, Inf) + prior_b = constrained_gaussian(param_names[5], 5.0, 4.0, 0, Inf) + priors = combine_distributions([prior_F, prior_G, prior_h, prior_c, prior_b]) + + ### + ### Configuration of the forward map + ### + + ## State evolution + N = 40 # state size + dt = 0.005 # integration dt + + + ## Spin up + spinup = year + initial = rand(Normal(0.0, 0.01), 2 * N) # IC for fast and slow + + lsettings_spinup = LSettings(N, dt, spinup, initial) + + ## Gather statistics + # e.g. run for 2 years, using last year for data, (& batched monthly) + t_solve = 2 * year + window_start = t_solve - year + window_end = t_solve + subwindow_size = Int64(floor(month / dt)) + slide_or_batch = "batch" #sliding window or disjoint batches + + lsettings = LSettings(N, dt, t_solve, nothing) + window = ProcessingWindow(window_start, window_end, subwindow_size, slide_or_batch) + + ### + ### Spin up and plot + ### + + spunup_state, t = lorenz_solve(lsettings_spinup, true_params) + @info "spin-up complete" + + fig = Figure(size = (900, 450)) + + afast = Axis(fig[1, 1][1, 1], xlabel = "time", ylabel = "fast") + aslow = Axis(fig[2, 1][1, 1], xlabel = "time", ylabel = "slow") + + ss_freq = length(t) / 2010.0 + ss_rate = Int64(ceil(ss_freq)) + tplot = t[(10 * ss_rate + 1):ss_rate:end] + hm = + heatmap!(afast, tplot, 1:N, spunup_state[(N + 1):(2 * N), (10 * ss_rate + 1):ss_rate:end]', colormap = :Oranges) + Colorbar(fig[1, 1][1, 2], hm) + hm = heatmap!(aslow, tplot, 1:N, spunup_state[1:N, (10 * ss_rate + 1):ss_rate:end]', colormap = :Blues) + Colorbar(fig[2, 1][1, 2], hm) + + # save + save(joinpath(output_directory, case * "_spinup_allstate.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_spinup_allstate.pdf"), fig, pt_per_unit = 3) + + + fig = Figure(size = (900, 450)) + + afast = Axis(fig[1, 1][1, 1], xlabel = "time", ylabel = "fast") + aslow = Axis(fig[2, 1][1, 1], xlabel = "time", ylabel = "slow") + + lines!(afast, tplot, spunup_state[N + 1, (10 * ss_rate + 1):ss_rate:end], color = :red) + lines!(aslow, tplot, spunup_state[1, (10 * ss_rate + 1):ss_rate:end], color = :blue) + + # save + save(joinpath(output_directory, case * "_spinup_state1.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_spinup_state1.pdf"), fig, pt_per_unit = 3) + @info "plotted spin-up" + + ### + ### Generate perfect-model data + ### + + #estimate covariance + n_sample_cov = 100 + + new_state, new_t = lorenz_solve(spunup_state, lsettings, true_params) + data_sample = process_trajectory_to_data(window, new_state, new_t) + + data_samples = zeros(size(data_sample, 1), n_sample_cov) + data_samples[:, 1] = data_sample + + fig = Figure(size = (900, 450)) + afast = Axis(fig[1, 1][1, 1], xlabel = "time", ylabel = "fast") + aslow = Axis(fig[2, 1][1, 1], xlabel = "time", ylabel = "slow") + + for i in 2:n_sample_cov + # extend trajectory from end of new_state + new_state, new_t = lorenz_solve(new_state, lsettings, true_params) #integrate another window from last state + # calculate data sample + data_samples[:, i] = process_trajectory_to_data(window, new_state, new_t) # process the window + + #plot trajectory + ss_freq = length(new_t) / 2010.0 + ss_rate = Int64(ceil(ss_freq)) + tplot = new_t[Int64(floor(length(new_t) / 2)):ss_rate:end] + + lines!(afast, tplot, new_state[N + 1, Int64(floor(length(new_t) / 2)):ss_rate:end], color = :red, alpha = 0.1) + lines!( + aslow, + tplot, + new_state[1, Int64(floor(length(new_t) / 2)):ss_rate:end], + color = :blue, + alpha = 0.1, + label = "$(n_sample_cov) samples", + ) + axislegend(aslow, merge = true, unique = true) + end + # save + save(joinpath(output_directory, case * "_datatrajectory_state1.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_datatrajectory_state1.pdf"), fig, pt_per_unit = 3) + + Γ = cov(data_samples, dims = 2) # estimate covariance from samples + # add a little additive and multiplicative inflation + Γ += 1e4 * eps() * I # 10^-12 just to make things nonzero + blocksize = Int64(size(Γ, 1) / 5) # known block structure + #meanblocks = [mean([Γ[i, i] for i in ((j - 1) * blocksize + 1):(j * blocksize)]) for j in 1:5] + #Γ += 1e-4* kron(Diagonal(meanblocks),I(blocksize)) # this will add scaled noise to the diagonal scaled by the block + + y_mean = mean(data_samples, dims = 2) + y = data_samples[:, shuffle(rng, 1:n_sample_cov)[1]] # random data point as the data + + # build a nice observation object from this (blocks assumption) + full_observation = Observation(Dict("samples" => y, "covariances" => Γ, "names" => "full")) + + observation_cases = [ + "all-block", + "fast-slow-block", + "fast-slow-block-inconsistent", # causes error when calc timestep + "fast-slow-noxy", + ] + observation_case = observation_cases[1] + + if observation_case == "all-block" + data_block_names = ["", "", "", "", ""] + + observation_vec = [] + for i in 1:length(data_block_names) + idx = ((i - 1) * blocksize + 1):(i * blocksize) + push!( + observation_vec, + Observation(Dict("samples" => y[idx], "covariances" => Γ[idx, idx], "names" => data_block_names[i])), + ) + end + observation = combine_observations(observation_vec) + elseif observation_case == "fast-slow-block" + + data_block_names = ["", ""] + idx_slow = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [1, 3]]) #slow + idx_fast = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [2, 4, 5]]) #fast + + observation_vec = [ + Observation( + Dict("samples" => y[idx_slow], "covariances" => Γ[idx_slow, idx_slow], "names" => data_block_names[1]), + ), + Observation( + Dict("samples" => y[idx_fast], "covariances" => Γ[idx_fast, idx_fast], "names" => data_block_names[2]), + ), + ] + elseif observation_case == "fast-slow-block-inconsistent" + + data_block_names = ["", ""] + idx_slow = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [1, 3, 5]]) #slow + idx_fast = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [2, 4, 5]]) #fast + + observation_vec = [ + Observation( + Dict("samples" => y[idx_slow], "covariances" => Γ[idx_slow, idx_slow], "names" => data_block_names[1]), + ), + Observation( + Dict("samples" => y[idx_fast], "covariances" => Γ[idx_fast, idx_fast], "names" => data_block_names[2]), + ), + ] + elseif observation_case == "fast-slow-noxy" + + data_block_names = ["", ""] + idx_slow = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [1, 3]]) #slow + idx_fast = reduce(vcat, [((i - 1) * blocksize + 1):(i * blocksize) for i in [2, 4]]) #fast + + observation_vec = [ + Observation( + Dict("samples" => y[idx_slow], "covariances" => Γ[idx_slow, idx_slow], "names" => data_block_names[1]), + ), + Observation( + Dict("samples" => y[idx_fast], "covariances" => Γ[idx_fast, idx_fast], "names" => data_block_names[2]), + ), + ] + end + observation = combine_observations(observation_vec) + + ## group parameter-observation pairs for second experiment + if observation_case == "all-block" + group_identifiers = Dict(["F", "G"] => ["", ""], ["h", "c", "b"] => ["", "", ""]) + elseif observation_case == "fast-slow-block" + group_identifiers = Dict(["F", "G"] => [""], ["h", "c", "b"] => [""]) + elseif observation_case == "fast-slow-block-inconsistent" + group_identifiers = Dict(["F", "G"] => [""], ["h", "c", "b"] => [""]) + elseif observation_case == "fast-slow-block-noxy" + group_identifiers = Dict(["F", "G"] => [""], ["h", "c", "b"] => [""]) + end + + update_groups = create_update_groups(priors, observation, group_identifiers) + + + + fig = Figure(size = (450, 450)) + aΓ = Axis(fig[1, 1][1, 1]) + adata = Axis(fig[2, 1][1, 1]) + + heatmap!(aΓ, Γ) + sqrt_inv_Γ = sqrt(inv(Γ)) + series!(adata, (sqrt_inv_Γ * data_samples)', solid_color = :black, label = "normalized data") #plots each row as new plot + # save + save(joinpath(output_directory, case * "_datasamples.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_datasamples.pdf"), fig, pt_per_unit = 3) + + @info "constructed and plotted perfect experiment data and noise" + + # may need to condition Gamma for posdef etc. or add shrinkage estimation + + ### + ### (a) Configure and solve ensemble inversion + ### + + # EKP parameters + N_ens = 40 # number of ensemble members + N_iter = 5 # number of EKI iterations + # initial parameters: N_params x N_ens + initial_params = construct_initial_ensemble(rng, priors, N_ens) + + ekiobj = + EKP.EnsembleKalmanProcess(initial_params, full_observation, Inversion(), scheduler = DataMisfitController()) + @info "Built EKP object" + + # EKI iterations + println("EKP inversion error:") + err = zeros(N_iter) + for i in 1:N_iter + ϕ_i = get_ϕ_final(priors, ekiobj) # the `ϕ` indicates that the `params_i` are in the constrained space + g_ens = run_G_ensemble(spunup_state, lsettings, ϕ_i, window, length(y)) + EKP.update_ensemble!(ekiobj, g_ens) + end + @info "Calibrated parameters with EKP" + # EKI results: Has the ensemble collapsed toward the truth? + println("True parameters: ") + println(true_params) + + println("\nEKI results:") + println(get_ϕ_mean_final(priors, ekiobj)) + + u_stored = get_u(ekiobj, return_array = false) + g_stored = get_g(ekiobj, return_array = false) + + @save output_directory * "parameter_storage.jld2" u_stored + @save output_directory * "output_storage.jld2" g_stored Γ + + # Plots + Γ = get_obs_noise_cov(full_observation) + y = get_obs(full_observation) + fig = Figure(size = (450, 450)) + aprior = Axis(fig[1, 1][1, 1]) + apost = Axis(fig[2, 1][1, 1]) + g_prior = get_g(ekiobj, 1) + g_post = get_g_final(ekiobj) + data_std = sqrt.([Γ[i, i] for i in 1:size(Γ, 1)]) + data_dim = length(y) + dplot = 1:data_dim + + + lines!(aprior, dplot, sqrt_inv_Γ * y, color = (:black, 0.5), label = "data") #plots each row as new plot + lines!(apost, dplot, sqrt_inv_Γ * y, color = (:black, 0.5), label = "data") #plots each row as new plot + band!( + aprior, + dplot, + sqrt_inv_Γ * (y_mean - 2 * data_std)[:], + sqrt_inv_Γ * (y_mean + 2 * data_std)[:], + color = (:grey, 0.2), + ) #estimated 2*std bands about a mean + band!( + apost, + dplot, + sqrt_inv_Γ * (y_mean - 2 * data_std)[:], + sqrt_inv_Γ * (y_mean + 2 * data_std)[:], + color = (:grey, 0.2), + ) + for idx in 1:N_ens + lines!(aprior, dplot, sqrt_inv_Γ * g_prior[:, idx], color = :orange, alpha = 0.1, label = "prior") #plots each row as new plot + lines!(apost, dplot, sqrt_inv_Γ * g_post[:, idx], color = :blue, alpha = 0.1, label = "posterior") #plots each row as new plot + end + axislegend(aprior, merge = true, unique = true) + axislegend(apost, merge = true, unique = true) + + # save + save(joinpath(output_directory, case * "_datasamples-prior-post.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_datasamples-prior-post.pdf"), fig, pt_per_unit = 3) + + @info "plotted results of perfect experiment" + + ### + ### (b) Repeat exp (a) with UpdateGroups + ### + + # We see that + bs = Int64(size(Γ, 1) / 5) # known block structure + + + println("update groups:") + println("**************") + println(get_group_id.(update_groups)) + + ekiobj_grouped = EKP.EnsembleKalmanProcess(initial_params, observation, Inversion(), update_groups = update_groups) + @info "Built grouped EKP object" + + # EKI iterations + println("EKP inversion error:") + err = zeros(N_iter) + for i in 1:N_iter + ϕ_i = get_ϕ_final(priors, ekiobj_grouped) # the `ϕ` indicates that the `params_i` are in the constrained space + g_ens = run_G_ensemble(spunup_state, lsettings, ϕ_i, window, length(y)) + EKP.update_ensemble!(ekiobj_grouped, g_ens) + end + @info "Calibrated parameters with grouped EKP" + # EKI results: Has the ensemble collapsed toward the truth? + println("True parameters: ") + println(true_params) + + println("\nEKI results:") + println(get_ϕ_mean_final(priors, ekiobj_grouped)) + + u_stored = get_u(ekiobj_grouped, return_array = false) + g_stored = get_g(ekiobj_grouped, return_array = false) + + @save output_directory * "parameter_storage_grouped.jld2" u_stored + @save output_directory * "output_storage_grouped.jld2" g_stored + + + # Plots + Γ = get_obs_noise_cov(full_observation) # get the data/noise from the true statistics, despite our assumptions + y = get_obs(full_observation) + fig = Figure(size = (450, 450)) + aprior = Axis(fig[1, 1][1, 1]) + apost = Axis(fig[2, 1][1, 1]) + g_prior = get_g(ekiobj_grouped, 1) + g_post = get_g_final(ekiobj_grouped) + data_std = sqrt.([Γ[i, i] for i in 1:size(Γ, 1)]) + data_dim = length(y) + dplot = 1:data_dim + lines!(aprior, dplot, sqrt_inv_Γ * y, color = (:black, 0.5), label = "data") #plots each row as new plot + lines!(apost, dplot, sqrt_inv_Γ * y, color = (:black, 0.5), label = "data") #plots each row as new plot + band!( + aprior, + dplot, + sqrt_inv_Γ * (y_mean - 2 * data_std)[:], + sqrt_inv_Γ * (y_mean + 2 * data_std)[:], + color = (:grey, 0.2), + ) #estimated 2*std bands about a mean + band!( + apost, + dplot, + sqrt_inv_Γ * (y_mean - 2 * data_std)[:], + sqrt_inv_Γ * (y_mean + 2 * data_std)[:], + color = (:grey, 0.2), + ) + for idx in 1:N_ens + lines!(aprior, dplot, sqrt_inv_Γ * g_prior[:, idx], color = :orange, alpha = 0.1, label = "prior") #plots each row as new plot + lines!(apost, dplot, sqrt_inv_Γ * g_post[:, idx], color = :blue, alpha = 0.1, label = "posterior") #plots each row as new plot + end + axislegend(aprior, merge = true, unique = true) + axislegend(apost, merge = true, unique = true) + + # save + save(joinpath(output_directory, case * "_datasamples-prior-post-grouped.png"), fig, px_per_unit = 3) + save(joinpath(output_directory, case * "_datasamples-prior-post-grouped.pdf"), fig, pt_per_unit = 3) + + @info "plotted results of perfect experiment" + +end + +main() diff --git a/examples/UpdateGroups/data_processing.jl b/examples/UpdateGroups/data_processing.jl new file mode 100644 index 000000000..d07a53a97 --- /dev/null +++ b/examples/UpdateGroups/data_processing.jl @@ -0,0 +1,52 @@ + +struct ProcessingWindow{FT <: AbstractFloat, SS <: AbstractString} + "start time of processing window" + t_start::FT + "end time of processing window" + t_end::FT + "subwindow size" + sw_size::Int + "sliding subwindow, or batching subwindow" + slide_or_batch::SS +end + +function process_trajectory_to_data(pw::ProcessingWindow, xn, t) + + # Define averaging indices range - relative to trajectory start time + indices = findall(x -> (x > pw.t_start) && (x < pw.t_end), t .- minimum(t)) + + # Define the subwindows' starts and ends, and the number of them + if pw.slide_or_batch == "batch" + n_subwindow = Int64(floor(length(indices) / pw.sw_size)) + sw_start = 1:(pw.sw_size):(n_subwindow * pw.sw_size - 1) + sw_end = (pw.sw_size):(pw.sw_size):(n_subwindow * pw.sw_size) + elseif pw.slide_or_batch == "slide" + n_subwindow = length(indices) - pw.sw_size + sw_start = 1:n_subwindow + sw_end = (pw.sw_size):(n_subwindow + pw.sw_size) + else + throw( + ArgumentError, + "ProcessingWindow.slide_or_batch must be \"slide\" or \"batch\", received $pw.slide_or_batch", + ) + end + + # calculate first and second moments over the subwindows + N = Int64(size(xn, 1) / 2) + slow_id = 1:N + fast_id = (N + 1):(2 * N) + + slow_mean_sw = [mean(vcat(xn[slow_id, indices[sw_start[i]:sw_end[i]]])) for i in 1:n_subwindow] + fast_mean_sw = [mean(vcat(xn[fast_id, indices[sw_start[i]:sw_end[i]]])) for i in 1:n_subwindow] + + slow_meansq_sw = [mean(vcat(xn[slow_id, indices[sw_start[i]:sw_end[i]]] .^ 2)) for i in 1:n_subwindow] + fast_meansq_sw = [mean(vcat(xn[fast_id, indices[sw_start[i]:sw_end[i]]] .^ 2)) for i in 1:n_subwindow] + + slowfast_mean_sw = [ + mean(vcat(xn[fast_id, indices[sw_start[i]:sw_end[i]]] .* xn[slow_id, indices[sw_start[i]:sw_end[i]]])) for + i in 1:n_subwindow + ] + + # Combine (, , , , ) + return vcat(slow_mean_sw..., fast_mean_sw..., slow_meansq_sw..., fast_meansq_sw..., slowfast_mean_sw...) +end diff --git a/examples/UpdateGroups/lorenz_model.jl b/examples/UpdateGroups/lorenz_model.jl new file mode 100644 index 000000000..f72f998b3 --- /dev/null +++ b/examples/UpdateGroups/lorenz_model.jl @@ -0,0 +1,125 @@ +struct LSettings + # Number of longitude steps + N::Int32 + # Timestep + dt::Float64 + # end time + t_end::Float64 + # Initial perturbation + initial::Union{Array{Float64}, Nothing} + +end + +struct LParams{FT <: AbstractFloat, VV <: AbstractVector} + # Slow - Mean forcing + F::VV + # Slow - Coupling strength + G::FT + # Fast - Coupling strength + h::FT + # fast timescale separation + c::FT + # Fast - Nonlinearity + b::FT +end + + +# Forward pass of the Lorenz 96 model +#=function lorenz_forward(settings::LSettings, params::LParams) + # run the Lorenz simulation + xn, t = lorenz_solve(settings, params) + # Get statistics + gt = stats(settings, xn, t) + return gt +end +=# + +# initial run +function lorenz_solve(settings::LSettings, params::LParams) + X = fill(Float64(0.0), 2 * settings.N, 1) #fast+slow + if !isnothing(settings.initial) + X = X .+ settings.initial + end + return lorenz_solve(X, settings, params) +end + +# Solve the Lorenz 96 system +function lorenz_solve(X::Matrix, settings::LSettings, params::LParams) + X_next = X[:, end] #take the final state + # Initialize + nstep = Int32(ceil(settings.t_end / settings.dt)) + xn = zeros(Float64, 2 * settings.N, nstep) + t = zeros(Float64, nstep) + # March forward in time + for j in 1:nstep + t[j] = settings.dt * j + X_next = RK4(X_next, settings.dt, settings.N, params, t[j]) + xn[:, j] = X_next + end + # Output + return xn, t + +end + +# Lorenz-96 system with one substep +# f = dx/dt +# Inputs: x: state, N: longitude steps, F: forcing +function rk_inner(x, N, params, t) + + #get lorenz parameters + (F, G, h, c, b) = (params.F, params.G, params.h, params.c, params.b) + F_local = params.F[1] + params.F[2] * sin(params.F[3] * 2 * π * t) + + slow_id = 1:N + fast_id = (N + 1):(2 * N) + xs = x[slow_id] + xf = x[fast_id] + slow = zeros(Float64, N) + fast = zeros(Float64, N) + xnew = zeros(size(x)) + # Loop over N positions + + # dynamics - slow (F,G) + #= + # Damping system with coupling and force + for i = 1:N + slow[i] = - xs[i] + F_local - G*xf[i] + end + =# + # Damping system with coupling and force + for i in 3:(N - 1) + slow[i] = -xs[i - 1] * (xs[i - 2] - xs[i + 1]) - xs[i] + F_local - G * xf[i] + end + # Periodic BC + slow[2] = -xs[1] * (xs[N] - xs[3]) - xs[2] + F_local - G * xf[2] + slow[1] = -xs[N] * (xs[N - 1] - xs[2]) - xs[1] + F_local - G * xf[1] + slow[N] = -xs[N - 1] * (xs[N - 2] - xs[1]) - xs[N] + F_local - G * xf[N] + + # dynamics - fast (h,c,b) + # L96 system with coupling to slow + for i in 2:(N - 2) + fast[i] = -c * b * xf[i + 1] * (xf[i + 2] - xf[i - 1]) - c * xf[i] + h * c / b * xs[i] + end + # periodic boundary conditions + fast[1] = -c * b * xf[2] * (xf[3] - xf[N]) - c * xf[1] + h * c / b * xs[1] + fast[N - 1] = -c * b * xf[N] * (xf[1] - xf[N - 2]) - c * xf[N - 1] + h * c / b * xs[N - 1] + fast[N] = -c * b * xf[1] * (xf[2] - xf[N - 1]) - c * xf[N] + h * c / b * xs[N] + + # return + xnew[slow_id] = slow + xnew[fast_id] = fast + return xnew +end + +# RK4 solve +function RK4(xold, dt, N, F, t) + # Predictor steps + k1 = rk_inner(xold, N, F, t) + k2 = rk_inner(xold + k1 * dt / 2.0, N, F, t) + k3 = rk_inner(xold + k2 * dt / 2.0, N, F, t) + k4 = rk_inner(xold + k3 * dt, N, F, t) + # Step + xnew = xold + (dt / 6.0) * (k1 + 2.0 * k2 + 2.0 * k3 + k4) + # Output + return xnew +end diff --git a/examples/UpdateGroups/plot_covs.jl b/examples/UpdateGroups/plot_covs.jl new file mode 100644 index 000000000..939187864 --- /dev/null +++ b/examples/UpdateGroups/plot_covs.jl @@ -0,0 +1,40 @@ +using JLD2, LinearAlgebra, Statistics +using CairoMakie, ColorSchemes +using EnsembleKalmanProcesses.DataContainers + + +function main() + output_directory = "output" + + u_iters = JLD2.load(joinpath(output_directory, "parameter_storage.jld2"))["u_stored"] + g_iters = JLD2.load(joinpath(output_directory, "output_storage.jld2"))["g_stored"] + + iters = 1:5 + for iter in iters + U = get_data(u_iters[iter]) + G = get_data(g_iters[iter]) + + Cuu = cov(U, dims = 2) + Cgg = cov(G, dims = 2) + Um = mean(U, dims = 2) + Gm = mean(G, dims = 2) + Cug = 1 / (size(G, 2) - 1) * ((U .- Um) * (G .- Gm)')' # so u are rows + + + fig = Figure(size = (3 * 450, 450)) # size is backwards to what you expect + + auu = Axis(fig[1, 1], title = "Cuu") + aug = Axis(fig[1, 2], title = "Cug") + agg = Axis(fig[1, 3], title = "Cgg") + + heatmap!(auu, Cuu) + heatmap!(aug, Cug) + heatmap!(agg, Cgg) + + current_figure() + + save(joinpath(output_directory, "covs_iter_$iter.png"), fig, px_per_unit = 3) + end +end + +main() diff --git a/src/EnsembleKalmanInversion.jl b/src/EnsembleKalmanInversion.jl index 0228fdcbb..9b39e2b0d 100644 --- a/src/EnsembleKalmanInversion.jl +++ b/src/EnsembleKalmanInversion.jl @@ -58,8 +58,8 @@ function eki_update( cov_est = cov([u; g], dims = 2, corrected = false) # [(N_par + N_obs)×(N_par + N_obs)] - # Localization - cov_localized = get_localizer(ekp).localize(cov_est) + # Localization - a function taking in (cov, float-type, n_par, n_obs, n_ens) + cov_localized = get_localizer(ekp).localize(cov_est, FT, size(u, 1), size(g, 1), size(u, 2)) cov_uu, cov_ug, cov_gg = get_cov_blocks(cov_localized, size(u, 1)) # N_obs × N_obs \ [N_obs × N_ens] @@ -90,16 +90,20 @@ end Updates the ensemble according to an Inversion process. Inputs: - - ekp :: The EnsembleKalmanProcess to update. - - g :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). - - process :: Type of the EKP. - - deterministic_forward_map :: Whether output `g` comes from a deterministic model. - - failed_ens :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. + - `ekp` :: The EnsembleKalmanProcess to update. + - `g` :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). + - `process` :: Type of the EKP. + - `u_idx` :: indices of u to update (see `UpdateGroup`) + - `g_idx` :: indices of g,y,Γ with which to update u (see `UpdateGroup`) + - `deterministic_forward_map` :: Whether output `g` comes from a deterministic model. + - `failed_ens` :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. """ function update_ensemble!( ekp::EnsembleKalmanProcess{FT, IT, Inversion}, g::AbstractMatrix{FT}, - process::Inversion; + process::Inversion, + u_idx::Vector{Int}, + g_idx::Vector{Int}; deterministic_forward_map::Bool = true, failed_ens = nothing, ) where {FT, IT} @@ -111,28 +115,21 @@ function update_ensemble!( end # u: N_par × N_ens # g: N_obs × N_ens - u = get_u_final(ekp) - N_obs = size(g, 1) - cov_init = cov(u, dims = 2) - - if ekp.verbose - if get_N_iterations(ekp) == 0 - @info "Iteration 0 (prior)" - @info "Covariance trace: $(tr(cov_init))" - end - - @info "Iteration $(get_N_iterations(ekp)+1) (T=$(sum(get_Δt(ekp))))" - end + u = get_u_final(ekp)[u_idx, :] + g = g[g_idx, :] + N_obs = length(g_idx) + obs_noise_cov = get_obs_noise_cov(ekp)[g_idx, g_idx] + obs_mean = get_obs(ekp)[g_idx] fh = get_failure_handler(ekp) # Scale noise using Δt - scaled_obs_noise_cov = get_obs_noise_cov(ekp) / get_Δt(ekp)[end] + scaled_obs_noise_cov = obs_noise_cov / get_Δt(ekp)[end] noise = sqrt(scaled_obs_noise_cov) * rand(get_rng(ekp), MvNormal(zeros(N_obs), I), get_N_ens(ekp)) # Add obs (N_obs) to each column of noise (N_obs × N_ens) if # G is deterministic, else just repeat the observation - y = add_stochastic_perturbation ? (get_obs(ekp) .+ noise) : repeat(get_obs(ekp), 1, get_N_ens(ekp)) + y = add_stochastic_perturbation ? (obs_mean .+ noise) : repeat(obs_mean, 1, get_N_ens(ekp)) if isnothing(failed_ens) _, failed_ens = split_indices_by_success(g) @@ -143,17 +140,5 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) - push!(ekp.g, DataContainer(g, data_are_columns = true)) - - # Store error - compute_error!(ekp) - - # Diagnostics - cov_new = cov(u, dims = 2) - - if ekp.verbose - @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" - end - return u end diff --git a/src/EnsembleKalmanProcess.jl b/src/EnsembleKalmanProcess.jl index 3285f197e..3de19524c 100644 --- a/src/EnsembleKalmanProcess.jl +++ b/src/EnsembleKalmanProcess.jl @@ -168,6 +168,7 @@ struct EnsembleKalmanProcess{ P <: Process, LRS <: LearningRateScheduler, ACC <: Accelerator, + VV <: AbstractVector, } "array of stores for parameters (`u`), each of size [`N_par × N_ens`]" u::Array{DataContainer{FT}} @@ -185,6 +186,8 @@ struct EnsembleKalmanProcess{ accelerator::ACC "stored vector of timesteps used in each EK iteration" Δt::Vector{FT} + "vector of update groups, defining which parameters should be updated by which data" + update_groups::VV "the particular EK process (`Inversion` or `Sampler` or `Unscented` or `TransformInversion` or `SparseInversion`)" process::P "Random number generator object (algorithm + seed) used for sampling and noise, for reproducibility. Defaults to `Random.GLOBAL_RNG`." @@ -203,9 +206,10 @@ function EnsembleKalmanProcess( observation_series::OS, process::P, configuration::Dict; + update_groups::Union{Nothing, VV} = nothing, rng::AbstractRNG = Random.GLOBAL_RNG, verbose::Bool = false, -) where {FT <: AbstractFloat, P <: Process, OS <: ObservationSeries} +) where {FT <: AbstractFloat, P <: Process, OS <: ObservationSeries, VV <: AbstractVector} #initial parameters stored as columns init_params = DataContainer(params, data_are_columns = true) @@ -225,7 +229,6 @@ function EnsembleKalmanProcess( obs_for_minibatch = get_obs(observation_series) # get stacked observation over minibatch obs_size_for_minibatch = length(obs_for_minibatch) # number of dims in the stacked observation - IT = typeof(N_ens) #store for model evaluations g = [] @@ -234,6 +237,16 @@ function EnsembleKalmanProcess( # timestep store Δt = FT[] + # defined groups of parameters to be updated by groups of data + obs_size = length(get_obs(get_observations(observation_series)[1])) #deduce size just from first observation + if isnothing(update_groups) + groups = [UpdateGroup(1:N_par, 1:obs_size)] # vec length 1 + else + groups = update_groups + end + update_group_consistency(groups, N_par, obs_size) # consistency checks + VVV = typeof(groups) + scheduler = configuration["scheduler"] RS = typeof(scheduler) @@ -254,14 +267,15 @@ function EnsembleKalmanProcess( if isa(process, TransformInversion) && !(isa(configuration["localization_method"], NoLocalization)) throw(ArgumentError("`TransformInversion` cannot currently be used with localization.")) end - localizer = Localizer(configuration["localization_method"], N_par, obs_size_for_minibatch, N_ens, FT) + + localizer = Localizer(configuration["localization_method"], N_ens, FT) if verbose @info "Initializing ensemble Kalman process of type $(nameof(typeof(process)))\nNumber of ensemble members: $(N_ens)\nLocalization: $(nameof(typeof(localizer)))\nFailure handler: $(nameof(typeof(failure_handler)))\nScheduler: $(nameof(typeof(scheduler)))\nAccelerator: $(nameof(typeof(accelerator)))" end - EnsembleKalmanProcess{FT, IT, P, RS, AC}( + EnsembleKalmanProcess{FT, IT, P, RS, AC, VVV}( [init_params], observation_series, N_ens, @@ -270,6 +284,7 @@ function EnsembleKalmanProcess( scheduler, accelerator, Δt, + groups, process, rng, failure_handler, @@ -287,10 +302,12 @@ function EnsembleKalmanProcess( failure_handler_method::Union{Nothing, FM} = nothing, localization_method::Union{Nothing, LM} = nothing, Δt = nothing, + update_groups::Union{Nothing, VV} = nothing, rng::AbstractRNG = Random.GLOBAL_RNG, verbose::Bool = false, ) where { FT <: AbstractFloat, + VV <: AbstractVector, LRS <: LearningRateScheduler, ACC <: Accelerator, P <: Process, @@ -319,7 +336,15 @@ function EnsembleKalmanProcess( configuration["localization_method"] = localization_method end - return EnsembleKalmanProcess(params, observation_series, process, configuration, rng = rng, verbose = verbose) + return EnsembleKalmanProcess( + params, + observation_series, + process, + configuration, + update_groups = update_groups, + rng = rng, + verbose = verbose, + ) end function EnsembleKalmanProcess( @@ -554,6 +579,35 @@ function get_failure_handler(ekp::EnsembleKalmanProcess) return ekp.failure_handler end +""" + get_update_groups(ekp::EnsembleKalmanProcess) +Return update_groups type of EnsembleKalmanProcess. +""" +function get_update_groups(ekp::EnsembleKalmanProcess) + return ekp.update_groups +end + +""" + list_update_groups_over_minibatch(ekp::EnsembleKalmanProcess) +Return u_groups and g_groups for the current minibatch, i.e. the subset of +""" +function list_update_groups_over_minibatch(ekp::EnsembleKalmanProcess) + os = get_observation_series(ekp) + len_mb = length(get_current_minibatch(os)) # number of obs per batch + len_obs = Int(length(get_obs(os)) / len_mb) # length of obs in a batch + update_groups = get_update_groups(ekp) + u_groups = get_u_group.(update_groups) # update_group indices + g_groups = get_g_group.(update_groups) + # extend group indices from one obs to the minibatch of obs + new_u_groups = [reduce(vcat, [(i - 1) * len_obs .+ u_group for i in 1:len_mb]) for u_group in u_groups] + new_g_groups = [reduce(vcat, [(i - 1) * len_obs .+ g_group for i in 1:len_mb]) for g_group in g_groups] + + return new_u_groups, new_g_groups +end + + + + """ get_process(ekp::EnsembleKalmanProcess) Return `process` field of EnsembleKalmanProcess. @@ -877,13 +931,40 @@ function update_ensemble!( terminate = calculate_timestep!(ekp, g, Δt_new) if isnothing(terminate) - u = update_ensemble!(ekp, g, get_process(ekp); ekp_kwargs...) + if ekp.verbose + cov_init = get_u_cov_final(ekp) + if get_N_iterations(ekp) == 0 + @info "Iteration 0 (prior)" + @info "Covariance trace: $(tr(cov_init))" + end + + @info "Iteration $(get_N_iterations(ekp)+1) (T=$(sum(get_Δt(ekp))))" + end + + u_groups, g_groups = list_update_groups_over_minibatch(ekp) + u = zeros(size(get_u_prior(ekp))) + + # update each u_block with every g_block + for (u_idx, g_idx) in zip(u_groups, g_groups) + u[u_idx, :] += update_ensemble!(ekp, g, get_process(ekp), u_idx, g_idx; ekp_kwargs...) + end + accelerate!(ekp, u) + if s > 0.0 multiplicative_inflation ? multiplicative_inflation!(ekp; s = s) : nothing additive_inflation ? additive_inflation!(ekp, additive_inflation_cov, s = s) : nothing end + # wrapping up + push!(ekp.g, DataContainer(g, data_are_columns = true)) # store g + compute_error!(ekp) + + if ekp.verbose + cov_new = get_u_cov_final(ekp) + @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" + end + else return terminate # true if scheduler has not stepped end @@ -924,6 +1005,5 @@ export Gaussian_2d export construct_initial_ensemble, construct_mean, construct_cov include("UnscentedKalmanInversion.jl") - # struct Accelerator include("Accelerators.jl") diff --git a/src/EnsembleKalmanProcesses.jl b/src/EnsembleKalmanProcesses.jl index 543aaf29c..998e7dd4e 100644 --- a/src/EnsembleKalmanProcesses.jl +++ b/src/EnsembleKalmanProcesses.jl @@ -8,7 +8,7 @@ include("DataContainers.jl") include("Observations.jl") include("Localizers.jl") include("TOMLInterface.jl") - +include("UpdateGroup.jl") # algorithmic updates include("EnsembleKalmanProcess.jl") diff --git a/src/EnsembleKalmanSampler.jl b/src/EnsembleKalmanSampler.jl index e2fbd6057..2e6ce7990 100644 --- a/src/EnsembleKalmanSampler.jl +++ b/src/EnsembleKalmanSampler.jl @@ -100,35 +100,29 @@ end Updates the ensemble according to a Sampler process. Inputs: - - ekp :: The EnsembleKalmanProcess to update. - - g :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). - - process :: Type of the EKP. - - failed_ens :: Indices of failed particles. If nothing, failures are computed as columns of `g` + - `ekp` :: The EnsembleKalmanProcess to update. + - `g` :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). + - `process` :: Type of the EKP. + - `u_idx` :: indices of u to update (see `UpdateGroup`) + - `g_idx` :: indices of g,y,Γ with which to update u (see `UpdateGroup`) + - `failed_ens` :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. """ function update_ensemble!( ekp::EnsembleKalmanProcess{FT, IT, Sampler{FT}}, g::AbstractMatrix{FT}, - process::Sampler{FT}; + process::Sampler{FT}, + u_idx::Vector{Int}, + g_idx::Vector{Int}; failed_ens = nothing, ) where {FT, IT} # u: N_ens × N_par # g: N_ens × N_obs u_old = get_u_final(ekp) - cov_init = get_u_cov_final(ekp) fh = get_failure_handler(ekp) - if ekp.verbose - if get_N_iterations(ekp) == 0 - @info "Iteration 0 (prior)" - @info "Covariance trace: $(tr(cov_init))" - end - - @info "Iteration $(get_N_iterations(ekp)+1) (T=$(sum(get_Δt(ekp))))" - end - if isnothing(failed_ens) _, failed_ens = split_indices_by_success(g) end @@ -138,19 +132,5 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u_old, g, failed_ens) - # store new parameters (and model outputs) - push!(ekp.g, DataContainer(g, data_are_columns = true)) - # u_old is N_ens × N_par, g is N_ens × N_obs, - # but stored in data container with N_ens as the 2nd dim - - compute_error!(ekp) - - # Diagnostics - cov_new = cov(u, dims = 2) - - if ekp.verbose - @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" - end - return u end diff --git a/src/EnsembleTransformKalmanInversion.jl b/src/EnsembleTransformKalmanInversion.jl index c060f8341..65a820243 100644 --- a/src/EnsembleTransformKalmanInversion.jl +++ b/src/EnsembleTransformKalmanInversion.jl @@ -19,7 +19,8 @@ TransformInversion() = TransformInversion([]) get_buffer(p::TI) where {TI <: TransformInversion} = p.buffer function FailureHandler(process::TransformInversion, method::IgnoreFailures) - failsafe_update(ekp, u, g, failed_ens) = etki_update(ekp, u, g) + failsafe_update(ekp, u, g, y, obs_noise_cov_inv, onci_idx, failed_ens) = + etki_update(ekp, u, g, y, obs_noise_cov_inv, onci_idx) return FailureHandler{TransformInversion, IgnoreFailures}(failsafe_update) end @@ -31,10 +32,11 @@ Provides a failsafe update that - updates the failed ensemble by sampling from the updated successful ensemble. """ function FailureHandler(process::TransformInversion, method::SampleSuccGauss) - function failsafe_update(ekp, u, g, failed_ens) + function failsafe_update(ekp, u, g, y, obs_noise_cov_inv, onci_idx, failed_ens) successful_ens = filter(x -> !(x in failed_ens), collect(1:size(g, 2))) n_failed = length(failed_ens) - u[:, successful_ens] = etki_update(ekp, u[:, successful_ens], g[:, successful_ens]) + u[:, successful_ens] = + etki_update(ekp, u[:, successful_ens], g[:, successful_ens], y, obs_noise_cov_inv, onci_idx) if !isempty(failed_ens) u[:, failed_ens] = sample_empirical_gaussian(get_rng(ekp), u[:, successful_ens], n_failed) end @@ -46,8 +48,10 @@ end """ etki_update( ekp::EnsembleKalmanProcess{FT, IT, TransformInversion}, - u::AbstractMatrix{FT}, - g::AbstractMatrix{FT}, + u::AbstractMatrix, + g::AbstractMatrix, + y::AbstractVector, + obs_noise_cov_inv::AbstractVector, ) where {FT <: Real, IT, CT <: Real} Returns the updated parameter vectors given their current values and @@ -55,12 +59,20 @@ the corresponding forward model evaluations. """ function etki_update( ekp::EnsembleKalmanProcess{FT, IT, TransformInversion}, - u::AbstractMatrix{FT}, - g::AbstractMatrix{FT}, -) where {FT <: Real, IT} - - y = get_obs(ekp) - inv_noise_scaling = get_Δt(ekp)[end] + u::AM1, + g::AM2, + y::AV1, + obs_noise_cov_inv::AV2, + onci_idx::AV3, +) where { + FT <: Real, + IT, + AM1 <: AbstractMatrix, + AM2 <: AbstractMatrix, + AV1 <: AbstractVector, + AV2 <: AbstractVector, + AV3 <: AbstractVector, +} m = size(u, 2) X = FT.((u .- mean(u, dims = 2)) / sqrt(m - 1)) @@ -83,13 +95,15 @@ function etki_update( end # construct I + Y' * Γ_inv * Y using only blocks γ_inv of Γ_inv - Γ_inv = get_obs_noise_cov_inv(ekp, build = false) # returns blocks of Γ_inv - γ_sizes = [size(γ_inv, 1) for γ_inv in Γ_inv] - shift = [0] - for (γs, γ_inv) in zip(γ_sizes, Γ_inv) - idx = (shift[1] + 1):(shift[1] + γs) - tmp[1][1:ys2, idx] = (inv_noise_scaling * γ_inv * Y[idx, :])' # NB: col(Y') * γ_inv = (γ_inv * row(Y))' - shift[1] = maximum(idx) + # this loop is very fast for diagonal, slow for nondiagonal + for (block_idx, local_idx, global_idx) in onci_idx + γ_inv = obs_noise_cov_inv[block_idx] + # This is cumbersome, but will retain e.g. diagonal type for matrix manipulations, else indexing converts back to matrix + if isa(γ_inv, Diagonal) # + tmp[1][1:ys2, global_idx] = (γ_inv.diag[local_idx] .* Y[global_idx, :])' # multiple each row of Y by γ_inv element + else #much slower + tmp[1][1:ys2, global_idx] = (γ_inv[local_idx, local_idx] * Y[global_idx, :])' # NB: col(Y') * γ_inv = (γ_inv * row(Y))' row-mult is faster + end end tmp[2][1:ys2, 1:ys2] = tmp[1][1:ys2, 1:ys1] * Y @@ -118,33 +132,49 @@ Inputs: - ekp :: The EnsembleKalmanProcess to update. - g :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). - process :: Type of the EKP. + - u_idx :: indices of u to update (see `UpdateGroup`) + - g_idx :: indices of g,y,Γ with which to update u (see `UpdateGroup`) - failed_ens :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. """ function update_ensemble!( ekp::EnsembleKalmanProcess{FT, IT, TransformInversion}, g::AbstractMatrix{FT}, - process::TransformInversion; + process::TransformInversion, + u_idx::Vector{Int}, + g_idx::Vector{Int}; failed_ens = nothing, kwargs..., ) where {FT, IT} - # u: N_par × N_ens - # g: N_obs × N_ens - u = get_u_final(ekp) - N_obs = size(g, 1) - cov_init = cov(u, dims = 2) - - if ekp.verbose - if get_N_iterations(ekp) == 0 - @info "Iteration 0 (prior)" - @info "Covariance trace: $(tr(cov_init))" + # update only u_idx parameters/ with g_idx data + # u: length(u_idx) × N_ens + # g: lenght(g_idx) × N_ens + u = get_u_final(ekp)[u_idx, :] + g = g[g_idx, :] + obs_mean = get_obs(ekp)[g_idx] + # get relevant inverse covariance blocks + obs_noise_cov_inv = get_obs_noise_cov_inv(ekp, build = false)# NEVER build=true for this - ruins scaling. + + # need to sweep over local blocks + γ_sizes = [size(γ_inv, 1) for γ_inv in obs_noise_cov_inv] + onci_idx = [] + shift = 0 + for (block_id, γs) in enumerate(γ_sizes) + loc_idx = intersect(1:γs, g_idx .- shift) + if !(length(loc_idx) == 0) + push!(onci_idx, (block_id, loc_idx, loc_idx .+ shift)) # end - - @info "Iteration $(get_N_iterations(ekp)+1) (T=$(sum(get_Δt(ekp))))" + shift += γs end + # obs_noise_cov_inv = [obs_noise_cov_inv[pair[1]][pair[2],pair[2]] for pair in local_intersect] # SLOW + + N_obs = length(g_idx) fh = get_failure_handler(ekp) + # Scale noise using Δt + y = get_obs(ekp) + if isnothing(failed_ens) _, failed_ens = split_indices_by_success(g) end @@ -152,19 +182,7 @@ function update_ensemble!( @info "$(length(failed_ens)) particle failure(s) detected. Handler used: $(nameof(typeof(fh).parameters[2]))." end - u = fh.failsafe_update(ekp, u, g, failed_ens) - - # store new parameters (and model outputs) - push!(ekp.g, DataContainer(g, data_are_columns = true)) - # Store error - compute_error!(ekp) - - # Diagnostics - cov_new = cov(u, dims = 2) - - if ekp.verbose - @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" - end + u = fh.failsafe_update(ekp, u, g, y, obs_noise_cov_inv, onci_idx, failed_ens) return u end diff --git a/src/GaussNewtonKalmanInversion.jl b/src/GaussNewtonKalmanInversion.jl index 7372a59e0..a658c91cc 100644 --- a/src/GaussNewtonKalmanInversion.jl +++ b/src/GaussNewtonKalmanInversion.jl @@ -73,7 +73,7 @@ function gnki_update( N_ens_successful = size(g, 2) cov_est = cov([u; g], dims = 2, corrected = false) # [(N_par + N_obs)×(N_par + N_obs)] - cov_localized = get_localizer(ekp).localize(cov_est) + cov_localized = get_localizer(ekp).localize(cov_est, FT, size(u, 1), size(g, 1), get_N_ens(ekp)) cov_uu, cov_ug, cov_gg = get_cov_blocks(cov_localized, size(u, 1)) process = get_process(ekp) prior_mean = process.prior_mean @@ -129,7 +129,9 @@ Inputs: function update_ensemble!( ekp::EnsembleKalmanProcess{FT, IT, GNI}, g::AbstractMatrix{FT}, - process::GNI; + process::GNI, + u_idx::Vector{Int}, + g_idx::Vector{Int}; deterministic_forward_map::Bool = true, failed_ens = nothing, ) where {FT, IT, GNI <: GaussNewtonInversion} @@ -141,18 +143,11 @@ function update_ensemble!( end # u: N_par × N_ens # g: N_obs × N_ens - u = get_u_final(ekp) - N_obs = size(g, 1) - cov_init = cov(u, dims = 2) - - if ekp.verbose - if get_N_iterations(ekp) == 0 - @info "Iteration 0 (prior)" - @info "Covariance trace: $(tr(cov_init))" - end - - @info "Iteration $(get_N_iterations(ekp)+1) (T=$(sum(get_Δt(ekp))))" - end + u = get_u_final(ekp)[u_idx, :] + g = g[g_idx, :] + N_obs = length(g_idx) + obs_noise_cov = get_obs_noise_cov(ekp)[g_idx, g_idx] + obs_mean = get_obs(ekp)[g_idx] fh = get_failure_handler(ekp) @@ -173,17 +168,5 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) - push!(ekp.g, DataContainer(g, data_are_columns = true)) - - # Store error - compute_error!(ekp) - - # Diagnostics - cov_new = cov(u, dims = 2) - - if ekp.verbose - @info "Covariance-weighted error: $(get_error(ekp)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" - end - return u end diff --git a/src/Localizers.jl b/src/Localizers.jl index ca792e93a..f11461f14 100644 --- a/src/Localizers.jl +++ b/src/Localizers.jl @@ -144,31 +144,36 @@ function kernel_function(kernel_ug, T, p, d) kernel = ones(T, p + d, p + d) kernel[1:p, (p + 1):end] = kernel_ug kernel[(p + 1):end, 1:p] = kernel_ug' - return (cov) -> kernel .* cov + return kernel end -"Uniform kernel constructor" -function Localizer(localization::NoLocalization, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - kernel_ug = ones(T, p, d) - return Localizer{NoLocalization, T}(kernel_function(kernel_ug, T, p, d)) +"Uniform kernel constructor" # p::IT, d::IT, J::IT, T = Float64 +function Localizer(localization::NoLocalization, J::Int, T = Float64) + #kernel_ug = ones(T,p,d) + return Localizer{NoLocalization, T}((cov, T, p, d, J) -> cov) end "Delta kernel localizer constructor" -function Localizer(localization::Delta, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - kernel_ug = T(1) * Matrix(I, p, d) - return Localizer{Delta, T}(kernel_function(kernel_ug, T, p, d)) +function Localizer(localization::Delta, J::Int, T = Float64) + #kernel_ug = T(1) * Matrix(I, p, d) + return Localizer{Delta, T}((cov, T, p, d, J) -> kernel_function(T(1) * Matrix(I, p, d), T, p, d) .* cov) end -"RBF kernel localizer constructor" -function Localizer(localization::RBF, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - l = localization.lengthscale +function create_rbf(l, T, p, d) kernel_ug = zeros(T, p, d) for i in 1:p for j in 1:d @inbounds kernel_ug[i, j] = exp(-(i - j) * (i - j) / (2 * l * l)) end end - return Localizer{RBF, T}(kernel_function(kernel_ug, T, p, d)) + return kernel_ug +end +"RBF kernel localizer constructor" +function Localizer(localization::RBF, J::Int, T = Float64) + #kernel_ug = create_rbf(localization.lengthscale,T,p,d) + return Localizer{RBF, T}( + (cov, T, p, d, J) -> kernel_function(create_rbf(localization.lengthscale, T, p, d), T, p, d) .* cov, + ) end "Localization kernel with Bernoulli trials as off-diagonal terms (symmetric)" @@ -194,19 +199,19 @@ end Localize using a Schur product with a random draw of a Bernoulli kernel matrix. Only the u–G(u) block is localized. """ function bernoulli_kernel_function(prob, T, p, d) - function get_kernel() - kernel = ones(T, p + d, p + d) - kernel_ug = bernoulli_kernel(prob, T, p, d) - kernel[1:p, (p + 1):end] = kernel_ug - kernel[(p + 1):end, 1:p] = kernel_ug' - return kernel - end - return (cov) -> get_kernel() .* cov + + kernel = ones(T, p + d, p + d) + kernel_ug = bernoulli_kernel(prob, T, p, d) + kernel[1:p, (p + 1):end] = kernel_ug + kernel[(p + 1):end, 1:p] = kernel_ug' + return kernel end "Randomized Bernoulli dropout kernel localizer constructor" -function Localizer(localization::BernoulliDropout, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - return Localizer{BernoulliDropout, T}(bernoulli_kernel_function(localization.prob, T, p, d)) +function Localizer(localization::BernoulliDropout, J::Int, T = Float64) + return Localizer{BernoulliDropout, T}( + (cov, T, p, d, J) -> bernoulli_kernel_function(localization.prob, T, p, d) .* cov, + ) end """ @@ -225,8 +230,8 @@ function sec(cov, α, r_0) end "Sampling error correction (Lee, 2021) constructor" -function Localizer(localization::SEC, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - return Localizer{SEC, T}((cov) -> sec(cov, localization.α, localization.r_0)) +function Localizer(localization::SEC, J::Int, T = Float64) + return Localizer{SEC, T}((cov, T, p, d, J) -> sec(cov, localization.α, localization.r_0)) end """ @@ -265,8 +270,8 @@ function sec_fisher(cov, N_ens) end "Sampling error correction (Flowerdew, 2015) constructor" -function Localizer(localization::SECFisher, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} - return Localizer{SECFisher, T}((cov) -> sec_fisher(cov, J)) +function Localizer(localization::SECFisher, J::Int, T = Float64) + return Localizer{SECFisher, T}((cov, T, p, d, J) -> sec_fisher(cov, J)) end """ @@ -360,7 +365,7 @@ end "Sampling error correction of Vishny, Morzfeld, et al. (2024) constructor" -function Localizer(localization::SECNice, p::IT, d::IT, J::IT, T = Float64) where {IT <: Int} +function Localizer(localization::SECNice, J::Int, T = Float64) if length(localization.std_of_corr) == 0 #i.e. if the user hasn't provided an interpolation dr = 0.001 grid = LinRange(-1, 1, Int(1 / dr + 1)) @@ -369,7 +374,7 @@ function Localizer(localization::SECNice, p::IT, d::IT, J::IT, T = Float64) wher end return Localizer{SECNice, T}( - (cov) -> sec_nice(cov, localization.std_of_corr[1], localization.δ_ug, localization.δ_gg, J, p, d), + (cov, T, p, d, J) -> sec_nice(cov, localization.std_of_corr[1], localization.δ_ug, localization.δ_gg, J, p, d), ) end diff --git a/src/Observations.jl b/src/Observations.jl index 3b9feb206..943760c87 100644 --- a/src/Observations.jl +++ b/src/Observations.jl @@ -122,7 +122,7 @@ function Observation(obs_dict::Dict) if !isa(samples, AbstractVector) # 1 -> [[1]] snew = [[samples]] else - T = promote_type((typeof(s) for s in samples)...) + T = eltype(samples) samples_tmp = [convert(T, s) for s in samples] # to re-infer eltype (if the user makes an eltype of "Any") if !isa(samples_tmp, AbstractVector{<:AbstractVector}) # [1,2] -> [[1,2]] snew = [samples] diff --git a/src/SparseEnsembleKalmanInversion.jl b/src/SparseEnsembleKalmanInversion.jl index 752f66931..7df928d45 100644 --- a/src/SparseEnsembleKalmanInversion.jl +++ b/src/SparseEnsembleKalmanInversion.jl @@ -133,7 +133,7 @@ function sparse_eki_update( cov_est = cov([u; g], [u; g], dims = 2, corrected = false) # [(N_par + N_obs)×(N_par + N_obs)] process = get_process(ekp) # Localization - cov_localized = get_localizer(ekp).localize(cov_est) + cov_localized = get_localizer(ekp).localize(cov_est, FT, size(u, 1), size(g, 1), get_N_ens(ekp)) cov_uu, cov_ug, cov_gg = get_cov_blocks(cov_localized, size(u, 1)) v = hcat(u', g') @@ -184,6 +184,8 @@ Inputs: - `ekp` :: The EnsembleKalmanProcess to update. - `g` :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). - `process` :: Type of the EKP. + - `u_idx` :: indices of u to update (see `UpdateGroup`) + - `g_idx` :: indices of g,y,Γ with which to update u (see `UpdateGroup`) - `deterministic_forward_map` :: Whether output `g` comes from a deterministic model. - `failed_ens` :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. @@ -191,7 +193,9 @@ Inputs: function update_ensemble!( ekp::EnsembleKalmanProcess{FT, IT, SparseInversion{FT}}, g::AbstractMatrix{FT}, - process::SparseInversion{FT}; + process::SparseInversion{FT}, + u_idx::Vector{Int}, + g_idx::Vector{Int}; deterministic_forward_map = true, failed_ens = nothing, ) where {FT, IT} @@ -220,12 +224,6 @@ function update_ensemble!( u = fh.failsafe_update(ekp, u, g, y, scaled_obs_noise_cov, failed_ens) - # store new parameters (and model outputs) - push!(ekp.g, DataContainer(g, data_are_columns = true)) - - # Store error - compute_error!(ekp) - # Check convergence cov_new = cov(get_u_final(ekp), dims = 2) diff --git a/src/UnscentedKalmanInversion.jl b/src/UnscentedKalmanInversion.jl index 6a029db9e..f55405795 100644 --- a/src/UnscentedKalmanInversion.jl +++ b/src/UnscentedKalmanInversion.jl @@ -293,7 +293,8 @@ function FailureHandler(process::Unscented, method::SampleSuccGauss) ] # Localization - cov_localized = get_localizer(uki).localize(cov_est) + FT = eltype(g_mean) + cov_localized = get_localizer(uki).localize(cov_est, FT, size(u_p, 1), size(g, 1), size(u_p, 2)) uu_p_cov, ug_cov, gg_cov = get_cov_blocks(cov_localized, size(u_p, 1)) if process.impose_prior @@ -312,9 +313,6 @@ function FailureHandler(process::Unscented, method::SampleSuccGauss) push!(process.obs_pred, g_mean) # N_ens x N_data push!(process.u_mean, u_mean) # N_ens x N_params push!(process.uu_cov, uu_cov) # N_ens x N_data - push!(uki.g, DataContainer(g, data_are_columns = true)) - - compute_error!(uki) end function failsafe_update(uki, u, g, failed_ens) @@ -632,7 +630,7 @@ function update_ensemble_analysis!( ug_cov' gg_cov ] # Localization - cov_localized = get_localizer(uki).localize(cov_est) + cov_localized = get_localizer(uki).localize(cov_est, FT, size(u_p, 1), size(g, 1), size(u_p, 2)) uu_p_cov, ug_cov, gg_cov = get_cov_blocks(cov_localized, size(u_p)[1]) tmp = ug_cov / gg_cov @@ -644,9 +642,6 @@ function update_ensemble_analysis!( push!(process.obs_pred, g_mean) # N_ens x N_data push!(process.u_mean, u_mean) # N_ens x N_params push!(process.uu_cov, uu_cov) # N_ens x N_data - push!(uki.g, DataContainer(g, data_are_columns = true)) - - compute_error!(uki) end @@ -664,29 +659,22 @@ Inputs: - `uki` :: The EnsembleKalmanProcess to update. - `g_in` :: Model outputs, they need to be stored as a `N_obs × N_ens` array (i.e data are columms). - `process` :: Type of the EKP. + - `u_idx` :: indices of u to update (see `UpdateGroup`) + - `g_idx` :: indices of g,y,Γ with which to update u (see `UpdateGroup`) - `failed_ens` :: Indices of failed particles. If nothing, failures are computed as columns of `g` with NaN entries. """ function update_ensemble!( uki::EnsembleKalmanProcess{FT, IT, U}, g_in::AbstractMatrix{FT}, - process::U; + process::U, + u_idx::Vector{Int}, + g_idx::Vector{Int}; failed_ens = nothing, ) where {FT <: AbstractFloat, IT <: Int, U <: Unscented} #catch works when g_in non-square u_p_old = get_u_final(uki) - if uki.verbose - cov_init = get_u_cov_final(uki) - - if get_N_iterations(uki) == 0 - @info "Iteration 0 (prior)" - @info "Covariance trace: $(tr(cov_init))" - end - - @info "Iteration $(get_N_iterations(uki)+1) (T=$(sum(get_Δt(uki))))" - end - fh = get_failure_handler(uki) if isnothing(failed_ens) @@ -698,12 +686,6 @@ function update_ensemble!( u_p = fh.failsafe_update(uki, u_p_old, g_in, failed_ens) - - if uki.verbose - cov_new = get_u_cov_final(uki) - @info "Covariance-weighted error: $(get_error(uki)[end])\nCovariance trace: $(tr(cov_new))\nCovariance trace ratio (current/previous): $(tr(cov_new)/tr(cov_init))" - end - return u_p end diff --git a/src/UpdateGroup.jl b/src/UpdateGroup.jl new file mode 100644 index 000000000..49174d50b --- /dev/null +++ b/src/UpdateGroup.jl @@ -0,0 +1,148 @@ +# EKP implementation of the domain and observation localization from the literature + +using ..ParameterDistributions +export UpdateGroup +export get_u_group, get_g_group, get_group_id, update_group_consistency, create_update_groups + +""" + struct UpdateGroup {VV <: AbstractVector} + +Container of indices indicating which parameters (u_group) should be updated by which data (g_group). +Provide an array of UpdateGroups to partition the parameter space. +Note this partitioning assumes conditional independence between different sets of `UpdateGroups.u_group`s. + +# Fields + +$(TYPEDFIELDS) + +""" +struct UpdateGroup + "vector of parameter indices to form a partition of 1:input_dim) with other UpdateGroups provided" + u_group::Vector{Int} + "vector of data indices that lie within 1:output_dim)" + g_group::Vector{Int} + # process::Process # in future + # localizer::Localizer # in future + # inflation::Inflation # in future + group_id::Dict +end + +function UpdateGroup(u_group::VV1, g_group::VV2) where {VV1 <: AbstractVector, VV2 <: AbstractVector} + # check there is an index in each group + if length(u_group) == 0 + throw(ArgumentError("all `UpdateGroup.u_group` must contain at least one parameter identifier")) + end + if length(g_group) == 0 + throw(ArgumentError("all `UpdateGroup.g_group` must contain at least one data identifier")) + end + return UpdateGroup( + u_group, + g_group, + Dict("[$(minimum(u_group)),...,$(maximum(u_group))]" => "[$(minimum(g_group)),...,$(maximum(g_group))]"), + ) +end + + +get_u_group(group::UpdateGroup) = group.u_group +get_g_group(group::UpdateGroup) = group.g_group +get_group_id(group::UpdateGroup) = group.group_id + + + +# check an array of update_groups are consistent, i.e. common sizing for u,g, and that u is a partition. +""" +$(TYPEDSIGNATURES) + +Check the consistency of sizing and partitioning of the `UpdateGroup` array if it contains indices +No consistency check if u,g has strings internally +""" +function update_group_consistency(groups::VV, input_dim::Int, output_dim::Int) where {VV <: AbstractVector} + + u_groups = get_u_group.(groups) + g_groups = get_g_group.(groups) + + # check for partition (only if indices passed) + u_flat = reduce(vcat, u_groups) + if !(1:input_dim == sort(u_flat)) + if eltype(sort(u_flat)) == Int + throw( + ArgumentError( + "The combined 'UpdateGroup.u_group's must partition the indices of the input parameters: 1:$(input_dim), received: $(sort(u_flat))", + ), + ) + end + end + + g_flat = reduce(vcat, g_groups) + if eltype(g_flat) == Int + if any(gf > output_dim for gf in g_flat) || any(gf <= 0 for gf in g_flat) + throw( + ArgumentError( + "The UpdateGroup.g_group must contains values in: 1:$(output_dim), found values outside this range", + ), + ) + end + end + # pass the tests + return true +end + +## Convience constructor for update_groups +""" +$(TYPEDSIGNATURES) + +To construct a list of update-groups populated by indices of parameter distributions and indices of observations, from a dictionary of `group_identifiers = Dict(..., group_k_input_names => group_k_output_names, ...)` +""" +function create_update_groups( + prior::PD, + observation::OB, + group_identifiers::Dict, +) where {PD <: ParameterDistribution, OB <: Observation} + + param_names = get_name(prior) + param_indices = batch(prior, function_parameter_opt = "dof") + + obs_names = get_names(observation) + obs_indices = get_indices(observation) + + update_groups = [] + for (key, val) in pairs(group_identifiers) + key_vec = isa(key, AbstractString) ? [key] : key # make it iterable + val_vec = isa(val, AbstractString) ? [val] : val + + u_group = [] + g_group = [] + for pn in key_vec + pi = param_indices[pn .== param_names] + if length(pi) == 0 + throw( + ArgumentError( + "For group identifiers Dict(X => ...), X should be listed in $(param_names). Got $(pn).", + ), + ) + end + + push!(u_group, isa(pi, Int) ? [pi] : pi) + end + for obn in val_vec + oi = obs_indices[obn .== obs_names] + if length(oi) == 0 + throw( + ArgumentError( + "For group identifiers Dict(... => Y), Y should be listed from $(obs_names). Instead got $(val).", + ), + ) + end + push!(g_group, isa(oi, Int) ? [oi] : oi) + end + u_group = reduce(vcat, reduce(vcat, u_group)) + g_group = reduce(vcat, reduce(vcat, g_group)) + push!(update_groups, UpdateGroup(u_group, g_group, Dict(key_vec => val_vec))) + end + return update_groups + +end + +## Overload == +Base.:(==)(a::UG1, b::UG2) where {UG1 <: UpdateGroup, UG2 <: UpdateGroup} = + all([get_u_group(a) == get_u_group(b), get_g_group(a) == get_g_group(b), get_group_id(a) == get_group_id(b)],) diff --git a/test/EnsembleKalmanProcess/runtests.jl b/test/EnsembleKalmanProcess/runtests.jl index 09f337e47..49f661014 100644 --- a/test/EnsembleKalmanProcess/runtests.jl +++ b/test/EnsembleKalmanProcess/runtests.jl @@ -512,7 +512,7 @@ end if i_prob == 1 scheduler = DataMisfitController(on_terminate = "continue") else - scheduler = DefaultScheduler() + scheduler = DefaultScheduler(0.1) end #remove localizers for now localization_method = Localizers.NoLocalization() @@ -1066,7 +1066,7 @@ end # Get inverse problem y_obs, G, Γy, A = inv_problem if i_prob == 1 - scheduler = DataMisfitController() + scheduler = DataMisfitController() # if terminated too early can miss out later tests else scheduler = DefaultScheduler(0.001) end @@ -1111,6 +1111,7 @@ end # GNKI iterations u_i_vec = Array{Float64, 2}[] g_ens_vec = Array{Float64, 2}[] + terminated = nothing for i in 1:N_iter # Check SampleSuccGauss handler params_i = get_ϕ_final(prior, gnkiobj) @@ -1120,7 +1121,11 @@ end if i in iters_with_failure g_ens[:, 1] .= NaN end - EKP.update_ensemble!(gnkiobj, g_ens) + terminated = EKP.update_ensemble!(gnkiobj, g_ens) + if !isnothing(terminated) + break + end + push!(g_ens_vec, g_ens) if i == 1 if !(size(g_ens, 1) == size(g_ens, 2)) @@ -1141,7 +1146,7 @@ end params_i_unsafe = get_ϕ_final(prior, gnkiobj_unsafe) g_ens_unsafe = G(params_i_unsafe) if i < iters_with_failure[1] - EKP.update_ensemble!(gnkiobj_unsafe, g_ens_unsafe) + terminated = EKP.update_ensemble!(gnkiobj_unsafe, g_ens_unsafe) elseif i == iters_with_failure[1] g_ens_unsafe[:, 1] .= NaN #inconsistent behaviour before/after v1.9 regarding NaNs in matrices @@ -1157,12 +1162,15 @@ end end end end + end - push!(u_i_vec, get_u_final(gnkiobj)) + if isnothing(terminated) + push!(u_i_vec, get_u_final(gnkiobj)) + end # if cancelled early then don't need "final iteration" @test get_u_prior(gnkiobj) == u_i_vec[1] @test get_u(gnkiobj) == u_i_vec - @test isequal(get_g(gnkiobj), g_ens_vec) + @test isequal(get_g(gnkiobj), g_ens_vec) # can deal with NaNs @test isequal(get_g_final(gnkiobj), g_ens_vec[end]) @test isequal(get_error(gnkiobj), gnkiobj.error) diff --git a/test/Inflation/runtests.jl b/test/Inflation/runtests.jl index 007050333..66e150ea0 100644 --- a/test/Inflation/runtests.jl +++ b/test/Inflation/runtests.jl @@ -58,6 +58,8 @@ initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) g_ens = G(get_ϕ_final(prior, ekiobj)) # ensure error is thrown when scaled time step >= 1 + idx_vec_in = collect(1:size(initial_ensemble, 1)) + idx_vec_out = collect(1:size(g_ens, 1)) @test_throws ErrorException EKP.update_ensemble!(ekiobj, g_ens; multiplicative_inflation = true, s = 3.0) @test_throws ErrorException EKP.update_ensemble!(ekiobj, g_ens; additive_inflation = true, s = 3.0) @@ -67,7 +69,7 @@ initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ens) g_ens = G(get_ϕ_final(prior, ekiobj)) # standard update - EKP.update_ensemble!(ekiobj, g_ens, EKP.get_process(ekiobj)) + EKP.update_ensemble!(ekiobj, g_ens) eki_mult_inflation = deepcopy(ekiobj) eki_add_inflation = deepcopy(ekiobj) eki_add_inflation_prior = deepcopy(ekiobj) diff --git a/test/Localizers/runtests.jl b/test/Localizers/runtests.jl index 497d80d2e..ea654cd60 100644 --- a/test/Localizers/runtests.jl +++ b/test/Localizers/runtests.jl @@ -100,7 +100,9 @@ const EKP = EnsembleKalmanProcesses u_final = get_u_final(ekiobj) g_final = get_g_final(ekiobj) cov_est = cov([u_final; g_final], dims = 2, corrected = false) - cov_localized = ekiobj.localizer.localize(cov_est) + # The arguments for the localizer + T, p, d, J = (eltype(g_final), size(u_final, 1), size(g_final, 1), size(u_final, 2)) + cov_localized = ekiobj.localizer.localize(cov_est, T, p, d, J) @test rank(cov_est) < rank(cov_localized) # Test localization getter method @test isa(loc_method, EKP.get_localizer_type(ekiobj)) diff --git a/test/ParameterDistributions/runtests.jl b/test/ParameterDistributions/runtests.jl index ba83e85f2..44bfee391 100644 --- a/test/ParameterDistributions/runtests.jl +++ b/test/ParameterDistributions/runtests.jl @@ -54,10 +54,10 @@ using EnsembleKalmanProcesses.ParameterDistributions @test get_package(d) == pkg # and with another prior: - wide_pos_prior = constrained_gaussian("GRF_coefficients_wide_pos", 0.0, 10.0, -10, Inf, repeats = dofs) + wide_pos_prior = constrained_gaussian("GRF_coefficients_wide_pos", 0.0, 5.0, -5.0, Inf, repeats = dofs) d_wide_pos = GaussianRandomFieldInterface(grf, pkg, wide_pos_prior) @test get_distribution(d_wide_pos) == wide_pos_prior - err_prior = constrained_gaussian("GRF_coefficients_wide_pos", 0.0, 10.0, -10, Inf, repeats = dofs + 1) # wrong num dofs + err_prior = constrained_gaussian("GRF_coefficients_wide_pos", 0.0, 5.0, -5.0, Inf, repeats = dofs + 1) # wrong num dofs @test_throws ArgumentError GaussianRandomFieldInterface(grf, pkg, err_prior) @@ -127,17 +127,14 @@ using EnsembleKalmanProcesses.ParameterDistributions # [b] building function samples from the prior (with explicit rng) rng1 = Random.MersenneTwister(s) - rng2 = copy(rng1) - coeff_mat = sample(rng2, wide_pos_prior, 1) + coeff_mat = sample(copy(rng1), wide_pos_prior, 1) constrained_coeff_mat = transform_unconstrained_to_constrained(wide_pos_prior, coeff_mat) - sample5 = reshape(GRF.sample(grf, xi = constrained_coeff_mat), :, 1) - - rng3 = copy(rng1) - coeff_mat2 = sample(rng3, wide_pos_prior, n_sample) + sample5 = reshape(GRF.sample(grf, xi = constrained_coeff_mat), :, 1) # deterministic + coeff_mat2 = sample(copy(rng1), wide_pos_prior, n_sample) constrained_coeff_mat2 = transform_unconstrained_to_constrained(wide_pos_prior, coeff_mat2) sample6 = zeros(n_pts, n_sample) for i in 1:n_sample - sample6[:, i] = GRF.sample(grf, xi = constrained_coeff_mat2[:, i])[:] + sample6[:, i] = GRF.sample(grf, xi = constrained_coeff_mat2[:, i])[:] # deterministic end @test build_function_sample(copy(rng1), d_wide_pos) ≈ sample5 atol = tol @test build_function_sample(d_wide_pos, constrained_coeff_mat) ≈ sample5 atol = tol @@ -164,12 +161,13 @@ using EnsembleKalmanProcesses.ParameterDistributions # Transforms: # u->c goes from coefficients to constrained function evaluations. by default + biggertol = 1e-6 # with sample5 (wide prior) can lead to rounding errors sample5_constrained = function_constraint.unconstrained_to_constrained.(sample5) sample6_constrained = function_constraint.unconstrained_to_constrained.(sample6) sample5_constrained_direct = transform_unconstrained_to_constrained(pd, vec(coeff_mat)) sample6_constrained_direct = transform_unconstrained_to_constrained(pd, coeff_mat2) - @test sample5_constrained ≈ sample5_constrained_direct atol = tol - @test sample6_constrained ≈ sample6_constrained_direct atol = tol + @test all(isapprox.(sample5_constrained, sample5_constrained_direct, atol = biggertol))# + @test all(isapprox.(sample6_constrained, sample6_constrained_direct, atol = biggertol)) # specifying from unc. to cons. function evaluations with flag @test sample5_constrained ≈ transform_unconstrained_to_constrained(pd, vec(sample5), build_flag = false) @@ -177,13 +175,14 @@ using EnsembleKalmanProcesses.ParameterDistributions isapprox.( sample6_constrained, transform_unconstrained_to_constrained(pd, sample6, build_flag = false), - atol = tol, + atol = biggertol, ), ) # c->u is the inverse, of the build_flag=false u->c ONLY - @test sample5 ≈ transform_constrained_to_unconstrained(pd, vec(sample5_constrained)) atol = tol - biggertol = 1e-5 + @test all( + isapprox.(sample5, transform_constrained_to_unconstrained(pd, vec(sample5_constrained)), atol = biggertol), + )# @test all(isapprox.(sample6, transform_constrained_to_unconstrained(pd, sample6_constrained), atol = biggertol)) #can be sensitive to sampling (sometimes throws a near "Inf" so inverse is less accurate) @@ -493,48 +492,20 @@ using EnsembleKalmanProcesses.ParameterDistributions v = combine_distributions([u1, u2, u3, u4]) # Tests for sample distribution - # Note from julia 1.8.5, rand(X,2) != [rand(X,1) rand(X,1)] - seed = 2020 - testd = MvNormal(zeros(4), 0.1 * I) - Random.seed!(seed) - s1 = rand(testd, 1) - Random.seed!(seed) - @test sample(u1) == s1 - - Random.seed!(seed) - s1 = [rand(testd, 1) rand(testd, 1) rand(testd, 1)] - Random.seed!(seed) - @test sample(u1, 3) == s1 - - Random.seed!(seed) - idx = StatsBase.sample(collect(1:size(d2.distribution_samples)[2]), 1) - s2 = d2.distribution_samples[:, idx] - Random.seed!(seed) - @test sample(u2) == s2 - - Random.seed!(seed) - idx = StatsBase.sample(collect(1:size(d2.distribution_samples)[2]), 3) - s2 = d2.distribution_samples[:, idx] - Random.seed!(seed) - @test sample(u2, 3) == s2 - - Random.seed!(seed) - s3 = zeros(3, 1) - s3[1] = rand(Beta(2, 2)) - s3[2:3] = rand(MvNormal(zeros(2), 0.1 * I)) - Random.seed!(seed) - @test sample(u3) == s3 - - Random.seed!(seed) - s1 = sample(u1, 3) - s2 = sample(u2, 3) - s3 = sample(u3, 3) - s4 = sample(u4, 3) - Random.seed!(seed) - s = sample(v, 3) - @test s == cat([s1, s2, s3, s4]..., dims = 1) + # remove actual tests for global seeding, as julia v1.8.5, v1.11 etc change conventions + sample(u1) + sample(u1, 3) + sample(u2) + sample(u2, 3) + sample(u3) + sample(u3, 3) + sample(u4) + sample(u4, 3) + sample(v) + sample(v, 3) #Test for logpdf + seed = 2046 @test_throws ErrorException logpdf(u, zeros(ndims(u))) x_in_bd = [0.5, 0.5, 0.5] Random.seed!(seed) @@ -639,46 +610,6 @@ using EnsembleKalmanProcesses.ParameterDistributions @test sample(copy(rng2), u3, 3) == cat([reshape(rand(rng2_new, test_d3a, 3), :, 3), rand(rng2_new, test_d3b, 3)]..., dims = 1) - # test that optional parameter defaults to Random.GLOBAL_RNG, for all methods. - # reset the global seed instead of copying the rng object's state - # Note, from julia 1.8.5 rand(X,2) != [rand(X,1) rand(X,1)] - rng_seed = 2468 - Random.seed!(rng_seed) - test_lhs = sample(d0) - Random.seed!(rng_seed) - @test test_lhs == rand(test_d, 1) - - Random.seed!(rng_seed) - test_lhs = sample(d0, 3) - Random.seed!(rng_seed) - @test test_lhs == [rand(test_d, 1) rand(test_d, 1) rand(test_d, 1)] - - Random.seed!(rng_seed) - test_lhs = sample(u1) - Random.seed!(rng_seed) - @test test_lhs == rand(test_d, 1) - - Random.seed!(rng_seed) - test_lhs = sample(u1, 3) - Random.seed!(rng_seed) - @test test_lhs == [rand(test_d, 1) rand(test_d, 1) rand(test_d, 1)] - - Random.seed!(rng_seed) - test_lhs = sample(d0) - Random.seed!(rng_seed) - @test test_lhs == rand(test_d, 1) - - Random.seed!(rng_seed) - idx = StatsBase.sample(collect(1:size(d2.distribution_samples)[2]), 1) - test_lhs = d2.distribution_samples[:, idx] - Random.seed!(rng_seed) - @test test_lhs == sample(u2) - - Random.seed!(rng_seed) - test_lhs = sample(d3) - Random.seed!(rng_seed) - @test test_lhs == cat([rand(test_d3a, 1), rand(test_d3b, 1)]..., dims = 1) - end @testset "transform definitions" begin diff --git a/test/TOMLInterface/runtests.jl b/test/TOMLInterface/runtests.jl index cc351b3c4..775147edc 100644 --- a/test/TOMLInterface/runtests.jl +++ b/test/TOMLInterface/runtests.jl @@ -277,41 +277,27 @@ const EKP = EnsembleKalmanProcesses end # Test `save_parameter_samples` - uq_param_4_samples = [ - [14.412514158048534, -9.990904722898303], - [14.877561432702601, -9.979758088554195], - [14.601045096347011, -9.988891003461758], - [14.877561432702601, -9.979758088554195], - [14.899607236135727, -9.995483419057388], - [14.412514158048534, -9.990904722898303], - [14.601045096347011, -9.988891003461758], - [14.899607236135727, -9.995483419057388], - [14.899607236135727, -9.995483419057388], - [14.877561432702601, -9.979758088554195], - ] - uq_param_5_samples = [ - [1.0, 5.0, 8101.083927575384, 19.999997739670594], - [1.0, 5.0, 8101.083927575384, 19.999997739670594], - [3.0, 7.0, 59872.14171519782, 19.999999694097678], - [3.0, 7.0, 59872.14171519782, 19.999999694097678], - [1.0, 5.0, 8101.083927575384, 19.999997739670594], - [3.0, 7.0, 59872.14171519782, 19.999999694097678], - [3.0, 7.0, 59872.14171519782, 19.999999694097678], - [3.0, 7.0, 59872.14171519782, 19.999999694097678], - [1.0, 5.0, 8101.083927575384, 19.999997739670594], - [1.0, 5.0, 8101.083927575384, 19.999997739670594], - ] + + rng = Random.MersenneTwister(1234) + n_samples = 10 + names = ["uq_param_" * string(i) for i in 1:7] + pd = get_parameter_distribution(param_dict, names) + slices = batch(pd) # indices of kth distribution + # this calls the rng in same manner as save_parameter_samples does for reproducibility + samples = transform_unconstrained_to_constrained(pd, sample(copy(rng), pd, n_samples)) + uq_param_4_samples = samples[slices[4], :] + uq_param_5_samples = samples[slices[5], :] + mktempdir(@__DIR__) do save_path # Uncomment the line below to debug if the tests fail # save_path = "sample_tests" save_file = "parameters.toml" - pd = get_parameter_distribution(param_dict, ["uq_param_4", "uq_param_5"]) - save_parameter_samples(pd, param_dict, 10, save_path; rng = Random.MersenneTwister(1234), save_file) + save_parameter_samples(pd, param_dict, n_samples, save_path; rng = copy(rng), save_file) for (i, fpath) in enumerate(readdir(save_path)) toml_file = joinpath(save_path, fpath, save_file) param_dict = TOML.parsefile(toml_file) - @test uq_param_4_samples[i] == param_dict["uq_param_4"]["value"] - @test uq_param_5_samples[i] == param_dict["uq_param_5"]["value"] + @test uq_param_4_samples[:, i] == param_dict["uq_param_4"]["value"] + @test uq_param_5_samples[:, i] == param_dict["uq_param_5"]["value"] end end end diff --git a/test/UpdateGroup/runtests.jl b/test/UpdateGroup/runtests.jl new file mode 100644 index 000000000..ca8fbfee4 --- /dev/null +++ b/test/UpdateGroup/runtests.jl @@ -0,0 +1,108 @@ +using Distributions +using LinearAlgebra +using Random +using Test + +using EnsembleKalmanProcesses +# using EnsembleKalmanProcesses.ParameterDistributions +const EKP = EnsembleKalmanProcesses +using EnsembleKalmanProcesses.ParameterDistributions + +@testset "UpdateGroups" begin + + input_dim = 10 + output_dim = 20 + u = 1:10 + g = 1:20 + + # build a set of consistent update groups + u1 = [1, 3, 5, 6, 7, 8] + g1 = 1:20 + identifier = Dict("1:8" => "1:20") + group1 = UpdateGroup(u1, g1, Dict("1:8" => "1:20")) + @test get_u_group(group1) == u1 + @test get_g_group(group1) == g1 + @test get_group_id(group1) == Dict("1:8" => "1:20") + + u1c = [2, 4, 9, 10] + g2 = 3:9 + group2 = UpdateGroup(u1c, g2) + + groups = [group1, group2] + + @test update_group_consistency(groups, input_dim, output_dim) + + # break consistency with different ways + not_u1c = [2, 3, 4, 9, 10] + @test_throws ArgumentError UpdateGroup([], g2) + @test_throws ArgumentError group_bad = UpdateGroup(u1c, []) + group_bad = UpdateGroup(not_u1c, g2) # [u1,not_u1c] is not a partition of 1:10 + @test_throws ArgumentError update_group_consistency([group1, group_bad], input_dim, output_dim) + not_inbd_g = [21] # outside of 1:20 + group_bad = UpdateGroup(u1c, not_inbd_g) + @test_throws ArgumentError update_group_consistency([group1, group_bad], input_dim, output_dim) + + # creation of update groups from prior and observations + param_names = ["F", "G"] + + prior_F = ParameterDistribution( + Dict( + "name" => param_names[1], + "distribution" => Parameterized(MvNormal([1.0, 0.0, -2.0], I)), + "constraint" => repeat([bounded_below(0)], 3), + ), + ) # gives 3-D dist + prior_G = constrained_gaussian(param_names[2], 5.0, 4.0, 0, Inf) + priors = combine_distributions([prior_F, prior_G]) + + # data + # given a list of vector statistics y and their covariances Γ + data_block_names = ["", ""] + + observation_vec = [] + for i in 1:length(data_block_names) + push!( + observation_vec, + Observation(Dict("samples" => vec(ones(i)), "covariances" => I(i), "names" => data_block_names[i])), + ) + end + observations = combine_observations(observation_vec) + group_identifiers = Dict(["F"] => [""], ["G"] => ["", ""]) + update_groups = create_update_groups(priors, observations, group_identifiers) + + + param_names = get_name(priors) + param_indices = batch(priors, function_parameter_opt = "dof") + + obs_names = get_names(observations) + obs_indices = get_indices(observations) + + update_groups_test = [] + for (key, val) in group_identifiers + + key_vec = isa(key, AbstractString) ? [key] : key # make it iterable + val_vec = isa(val, AbstractString) ? [val] : val + + u_group = [] + g_group = [] + for pn in key_vec + pi = param_indices[pn .== param_names] + push!(u_group, isa(pi, Int) ? [pi] : pi) + end + for obn in val_vec + oi = obs_indices[obn .== obs_names] + push!(g_group, isa(oi, Int) ? [oi] : oi) + end + u_group = reduce(vcat, reduce(vcat, u_group)) + g_group = reduce(vcat, reduce(vcat, g_group)) + push!(update_groups_test, UpdateGroup(u_group, g_group, Dict(key_vec => val_vec))) + end + + @test update_groups == update_groups_test + + # throw errors + bad_group_identifiers = Dict(["FF"] => [""], ["G"] => ["", ""]) + @test_throws ArgumentError create_update_groups(priors, observations, bad_group_identifiers) + bad_group_identifiers = Dict(["F"] => [""], ["G"] => ["", ""]) + @test_throws ArgumentError create_update_groups(priors, observations, bad_group_identifiers) +end diff --git a/test/runtests.jl b/test/runtests.jl index 71ce1c39b..90665380f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -29,6 +29,7 @@ end "ParameterDistributions", "PlotRecipes", "Observations", + "UpdateGroup", "EnsembleKalmanProcess", "Localizers", "TOMLInterface",