diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index 7ec3fc23..51800757 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -19,7 +19,7 @@ jobs: max-parallel: 3 matrix: os: ["ubuntu-latest"] - python-version: ["3.9"] + python-version: ["3.10"] # python-version: ["3.8", "3.9", "3.10"] steps: - name: Checkout diff --git a/CHANGELOG.md b/CHANGELOG.md index 07cda8f9..b937934e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,17 @@ # OpenGHG Inversions Change Log -# Version 0.2 (current devel) +# Version 1.1.3 + +- reorganised basis functions code into its own submodule `openghg_inversions.basis`. This submodule contains the basis function algorithms, functions to call those algorithms, and the basis function wrapper that was previously in `get_data.py`. [#PR 87](https://github.com/openghg/openghg_inversions/pull/87) + +- `combine_datasets` loads data before reindexing, to avoid a performance problem ( +`reindex_like` is very slow if dataset not loaded into memory pydata/xarray#8945). Also, the default method has been set to `nearest` instead of `ffill`, since `ffill` tends to create NaNs in the first lat/lon coordinates. [#PR 87](https://github.com/openghg/openghg_inversions/pull/87) + +- if the basis functions don't have a "region" dimension (which is the case for all of the basis functions created by our algorithms), then the projection to basis functions is done by creating a sparse matrix that maps from lat/lon to basis regions, and multiplies the footprint by this matrix. This requires the `sparse` package. [#PR 87](https://github.com/openghg/openghg_inversions/pull/87) + +- the required version of python has been increased to 3.10. This is because changes in scipy forced changes in arviz, and these change in arviz were implemented at the same time that they increased the required version of python to 3.10. This isn't caught by pip, so we end up with an old version of arviz that is incompatible with the most recent version of scipy. On Blue Pebble, you can use load lang/python/miniconda/3.10.10.cuda-12 instead of load lang/python/anaconda to get Python 3.10 (lang/python/anaconda gives you Python 3.9, even though it says you get Python 3.10) [#PR 87](https://github.com/openghg/openghg_inversions/pull/87) + +- Option added to use InTem outer regions for basis functions. This can be selected by using `fixed_outer_basis_regions = True` in an .ini file. [#PR 87](https://github.com/openghg/openghg_inversions/pull/87) - Refactored basis functions so that they return an xr.Dataset, rather than writing to temporary files. If an output directory is specified, they will save the basis functions as a side effect. @@ -8,6 +19,9 @@ - Added tests to test `get_data.py`, including creating, saving, and loading merged data. Refactored inversions tests to reload merged data, instead of creating merged data. +# Version 0.1.2 + +- Bugfix: fixed problem with error handling in `config.version` caused inversions to fail if git wasn't loaded on Blue Pebble. [#PR 91](https://github.com/openghg/openghg_inversions/pull/91) # Version 0.1.1 diff --git a/openghg_inversions/array_ops.py b/openghg_inversions/array_ops.py new file mode 100644 index 00000000..c216a096 --- /dev/null +++ b/openghg_inversions/array_ops.py @@ -0,0 +1,159 @@ +""" +General methods for xarray Datasets and DataArrays. + +`get_xr_dummies` applies pandas `get_dummies` to xarray DataArrays. + +`sparse_xr_dot` multiplies a Dataset or DataArray by a DataArray + with sparse underlying array. The built-in xarray functionality doesn't +work correctly. +""" +from typing import Any, Optional, Sequence, Union, TypeVar + +import numpy as np +import pandas as pd +import sparse +import xarray as xr +from sparse import COO +from xarray.core.common import DataWithCoords + + +# type for xr.Dataset *or* xr.DataArray +DataSetOrArray = TypeVar("DataSetOrArray", bound=DataWithCoords) + + +def get_xr_dummies( + da: xr.DataArray, + categories: Optional[Union[Sequence[Any], pd.Index, xr.DataArray, np.ndarray]] = None, + cat_dim: str = "categories", + return_sparse: bool = True, +): + """Create 0-1 dummy matrix from DataArray with values that correspond to categories. + + If the values of `da` are integers 0-N, then the result has N + 1 columns, and the (i, j) coordiante + of the result is 1 if `da[i] == j`, and is 0 otherwise. + + Args: + da: DataArray encoding categories. + categories: optional coordinates for categories. + cat_dim: dimension for categories coordinate + sparse: if True, store values in sparse.COO matrix + + Returns: + Dummy matrix corresponding to the input vector. Its dimensions are the same as the + input DataArray, plus an additional "categories" dimension, which has one value for each + distinct value in the input DataArray. + """ + # stack if `da` is not one dimensional + stack_dim = "" + if len(da.dims) > 1: + stack_dim = "".join([str(dim) for dim in da.dims]) + da = da.stack({stack_dim: da.dims}) + + dummies = pd.get_dummies(da.values, dtype=int, sparse=return_sparse) + + # put dummies into DataArray with the right coords and dims + values = COO.from_scipy_sparse(dummies.sparse.to_coo()) if return_sparse else dummies.values + if categories is None: + categories = np.arange(values.shape[1]) + coords = da.coords.merge({cat_dim: categories}).coords # coords.merge returns Dataset, we want the coords + result = xr.DataArray(values, coords=coords) + + # if we stacked `da`, unstack result before returning + return result.unstack(stack_dim) if stack_dim else result + + +def sparse_xr_dot( + da1: xr.DataArray, + da2: DataSetOrArray, + debug: bool = False, + broadcast_dims: Optional[Sequence[str]] = None, +) -> DataSetOrArray: + """Compute the matrix "dot" of a tuple of DataArrays with sparse.COO values. + + This multiplies and sums over all common dimensions of the input DataArrays, and + preserves the coordinates and dimensions that are not summed over. + + Common dimensions are automatically selected by name. The input arrays must have at + least one dimension in common. All matching dimensions will be used for multiplication. + + NOTE: this function shouldn't be necessary, but `da1 @ da2` doesn't work properly if the + values of `da1` and `da2` are `sparse.COO` arrays. + + Args: + da1, da2: xr.DataArrays to multiply and sum along common dimensions. + debug: if true, will print the dimensions of the inputs to `sparse.tensordot` + as well as the dimension of the result. + along_dim: name + + Returns: + xr.DataArray containing the result of matrix/tensor multiplication + + Raises: + ValueError if the input DataArrays have no common dimensions to multiply. + """ + common_dims = set(da1.dims).intersection(set(da2.dims)) + nc = len(common_dims) + + if nc == 0: + raise ValueError(f"DataArrays \n{da1}\n{da2}\n have no common dimensions. Cannot compute `dot`.") + + if broadcast_dims is not None: + _broadcast_dims = set(broadcast_dims).intersection(common_dims) + else: + _broadcast_dims = set([]) + + contract_dims = common_dims.difference(_broadcast_dims) + ncontract = len(contract_dims) + + tensor_dot_axes = tuple([tuple(range(-ncontract, 0))] * 2) + input_core_dims = [list(contract_dims)] * 2 + + # compute tensor dot on last nc coordinates (because core dims are moved to end) + # and then drop 1D coordinates resulting from summing + def _func(x, y, debug=False): + result = sparse.tensordot(x, y, axes=tensor_dot_axes) # type: ignore + + xs = list(x.shape[:-ncontract]) + nxs = len(xs) + ys = list(y.shape[:-ncontract]) + nys = len(ys) + pad_y = nxs - nys + if pad_y > 0: + ys = [1] * pad_y + ys + + idx1, idx2 = [], [] + for i, j in zip(xs, ys): + if j in (i, 1): + idx1.append(slice(None)) + idx2.append(0) + elif i == 1: + # x broadcasted to match y's dim + idx1.append(0) + idx2.append(slice(None)) + + if debug: + print("pad y", pad_y) + print(xs, ys) + print(idx1, idx2) + + idx2 = idx2[pad_y:] + idx3 = [0] * (result.ndim - len(idx1) - len(idx2)) + idx = tuple(idx1 + idx3 + idx2) + + if debug: + print("x.shape", x.shape, "y.shape", y.shape) + print("idx", idx) + print("result shape:", result.shape) + + return result[idx] # type: ignore + + def wrapper(da1, da2): + for arr in [da1, da2]: + print(f"_func received array of type {type(arr)}, shape {arr.shape}") + result = _func(da1, da2, debug=True) + print(f"_func result shape: {result.shape}\n") + return result + + func = wrapper if debug else _func + + return xr.apply_ufunc(func, da1, da2, input_core_dims=input_core_dims, join="outer") diff --git a/openghg_inversions/basis/__init__.py b/openghg_inversions/basis/__init__.py new file mode 100644 index 00000000..1288cc72 --- /dev/null +++ b/openghg_inversions/basis/__init__.py @@ -0,0 +1,2 @@ +from ._functions import bucketbasisfunction, quadtreebasisfunction +from ._wrapper import basis_functions_wrapper diff --git a/openghg_inversions/basis/_functions.py b/openghg_inversions/basis/_functions.py new file mode 100644 index 00000000..70ab42a6 --- /dev/null +++ b/openghg_inversions/basis/_functions.py @@ -0,0 +1,207 @@ +""" +Functions to create basis datasets from fluxes and footprints. +""" +import getpass +from collections import namedtuple +from functools import partial +from pathlib import Path +from typing import cast, Optional + +import pandas as pd +import xarray as xr + +from .algorithms import quadtree_algorithm, weighted_algorithm + + +def _flux_fp_from_fp_all( + fp_all: dict, emissions_name: Optional[list[str]] = None +) -> tuple[xr.DataArray, list[xr.DataArray]]: + """Get flux and list of footprints from `fp_all` dictionary and optional list of emissions names.""" + if emissions_name is not None: + flux = fp_all[".flux"][emissions_name[0]].data.flux + else: + first_flux = next(iter(fp_all[".flux"].values())) + flux = first_flux.data.flux + + flux = cast(xr.DataArray, flux) + + footprints: list[xr.DataArray] = [v.fp for k, v in fp_all.items() if not k.startswith(".")] + + return flux, footprints + + +def _mean_fp_times_mean_flux( + flux: xr.DataArray, + footprints: list[xr.DataArray], + abs_flux: bool = False, + mask: Optional[xr.DataArray] = None, +) -> xr.DataArray: + """Multiply mean flux by mean of footprints, optionally restricted to a Boolean mask.""" + if abs_flux is True: + print("Using absolute value of flux array.") + flux = abs(flux) + + mean_flux = flux.mean("time") + + fp_total = sum(footprints) # this seems to be faster than concatentating and summing over new axis + n_measure = sum(len(fp.time) for fp in footprints) + + fp_total = cast(xr.DataArray, fp_total) # otherwise mypy complains about the next line + mean_fp = fp_total.sum("time") / n_measure + + if mask is not None: + # align to footprint lat/lon + mean_fp, mean_flux, mask = xr.align(mean_fp, mean_flux, mask, join="override") + return (mean_fp * mean_flux).where(mask, drop=True) + + mean_fp, mean_flux = xr.align(mean_fp, mean_flux, join="override") + return mean_fp * mean_flux + + +def quadtreebasisfunction( + fp_all: dict, + start_date: str, + emissions_name: Optional[list[str]] = None, + nbasis: int = 100, + abs_flux: bool = False, + seed: Optional[int] = None, + mask: Optional[xr.DataArray] = None, +) -> xr.DataArray: + """ + Creates a basis function with nbasis grid cells using a quadtree algorithm. + + The domain is split with smaller grid cells for regions which contribute + more to the a priori (above basline) mole fraction. This is based on the + average footprint over the inversion period and the a priori emissions field. + + The number of basis functions is optimised using dual annealing. Probably + not the best or fastest method as there should only be one minima, but doesn't + require the Jacobian or Hessian for optimisation. + + Args: + fp_all: + Output from footprints_data_merge() function. Dictionary of datasets. + start_date: + String of start date of inversion + emissions_name: + List of "source" key words as used for retrieving specific emissions + from the object store. + nbasis: + Number of basis functions that you want. This will optimise to + closest value that fits with quadtree splitting algorithm, + i.e. nbasis % 4 = 1. + abs_flux: + If True this will take the absolute value of the flux + seed: + Optional seed to pass to scipy.optimize.dual_annealing. Used for testing. + mask: + Boolean mask on lat/lon coordinates. Used to find basis on sub-region. + + Returns: + xr.DataArray with lat/lon dimensions and basis regions encoded by integers. + """ + flux, footprints = _flux_fp_from_fp_all(fp_all, emissions_name) + fps = _mean_fp_times_mean_flux(flux, footprints, abs_flux=abs_flux, mask=mask) + + # use xr.apply_ufunc to keep xarray coords + func = partial(quadtree_algorithm, nbasis=nbasis, seed=seed) + quad_basis = xr.apply_ufunc(func, fps) + + quad_basis = quad_basis.expand_dims({"time": [pd.to_datetime(start_date)]}, axis=-1) + quad_basis = quad_basis.rename("basis") # this will be used in merges + + quad_basis.attrs["creator"] = getpass.getuser() + quad_basis.attrs["date created"] = str(pd.Timestamp.today()) + + return quad_basis + + +def bucketbasisfunction( + fp_all: dict, + start_date: str, + emissions_name: Optional[list[str]] = None, + nbasis: int = 100, + abs_flux: bool = False, + mask: Optional[xr.DataArray] = None, +) -> xr.DataArray: + """ + Basis functions calculated using a weighted region approach + where each basis function / scaling region contains approximately + the same value + + Args: + fp_all: + fp_all dictionary object as produced from get_data functions + start_date: + Start date of period of inference + emissions_name: + List of keyword "source" args used for retrieving emissions files + from the Object store. + nbasis: + Desired number of basis function regions + abs_flux: + When set to True uses absolute values of a flux array + mask: + Boolean mask on lat/lon coordinates. Used to find basis on sub-region. + + Returns: + xr.DataArray with lat/lon dimensions and basis regions encoded by integers. + """ + flux, footprints = _flux_fp_from_fp_all(fp_all, emissions_name) + fps = _mean_fp_times_mean_flux(flux, footprints, abs_flux=abs_flux, mask=mask) + + # use xr.apply_ufunc to keep xarray coords + func = partial(weighted_algorithm, nregion=nbasis, bucket=1) + bucket_basis = xr.apply_ufunc(func, fps) + + bucket_basis = bucket_basis.expand_dims({"time": [pd.to_datetime(start_date)]}, axis=-1) + bucket_basis = bucket_basis.rename("basis") # this will be used in merges + + bucket_basis.attrs["creator"] = getpass.getuser() + bucket_basis.attrs["date created"] = str(pd.Timestamp.today()) + + return bucket_basis + + +# dict to retrieve basis function and description by algorithm name +BasisFunction = namedtuple("BasisFunction", ["description", "algorithm"]) +basis_functions = { + "quadtree": BasisFunction("quadtree algorithm", quadtreebasisfunction), + "weighted": BasisFunction("weighted by data algorithm", bucketbasisfunction), +} + + +def fixed_outer_regions_basis( + fp_all: dict, + start_date: str, + basis_algorithm: str, + emissions_name: Optional[list[str]] = None, + nbasis: int = 100, + abs_flux: bool = False, +) -> xr.DataArray: + """Fix outer region of basis functions to InTEM regions, and fit the inner regions using `basis_algorithm`.""" + intem_regions_path = Path(__file__).parent / "intem_region_definition.nc" + intem_regions = xr.open_dataset(intem_regions_path).region + + # force intem_regions to use flux coordinates + flux, _ = _flux_fp_from_fp_all(fp_all, emissions_name) + _, intem_regions = xr.align(flux, intem_regions, join="override") + + mask = intem_regions == 6 + + basis_function = basis_functions[basis_algorithm].algorithm + inner_region = basis_function(fp_all, start_date, emissions_name, nbasis, abs_flux, mask=mask) + + basis = intem_regions.rename("basis") + + loc_dict = { + "lat": slice(inner_region.lat.min(), inner_region.lat.max() + 0.1), + "lon": slice(inner_region.lon.min(), inner_region.lon.max() + 0.1), + } + basis.loc[loc_dict] = (inner_region + 5).squeeze().values + + basis += 1 # intem_region_definitions.nc regions start at 0, not 1 + + basis = basis.expand_dims({"time": [pd.to_datetime(start_date)]}) + + return basis diff --git a/openghg_inversions/basis/_wrapper.py b/openghg_inversions/basis/_wrapper.py new file mode 100644 index 00000000..01d0d2f5 --- /dev/null +++ b/openghg_inversions/basis/_wrapper.py @@ -0,0 +1,170 @@ +""" +Functions to calling basis function algorithms and applying basis functions to data. +""" +from pathlib import Path +from typing import Optional + +import xarray as xr + +from ._functions import basis_functions, fixed_outer_regions_basis +from .. import utils + + +def basis_functions_wrapper( + fp_all: dict, + species: str, + domain: str, + start_date: str, + emissions_name: list[str], + nbasis: int, + use_bc: bool, + basis_algorithm: Optional[str] = None, + fix_outer_regions: bool = False, + fp_basis_case: Optional[str] = None, + bc_basis_case: Optional[str] = None, + basis_directory: Optional[str] = None, + bc_basis_directory: Optional[str] = None, + outputname: Optional[str] = None, + output_path: Optional[str] = None, +): + """ + Wrapper function for selecting basis function + algorithm. + + Args: + basis_algorithm (str): + One of "quadtree" (for using Quadtree algorithm) or + "weighted" (for using an algorihtm that splits region + by input data). + + NB. Land-sea separation is not imposed in the quadtree + basis functions, but is imposed by default in "weighted" + + nbasis (int): + Number of basis function regions to calculated in domain + fp_basis_case (str): + Name of basis function to use for emissions. + bc_basis_case (str, optional): + Name of basis case type for boundary conditions (NOTE, I don't + think that currently you can do anything apart from scaling NSEW + boundary conditions if you want to scale these monthly.) + basis_directory (str, optional): + Directory containing the basis function + if not default. + bc_basis_directory (str, optional): + Directory containing the boundary condition basis functions + (e.g. files starting with "NESW") + use_bc (bool): + Option to include/exclude boundary conditions in inversion + fp_all (dict): + Dictionary object produced from get_data functions + species (str): + Atmospheric trace gas species of interest + sites (str/list): + List of sites of interest + domain (str): + Model domain + start_date (str): + Start date of period of inference + emissions_name (str/list): + Emissions dataset key words for retrieving from object store + outputname (str): + File output name + output_path (str): + Passed to `outputdir` argument of `quadtreebasisfunction`. Used for testing. + + Returns: + fp_data (dict): + Dictionary object similar to fp_all but with information + on basis functions and sensitivities + """ + if fp_basis_case is not None: + if basis_algorithm: + print( + f"Basis algorithm {basis_algorithm} and basis case {fp_basis_case} supplied; using {fp_basis_case}." + ) + basis_data_array = utils.basis( + domain=domain, basis_case=fp_basis_case, basis_directory=basis_directory + ).basis + + elif basis_algorithm is None: + raise ValueError("One of `fp_basis_case` or `basis_algorithm` must be specified.") + + elif fix_outer_regions is True: + try: + basis_data_array = fixed_outer_regions_basis( + fp_all, start_date, basis_algorithm, emissions_name, nbasis + ) + except KeyError as e: + raise ValueError( + "Basis algorithm not recognised. Please use either 'quadtree' or 'weighted', or input a basis function file" + ) from e + print(f"Using InTEM regions with {basis_algorithm} to derive basis functions for inner region.") + + else: + try: + basis_function = basis_functions[basis_algorithm] + except KeyError as e: + raise ValueError( + "Basis algorithm not recognised. Please use either 'quadtree' or 'weighted', or input a basis function file" + ) from e + print(f"Using {basis_function.description} to derive basis functions.") + basis_data_array = basis_function.algorithm(fp_all, start_date, emissions_name, nbasis) + + fp_data = utils.fp_sensitivity(fp_all, basis_func=basis_data_array) + + if use_bc is True: + fp_data = utils.bc_sensitivity( + fp_data, + domain=domain, + basis_case=bc_basis_case, + bc_basis_directory=bc_basis_directory, + ) + + if output_path is not None and basis_algorithm is not None and fp_basis_case is None: + _save_basis( + basis=basis_data_array, + basis_algorithm=basis_algorithm, + output_dir=output_path, + domain=domain, + species=species, + output_name=outputname, + ) + + return fp_data + + +def _save_basis( + basis: xr.DataArray, + basis_algorithm: str, + output_dir: str, + domain: str, + species: str, + output_name: Optional[str] = None, +) -> None: + """Save basis functions to netCDF. + + Args: + basis: basis dataset to save + basis_algorithm: name of basis algorithm (e.g. "quadtree" or "weighted") + output_dir: root directory to save basis functions + domain: domain of inversion; basis is saved in a "domain" directory inside `output_dir` + species: species of inversion + output_name + + Returns: + None. Saves basis dataset to netCDF. + """ + basis_out_path = Path(output_dir, domain.upper()) + + if not basis_out_path.exists(): + basis_out_path.mkdir(parents=True) + + start_date = str(basis.time.min().values)[:7] # year and month + + if output_name is None: + output_name = f"{basis_algorithm}_{species}_{domain}_{start_date}.nc" + else: + output_name = f"{basis_algorithm}_{species}-{output_name}_{domain}_{start_date}.nc" + + basis.to_netcdf(basis_out_path / output_name, mode="w") diff --git a/openghg_inversions/basis/algorithms/__init__.py b/openghg_inversions/basis/algorithms/__init__.py new file mode 100644 index 00000000..476ce160 --- /dev/null +++ b/openghg_inversions/basis/algorithms/__init__.py @@ -0,0 +1,2 @@ +from ._quadtree import get_quadtree_basis as quadtree_algorithm +from ._weighted import nregion_landsea_basis as weighted_algorithm diff --git a/openghg_inversions/basis/algorithms/_quadtree.py b/openghg_inversions/basis/algorithms/_quadtree.py new file mode 100644 index 00000000..5a8bb5bf --- /dev/null +++ b/openghg_inversions/basis/algorithms/_quadtree.py @@ -0,0 +1,123 @@ +from typing import Optional + +import numpy as np +import scipy.optimize + + +class quadTreeNode: + def __init__(self, xStart, xEnd, yStart, yEnd): + self.xStart = xStart + self.xEnd = xEnd + self.yStart = yStart + self.yEnd = yEnd + + self.child1 = None # top left + self.child2 = None # top right + self.child3 = None # bottom left + self.child4 = None # bottom right + + def isLeaf(self): + if self.child1 or self.child2 or self.child3 or self.child4: + return False + else: + return True + + def createChildren(self, grid, limit): + value = np.sum(grid[self.xStart : self.xEnd, self.yStart : self.yEnd]) # .values + + # stop subdividing if finest resolution or bucket level reached + if value < limit or (self.xEnd - self.xStart < 2) or (self.yEnd - self.yStart < 2): + return + + dx = self.xEnd - self.xStart + dy = self.yEnd - self.yStart + + # create 4 children for subdivison + self.child1 = quadTreeNode(self.xStart, self.xStart + dx // 2, self.yStart, self.yStart + dy // 2) + self.child2 = quadTreeNode( + self.xStart + dx // 2, self.xStart + dx, self.yStart, self.yStart + dy // 2 + ) + self.child3 = quadTreeNode( + self.xStart, self.xStart + dx // 2, self.yStart + dy // 2, self.yStart + dy + ) + self.child4 = quadTreeNode( + self.xStart + dx // 2, self.xStart + dx, self.yStart + dy // 2, self.yStart + dy + ) + + # apply recursion on all child nodes + self.child1.createChildren(grid, limit) + self.child2.createChildren(grid, limit) + self.child3.createChildren(grid, limit) + self.child4.createChildren(grid, limit) + + def appendLeaves(self, leafList): + # recursively append all leaves/end nodes to leafList + if self.isLeaf(): + leafList.append(self) + else: + self.child1.appendLeaves(leafList) + self.child2.appendLeaves(leafList) + self.child3.appendLeaves(leafList) + self.child4.appendLeaves(leafList) + + +def quadTreeGrid(grid, limit): + """ + Apply quadtree division algorithm. + + Args: + grid (array): + 2d numpy array to apply quadtree division to + limit (float): + Use value as bucket level for defining maximum subdivision + + Returns: + outputGrid (array): + 2d numpy grid, same shape as grid, with values correpsonding to + each box from boxList + """ + # start with a single node the size of the entire input grid: + parentNode = quadTreeNode(0, grid.shape[0], 0, grid.shape[1]) + parentNode.createChildren(grid, limit) + + leafList = [] + parentNode.appendLeaves(leafList) + + outputGrid = np.zeros_like(grid) + + for i, leaf in enumerate(leafList): + outputGrid[leaf.xStart : leaf.xEnd, leaf.yStart : leaf.yEnd] = i + + return outputGrid + + +def get_quadtree_basis(fps: np.ndarray, nbasis: int, seed: Optional[int] = None) -> np.ndarray: + """Given an array and a specified number of basis functions, return basis regions specified by + the quadtree algorithm. + + Args: + fps: array (mean flux times mean footprints) to use to calculate basis regions + nbasis: target number of basis regions + seed: optional random seed to use (for testing or reproducing results) + + Returns: + 2D numpy array with positive integer values representing basis regions. + """ + + def qtoptim(x): + basisQuad = quadTreeGrid(fps, x) + return (nbasis - np.max(basisQuad) - 1) ** 2 + + cost = 1e6 + pwr = 0 + search_max = 10 * np.sum(fps) + while cost > 3.0: + optim = scipy.optimize.dual_annealing( + qtoptim, np.expand_dims([0, search_max / 10**pwr], axis=0), seed=seed + ) + cost = np.sqrt(optim.fun) + pwr += 1 + if pwr > 10: + raise RuntimeError("Quadtree did not converge after max iterations.") + + return quadTreeGrid(fps, optim.x[0]) + 1 diff --git a/openghg_inversions/basis/algorithms/_weighted.py b/openghg_inversions/basis/algorithms/_weighted.py new file mode 100644 index 00000000..cc67d353 --- /dev/null +++ b/openghg_inversions/basis/algorithms/_weighted.py @@ -0,0 +1,158 @@ +import numpy as np +import xarray as xr + + +# BUCKET BASIS FUNCTIONS +def load_landsea_indices(): + """ + Load UKMO array with indices that separate + land and sea regions in EUROPE domain + -------------- + land = 1 + sea = 0 + """ + landsea_indices = xr.open_dataset("../countries/country-EUROPE-UKMO-landsea-2023.nc") + return landsea_indices["country"].values + + +def bucket_value_split(grid, bucket, offset_x=0, offset_y=0): + """ + Algorithm that will split the input grid (e.g. fp * flux) + such that the sum of each basis function region will + equal the bucket value or by a single array element. + + The number of regions will be determined by the bucket value + i.e. smaller bucket value ==> more regions + larger bucket value ==> fewer regions + ------------------------------------ + Args: + grid: np.array + 2D grid of footprints * flux, or whatever + grid you want to split. Could be: population + data, spatial distribution of bakeries, you chose! + + bucket: float + Maximum value for each basis function region + + Returns: + array of tuples that define the indices for each basis function region + [(ymin0, ymax0, xmin0, xmax0), ..., (yminN, ymaxN, xminN, xmaxN)] + """ + + if np.sum(grid) <= bucket or grid.shape == (1, 1): + return [(offset_y, offset_y + grid.shape[0], offset_x, offset_x + grid.shape[1])] + + else: + if grid.shape[0] >= grid.shape[1]: + half_y = grid.shape[0] // 2 + return bucket_value_split(grid[0:half_y, :], bucket, offset_x, offset_y) + bucket_value_split( + grid[half_y:, :], bucket, offset_x, offset_y + half_y + ) + + elif grid.shape[0] < grid.shape[1]: + half_x = grid.shape[1] // 2 + return bucket_value_split(grid[:, 0:half_x], bucket, offset_x, offset_y) + bucket_value_split( + grid[:, half_x:], bucket, offset_x + half_x, offset_y + ) + + +# Optimize bucket value to number of desired regions +def get_nregions(bucket, grid): + """Returns no. of basis functions for bucket value""" + return np.max(bucket_split_landsea_basis(grid, bucket)) + + +def optimize_nregions(bucket, grid, nregion, tol): + """ + Optimize bucket value to obtain nregion basis functions + within +/- tol. + """ + # print(bucket, get_nregions(bucket, grid)) + if get_nregions(bucket, grid) <= nregion + tol and get_nregions(bucket, grid) >= nregion - tol: + return bucket + + if get_nregions(bucket, grid) < nregion + tol: + bucket = bucket * 0.995 + return optimize_nregions(bucket, grid, nregion, tol) + + elif get_nregions(bucket, grid) > nregion - tol: + bucket = bucket * 1.005 + return optimize_nregions(bucket, grid, nregion, tol) + + +def bucket_split_landsea_basis(grid, bucket): + """ + Same as bucket_split_basis but includes + land-sea split. i.e. basis functions cannot overlap sea and land + ------------------------------------ + Args: + grid: np.array + 2D grid of footprints * flux, or whatever + grid you want to split. Could be: population + data, spatial distribution of bakeries, you chose! + + bucket: float + Maximum value for each basis function region + + Returns: + 2D array with basis function values + + """ + landsea_indices = load_landsea_indices() + myregions = bucket_value_split(grid, bucket) + + mybasis_function = np.zeros(shape=grid.shape) + + for i in range(len(myregions)): + ymin, ymax = myregions[i][0], myregions[i][1] + xmin, xmax = myregions[i][2], myregions[i][3] + + inds_y0, inds_x0 = np.where(landsea_indices[ymin:ymax, xmin:xmax] == 0) + inds_y1, inds_x1 = np.where(landsea_indices[ymin:ymax, xmin:xmax] == 1) + + count = np.max(mybasis_function) + + if len(inds_y0) != 0: + count += 1 + for i in range(len(inds_y0)): + mybasis_function[inds_y0[i] + ymin, inds_x0[i] + xmin] = count + + if len(inds_y1) != 0: + count += 1 + for i in range(len(inds_y1)): + mybasis_function[inds_y1[i] + ymin, inds_x1[i] + xmin] = count + + return mybasis_function + + +def nregion_landsea_basis(grid, bucket=1, nregion=100, tol=1): + """ + Obtain basis function with nregions (for land-sea split) + ------------------------------------ + Args: + grid: np.array + 2D grid of footprints * flux, or whatever + grid you want to split. Could be: population + data, spatial distribution of bakeries, you chose! + + bucket: float + Initial bucket value for each basis function region. + Defaults to 1 + + nregion: int + Number of desired basis function regions + Defaults to 100 + + tol: int + Tolerance to find number of basis function regions. + i.e. optimizes nregions to +/- tol + Defaults to 1 + + Returns: + basis_function np.array + 2D basis function array + + """ + bucket_opt = optimize_nregions(bucket, grid, nregion, tol) + basis_function = bucket_split_landsea_basis(grid, bucket_opt) + return basis_function diff --git a/openghg_inversions/basis/intem_region_definition.nc b/openghg_inversions/basis/intem_region_definition.nc new file mode 100644 index 00000000..9f7d7cbc Binary files /dev/null and b/openghg_inversions/basis/intem_region_definition.nc differ diff --git a/openghg_inversions/basis_functions.py b/openghg_inversions/basis_functions.py deleted file mode 100644 index 43ab55d4..00000000 --- a/openghg_inversions/basis_functions.py +++ /dev/null @@ -1,517 +0,0 @@ -# ***************************************************************************** -# Created: 7 Nov. 2022 -# Author: Eric Saboya, School of Geographical Sciences, University of Bristol -# Contact: eric.saboya@bristol.ac.uk -# ***************************************************************************** -# About -# Basis functions for used for HBMCMC inversions. Originally created by -# Anita Ganesan and updated, here, by Eric Saboya. -# HBMCMC uses the QuadTree algorithm for creating basis functions for -# the inversion runs. -# ***************************************************************************** - -import getpass -import os -from typing import Optional - -import numpy as np -import pandas as pd -import scipy.optimize -import xarray as xr - - -class quadTreeNode: - def __init__(self, xStart, xEnd, yStart, yEnd): - self.xStart = xStart - self.xEnd = xEnd - self.yStart = yStart - self.yEnd = yEnd - - self.child1 = None # top left - self.child2 = None # top right - self.child3 = None # bottom left - self.child4 = None # bottom right - - def isLeaf(self): - if self.child1 or self.child2 or self.child3 or self.child4: - return False - else: - return True - - def createChildren(self, grid, limit): - value = np.sum(grid[self.xStart : self.xEnd, self.yStart : self.yEnd]) # .values - - # stop subdividing if finest resolution or bucket level reached - if value < limit or (self.xEnd - self.xStart < 2) or (self.yEnd - self.yStart < 2): - return - - dx = self.xEnd - self.xStart - dy = self.yEnd - self.yStart - - # create 4 children for subdivison - self.child1 = quadTreeNode(self.xStart, self.xStart + dx // 2, self.yStart, self.yStart + dy // 2) - self.child2 = quadTreeNode( - self.xStart + dx // 2, self.xStart + dx, self.yStart, self.yStart + dy // 2 - ) - self.child3 = quadTreeNode( - self.xStart, self.xStart + dx // 2, self.yStart + dy // 2, self.yStart + dy - ) - self.child4 = quadTreeNode( - self.xStart + dx // 2, self.xStart + dx, self.yStart + dy // 2, self.yStart + dy - ) - - # apply recursion on all child nodes - self.child1.createChildren(grid, limit) - self.child2.createChildren(grid, limit) - self.child3.createChildren(grid, limit) - self.child4.createChildren(grid, limit) - - def appendLeaves(self, leafList): - # recursively append all leaves/end nodes to leafList - if self.isLeaf(): - leafList.append(self) - else: - self.child1.appendLeaves(leafList) - self.child2.appendLeaves(leafList) - self.child3.appendLeaves(leafList) - self.child4.appendLeaves(leafList) - - -def quadTreeGrid(grid, limit): - """ - Apply quadtree division algorithm - ----------------------------------- - Args: - grid (array): - 2d numpy array to apply quadtree division to - limit (float): - Use value as bucket level for defining maximum subdivision - - Returns: - outputGrid (array): - 2d numpy grid, same shape as grid, with values correpsonding to - each box from boxList - boxList: (list of lists) - Each sublist describes the corners of a quadtree leaf - ----------------------------------- - """ - # start with a single node the size of the entire input grid: - parentNode = quadTreeNode(0, grid.shape[0], 0, grid.shape[1]) - parentNode.createChildren(grid, limit) - - leafList = [] - boxList = [] - parentNode.appendLeaves(leafList) - - outputGrid = np.zeros_like(grid) - - for i, leaf in enumerate(leafList): - outputGrid[leaf.xStart : leaf.xEnd, leaf.yStart : leaf.yEnd] = i - boxList.append([leaf.xStart, leaf.xEnd, leaf.yStart, leaf.yEnd]) - - return outputGrid - - -def quadtreebasisfunction( - emissions_name: list[str], - fp_all: dict, - sites: list[str], - start_date: str, - domain: str, - species: str, - outputname: Optional[str] = None, - outputdir: Optional[str] = None, - nbasis: int = 100, - abs_flux: bool = False, - seed: Optional[int] = None, -) -> xr.Dataset: - """ - Creates a basis function with nbasis grid cells using a quadtree algorithm. - The domain is split with smaller grid cells for regions which contribute - more to the a priori (above basline) mole fraction. This is based on the - average footprint over the inversion period and the a priori emissions field. - Output is a netcdf file saved to /Temp/ in the current directory - if no outputdir is specified or to outputdir if specified. - The number of basis functions is optimised using dual annealing. Probably - not the best or fastest method as there should only be one minima, but doesn't - require the Jacobian or Hessian for optimisation. - ----------------------------------- - Args: - emissions_name (list): - List of "source" key words as used for retrieving specific emissions - from the object store. - fp_all (dict): - Output from footprints_data_merge() function. Dictionary of datasets. - sites (list): - List of site names (This could probably be found elsewhere) - start_date (str): - String of start date of inversion - domain (str): - The inversion domain - species (str): - Atmospheric trace gas species of interest (e.g. 'co2') - outputname (str): - Identifier or run name - outputdir (str, optional): - Path to output directory where the basis function file will be saved. - Basis function will automatically be saved in outputdir/DOMAIN - Default of None makes a temp directory. - nbasis (int): - Number of basis functions that you want. This will optimise to - closest value that fits with quadtree splitting algorithm, - i.e. nbasis % 4 = 1. - abs_flux (bool): - If True this will take the absolute value of the flux - seed: - Optional seed to pass to scipy.optimize.dual_annealing. Used for testing. - - Returns: - xr.Dataset with lat/lon dimensions and basis regions encoded by integers. - If outputdir is not None, then saves the basis function in outputdir. - ----------------------------------- - """ - if abs_flux: - print("Using absolute values of flux array") - if emissions_name is None: - flux = ( - np.absolute(fp_all[".flux"]["all"].data.flux.values) - if abs_flux - else fp_all[".flux"]["all"].data.flux.values - ) - meanflux = np.squeeze(flux) - else: - if isinstance(fp_all[".flux"][emissions_name[0]], dict): - arr = fp_all[".flux"][emissions_name[0]] - flux = np.absolute(arr.data.flux.values) if abs_flux else arr.data.flux.values - meanflux = np.squeeze(flux) - else: - flux = ( - np.absolute(fp_all[".flux"][emissions_name[0]].data.flux.values) - if abs_flux - else fp_all[".flux"][emissions_name[0]].data.flux.values - ) - meanflux = np.squeeze(flux) - meanfp = np.zeros((fp_all[sites[0]].fp.shape[0], fp_all[sites[0]].fp.shape[1])) - div = 0 - for site in sites: - meanfp += np.sum(fp_all[site].fp.values, axis=2) - div += fp_all[site].fp.shape[2] - meanfp /= div - - if meanflux.shape != meanfp.shape: - meanflux = np.mean(meanflux, axis=2) - fps = meanfp * meanflux - - def qtoptim(x): - basisQuad = quadTreeGrid(fps, x) - return (nbasis - np.max(basisQuad) - 1) ** 2 - - cost = 1e6 - pwr = 0 - while cost > 3.0: - optim = scipy.optimize.dual_annealing( - qtoptim, np.expand_dims([0, 100 / 10**pwr], axis=0), seed=seed - ) - cost = np.sqrt(optim.fun) - pwr += 1 - if pwr > 10: - raise RuntimeError("Quadtree did not converge after max iterations.") - basisQuad = quadTreeGrid(fps, optim.x[0]) - - lon = fp_all[sites[0]].lon.values - lat = fp_all[sites[0]].lat.values - - base = np.expand_dims(basisQuad + 1, axis=2) - - time = [pd.to_datetime(start_date)] - newds = xr.Dataset( - {"basis": (["lat", "lon", "time"], base)}, - coords={"time": (["time"], time), "lat": (["lat"], lat), "lon": (["lon"], lon)}, - ) - newds.lat.attrs["long_name"] = "latitude" - newds.lon.attrs["long_name"] = "longitude" - newds.lat.attrs["units"] = "degrees_north" - newds.lon.attrs["units"] = "degrees_east" - newds.attrs["creator"] = getpass.getuser() - newds.attrs["date created"] = str(pd.Timestamp.today()) - - if outputdir is not None: - basisoutpath = os.path.join(outputdir, domain) - if outputname is None: - outputname = "output_name" - if not os.path.exists(basisoutpath): - os.makedirs(basisoutpath) - newds.to_netcdf( - os.path.join( - basisoutpath, f"quadtree_{species}-{outputname}_{domain}_{start_date.split('-')[0]}.nc" - ), - mode="w", - ) - - return newds - - -# BUCKET BASIS FUNCTIONS -def load_landsea_indices(): - """ - Load UKMO array with indices that separate - land and sea regions in EUROPE domain - -------------- - land = 1 - sea = 0 - """ - landsea_indices = xr.open_dataset("../countries/country-EUROPE-UKMO-landsea-2023.nc") - return landsea_indices["country"].values - - -def bucket_value_split(grid, bucket, offset_x=0, offset_y=0): - """ - Algorithm that will split the input grid (e.g. fp * flux) - such that the sum of each basis function region will - equal the bucket value or by a single array element. - - The number of regions will be determined by the bucket value - i.e. smaller bucket value ==> more regions - larger bucket value ==> fewer regions - ------------------------------------ - Args: - grid: np.array - 2D grid of footprints * flux, or whatever - grid you want to split. Could be: population - data, spatial distribution of bakeries, you chose! - - bucket: float - Maximum value for each basis function region - - Returns: - array of tuples that define the indices for each basis function region - [(ymin0, ymax0, xmin0, xmax0), ..., (yminN, ymaxN, xminN, xmaxN)] - """ - - if np.sum(grid) <= bucket or grid.shape == (1, 1): - return [(offset_y, offset_y + grid.shape[0], offset_x, offset_x + grid.shape[1])] - - else: - if grid.shape[0] >= grid.shape[1]: - half_y = grid.shape[0] // 2 - return bucket_value_split(grid[0:half_y, :], bucket, offset_x, offset_y) + bucket_value_split( - grid[half_y:, :], bucket, offset_x, offset_y + half_y - ) - - elif grid.shape[0] < grid.shape[1]: - half_x = grid.shape[1] // 2 - return bucket_value_split(grid[:, 0:half_x], bucket, offset_x, offset_y) + bucket_value_split( - grid[:, half_x:], bucket, offset_x + half_x, offset_y - ) - - -# Optimize bucket value to number of desired regions -def get_nregions(bucket, grid): - """Returns no. of basis functions for bucket value""" - return np.max(bucket_split_landsea_basis(grid, bucket)) - - -def optimize_nregions(bucket, grid, nregion, tol): - """ - Optimize bucket value to obtain nregion basis functions - within +/- tol. - """ - # print(bucket, get_nregions(bucket, grid)) - if get_nregions(bucket, grid) <= nregion + tol and get_nregions(bucket, grid) >= nregion - tol: - return bucket - - if get_nregions(bucket, grid) < nregion + tol: - bucket = bucket * 0.995 - return optimize_nregions(bucket, grid, nregion, tol) - - elif get_nregions(bucket, grid) > nregion - tol: - bucket = bucket * 1.005 - return optimize_nregions(bucket, grid, nregion, tol) - - -def bucket_split_landsea_basis(grid, bucket): - """ - Same as bucket_split_basis but includes - land-sea split. i.e. basis functions cannot overlap sea and land - ------------------------------------ - Args: - grid: np.array - 2D grid of footprints * flux, or whatever - grid you want to split. Could be: population - data, spatial distribution of bakeries, you chose! - - bucket: float - Maximum value for each basis function region - - Returns: - 2D array with basis function values - - """ - landsea_indices = load_landsea_indices() - myregions = bucket_value_split(grid, bucket) - - mybasis_function = np.zeros(shape=grid.shape) - - for i in range(len(myregions)): - ymin, ymax = myregions[i][0], myregions[i][1] - xmin, xmax = myregions[i][2], myregions[i][3] - - inds_y0, inds_x0 = np.where(landsea_indices[ymin:ymax, xmin:xmax] == 0) - inds_y1, inds_x1 = np.where(landsea_indices[ymin:ymax, xmin:xmax] == 1) - - count = np.max(mybasis_function) - - if len(inds_y0) != 0: - count += 1 - for i in range(len(inds_y0)): - mybasis_function[inds_y0[i] + ymin, inds_x0[i] + xmin] = count - - if len(inds_y1) != 0: - count += 1 - for i in range(len(inds_y1)): - mybasis_function[inds_y1[i] + ymin, inds_x1[i] + xmin] = count - - return mybasis_function - - -def nregion_landsea_basis(grid, bucket=1, nregion=100, tol=1): - """ - Obtain basis function with nregions (for land-sea split) - ------------------------------------ - Args: - grid: np.array - 2D grid of footprints * flux, or whatever - grid you want to split. Could be: population - data, spatial distribution of bakeries, you chose! - - bucket: float - Initial bucket value for each basis function region. - Defaults to 1 - - nregion: int - Number of desired basis function regions - Defaults to 100 - - tol: int - Tolerance to find number of basis function regions. - i.e. optimizes nregions to +/- tol - Defaults to 1 - - Returns: - basis_function np.array - 2D basis function array - - """ - bucket_opt = optimize_nregions(bucket, grid, nregion, tol) - basis_function = bucket_split_landsea_basis(grid, bucket_opt) - return basis_function - - -def bucketbasisfunction( - emissions_name, - fp_all, - sites, - start_date, - domain, - species, - outputname, - outputdir=None, - nbasis=100, - abs_flux=False, -) -> xr.Dataset: - """ - Basis functions calculated using a weighted region approach - where each basis function / scaling region contains approximately - the same value - ----------------------------------- - Args: - emissions_name (str/list): - List of keyword "source" args used for retrieving emissions files - from the Object store. - fp_all (dict): - fp_all dictionary object as produced from get_data functions - sites (str/list): - List of measurements sites being used. - start_date (str): - Start date of period of inference - domain (str): - Name of model domain - species (str): - Name of atmospheric species of interest - outputname (str): - Name of inversion run - outputdir (str): - Directory where inversion run outputs are saved - nbasis (int): - Desired number of basis function regions - abs_flux (bool): - When set to True uses absolute values of a flux array - - Returns: - xr.Dataset with lat/lon dimensions and basis regions encoded by integers. - If outputdir is not None, then saves the basis function in outputdir. - """ - if abs_flux: - print("Using absolute values of flux array") - if emissions_name is None: - flux = ( - np.absolute(fp_all[".flux"]["all"].data.flux.values) - if abs_flux - else fp_all[".flux"]["all"].data.flux.values - ) - meanflux = np.squeeze(flux) - else: - if isinstance(fp_all[".flux"][emissions_name[0]], dict): - arr = fp_all[".flux"][emissions_name[0]] - flux = np.absolute(arr.data.flux.values) if abs_flux else arr.data.flux.values - meanflux = np.squeeze(flux) - else: - flux = ( - np.absolute(fp_all[".flux"][emissions_name[0]].data.flux.values) - if abs_flux - else fp_all[".flux"][emissions_name[0]].data.flux.values - ) - meanflux = np.squeeze(flux) - meanfp = np.zeros((fp_all[sites[0]].fp.shape[0], fp_all[sites[0]].fp.shape[1])) - div = 0 - for site in sites: - meanfp += np.sum(fp_all[site].fp.values, axis=2) - div += fp_all[site].fp.shape[2] - meanfp /= div - - if meanflux.shape != meanfp.shape: - meanflux = np.mean(meanflux, axis=2) - fps = meanfp * meanflux - - bucket_basis = nregion_landsea_basis(fps, 1, nbasis) - - lon = fp_all[sites[0]].lon.values - lat = fp_all[sites[0]].lat.values - - base = np.expand_dims(bucket_basis, axis=2) - - time = [pd.to_datetime(start_date)] - newds = xr.Dataset( - {"basis": (["lat", "lon", "time"], base)}, - coords={"time": (["time"], time), "lat": (["lat"], lat), "lon": (["lon"], lon)}, - ) - newds.lat.attrs["long_name"] = "latitude" - newds.lon.attrs["long_name"] = "longitude" - newds.lat.attrs["units"] = "degrees_north" - newds.lon.attrs["units"] = "degrees_east" - newds.attrs["creator"] = getpass.getuser() - newds.attrs["date created"] = str(pd.Timestamp.today()) - - if outputdir is not None: - basisoutpath = os.path.join(outputdir, domain) - if not os.path.exists(basisoutpath): - os.makedirs(basisoutpath) - newds.to_netcdf( - os.path.join( - basisoutpath, - f"weighted_{species}-{outputname}_{domain}_{start_date.split('-')[0]}{start_date.split('-')[1]}.nc", - ), - mode="w", - ) - - return newds diff --git a/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template.ini b/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template.ini index 75a8e08f..7bb385f7 100644 --- a/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template.ini +++ b/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template.ini @@ -176,10 +176,12 @@ tune = ; (required) nchain = 2 -[MCMC.ADD_ERROR] +[MCMC.OPTIONS] ; averagingerror (bool): Add variability in averaging period to the measurement error averagingerror = True +min_error = 1.0 +fix_basis_outer_regions = False [MCMC.OUTPUT] ; Details of where to write the output diff --git a/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template_example.ini b/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template_example.ini index 2e985cc1..056f5de3 100644 --- a/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template_example.ini +++ b/openghg_inversions/hbmcmc/config/openghg_hbmcmc_input_template_example.ini @@ -147,10 +147,13 @@ tune = 2000 nchain = 2 -[MCMC.ADD_ERROR] +[MCMC.OPTIONS] ; averagingerror (bool): Add variability in averaging period to the measurement error -averaging_error = True +averagingerror = True +min_error = 20.0 +fix_basis_outer_regions = False + [MCMC.OUTPUT] ; Details of where to write the output diff --git a/openghg_inversions/hbmcmc/hbmcmc.py b/openghg_inversions/hbmcmc/hbmcmc.py index d9383a48..fc520725 100644 --- a/openghg_inversions/hbmcmc/hbmcmc.py +++ b/openghg_inversions/hbmcmc/hbmcmc.py @@ -33,137 +33,12 @@ import os import pickle from pathlib import Path -from typing import Optional import numpy as np -import openghg_inversions.basis_functions as basis import openghg_inversions.hbmcmc.inversion_pymc as mcmc import openghg_inversions.hbmcmc.inversionsetup as setup from openghg_inversions import get_data, utils - - -def basis_functions_wrapper( - fp_all: dict, - species: str, - sites: list[str], - domain: str, - start_date: str, - emissions_name: list[str], - nbasis: int, - use_bc: bool, - basis_algorithm: Optional[str] = None, - fp_basis_case: Optional[str] = None, - bc_basis_case: Optional[str] = None, - basis_directory: Optional[str] = None, - bc_basis_directory: Optional[str] = None, - outputname: Optional[str] = None, - output_path: Optional[str] = None, -): - """ - Wrapper function for selecting basis function - algorithm. - ----------------------------------- - Args: - basis_algorithm (str): - One of "quadtree" (for using Quadtree algorithm) or - "weighted" (for using an algorihtm that splits region - by input data). - - NB. Land-sea separation is not imposed in the quadtree - basis functions, but is imposed by default in "weighted" - - nbasis (int): - Number of basis function regions to calculated in domain - fp_basis_case (str): - Name of basis function to use for emissions. - bc_basis_case (str, optional): - Name of basis case type for boundary conditions (NOTE, I don't - think that currently you can do anything apart from scaling NSEW - boundary conditions if you want to scale these monthly.) - basis_directory (str, optional): - Directory containing the basis function - if not default. - bc_basis_directory (str, optional): - Directory containing the boundary condition basis functions - (e.g. files starting with "NESW") - use_bc (bool): - Option to include/exclude boundary conditions in inversion - fp_all (dict): - Dictionary object produced from get_data functions - species (str): - Atmospheric trace gas species of interest - sites (str/list): - List of sites of interest - domain (str): - Model domain - start_date (str): - Start date of period of inference - emissions_name (str/list): - Emissions dataset key words for retrieving from object store - outputname (str): - File output name - output_path (str): - Passed to `outputdir` argument of `quadtreebasisfunction`. Used for testing. - - - Returns: - fp_data (dict): - Dictionary object similar to fp_all but with information - on basis functions and sensitivities - """ - if fp_basis_case is not None: - if basis_algorithm: - print( - f"Basis algorithm {basis_algorithm} and basis case {fp_basis_case} supplied; using {fp_basis_case}." - ) - basis_func = utils.basis(domain=domain, basis_case=fp_basis_case, basis_directory=basis_directory) - - elif basis_algorithm is None: - raise ValueError("One of `fp_basis_case` or `basis_algorithm` must be specified.") - - elif basis_algorithm == "quadtree": - print("Using Quadtree algorithm to derive basis functions") - basis_func = basis.quadtreebasisfunction( - emissions_name, - fp_all, - sites, - start_date, - domain, - species, - outputname, - nbasis=nbasis, - outputdir=output_path, - ) - - elif basis_algorithm == "weighted": - print("Using weighted by data algorithm to derive basis functions") - basis_func = basis.bucketbasisfunction( - emissions_name, - fp_all, - sites, - start_date, - domain, - species, - outputname, - nbasis=nbasis, - ) - - else: - raise ValueError( - "Basis algorithm not recognised. Please use either 'quadtree' or 'weighted', or input a basis function file" - ) - - fp_data = utils.fp_sensitivity(fp_all, basis_func=basis_func) - - if use_bc is True: - fp_data = utils.bc_sensitivity( - fp_data, - domain=domain, - basis_case=bc_basis_case, - bc_basis_directory=bc_basis_directory, - ) - - return fp_data +from openghg_inversions.basis import basis_functions_wrapper def fixedbasisMCMC( @@ -206,6 +81,7 @@ def fixedbasisMCMC( burn=50000, tune=1.25e5, nchain=2, + fix_basis_outer_regions: bool = False, averaging_error=True, bc_freq=None, sigma_freq=None, @@ -494,9 +370,9 @@ def fixedbasisMCMC( fp_all=fp_all, use_bc=use_bc, species=species, - sites=sites, domain=domain, start_date=start_date, + fix_outer_regions=fix_basis_outer_regions, emissions_name=emissions_name, outputname=outputname, output_path=basis_output_path, diff --git a/openghg_inversions/utils.py b/openghg_inversions/utils.py index 84744376..5e4cf0f2 100644 --- a/openghg_inversions/utils.py +++ b/openghg_inversions/utils.py @@ -11,17 +11,21 @@ # **************************************************************************** import glob import json -from pathlib import Path import os +from pathlib import Path from types import SimpleNamespace +from typing import Optional, Union -import pandas as pd +import dask.array as da import numpy as np +import pandas as pd import xarray as xr +from openghg.analyse import ModelScenario from tqdm import tqdm -import dask.array as da + from openghg_inversions import convert from openghg_inversions.config.paths import Paths +from openghg_inversions.array_ops import get_xr_dummies, sparse_xr_dot openghginv_path = Paths.openghginv @@ -606,44 +610,42 @@ def indexesMatch(dsa, dsb): return True -def combine_datasets(dsa, dsb, method="ffill", tolerance=None): +def combine_datasets(dsa, dsb, method="nearest", tolerance: Optional[float] = None) -> xr.Dataset: """ - The combine_datasets function merges two datasets and re-indexes - to the FIRST dataset. If "fp" variable is found within the combined - dataset, the "time" values where the "lat","lon"dimensions didn't - match are removed. + Merge two datasets, re-indexing to the first dataset (within an optional tolerance). + + If "fp" variable is found within the combined dataset, the "time" values where the "lat", "lon" + dimensions didn't match are removed. Example: ds = combine_datasets(dsa, dsb) - ----------------------------------- + Args: dsa (xarray.Dataset): First dataset to merge dsb (xarray.Dataset): Second dataset to merge - method (str, optional): - One of {None, ‘nearest’, ‘pad’/’ffill’, ‘backfill’/’bfill’} + method: One of {None, ‘nearest’, ‘pad’/’ffill’, ‘backfill’/’bfill’} See xarray.DataArray.reindex_like for list of options and meaning. Default = "ffill" (forward fill) - tolerance (int/float??): - Maximum allowed tolerance between matches. + tolerance: Maximum allowed (absolute) tolerance between matches. Returns: - xarray.Dataset: - Combined dataset indexed to dsa - ----------------------------------- + xarray.Dataset: combined dataset indexed to dsa """ # merge the two datasets within a tolerance and remove times that are NaN (i.e. when FPs don't exist) if not indexesMatch(dsa, dsb): - dsb_temp = dsb.reindex_like(dsa, method, tolerance=tolerance) + dsb_temp = dsb.load().reindex_like(dsa, method, tolerance=tolerance) else: dsb_temp = dsb ds_temp = dsa.merge(dsb_temp) - if "fp" in list(ds_temp.keys()): - flag = np.where(np.isfinite(ds_temp.fp.mean(dim=["lat", "lon"]).values)) - ds_temp = ds_temp[dict(time=flag[0])] + + if "fp" in ds_temp: + flag = np.isfinite(ds_temp.fp.sum(dim=["lat", "lon"], skipna=False)) + ds_temp = ds_temp.where(flag, drop=True) + return ds_temp @@ -961,7 +963,9 @@ def timeseries_HiTRes( return timeseries -def fp_sensitivity(fp_and_data, basis_func, verbose=True): +def fp_sensitivity( + fp_and_data: dict, basis_func: Union[xr.DataArray, dict[str, xr.DataArray]], verbose: bool = True +): """ The fp_sensitivity function adds a sensitivity matrix, H, to each site xarray dataframe in fp_and_data. @@ -970,7 +974,7 @@ def fp_sensitivity(fp_and_data, basis_func, verbose=True): region and 0 outside region. Region numbering must start from 1 - ----------------------------------- + Args: fp_and_data (dict): Output from footprints_data_merge() function. Dictionary of datasets. @@ -983,9 +987,8 @@ def fp_sensitivity(fp_and_data, basis_func, verbose=True): reflect keys in emissions_name dict used in fp_data_merge. Returns: - dict (xarray.Dataset): + dict: Same format as fp_and_data with sensitivity matrix and basis function grid added. - ----------------------------------- """ sites = [key for key in list(fp_and_data.keys()) if key[0] != "."] @@ -1027,13 +1030,17 @@ def fp_sensitivity(fp_and_data, basis_func, verbose=True): site_sensitivities.append(sensitivity) fp_and_data[site]["H"] = xr.concat(site_sensitivities, dim="region") - fp_and_data[".basis"] = site_bf.basis[:, :, 0] + fp_and_data[".basis"] = ( + current_basis_func.squeeze("time") if site_bf is None else site_bf.basis[:, :, 0] + ) # TODO: this will only contain the last value in the loop... return fp_and_data -def fp_sensitivity_single_site_basis_func(scenario, flux, source, basis_func, verbose=True): +def fp_sensitivity_single_site_basis_func( + scenario: ModelScenario, flux, source: str, basis_func: xr.DataArray, verbose: bool = True +): """ The fp_sensitivity function adds a sensitivity matrix, H, to each site xarray dataframe in fp_and_data. @@ -1042,7 +1049,7 @@ def fp_sensitivity_single_site_basis_func(scenario, flux, source, basis_func, ve region and 0 outside region. Region numbering must start from 1 - ----------------------------------- + Args: scenario: Output from footprints_data_merge() function; e.g. `fp_all["TAC"]` @@ -1057,7 +1064,6 @@ def fp_sensitivity_single_site_basis_func(scenario, flux, source, basis_func, ve Returns: sensitivity ("H") xr.DataArray and site_bf xr.Dataset - ----------------------------------- """ if isinstance(flux, dict): if "fp_HiTRes" in list(scenario.keys()): @@ -1086,8 +1092,8 @@ def fp_sensitivity_single_site_basis_func(scenario, flux, source, basis_func, ve H_all_v = H_all.values.reshape((len(site_bf.lat) * len(site_bf.lon), len(site_bf.time))) - if "region" in list(basis_func.dims.keys()): - if "time" in basis_func.basis.dims: + if "region" in basis_func.dims: + if "time" in basis_func.dims: basis_func = basis_func.isel(time=0) site_bf = xr.merge([site_bf, basis_func]) @@ -1107,31 +1113,10 @@ def fp_sensitivity_single_site_basis_func(scenario, flux, source, basis_func, ve sensitivity = xr.DataArray(H, coords=[("region", region_name), ("time", scenario.coords["time"])]) else: - print("Warning: Using basis functions without a region dimension may be deprecated shortly.") - - site_bf = combine_datasets(site_bf, basis_func, method="ffill") - - H = np.zeros((int(np.max(site_bf.basis)), len(site_bf.time))) - - basis_scale = xr.Dataset( - {"basis_scale": (["lat", "lon", "time"], np.zeros(np.shape(site_bf.basis)))}, - coords=site_bf.coords, - ) - site_bf = site_bf.merge(basis_scale) - - base_v = np.ravel(site_bf.basis.values[:, :, 0]) - for i in range(int(np.max(site_bf.basis))): - wh_ri = np.where(base_v == i + 1) - H[i, :] = np.nansum(H_all_v[wh_ri[0], :], axis=0) - - if source == "all": - region_name = list(range(1, np.max(site_bf.basis.values) + 1)) - else: - region_name = [source + "-" + str(reg) for reg in range(1, int(np.max(site_bf.basis.values) + 1))] - - sensitivity = xr.DataArray( - H.data, coords=[("region", region_name), ("time", scenario.coords["time"].data)] - ) + _, basis_aligned = xr.align(H_all.isel(time=0), basis_func, join="override") + basis_mat = get_xr_dummies(basis_aligned.squeeze("time"), cat_dim="region") + sensitivity = sparse_xr_dot(basis_mat, H_all.fillna(0.0)).transpose("region", "time") + site_bf = None return sensitivity, site_bf diff --git a/pyproject.toml b/pyproject.toml index 211a4e78..b087ca56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openghg_inversions" -version = "0.1.1" +version = "0.1.3" authors = [{name = "Eric Saboya", email = "eric.saboya@bristol.ac.uk"}] maintainers = [{name = "Brendan Murphy", email = "brendan.murphy@bristol.ac.uk"}] license = {file = "LICENSE"} @@ -15,7 +15,7 @@ classifiers = [ ] description = "OpenGHG Inversions" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10" dependencies = [ "pymc", "xarray", @@ -23,7 +23,8 @@ dependencies = [ "matplotlib", "scipy", "numpy", - "openghg" + "openghg", + "sparse" ] [project.urls] @@ -38,4 +39,4 @@ openghg_inversions = ["data/*"] [tool.black] line-length = 110 -target-version = ["py38", "py39"] +target-version = ["py310"] diff --git a/requirements.txt b/requirements.txt index 85386bfb..30c419dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ matplotlib scipy numpy openghg == 0.6.2 +sparse diff --git a/tests/test_basis_functions.py b/tests/test_basis_functions.py index 8dcb64b3..d4b0a14d 100644 --- a/tests/test_basis_functions.py +++ b/tests/test_basis_functions.py @@ -1,6 +1,6 @@ import xarray as xr from openghg_inversions import utils -from openghg_inversions.basis_functions import quadtreebasisfunction +from openghg_inversions.basis import quadtreebasisfunction from openghg_inversions.get_data import data_processing_surface_notracer @@ -16,10 +16,7 @@ def test_quadtree_basis_function(tac_ch4_data_args, raw_data_path): basis_func = quadtreebasisfunction( emissions_name=[emissions_name], fp_all=fp_all, - sites=["TAC"], start_date="2019-01-01", - domain="EUROPE", - species="ch4", seed=42, ) @@ -27,4 +24,6 @@ def test_quadtree_basis_function(tac_ch4_data_args, raw_data_path): domain="EUROPE", basis_case="quadtree_ch4-test_basis", basis_directory=raw_data_path / "basis" ) - xr.testing.assert_allclose(basis_func, basis_func_reloaded) + # TODO: create new "fixed" basis function file, since we've switched basis functions from + # dataset to data array + xr.testing.assert_allclose(basis_func, basis_func_reloaded.basis) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..7a20fb6c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,24 @@ +import pytest +import xarray as xr +from openghg.retrieve import get_footprint, get_flux + +from openghg_inversions.utils import combine_datasets + + +def test_combine_datasets(): + fp = get_footprint(site="tac", domain="europe").data + flux = get_flux(species="ch4", source="total-ukghg-edgar7", domain="europe").data + + comb = combine_datasets(fp, flux) + + with pytest.raises(AssertionError) as exc_info: + xr.testing.assert_allclose(flux.flux.squeeze("time").drop_vars("time"), comb.flux.isel(time=0)) + + # coordinates should be different because we aligned the flux to the footprint + assert exc_info.match("Differing coordinates") + + # values should not be different + with pytest.raises(AssertionError): + # the match fails, so this raises an assertion error; if the match is found + # no error is raised and pytest complains that it did not see an AssertionError + assert exc_info.match("Differing values")