From bce08eb5bce4c5262f60669e1f9cadc13294d71f Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Wed, 30 Aug 2023 14:32:41 +0100 Subject: [PATCH] return format --- README.md | 2 +- earthkit/climate/shapes.py | 256 ++++++++++++++++++++++++------------- earthkit/climate/tools.py | 28 +++- 3 files changed, 193 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 27e7235..b2bf898 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # earthkit-climate -A toolkit for statistical analysis of climate and related observational data. +A toolkit for statistical analysis of temporal-geospatial data. **DISCLAIMER** This project is **BETA** and will be **Experimental** for the foreseeable future. diff --git a/earthkit/climate/shapes.py b/earthkit/climate/shapes.py index 1bf345e..5f35b49 100644 --- a/earthkit/climate/shapes.py +++ b/earthkit/climate/shapes.py @@ -3,14 +3,14 @@ from copy import deepcopy import geopandas as gpd +import pandas as pd import numpy as np import xarray as xr from earthkit.climate.tools import ( WEIGHTS_DICT, - get_dim_key, get_how, - get_spatial_dims, + get_spatial_info, ) logger = logging.getLogger(__name__) @@ -143,11 +143,11 @@ def _geopandas_to_shape_list(geodataframe): return [row[1]["geometry"] for row in geodataframe.iterrows()] -def _shape_mask_iterator(shapes, target, regular_grid=True, **kwargs): +def _shape_mask_iterator(shapes, target, regular=True, **kwargs): """Method which iterates over shape mask methods.""" if isinstance(shapes, gpd.GeoDataFrame): shapes = _geopandas_to_shape_list(shapes) - if regular_grid: + if regular: mask_function = rasterize else: mask_function = mask_contains_points @@ -156,15 +156,15 @@ def _shape_mask_iterator(shapes, target, regular_grid=True, **kwargs): yield shape_da -def shapes_to_mask(shapes, target, regular_grid=True, **kwargs): +def shapes_to_masks(shapes, target, regular=True, **kwargs): """ - Method which creates a list of mask dataarrays. + Method which creates a list of masked dataarrays. If possible use the shape_mask_iterator. """ if isinstance(shapes, gpd.GeoDataFrame): shapes = _geopandas_to_shape_list(shapes) - if regular_grid: + if regular: mask_function = rasterize else: mask_function = mask_contains_points @@ -172,12 +172,93 @@ def shapes_to_mask(shapes, target, regular_grid=True, **kwargs): return [mask_function([shape], target.coords, **kwargs) for shape in shapes] +def shapes_to_mask(shapes, target, regular=True, **kwargs): + """ + Method which creates a single masked dataarray based on all features in shapes. + + If possible use the shape_mask_iterator. + """ + if isinstance(shapes, gpd.GeoDataFrame): + shapes = _geopandas_to_shape_list(shapes) + if regular: + mask_function = rasterize + else: + mask_function = mask_contains_points + + return mask_function(shapes, target.coords, **kwargs) + + +def get_mask_dim_index( + mask_dim: T.Union[str, None, T.Dict[str, T.Any]], + geodataframe: gpd.geodataframe.GeoDataFrame, + default_index_name: str = "index", +): + if isinstance(mask_dim, str): + if mask_dim in geodataframe: + mask_dim_index = pd.Index(geodataframe[mask_dim]) + else: + mask_dim_index = geodataframe.index.rename(mask_dim) + elif isinstance(mask_dim, dict): + assert ( + len(mask_dim) == 1 + ), "If provided as a dictionary, mask_dim should have onlly one key value pair" + _mask_dim, _mask_dim_values = mask_dim.items() + mask_dim_index = pd.Index(_mask_dim_values, name=_mask_dim) + elif mask_dim is None: + # Use the index of the data frame + mask_dim_index = geodataframe.index + if mask_dim_index.name is None: + mask_dim_index = mask_dim_index.rename(default_index_name) + else: + raise ValueError("Unrecognised mask_dim format") + + return mask_dim_index + + +def mask( + dataarray: T.Union[xr.Dataset, xr.DataArray], + geodataframe: gpd.geodataframe.GeoDataFrame, + lat_key: T.Union[None, str] = None, + lon_key: T.Union[None, str] = None, + **mask_kwargs, +): + """ + Apply shape mask to some gridded data. + + The geodataframe object is treated as a single mask, any points that lie outside of any + of the features are masked + + Parameters + ---------- + dataarray : + Xarray data object (must have geospatial coordinates). + geodataframe : + Geopandas Dataframe containing the polygons for aggregations + lat_key/lon_key : + key for latitude/longitude variable, default behaviour is to detect variable keys. + + Returns + ------- + A masked data array/dataset with same dimensions as the input dataarray/dataset. Any point that + does not lie in any of the features of geodataframe is masked. + """ + spatial_info = get_spatial_info( + dataarray, lat_key=lat_key, lon_key=lon_key + ) + # Get spatial info required by mask functions: + mask_kwargs.update({key: spatial_info[key] for key in ["lat_key", "lon_key", "regular"]}) + mask = shapes_to_mask(geodataframe, dataarray, **mask_kwargs) + out = dataarray.where(mask) + out.attrs.update(geodataframe.attrs) + return out + def masks( - dataarray: T.Union[xr.DataArray, xr.Dataset], + dataarray: T.Union[xr.Dataset, xr.DataArray], geodataframe: gpd.geodataframe.GeoDataFrame, - mask_dim: str = "FID", - # regular_grid: bool = True, - **kwargs, + mask_dim: T.Union[str, None] = None, + lat_key: T.Union[None, str] = None, + lon_key: T.Union[None, str] = None, + **mask_kwargs, ): """ Apply multiple shape masks to some gridded data. @@ -185,9 +266,8 @@ def masks( Each feauture in shape is treated as an individual mask to apply to data. The data provided is returned with an additional dimension equal in length to the number of features in the shape object, this can result in very - large files which will slow down your workflow. It may be better to loop - over individual features, or directly apply the mask with the ct.shapes.average - or ct.shapes.reduce functions. + large files which will slow down your script. It may be better to loop + over individual features, or directly apply the mask with the shapes.reduce. Parameters ---------- @@ -195,11 +275,11 @@ def masks( Xarray data object (must have geospatial coordinates). geodataframe : Geopandas Dataframe containing the polygons for aggregations - how : - method used to apply mask. Default='mean', which calls np.nanmean - weights : - Provide weights for aggregation, also accepts recognised keys for weights, e.g. - 'latitude' + mask_dim : + dimension that will be created to accomodate the masked arrays, default is the index + of the geodataframe + lat_key/lon_key : + key for latitude/longitude variable, default behaviour is to detect variable keys. Returns ------- @@ -207,25 +287,18 @@ def masks( Each slice of layer corresponds to a feature in layer. """ masked_arrays = [] - for mask in _shape_mask_iterator(geodataframe, dataarray, **kwargs): + spatial_info = get_spatial_info( + dataarray, lat_key=lat_key, lon_key=lon_key + ) + # Get spatial info required by mask functions: + mask_kwargs.update({key: spatial_info[key] for key in ["lat_key", "lon_key", "regular"]}) + + for mask in _shape_mask_iterator(geodataframe, dataarray, **mask_kwargs): masked_arrays.append(dataarray.where(mask)) - if isinstance(mask_dim, str): - mask_dim_values = geodataframe.get( - mask_dim, np.arange(len(masked_arrays)) - ).to_numpy() - elif isinstance(mask_dim, dict): - assert ( - len(mask_dim) == 1 - ), "If provided as a dictionary, mask_dim should have onlly one key value pair" - mask_dim, mask_dim_values = mask_dim.items() - else: - raise ValueError( - "Unrecognised format for mask_dim, should be a string or length one dictionary" - ) + mask_dim_index = get_mask_dim_index(mask_dim, geodataframe) - out = xr.concat(masked_arrays, dim=mask_dim) - out = out.assign_coords({mask_dim: mask_dim_values}) + out = xr.concat(masked_arrays, dim=mask_dim_index) out.attrs.update(geodataframe.attrs) @@ -233,7 +306,7 @@ def masks( def reduce( - dataarray: T.Union[xr.DataArray, xr.Dataset], + dataarray: T.Union[xr.Dataset, xr.DataArray], geodataframe: gpd.GeoDataFrame, **kwargs, ): @@ -258,7 +331,8 @@ def reduce( extra_reduce_dims : any additional dimensions to aggregate over when reducing over spatial dimensions mask_dim : - dimension that will be created after the reduction of the spatial dimensions, default = `FID` + dimension that will be created after the reduction of the spatial dimensions, default is the index + of the dataframe return_as : what format to return the data object, `pandas` or `xarray`. Work In Progress how_label : @@ -297,11 +371,12 @@ def _reduce_dataarray( lat_key: T.Union[None, str] = None, lon_key: T.Union[None, str] = None, extra_reduce_dims: T.Union[list, str] = [], - mask_dim: str = "FID", + mask_dim: T.Union[None, str] = None, return_as: str = "xarray", how_label: T.Union[str, None] = None, squeeze: bool = True, - **kwargs, + mask_kwargs: T.Dict[str, T.Any] = {}, + **reduce_kwargs, ): """ Apply a shape object to an xarray.DataArray object using the specified 'how' method. @@ -329,7 +404,9 @@ def _reduce_dataarray( what format to return the data object, `"pandas"` or `"xarray"`. Work In Progress how_label : label to append to variable name in returned object, default is `how` - kwargs : + mask_kwargs : + Any kwargs to pass into the mask method + reduce_kwargs : kwargs recognised by the how function Returns @@ -339,11 +416,24 @@ def _reduce_dataarray( """ extra_out_attrs = {} - # If how is string, fetch function from dictionary: - if isinstance(how, str): - how_label = deepcopy(how) - how = get_how(how) - assert isinstance(how, T.Callable), f"how must be a callable: {how}" + if weights is None: + # convert how string to a method to apply + if isinstance(how, str): + how_label = deepcopy(how) + how = get_how(how) + assert isinstance(how, T.Callable), f"how must be a callable: {how}" + if how_label is None: + # get label from how method + how_label = how.__name__ + else: + # Create any standard weights, i.e. latitude + if isinstance(weights, str): + weights = WEIGHTS_DICT[weights](dataarray) + # We ensure the callable is a string + if callable(how): + how = how.__name__ + if how_label is None: + how_label = how new_long_name = ( f"{how_label.title()} {dataarray.attrs.get('long_name', dataarray.name)}" @@ -353,70 +443,55 @@ def _reduce_dataarray( if isinstance(extra_reduce_dims, str): extra_reduce_dims = [extra_reduce_dims] - if lat_key is None: - lat_key = get_dim_key(dataarray, "y") - if lon_key is None: - lon_key = get_dim_key(dataarray, "x") - - spatial_dims = get_spatial_dims(dataarray, lat_key, lon_key) - - # Create any standard weights, i.e. latitude - if isinstance(weights, str): - weights = WEIGHTS_DICT[weights](dataarray) + spatial_info = get_spatial_info( + dataarray, lat_key=lat_key, lon_key=lon_key + ) + # Get spatial info required by mask functions: + mask_kwargs.update({key: spatial_info[key] for key in ["lat_key", "lon_key", "regular"]}) + spatial_dims = spatial_info.get("spatial_dims") reduce_dims = spatial_dims + extra_reduce_dims extra_out_attrs.update({"reduce_dims": reduce_dims}) - red_kwargs = {} + reduce_kwargs = {"dim": reduce_dims} reduced_list = [] - for mask in _shape_mask_iterator(geodataframe, dataarray, **kwargs): + for mask in _shape_mask_iterator(geodataframe, dataarray, **mask_kwargs): this = dataarray.where(mask, other=np.nan) # If weighted, use xarray weighted arrays which correctly handle missing values etc. if weights is not None: - dataarray.weighted(weights) - - reduced = this.reduce(how, dim=reduce_dims, **red_kwargs).compute() - reduced = reduced.assign_attrs(dataarray.attrs) - reduced_list.append(reduced) - # context.debug(f"Shapes.average reduced ({i}): {reduced} \n{i}") - - if isinstance(mask_dim, str): - mask_dim_values = geodataframe.get( - mask_dim, np.arange(len(reduced_list)) - ).to_numpy() - elif isinstance(mask_dim, dict): - assert ( - len(mask_dim) == 1 - ), "If provided as a dictionary, mask_dim should have only one key value pair" - mask_dim, mask_dim_values = mask_dim.items() - else: - raise ValueError( - "Unrecognised format for mask_dim, should be a string or length one dictionary" - ) + this = this.weighted(weights) + reduced_list.append(this.__getattribute__(how)(**reduce_kwargs)) + else: + reduced = this.reduce(how, **reduce_kwargs).compute() + reduced = reduced.assign_attrs(dataarray.attrs) + reduced_list.append(reduced) if squeeze: reduced_list = [red_data.squeeze() for red_data in reduced_list] - out_xr = xr.concat(reduced_list, dim=mask_dim) - out_xr = out_xr.assign_coords( - **{ - mask_dim: (mask_dim, mask_dim_values), - # TODO: the following creates an xarray that cannot be saved to netCDF - # "geometry": (mask_dim, [geom for geom in geodataframe["geometry"]]), - } - ) - out_xr = out_xr.assign_attrs() + mask_dim_index = get_mask_dim_index(mask_dim, geodataframe) + + out_xr = xr.concat(reduced_list, dim=mask_dim_index) + + # # TODO: the following creates an xarray that cannot be saved to netCDF + # out_xr = out_xr.assign_coords( + # **{ + # "geometry": (mask_dim, [geom for geom in geodataframe["geometry"]]), + # } + # ) if return_as in ["pandas"]: # Return as a fully expanded pandas.DataFrame logger.warn( - "Returning reduced data in pandas format is considered experimental and the format" + "Returning reduced data in pandas format is considered experimental and may change in future" + "versions of earthkit-climate" ) # Convert to DataFrame - out = geodataframe.set_index(mask_dim) + out = geodataframe.set_index(mask_dim_index) out = out.join(out_xr.to_dataframe()) out.attrs.update({**extra_out_attrs}) elif return_as in ["pandas_compact"]: logger.warn( - "Returning reduced data in pandas format is considered experimental and the format" + "Returning reduced data in pandas format is considered experimental and may change in future" + "versions of earthkit-climate" ) # Out dims for attributes: out_dims = { @@ -439,7 +514,8 @@ def _reduce_dataarray( }, } ) - out = geodataframe.assign(**{new_short_name: reduced_list}) + out = geodataframe.set_index(mask_dim_index) + out = out.assign(**{new_short_name: reduced_list}) out.attrs.update({"reduce_attrs": reduce_attrs}) else: out = out_xr.assign_attrs({**geodataframe.attrs, **extra_out_attrs}) diff --git a/earthkit/climate/tools.py b/earthkit/climate/tools.py index 02dac79..3a8e0c5 100644 --- a/earthkit/climate/tools.py +++ b/earthkit/climate/tools.py @@ -211,11 +211,19 @@ def get_dim_key( return axis -def get_spatial_dims(dataarray, lat_key, lon_key): +def get_spatial_info(dataarray, lat_key=None, lon_key=None): + + # Figure out the keys for the latitude and longitude variables + if lat_key is None: + lat_key = get_dim_key(dataarray, "y") + if lon_key is None: + lon_key = get_dim_key(dataarray, "x") + # Get the geospatial dimensions of the data. In the case of regular data this # will be 'lat' and 'lon'. For irregular data it could be any dimensions lat_dims = dataarray.coords[lat_key].dims lon_dims = dataarray.coords[lon_key].dims + spatial_dims = [dim for dim in lat_dims] + [dim for dim in lon_dims if dim not in lat_dims] # Assert that latitude and longitude have the same dimensions # (irregular data, e.g. x&y or obs) @@ -223,4 +231,20 @@ def get_spatial_dims(dataarray, lat_key, lon_key): assert (lat_dims == lon_dims) or ( (lat_dims == (lat_key,)) and (lon_dims) == (lon_key,) ) - return list(set(lat_dims + lon_dims)) + if (lat_dims == lon_dims): + regular = False + elif (lat_dims == (lat_key,)) and (lon_dims) == (lon_key,): + regular = True + else: + raise ValueError( + "The geospatial dimensions have not not been correctly detected:\n" + f"lat_key: {lat_key}; lat_dims: {lat_dims}\n" + f"lon_key: {lon_key}; lon_dims: {lon_dims}\n" + ) + spatial_info = { + "lat_key": lat_key, + "lon_key": lon_key, + "regular": regular, + "spatial_dims": spatial_dims + } + return spatial_info