From dea2ee25227de0b7411e067f0baa4f1603d5933e Mon Sep 17 00:00:00 2001 From: kmdeck Date: Wed, 2 Oct 2024 16:13:46 -0700 Subject: [PATCH] wip --- .buildkite/Project.toml | 1 + .../global_bucket/calibrate_bucket.jl | 71 ++++ .../calibrate_bucket_function.jl | 328 ++++++++++++++++ ...alibrate_bucket_function_climacalibrate.jl | 364 ++++++++++++++++++ .../global_bucket/with_climacalibrate.jl | 45 +++ job_script.sh | 17 + src/Artifacts.jl | 12 + 7 files changed, 838 insertions(+) create mode 100644 experiments/calibration/global_bucket/calibrate_bucket.jl create mode 100644 experiments/calibration/global_bucket/calibrate_bucket_function.jl create mode 100644 experiments/calibration/global_bucket/calibrate_bucket_function_climacalibrate.jl create mode 100644 experiments/calibration/global_bucket/with_climacalibrate.jl create mode 100644 job_script.sh diff --git a/.buildkite/Project.toml b/.buildkite/Project.toml index a108f5659d..3cf384bbb5 100644 --- a/.buildkite/Project.toml +++ b/.buildkite/Project.toml @@ -17,6 +17,7 @@ ClimaUtilities = "b3f4f4ca-9299-4f7f-bd9b-81e1242a7513" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab" +EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d" Flux = "587475ba-b771-5e3f-ad9e-33799f191a9c" Format = "1fa38f19-a742-5d3f-a2b9-30dd87b9d5f8" GeoMakie = "db073c08-6b98-4ee5-b6a4-5efafb3259c6" diff --git a/experiments/calibration/global_bucket/calibrate_bucket.jl b/experiments/calibration/global_bucket/calibrate_bucket.jl new file mode 100644 index 0000000000..26d79dc49a --- /dev/null +++ b/experiments/calibration/global_bucket/calibrate_bucket.jl @@ -0,0 +1,71 @@ +# Use the .buildkite environment +# bucket_turbulent_fluxes(params) returns lhf and shf as a function of 6 parameters +include("calibrate_bucket_function.jl") + +using Random + +#= Target (perfect model) +params = (; + κ_soil = FT(1.5), + ρc_soil = FT(2e6), + f_bucket = FT(0.75), + W_f = FT(0.2), + p = FT(1), + z_0m = FT(1e-2), +) +target = bucket_turbulent_fluxes(params)[1] +=# + +target = full_obs_era5 + +# Parameters prior +prior_κ_soil = EKP.constrained_gaussian("κ_soil", 2, 1, 0, Inf); +prior_ρc_soil = EKP.constrained_gaussian("ρc_soil", 4e6, 2e6, 0, Inf); +prior_f_bucket = EKP.constrained_gaussian("f_bucket", 0.5, 0.3, 0, 1); +prior_W_f = EKP.constrained_gaussian("W_f", 0.4, 0.4, 0, Inf); +prior_p = EKP.constrained_gaussian("p", 2, 1, 1, Inf); +prior_z_0m = EKP.constrained_gaussian("z_0m", 0.01, 0.1, 0, Inf); +prior = EKP.combine_distributions([ + prior_κ_soil, + prior_ρc_soil, + prior_f_bucket, + prior_W_f, + prior_p, + prior_z_0m, +]); + +rng_seed = 2 +rng = Random.MersenneTwister(rng_seed) + +N_ensemble = 10 +N_iterations = 5 + +initial_ensemble = EKP.construct_initial_ensemble(rng, prior, N_ensemble); +ensemble_kalman_process = EKP.EnsembleKalmanProcess( + initial_ensemble, + target, + EKP.Inversion(); + rng = rng, +); + +for i in 1:N_iterations + params_i = EKP.get_ϕ_final(prior, ensemble_kalman_process) + + G_ens = hcat( + [ + bucket_turbulent_fluxes((; + κ_soil = params_i[1, j], + ρc_soil = params_i[2, j], + f_bucket = params_i[3, j], + W_f = params_i[4, j], + p = params_i[5, j], + z_0m = params_i[6, j], + ))[2] for j in 1:N_ensemble + ]..., + ) + + EKP.update_ensemble!(ensemble_kalman_process, G_ens) +end + +final_ensemble = EKP.get_ϕ_final(prior, ensemble_kalman_process) +println(final_ensemble) diff --git a/experiments/calibration/global_bucket/calibrate_bucket_function.jl b/experiments/calibration/global_bucket/calibrate_bucket_function.jl new file mode 100644 index 0000000000..0b55f34390 --- /dev/null +++ b/experiments/calibration/global_bucket/calibrate_bucket_function.jl @@ -0,0 +1,328 @@ +# # Global bucket run + +# The code sets up and runs the bucket model on a spherical domain, +# using ERA5 data. + +# Simulation Setup +# Number of spatial elements: 101 in horizontal, 5 in vertical +# Soil depth: 3.5 m +# Simulation duration: 365 d +# Timestep: 3600 s +# Timestepper: RK4 +# Atmos forcing update: every 3 hours +import SciMLBase +import ClimaComms +ClimaComms.@import_required_backends +import ClimaTimeSteppers as CTS +using ClimaCore +using ClimaUtilities.ClimaArtifacts +import Interpolations +using Insolation + +using ClimaDiagnostics +using ClimaAnalysis +import ClimaAnalysis.Visualize as viz +using ClimaUtilities + +import ClimaUtilities.TimeVaryingInputs: + TimeVaryingInput, LinearInterpolation, PeriodicCalendar +import ClimaUtilities.SpaceVaryingInputs: SpaceVaryingInput +import ClimaUtilities.Regridders: InterpolationsRegridder +import ClimaUtilities.ClimaArtifacts: @clima_artifact +import ClimaParams as CP + +using ClimaLand +using ClimaLand.Bucket: + BucketModel, BucketModelParameters, PrescribedBaregroundAlbedo +import ClimaLand +import ClimaLand.Parameters as LP + +using Statistics +using CairoMakie +using Dates +using Random +import NCDatasets +using NCDatasets +using StaticArrays +using CUDA + +import EnsembleKalmanProcesses as EKP + +const FT = Float64; +context = ClimaComms.context() +device = ClimaComms.device() +device_suffix = device isa ClimaComms.CPUSingleThreaded ? "cpu" : "gpu" +root_path = "bucket_longrun_$(device_suffix)" + +# Returns a 2 SVectors of (lat, lon) tuples for n random locations on the land +# surface. The two sets of locations are designed to be used as a training and +# validation set. +function rand_locations(surface_space, regridder_type, n = 100) + # Load the landsea mask data + datapath = ClimaLand.Artifacts.topmodel_data_path() + landsea_mask = SpaceVaryingInput( + datapath, + "landsea_mask", + surface_space; + regridder_type, + ) + + # Get the corresponding latitude and longitude values + lat = ClimaCore.Fields.coordinate_field(surface_space).lat + lon = ClimaCore.Fields.coordinate_field(surface_space).long + + # Find the coordinates of 2n random land locations + land_inds = rand(findall(x -> x == 1.0, Array(parent(landsea_mask))), 2 * n) + + # Since this is run very rarely (once at start of calibration run), we don't + # mind scalar iteration when running on the GPU + CUDA.@allowscalar land_locs = StaticArrays.sacollect( + SVector{2 * n, Tuple{FT, FT}}, + zip(parent(lon)[land_inds], parent(lat)[land_inds]), + ) + + # Return a uniform random sample of n land locations + return ( + SVector{n}(land_locs[1:n]...), + SVector{n}(land_locs[(n + 1):end]...), + ) +end + +function setup_prob(t0, tf, Δt, params, outdir; nelements = (101, 7)) + time_interpolation_method = LinearInterpolation(PeriodicCalendar()) + regridder_type = :InterpolationsRegridder + earth_param_set = LP.LandParameters(FT) + radius = FT(6378.1e3) + depth = FT(3.5) + domain = ClimaLand.Domains.SphericalShell(; + radius = radius, + depth = depth, + nelements = nelements, + npolynomial = 1, + dz_tuple = FT.((1.0, 0.05)), + ) + surface_space = domain.space.surface + subsurface_space = domain.space.subsurface + + start_date = DateTime(2008) + # Forcing data + # Forcing data + era5_artifact_path = + ClimaLand.Artifacts.era5_land_forcing_data2008_folder_path(; context) + era5_ncdata_path = joinpath(era5_artifact_path, "era5_2008_1.0x1.0.nc") + atmos, radiation = ClimaLand.prescribed_forcing_era5( + era5_ncdata_path, + surface_space, + start_date, + earth_param_set, + FT; + time_interpolation_method = time_interpolation_method, + regridder_type = regridder_type, + ) + + # Set up parameters + (; κ_soil, ρc_soil, f_bucket, W_f, p, z_0m) = params + z_0b = z_0m + τc = FT(Δt) + α_snow = FT(0.8) + albedo = PrescribedBaregroundAlbedo{FT}(α_snow, surface_space) + bucket_parameters = BucketModelParameters( + FT; + albedo, + z_0m, + z_0b, + τc, + f_bucket, + p, + W_f, + κ_soil, + ρc_soil, + ) + bucket = BucketModel( + parameters = bucket_parameters, + domain = domain, + atmosphere = atmos, + radiation = radiation, + ) + + temp_anomaly_amip(coord) = 40 * cosd(coord.lat)^4 + Y, p, cds = initialize(bucket) + # Set temperature IC including anomaly, based on atmospheric setup + T_sfc_0 = FT(271.0) + @. Y.bucket.T = T_sfc_0 + temp_anomaly_amip(cds.subsurface) + Y.bucket.W .= FT(f_bucket * W_f) + Y.bucket.Ws .= FT(0.0) + Y.bucket.σS .= FT(0.0) + + set_initial_cache! = make_set_initial_cache(bucket) + set_initial_cache!(p, Y, t0) + exp_tendency! = make_exp_tendency(bucket) + + prob = SciMLBase.ODEProblem( + CTS.ClimaODEFunction(T_exp! = exp_tendency!, dss! = ClimaLand.dss!), + Y, + (t0, tf), + p, + ) + + updateat = Array(t0:(3600 * 3):tf) + drivers = ClimaLand.get_drivers(bucket) + updatefunc = ClimaLand.make_update_drivers(drivers) + + # ClimaDiagnostics + + nc_writer = ClimaDiagnostics.Writers.NetCDFWriter(subsurface_space, outdir) + + diags = ClimaLand.default_diagnostics( + bucket, + start_date; + output_writer = nc_writer, + average_period = :monthly, + ) + + diagnostic_handler = + ClimaDiagnostics.DiagnosticsHandler(diags, Y, p, t0; dt = Δt) + + diag_cb = ClimaDiagnostics.DiagnosticsCallback(diagnostic_handler) + + driver_cb = ClimaLand.DriverUpdateCallback(updateat, updatefunc) + return prob, SciMLBase.CallbackSet(driver_cb, diag_cb) +end + +# Define the locations longitudes and latitudes +# Needs to be defined once, both for g(θ) and ERA5 target +t0 = 0.0 +tf = 60 * 60.0 * 24 * 366 +Δt = 900.0 +nelements = (101, 7) +regridder_type = :InterpolationsRegridder +radius = FT(6378.1e3) +depth = FT(3.5) +domain = ClimaLand.Domains.SphericalShell(; + radius = radius, + depth = depth, + nelements = nelements, + npolynomial = 1, + dz_tuple = FT.((1.0, 0.05)), + ) +surface_space = domain.space.surface +training_locations, validation_locations = +rand_locations(surface_space, regridder_type, 25) + +# (; κ_soil, ρc_soil, f_bucket, W_f, p, z_0m) = params +# The truth params = (;κ_soil = FT(1.5), ρc_soil = FT(2e6), f_bucket = FT(0.75), W_f = FT(0.2), p = FT(1), z_0m = FT(1e-2)) +function bucket_turbulent_fluxes(params) + diagnostics_outdir = joinpath(root_path, "global_diagnostics") + outdir = ClimaUtilities.OutputPathGenerator.generate_output_path( + diagnostics_outdir, + ) + prob, cb = setup_prob(t0, tf, Δt, params, outdir; nelements) + + timestepper = CTS.RK4() + ode_algo = CTS.ExplicitAlgorithm(timestepper) + SciMLBase.solve(prob, ode_algo; dt = Δt, callback = cb, adaptive = false) + + simdir = ClimaAnalysis.SimDir( + joinpath(root_path, "global_diagnostics", "output_active"), + ) + lhf = get(simdir; short_name = "lhf") + shf = get(simdir; short_name = "shf") + + # Initialize an empty list to store observations + obs_list = [] + # Loop over each location + for (lon, lat) in training_locations + # Slice lhf and shf at the given longitude and latitude + lhf_loc = ClimaAnalysis.slice(lhf, lon = lon, lat = lat) + shf_loc = ClimaAnalysis.slice(shf, lon = lon, lat = lat) + + # Create Observation objects for lhf and shf + lhf_obs = EKP.Observation( + Dict( + "samples" => lhf_loc.data, + "covariances" => cov(lhf_loc.data) * EKP.I, + "names" => "lhf_$(lon)_$(lat)", + ), + ) + shf_obs = EKP.Observation( + Dict( + "samples" => shf_loc.data, + "covariances" => cov(shf_loc.data) * EKP.I, + "names" => "shf_$(lon)_$(lat)", + ), + ) + + # Add the observations to the list + push!(obs_list, lhf_obs) + push!(obs_list, shf_obs) + end + + # Combine all observations into a single observation + full_obs = EKP.combine_observations(obs_list) + + # Optional + #= + minibatcher = EKP.RandomFixedSizeMinibatcher(3) # batches the epoch of size 100, into batches of size 5 + observation_series = EKP.ObservationSeries( + Dict( + "observations" => full_obs, + "minibatcher" => minibatcher, + ), + ) + =# + + obs = EKP.get_obs(full_obs) + + return full_obs, obs +end + +# Read in the era5 datafile +era5_ds = Dataset(joinpath(ClimaLand.Artifacts.era5_surface_data2008_path(), + "era5_monthly_surface_fluxes_200801-200812.nc")) + +# Make the ERA5 target +ERA5_target = [] +close = (x, y) -> abs(x - y) < 5e-1 +for (lon, lat) in training_locations + # Fetch slices of lhf and shf era5 data from the era5 dataset + lat_ind, lon_ind = findall((x) -> close(x, lat), era5_ds["latitude"][:])[1], + findall((x) -> close(x, lon + 180), era5_ds["longitude"][:])[1] + lhf_loc = vec(era5_ds["mslhf"][lon_ind, lat_ind, :][:, end, :]) + shf_loc = vec(era5_ds["msshf"][lon_ind, lat_ind, :][:, end, :]) + + # Create Observation objects for lhf and shf + lhf_ERA5 = EKP.Observation( + Dict( + "samples" => lhf_loc, + "covariances" => cov(lhf_loc) * EKP.I, + "names" => "lhf_$(lon)_$(lat)", + ), + ) + + shf_ERA5 = EKP.Observation( + Dict( + "samples" => shf_loc, + "covariances" => cov(shf_loc) * EKP.I, + "names" => "shf_$(lon)_$(lat)", + ), + ) + + # Add the observations to the target + push!(ERA5_target, lhf_ERA5) + push!(ERA5_target, shf_ERA5) + +end + +full_obs_era5 = EKP.combine_observations(ERA5_target) + +# Things to consider: +# - masking out the ocean when comparing to data +# - output paths matching to parameters + +# Priors: +# κ_soil = FT(1.5): Gaussian with mean = 2, std dev = 1, cannot be negative [ or uniform from 0.5 to 2.5] +# ρc_soil = FT(2e6); Gaussian with mean = 4e6, std dev = 2e6, cannot be negative [ or uniform from 1e5 to 5e6] +# f_bucket = FT(0.75); Gaussian with mean = 0.5, std_dev = 0.3, cannot be negative, cannot be greater than 1 [ or uniform from 0.2 to 0.9] +# W_f = FT(0.2); Gaussian with mean = 0.4, std_dev = 0.4, cannot be negative [ or uniform from 0.05 to 1] +# p = FT(1); we can range uniform 1 to 2.5, cannot be smaller than 1 +#z_0m = FT(1e-2); can we sample from log normal ranging from z_0m = 1e-3 to z_0m = 0.1? diff --git a/experiments/calibration/global_bucket/calibrate_bucket_function_climacalibrate.jl b/experiments/calibration/global_bucket/calibrate_bucket_function_climacalibrate.jl new file mode 100644 index 0000000000..2b0eafef47 --- /dev/null +++ b/experiments/calibration/global_bucket/calibrate_bucket_function_climacalibrate.jl @@ -0,0 +1,364 @@ +# # Global bucket run + +# The code sets up and runs the bucket model on a spherical domain, +# using ERA5 data. + +# Simulation Setup +# Number of spatial elements: 101 in horizontal, 5 in vertical +# Soil depth: 3.5 m +# Simulation duration: 365 d +# Timestep: 3600 s +# Timestepper: RK4 +# Atmos forcing update: every 3 hours +import SciMLBase +import ClimaComms +ClimaComms.@import_required_backends +import ClimaTimeSteppers as CTS +using ClimaCore +using ClimaUtilities.ClimaArtifacts +import Interpolations +using Insolation + +using ClimaDiagnostics +using ClimaAnalysis +import ClimaAnalysis.Visualize as viz +using ClimaUtilities + +import ClimaUtilities.TimeVaryingInputs: + TimeVaryingInput, LinearInterpolation, PeriodicCalendar +import ClimaUtilities.SpaceVaryingInputs: SpaceVaryingInput +import ClimaUtilities.Regridders: InterpolationsRegridder +import ClimaUtilities.ClimaArtifacts: @clima_artifact +import ClimaParams as CP + +using ClimaLand +using ClimaLand.Bucket: + BucketModel, BucketModelParameters, PrescribedBaregroundAlbedo +import ClimaLand +import ClimaLand.Parameters as LP + +using Statistics +using CairoMakie +using Dates +using Random +import NCDatasets +using NCDatasets +using StaticArrays +using CUDA + +import EnsembleKalmanProcesses as EKP + +const FT = Float64; +context = ClimaComms.context() +device = ClimaComms.device() +device_suffix = device isa ClimaComms.CPUSingleThreaded ? "cpu" : "gpu" +root_path = "bucket_longrun_$(device_suffix)" + +# Returns a 2 SVectors of (lat, lon) tuples for n random locations on the land +# surface. The two sets of locations are designed to be used as a training and +# validation set. +function rand_locations(surface_space, regridder_type, n = 100) + # Load the landsea mask data + datapath = ClimaLand.Artifacts.topmodel_data_path() + landsea_mask = SpaceVaryingInput( + datapath, + "landsea_mask", + surface_space; + regridder_type, + ) + + # Get the corresponding latitude and longitude values + lat = ClimaCore.Fields.coordinate_field(surface_space).lat + lon = ClimaCore.Fields.coordinate_field(surface_space).long + + # Find the coordinates of 2n random land locations + land_inds = rand(findall(x -> x == 1.0, Array(parent(landsea_mask))), 2 * n) + + # Since this is run very rarely (once at start of calibration run), we don't + # mind scalar iteration when running on the GPU + CUDA.@allowscalar land_locs = StaticArrays.sacollect( + SVector{2 * n, Tuple{FT, FT}}, + zip(parent(lon)[land_inds], parent(lat)[land_inds]), + ) + + # Return a uniform random sample of n land locations + return ( + SVector{n}(land_locs[1:n]...), + SVector{n}(land_locs[(n + 1):end]...), + ) +end + +function setup_prob(t0, tf, Δt, params, outdir; nelements = (101, 7)) + time_interpolation_method = LinearInterpolation(PeriodicCalendar()) + regridder_type = :InterpolationsRegridder + earth_param_set = LP.LandParameters(FT) + radius = FT(6378.1e3) + depth = FT(3.5) + domain = ClimaLand.Domains.SphericalShell(; + radius = radius, + depth = depth, + nelements = nelements, + npolynomial = 1, + dz_tuple = FT.((1.0, 0.05)), + ) + surface_space = domain.space.surface + subsurface_space = domain.space.subsurface + + start_date = DateTime(2008) + # Forcing data + # Forcing data + era5_artifact_path = + ClimaLand.Artifacts.era5_land_forcing_data2008_folder_path(; context) + era5_ncdata_path = joinpath(era5_artifact_path, "era5_2008_1.0x1.0.nc") + atmos, radiation = ClimaLand.prescribed_forcing_era5( + era5_ncdata_path, + surface_space, + start_date, + earth_param_set, + FT; + time_interpolation_method = time_interpolation_method, + regridder_type = regridder_type, + ) + + # Set up parameters + (; κ_soil, ρc_soil, f_bucket, W_f, p, z_0m) = params + z_0b = z_0m + τc = FT(Δt) + α_snow = FT(0.8) + albedo = PrescribedBaregroundAlbedo{FT}(α_snow, surface_space) + bucket_parameters = BucketModelParameters( + FT; + albedo, + z_0m, + z_0b, + τc, + f_bucket, + p, + W_f, + κ_soil, + ρc_soil, + ) + bucket = BucketModel( + parameters = bucket_parameters, + domain = domain, + atmosphere = atmos, + radiation = radiation, + ) + + temp_anomaly_amip(coord) = 40 * cosd(coord.lat)^4 + Y, p, cds = initialize(bucket) + # Set temperature IC including anomaly, based on atmospheric setup + T_sfc_0 = FT(271.0) + @. Y.bucket.T = T_sfc_0 + temp_anomaly_amip(cds.subsurface) + Y.bucket.W .= FT(f_bucket * W_f) + Y.bucket.Ws .= FT(0.0) + Y.bucket.σS .= FT(0.0) + + set_initial_cache! = make_set_initial_cache(bucket) + set_initial_cache!(p, Y, t0) + exp_tendency! = make_exp_tendency(bucket) + + prob = SciMLBase.ODEProblem( + CTS.ClimaODEFunction(T_exp! = exp_tendency!, dss! = ClimaLand.dss!), + Y, + (t0, tf), + p, + ) + + updateat = Array(t0:(3600 * 3):tf) + drivers = ClimaLand.get_drivers(bucket) + updatefunc = ClimaLand.make_update_drivers(drivers) + + # ClimaDiagnostics + + nc_writer = ClimaDiagnostics.Writers.NetCDFWriter(subsurface_space, outdir) + + diags = ClimaLand.default_diagnostics( + bucket, + start_date; + output_writer = nc_writer, + average_period = :monthly, + ) + + diagnostic_handler = + ClimaDiagnostics.DiagnosticsHandler(diags, Y, p, t0; dt = Δt) + + diag_cb = ClimaDiagnostics.DiagnosticsCallback(diagnostic_handler) + + driver_cb = ClimaLand.DriverUpdateCallback(updateat, updatefunc) + return prob, SciMLBase.CallbackSet(driver_cb, diag_cb) +end + +# Define the locations longitudes and latitudes +# Needs to be defined once, both for g(θ) and ERA5 target +t0 = 0.0 +tf = 60 * 60.0 * 24 * 366 +Δt = 900.0 +nelements = (101, 7) +regridder_type = :InterpolationsRegridder +radius = FT(6378.1e3) +depth = FT(3.5) +domain = ClimaLand.Domains.SphericalShell(; + radius = radius, + depth = depth, + nelements = nelements, + npolynomial = 1, + dz_tuple = FT.((1.0, 0.05)), + ) +surface_space = domain.space.surface +training_locations, validation_locations = +rand_locations(surface_space, regridder_type, 25) + +# (; κ_soil, ρc_soil, f_bucket, W_f, p, z_0m) = params +# The truth params = (;κ_soil = FT(1.5), ρc_soil = FT(2e6), f_bucket = FT(0.75), W_f = FT(0.2), p = FT(1), z_0m = FT(1e-2)) +function forward_model(iteration, member) + + diagnostics_outdir = joinpath(root_path, "global_diagnostics") + outdir = ClimaUtilities.OutputPathGenerator.generate_output_path( + diagnostics_outdir, + ) + + parameter_path = parameter_path(outdir, iteration, member) + params = toml.parsefile(parameter_path) # should load a Dict, that needs to be converted to namedtuple + + prob, cb = setup_prob(t0, tf, Δt, params, outdir; nelements) + + timestepper = CTS.RK4() + ode_algo = CTS.ExplicitAlgorithm(timestepper) + SciMLBase.solve(prob, ode_algo; dt = Δt, callback = cb, adaptive = false) +end + +function observation_map(iteration) + + simdir = ClimaAnalysis.SimDir( + joinpath(root_path, "global_diagnostics", "output_active"), + ) + lhf = get(simdir; short_name = "lhf") + shf = get(simdir; short_name = "shf") + + # Initialize an empty list to store observations + obs_list = [] + # Loop over each location + for (lon, lat) in training_locations + # Slice lhf and shf at the given longitude and latitude + lhf_loc = ClimaAnalysis.slice(lhf, lon = lon, lat = lat) + shf_loc = ClimaAnalysis.slice(shf, lon = lon, lat = lat) + + # Create Observation objects for lhf and shf + lhf_obs = EKP.Observation( + Dict( + "samples" => lhf_loc.data, + "covariances" => cov(lhf_loc.data) * EKP.I, + "names" => "lhf_$(lon)_$(lat)", + ), + ) + shf_obs = EKP.Observation( + Dict( + "samples" => shf_loc.data, + "covariances" => cov(shf_loc.data) * EKP.I, + "names" => "shf_$(lon)_$(lat)", + ), + ) + + # Add the observations to the list + push!(obs_list, lhf_obs) + push!(obs_list, shf_obs) + end + + # Combine all observations into a single observation + full_obs = EKP.combine_observations(obs_list) + + # Optional + #= + minibatcher = EKP.RandomFixedSizeMinibatcher(3) # batches the epoch of size 100, into batches of size 5 + observation_series = EKP.ObservationSeries( + Dict( + "observations" => full_obs, + "minibatcher" => minibatcher, + ), + ) + =# + + obs = EKP.get_obs(full_obs) + + # make G_ensemble as in https://github.com/CliMA/ClimaAtmos.jl/blob/ne/slurm_workers/calibration/test/e2e_test.jl#L19 + # see below + #= + function CAL.observation_map(iteration) + single_member_dims = (1,) + G_ensemble = Array{Float64}(undef, single_member_dims..., ensemble_size) + + for m in 1:ensemble_size + member_path = CAL.path_to_ensemble_member(output_dir, iteration, m) + simdir_path = joinpath(member_path, "output_active") + if isdir(simdir_path) + simdir = SimDir(simdir_path) + G_ensemble[:, m] .= process_member_data(simdir) + else + G_ensemble[:, m] .= NaN + end + end + return G_ensemble + end + + function process_member_data(simdir::SimDir) + isempty(simdir.vars) && return NaN + rsut = + get(simdir; short_name = "rsut", reduction = "average", period = "30d") + return slice(average_xy(rsut); time = 30).data + end + =# + + return obs +end + +# Read in the era5 datafile +era5_ds = Dataset(joinpath(ClimaLand.Artifacts.era5_surface_data2008_path(), + "era5_monthly_surface_fluxes_200801-200812.nc")) + +# Make the ERA5 target +ERA5_target = [] +close = (x, y) -> abs(x - y) < 5e-1 +for (lon, lat) in training_locations + # Fetch slices of lhf and shf era5 data from the era5 dataset + lat_ind, lon_ind = findall((x) -> close(x, lat), era5_ds["latitude"][:])[1], + findall((x) -> close(x, lon + 180), era5_ds["longitude"][:])[1] + lhf_loc = vec(era5_ds["mslhf"][lon_ind, lat_ind, :][:, end, :]) + shf_loc = vec(era5_ds["msshf"][lon_ind, lat_ind, :][:, end, :]) + + # Create Observation objects for lhf and shf + lhf_ERA5 = EKP.Observation( + Dict( + "samples" => lhf_loc, + "covariances" => cov(lhf_loc) * EKP.I, + "names" => "lhf_$(lon)_$(lat)", + ), + ) + + shf_ERA5 = EKP.Observation( + Dict( + "samples" => shf_loc, + "covariances" => cov(shf_loc) * EKP.I, + "names" => "shf_$(lon)_$(lat)", + ), + ) + + # Add the observations to the target + push!(ERA5_target, lhf_ERA5) + push!(ERA5_target, shf_ERA5) + +end + +full_obs_era5 = EKP.combine_observations(ERA5_target) + +# Things to consider: +# - masking out the ocean when comparing to data +# - output paths matching to parameters + +# Priors: +# κ_soil = FT(1.5): Gaussian with mean = 2, std dev = 1, cannot be negative [ or uniform from 0.5 to 2.5] +# ρc_soil = FT(2e6); Gaussian with mean = 4e6, std dev = 2e6, cannot be negative [ or uniform from 1e5 to 5e6] +# f_bucket = FT(0.75); Gaussian with mean = 0.5, std_dev = 0.3, cannot be negative, cannot be greater than 1 [ or uniform from 0.2 to 0.9] +# W_f = FT(0.2); Gaussian with mean = 0.4, std_dev = 0.4, cannot be negative [ or uniform from 0.05 to 1] +# p = FT(1); we can range uniform 1 to 2.5, cannot be smaller than 1 +#z_0m = FT(1e-2); can we sample from log normal ranging from z_0m = 1e-3 to z_0m = 0.1? diff --git a/experiments/calibration/global_bucket/with_climacalibrate.jl b/experiments/calibration/global_bucket/with_climacalibrate.jl new file mode 100644 index 0000000000..434643ae47 --- /dev/null +++ b/experiments/calibration/global_bucket/with_climacalibrate.jl @@ -0,0 +1,45 @@ +import ClimaCalibrate as CAL + +# CAL.forward_model(iteration, member) +# --> just runs the model, doesn't return anything +# CAL.observation_map(iteration) +# --> returns the vector of obs + +# @everywhere, see https://github.com/CliMA/ClimaAtmos.jl/blob/30ac26aafbb27c66bc65201dec5397116588a9af/calibration/test/e2e_test.jl + +include("calibrate_bucket_function_climacalibrate.jl") + +observations = full_obs_era5 + +# Parameters prior +prior_κ_soil = EKP.constrained_gaussian("κ_soil", 2, 1, 0, Inf); +prior_ρc_soil = EKP.constrained_gaussian("ρc_soil", 4e6, 2e6, 0, Inf); +prior_f_bucket = EKP.constrained_gaussian("f_bucket", 0.5, 0.3, 0, 1); +prior_W_f = EKP.constrained_gaussian("W_f", 0.4, 0.4, 0, Inf); +prior_p = EKP.constrained_gaussian("p", 2, 1, 1, Inf); +prior_z_0m = EKP.constrained_gaussian("z_0m", 0.01, 0.1, 0, Inf); +prior = EKP.combine_distributions([ + prior_κ_soil, + prior_ρc_soil, + prior_f_bucket, + prior_W_f, + prior_p, + prior_z_0m, +]); + +ensemble_size = 10 # we could start with less +n_iterations = 5 # we could start with less +noise = EKP.I # not sure what to do here, I had noise embeded in my observation_map function + +output_dir = "calibration_output" # Should I create this folder? + +CAL.calibrate( + CAL.WorkerBackend, # not sure what to do here + ensemble_size, + n_iterations, + observations, + noise, + prior, + output_dir + ) + diff --git a/job_script.sh b/job_script.sh new file mode 100644 index 0000000000..4545f1ac72 --- /dev/null +++ b/job_script.sh @@ -0,0 +1,17 @@ +#!/bin/bash +#SBATCH --job-name=calibrate_bucket +#SBATCH --output=output_%j.txt # Output file (%j expands to job ID) +#SBATCH --error=error_%j.txt # Error file +#SBATCH --nodes=1 # Number of nodes +#SBATCH --ntasks=1 # Number of tasks (processes) +#SBATCH --time=05:00:00 # Time limit hrs:min:sec +#SBATCH --mem=8G # Memory per node +#SBATCH --partition=gpu +#SBATCH --gpus=1 + +# Run your command +export CLIMACOMMS_DEVICE="CUDA" +export CLIMACOMMS_CONTEXT="SINGLETON" + +julia --project=.buildkite/ experiments/calibration/global_bucket/calibrate_bucket.jl + diff --git a/src/Artifacts.jl b/src/Artifacts.jl index f342bc140d..49e3854a49 100644 --- a/src/Artifacts.jl +++ b/src/Artifacts.jl @@ -32,6 +32,18 @@ function era5_lai_forcing_data2008_folder_path(; context = nothing) return @clima_artifact("era5_land_forcing_data2008_lai", context) end +""" + era5_surface_data2008_path(; context) + +Return the path to the folder that contains the ERA5 monthly surface data + +NOTE: We should be able to use @clima_artifact here, but the folders on central +seem to be named differently than the artifact name +""" +function era5_surface_data2008_path(; context = nothing) + return "/groups/esm/ClimaArtifacts/artifacts/era5_surface_fluxes_2008" +end + """ clm_data__folder_path(; context)