From 25c2bedf694307d773f39f8cd2cd27eb311fe105 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 4 Aug 2023 10:33:55 +0200 Subject: [PATCH 01/10] refactor(stacker): allow sample feature DataArray Specifically, test cases for sample-feature DataArray, that test edge cases when trying to stack/unstack a DataArray consisting only of sample-feature dimensions --- tests/preprocessing/test_dataarray_stacker.py | 37 +++ .../test_dataarray_stacker_stack.py | 132 +++++++++ .../test_dataset_stacker_stack.py | 170 ++++++++++++ xeofs/preprocessing/stacker.py | 256 ++++++++++++------ xeofs/validation/bootstrapper.py | 96 ++----- 5 files changed, 541 insertions(+), 150 deletions(-) create mode 100644 tests/preprocessing/test_dataarray_stacker_stack.py create mode 100644 tests/preprocessing/test_dataset_stacker_stack.py diff --git a/tests/preprocessing/test_dataarray_stacker.py b/tests/preprocessing/test_dataarray_stacker.py index cafb95d..14f697a 100644 --- a/tests/preprocessing/test_dataarray_stacker.py +++ b/tests/preprocessing/test_dataarray_stacker.py @@ -165,3 +165,40 @@ def test_inverse_transform_scores(mock_data_array, dim_sample, dim_feature): assert ( unstacked.coords[dim].size == coords.size ), "Dimension {} has different size.".format(dim) + + +def test_fit_transform_sample_feature_data(): + """Test fit_transform with sample and feature data.""" + # Create sample and feature data + np.random.seed(5) + simple_data = xr.DataArray( + np.random.rand(10, 5), + dims=("sample", "feature"), + coords={"sample": np.arange(10), "feature": np.arange(5)}, + ) + np.random.seed(5) + more_simple_data = xr.DataArray( + np.random.rand(10, 5), + dims=("sample", "feature"), + coords={"sample": np.arange(10), "feature": np.arange(5)}, + ) + + # Create stacker and fit_transform + stacker = SingleDataArrayStacker() + stacked = stacker.fit_transform(simple_data, ("sample",), ("feature")) + + # Test that the dimensions are correct + assert stacked.ndim == 2 + assert set(stacked.dims) == {"sample", "feature"} + assert not stacked.isnull().any() + + # Test that fitting new data yields the same results + more_stacked = stacker.transform(more_simple_data) + xr.testing.assert_equal(more_stacked, stacked) + + # Test that the operation is reversible + unstacked = stacker.inverse_transform_data(stacked) + xr.testing.assert_equal(unstacked, simple_data) + + more_unstacked = stacker.inverse_transform_data(more_stacked) + xr.testing.assert_equal(more_unstacked, more_simple_data) diff --git a/tests/preprocessing/test_dataarray_stacker_stack.py b/tests/preprocessing/test_dataarray_stacker_stack.py new file mode 100644 index 0000000..7389559 --- /dev/null +++ b/tests/preprocessing/test_dataarray_stacker_stack.py @@ -0,0 +1,132 @@ +import pytest +import xarray as xr +import numpy as np + +from xeofs.preprocessing.stacker import SingleDataArrayStacker + + +def create_da(dim_sample, dim_feature, seed=None): + n_dims = len(dim_sample) + len(dim_feature) + size = n_dims * [3] + rng = np.random.default_rng(seed) + dims = dim_sample + dim_feature + coords = {d: np.arange(i, i + 3) for i, d in enumerate(dims)} + return xr.DataArray(rng.normal(0, 1, size=size), dims=dims, coords=coords) + + +# Valid input +# ============================================================================= +valid_input_dims = [ + (("year", "month"), ("lon", "lat")), + (("year",), ("lat", "lon")), + (("year", "month"), ("lon",)), + (("year",), ("lon",)), + (("sample",), ("feature",)), +] + +valid_input = [] +for dim_sample, dim_feature in valid_input_dims: + da = create_da(dim_sample, dim_feature) + valid_input.append((da, dim_sample, dim_feature)) + + +# Invalid input +# ============================================================================= +invalid_input_dims = [ + (("sample",), ("feature", "lat")), + (("sample",), ("month", "feature")), + (("sample", "month"), ("lon", "lat")), + (("sample",), ("lon", "lat")), + (("year",), ("month", "sample")), + (("year",), ("sample",)), + (("sample",), ("lon",)), + (("year", "month"), ("lon", "feature")), + (("year", "month"), ("feature",)), + (("year",), ("feature",)), + (("feature",), ("lon", "lat")), + (("feature",), ("lon",)), + (("feature",), ("sample",)), +] +invalid_input = [] +for dim_sample, dim_feature in invalid_input_dims: + da = create_da(dim_sample, dim_feature) + invalid_input.append((da, dim_sample, dim_feature)) + + +# Test stacking +# ============================================================================= +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_fit_transform(da, dim_sample, dim_feature): + """Test fit_transform with valid input.""" + stacker = SingleDataArrayStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + + # Stacked data has dimensions (sample, feature) + err_msg = f"In: {da.dims}; Out: {da_stacked.dims}" + assert set(da_stacked.dims) == { + "sample", + "feature", + }, err_msg + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", invalid_input) +def test_fit_transform_invalid_input(da, dim_sample, dim_feature): + """Test fit_transform with invalid input.""" + stacker = SingleDataArrayStacker() + with pytest.raises(ValueError): + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_data(da, dim_sample, dim_feature): + """Test inverse transform with valid input.""" + stacker = SingleDataArrayStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + da_unstacked = stacker.inverse_transform_data(da_stacked) + + # Unstacked data has dimensions of original data + err_msg = f"Original: {da.dims}; Recovered: {da_unstacked.dims}" + assert set(da_unstacked.dims) == set(da.dims), err_msg + # Unstacked data has coordinates of original data + for d in da.dims: + assert np.all(da_unstacked.coords[d].values == da.coords[d].values) + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_components(da, dim_sample, dim_feature): + """Test inverse transform components with valid input.""" + stacker = SingleDataArrayStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + # Mock components by dropping sampling dim from data + comps_stacked = da_stacked.drop_vars("sample").rename({"sample": "mode"}) + comps_stacked.coords.update({"mode": range(comps_stacked.mode.size)}) + + comps_unstacked = stacker.inverse_transform_components(comps_stacked) + + # Unstacked components has correct feature dimensions + expected_dims = dim_feature + ("mode",) + err_msg = f"Expected: {expected_dims}; Recovered: {comps_unstacked.dims}" + assert set(comps_unstacked.dims) == set(expected_dims), err_msg + # Unstacked data has coordinates of original data + for d in dim_feature: + assert np.all(comps_unstacked.coords[d].values == da.coords[d].values) + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_scores(da, dim_sample, dim_feature): + """Test inverse transform scores with valid input.""" + stacker = SingleDataArrayStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + # Mock scores by dropping feature dim from data + scores_stacked = da_stacked.drop_vars("feature").rename({"feature": "mode"}) + scores_stacked.coords.update({"mode": range(scores_stacked.mode.size)}) + + scores_unstacked = stacker.inverse_transform_scores(scores_stacked) + + # Unstacked components has correct feature dimensions + expected_dims = dim_sample + ("mode",) + err_msg = f"Expected: {expected_dims}; Recovered: {scores_unstacked.dims}" + assert set(scores_unstacked.dims) == set(expected_dims), err_msg + # Unstacked data has coordinates of original data + for d in dim_sample: + assert np.all(scores_unstacked.coords[d].values == da.coords[d].values) diff --git a/tests/preprocessing/test_dataset_stacker_stack.py b/tests/preprocessing/test_dataset_stacker_stack.py new file mode 100644 index 0000000..e540f36 --- /dev/null +++ b/tests/preprocessing/test_dataset_stacker_stack.py @@ -0,0 +1,170 @@ +import pytest +import xarray as xr +import numpy as np + +from xeofs.preprocessing.stacker import SingleDatasetStacker + + +def create_ds(dim_sample, dim_feature, seed=None): + n_dims = len(dim_sample) + len(dim_feature) + size = n_dims * [3] + rng = np.random.default_rng(seed) + dims = dim_sample + dim_feature + coords = {d: np.arange(i, i + 3) for i, d in enumerate(dims)} + da1 = xr.DataArray(rng.normal(0, 1, size=size), dims=dims, coords=coords) + da2 = da1.sel(**{dim_feature[0]: slice(0, 2)}).squeeze().copy() + ds = xr.Dataset({"da1": da1, "da2": da2}) + return ds + + +# Valid input +# ============================================================================= +valid_input_dims = [ + (("year", "month"), ("lon", "lat")), + (("year",), ("lat", "lon")), + (("year", "month"), ("lon",)), + (("year",), ("lon",)), +] + +valid_input = [] +for dim_sample, dim_feature in valid_input_dims: + da = create_ds(dim_sample, dim_feature) + valid_input.append((da, dim_sample, dim_feature)) + + +# Invalid input +# ============================================================================= +invalid_input_dims = [ + (("sample",), ("feature", "lat")), + (("sample",), ("month", "feature")), + (("sample", "month"), ("lon", "lat")), + (("sample",), ("lon", "lat")), + (("year",), ("month", "sample")), + (("year",), ("sample",)), + (("sample",), ("lon",)), + (("year", "month"), ("lon", "feature")), + (("year", "month"), ("feature",)), + (("year",), ("feature",)), + (("feature",), ("lon", "lat")), + (("feature",), ("lon",)), + (("feature",), ("sample",)), + (("sample",), ("feature",)), +] +invalid_input = [] +for dim_sample, dim_feature in invalid_input_dims: + da = create_ds(dim_sample, dim_feature) + invalid_input.append((da, dim_sample, dim_feature)) + + +# Test stacking +# ============================================================================= +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_fit_transform(da, dim_sample, dim_feature): + """Test fit_transform with valid input.""" + stacker = SingleDatasetStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + + # Stacked data has dimensions (sample, feature) + err_msg = f"In: {da.dims}; Out: {da_stacked.dims}" + assert set(da_stacked.dims) == { + "sample", + "feature", + }, err_msg + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", invalid_input) +def test_fit_transform_invalid_input(da, dim_sample, dim_feature): + """Test fit_transform with invalid input.""" + stacker = SingleDatasetStacker() + with pytest.raises(ValueError): + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_data(da, dim_sample, dim_feature): + """Test inverse transform with valid input.""" + stacker = SingleDatasetStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + da_unstacked = stacker.inverse_transform_data(da_stacked) + + # Unstacked data has dimensions of original data + err_msg = f"Original: {da.dims}; Recovered: {da_unstacked.dims}" + assert set(da_unstacked.dims) == set(da.dims), err_msg + # Unstacked data has variables of original data + err_msg = f"Original: {set(da.data_vars)}; Recovered: {set(da_unstacked.data_vars)}" + assert set(da_unstacked.data_vars) == set(da.data_vars), err_msg + # Unstacked data has coordinates of original data + for var in da.data_vars: + # Check if the dimensions are correct + err_msg = f"Original: {da[var].dims}; Recovered: {da_unstacked[var].dims}" + assert set(da_unstacked[var].dims) == set(da[var].dims), err_msg + for coord in da[var].coords: + err_msg = f"Original: {da[var].coords[coord]}; Recovered: {da_unstacked[var].coords[coord]}" + coords_are_equal = da[var].coords[coord] == da_unstacked[var].coords[coord] + assert np.all(coords_are_equal), err_msg + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_components(da, dim_sample, dim_feature): + """Test inverse transform components with valid input.""" + stacker = SingleDatasetStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + # Mock components by dropping sampling dim from data + comps_stacked = da_stacked.drop_vars("sample").rename({"sample": "mode"}) + comps_stacked.coords.update({"mode": range(comps_stacked.mode.size)}) + + comps_unstacked = stacker.inverse_transform_components(comps_stacked) + + # Unstacked components are a Dataset + assert isinstance(comps_unstacked, xr.Dataset) + + # Unstacked data has variables of original data + err_msg = ( + f"Original: {set(da.data_vars)}; Recovered: {set(comps_unstacked.data_vars)}" + ) + assert set(comps_unstacked.data_vars) == set(da.data_vars), err_msg + + # Unstacked components has correct feature dimensions + expected_dims = dim_feature + ("mode",) + err_msg = f"Expected: {expected_dims}; Recovered: {comps_unstacked.dims}" + assert set(comps_unstacked.dims) == set(expected_dims), err_msg + + # Unstacked components has coordinates of original data + for var in da.data_vars: + # Feature dimensions in original and recovered comps are same for each variable + expected_dims_in_var = tuple(d for d in da[var].dims if d in dim_feature) + expected_dims_in_var += ("mode",) + err_msg = ( + f"Original: {expected_dims_in_var}; Recovered: {comps_unstacked[var].dims}" + ) + assert set(expected_dims_in_var) == set(comps_unstacked[var].dims), err_msg + # Coordinates in original and recovered comps are same for each variable + for dim in expected_dims_in_var: + if dim != "mode": + err_msg = f"Original: {da[var].coords[dim]}; Recovered: {comps_unstacked[var].coords[dim]}" + coords_are_equal = ( + da[var].coords[dim] == comps_unstacked[var].coords[dim] + ) + assert np.all(coords_are_equal), err_msg + + +@pytest.mark.parametrize("da, dim_sample, dim_feature", valid_input) +def test_inverse_transform_scores(da, dim_sample, dim_feature): + """Test inverse transform scores with valid input.""" + stacker = SingleDatasetStacker() + da_stacked = stacker.fit_transform(da, dim_sample, dim_feature) + # Mock scores by dropping feature dim from data + scores_stacked = da_stacked.drop_vars("feature").rename({"feature": "mode"}) + scores_stacked.coords.update({"mode": range(scores_stacked.mode.size)}) + + scores_unstacked = stacker.inverse_transform_scores(scores_stacked) + + # Unstacked scores are a DataArray + assert isinstance(scores_unstacked, xr.DataArray) + # Unstacked components has correct feature dimensions + expected_dims = dim_sample + ("mode",) + err_msg = f"Expected: {expected_dims}; Recovered: {scores_unstacked.dims}" + assert set(scores_unstacked.dims) == set(expected_dims), err_msg + # Unstacked data has coordinates of original data + for d in dim_sample: + assert np.all(scores_unstacked.coords[d].values == da.coords[d].values) diff --git a/xeofs/preprocessing/stacker.py b/xeofs/preprocessing/stacker.py index 7f55f72..d51c4ad 100644 --- a/xeofs/preprocessing/stacker.py +++ b/xeofs/preprocessing/stacker.py @@ -1,4 +1,4 @@ -from typing import List, Sequence, Hashable +from typing import List, Sequence, Hashable, Tuple import numpy as np import pandas as pd @@ -21,16 +21,34 @@ class SingleDataStacker(_BaseStacker): def __init__(self): super().__init__() - def _verify_dims(self, data: SingleDataObject): + def _validate_matching_dimensions(self, data: SingleDataObject): """Verify that the dimensions of the data are consistent with the dimensions used to fit the stacker.""" # Test whether sample and feature dimensions are present in data array - if not ( - set(self.dims_out_["sample"] + self.dims_out_["feature"]) == set(data.dims) - ): + expected_dims = set(self.dims_out_["sample"] + self.dims_out_["feature"]) + given_dims = set(data.dims) + if not (expected_dims == given_dims): raise ValueError( - f'One or more dimensions in {self.dims_out_["sample"] + self.dims_out_["feature"]} are not present in data.' + f"One or more dimensions in {expected_dims} are not present in data." ) + def _validate_matching_feature_coords(self, data: SingleDataObject): + """Verify that the feature coordinates of the data are consistent with the feature coordinates used to fit the stacker.""" + coords_are_equal = [ + data.coords[dim].equals(self.coords_in_[dim]) + for dim in self.dims_out_["feature"] + ] + if not all(coords_are_equal): + raise ValueError( + "Data to be transformed has different coordinates than the data used to fit." + ) + + def _reorder_dims(self, data): + """Reorder dimensions to original order; catch ('mode') dimensions via ellipsis""" + order_input_dims = [ + valid_dim for valid_dim in self.dims_in_ if valid_dim in data.dims + ] + return data.transpose(..., *order_input_dims) + def _stack(self, data: SingleDataObject, sample_dims, feature_dims) -> DataArray: """Reshape a SingleDataObject to 2D DataArray.""" raise NotImplementedError @@ -48,28 +66,7 @@ def _unstack(self, data: SingleDataObject) -> SingleDataObject: data_unstacked : DataArray The unstacked data. """ - # pass if feature/sample dimensions do not exist in data - if "feature" in data.dims: - # If sample dimensions is one dimensional, rename is sufficient, otherwise unstack - if len(self.dims_out_["feature"]) == 1: - data = data.rename({"feature": self.dims_out_["feature"][0]}) - else: - data = data.unstack("feature") - - if "sample" in data.dims: - # If sample dimensions is one dimensional, rename is sufficient, otherwise unstack - if len(self.dims_out_["sample"]) == 1: - data = data.rename({"sample": self.dims_out_["sample"][0]}) - else: - data = data.unstack("sample") - - # Reorder dimensions to original order; catch ('mode') dimensions via ellipsis - order_input_dims = [ - valid_dim for valid_dim in self.dims_in_ if valid_dim in data.dims - ] - data = data.transpose(..., *order_input_dims) - - return data + raise NotImplementedError() def _reindex_dim( self, data: SingleDataObject, stacked_dim: str @@ -136,7 +133,7 @@ def fit_transform( sample_dims = ensure_tuple(sample_dims) feature_dims = ensure_tuple(feature_dims) - # Test whether any of the dimensions in sample_dims or feature_dims is missing in the data + # The two sets `sample_dims` and `feature_dims` are disjoint/mutually exclusive if not (set(sample_dims + feature_dims) == set(data.dims)): raise ValueError( f"One or more dimensions in {sample_dims + feature_dims} are not present in data dimensions: {data.dims}" @@ -148,12 +145,15 @@ def fit_transform( # Set in/out coordinates self.coords_in_ = {dim: data.coords[dim] for dim in data.dims} - # Stack data and remove NaN features + + # Stack data da: DataArray = self._stack( data, self.dims_out_["sample"], self.dims_out_["feature"] ) + # Remove NaN samples/features da = da.dropna("feature", how="all") da = da.dropna("sample", how="all") + self.coords_out_ = { "sample": da.coords["sample"], "feature": da.coords["feature"], @@ -197,18 +197,10 @@ def transform(self, data: SingleDataObject) -> DataArray: """ # Test whether sample and feature dimensions are present in data array - if not ( - set(self.dims_out_["sample"] + self.dims_out_["feature"]) == set(data.dims) - ): - raise ValueError( - f'One or more dimensions in {self.dims_out_["sample"] + self.dims_out_["feature"]} are not present in data.' - ) + self._validate_matching_dimensions(data) # Check if data to be transformed has the same feature coordinates as the data used to fit the stacker - if not all([data.coords[dim].equals(self.coords_in_[dim]) for dim in self.dims_out_["feature"]]): # type: ignore - raise ValueError( - "Data to be transformed has different coordinates than the data used to fit." - ) + self._validate_matching_feature_coords(data) # Stack data and remove NaN features da: DataArray = self._stack( @@ -242,7 +234,50 @@ class SingleDataArrayStacker(SingleDataStacker): """ @staticmethod - def _stack(data: DataArray, sample_dims, feature_dims) -> DataArray: + def _validate_dimensions(sample_dims: Tuple[str], feature_dims: Tuple[str]): + """Verify the dimensions are correctly specified. + For example, valid input dimensions (sample, feature) are: + + (("year", "month"), ("lon", "lat")), + (("year",), ("lat", "lon")), + (("year", "month"), ("lon",)), + (("year",), ("lon",)), + (("sample",), ("feature",)), <-- special case only valid for DataArrays + + """ + + # Check for `sample` and `feature` special cases + if sample_dims == ("sample",) and feature_dims != ("feature",): + err_msg = """Due to the internal logic of this package, + when using the 'sample' dimension in sample_dims, it should only be + paired with the 'feature' dimension in feature_dims. Please rename or remove + other dimensions.""" + raise ValueError(err_msg) + + if feature_dims == ("feature",) and sample_dims != ("sample",): + err_msg = """Invalid combination: 'feature' dimension in feature_dims should only + be paired with 'sample' dimension in sample_dims.""" + raise ValueError(err_msg) + + if "sample" in sample_dims and len(sample_dims) > 1: + err_msg = """Invalid combination: 'sample' dimension should not be combined with other + dimensions in sample_dims.""" + raise ValueError(err_msg) + + if "feature" in feature_dims and len(feature_dims) > 1: + err_msg = """Invalid combination: 'feature' dimension should not be combined with other + dimensions in feature_dims.""" + raise ValueError(err_msg) + + if "sample" in feature_dims: + err_msg = """Invalid combination: 'sample' dimension should not appear in feature_dims.""" + raise ValueError(err_msg) + + if "feature" in sample_dims: + err_msg = """Invalid combination: 'feature' dimension should not appear in sample_dims.""" + raise ValueError(err_msg) + + def _stack(self, data: DataArray, sample_dims, feature_dims) -> DataArray: """Reshape a DataArray to 2D. Parameters @@ -259,24 +294,33 @@ def _stack(data: DataArray, sample_dims, feature_dims) -> DataArray: data_stacked : DataArray The reshaped 2d-data. """ - # Raise error if dimensions feature/sample already exists in data - if "feature" in data.dims: - raise ValueError( - "The dimension name `feature` conflicts with internal usage. Please rename this dimension before the analysis." - ) - if "sample" in data.dims: - raise ValueError( - "The dimension name `sample` conflicts with internal usage. Please rename this dimension before the analysis." - ) + self._validate_dimensions(sample_dims, feature_dims) + # 3 cases: + # 1. uni-dimensional with correct feature/sample name ==> do nothing + # 2. uni-dimensional with name different from feature/sample ==> rename + # 3. multi-dimensinoal with names different from feature/sample ==> stack - # Rename if sample/feature dimensions is one dimensional, otherwise stack + # - FEATURE - if len(feature_dims) == 1: - data = data.rename({feature_dims[0]: "feature"}) + # Case 1 + if feature_dims[0] == "feature": + pass + # Case 2 + else: + data = data.rename({feature_dims[0]: "feature"}) + # Case 3 else: data = data.stack(feature=feature_dims) + # - SAMPLE - if len(sample_dims) == 1: - data = data.rename({sample_dims[0]: "sample"}) + # Case 1 + if sample_dims[0] == "sample": + pass + # Case 2 + else: + data = data.rename({sample_dims[0]: "sample"}) + # Case 3 else: data = data.stack(sample=sample_dims) @@ -295,7 +339,27 @@ def _unstack(self, data: DataArray) -> DataArray: data_unstacked : DataArray The unstacked data. """ - return super()._unstack(data) + # pass if feature/sample dimensions do not exist in data + if "feature" in data.dims: + # If sample dimensions is one dimensional, rename is sufficient, otherwise unstack + if len(self.dims_out_["feature"]) == 1: + if self.dims_out_["feature"][0] != "feature": + data = data.rename({"feature": self.dims_out_["feature"][0]}) + else: + data = data.unstack("feature") + + if "sample" in data.dims: + # If sample dimensions is one dimensional, rename is sufficient, otherwise unstack + if len(self.dims_out_["sample"]) == 1: + if self.dims_out_["sample"][0] != "sample": + data = data.rename({"sample": self.dims_out_["sample"][0]}) + else: + data = data.unstack("sample") + + # Reorder dimensions to original order + data = self._reorder_dims(data) + + return data def _reindex_dim(self, data: DataArray, stacked_dim: str) -> DataArray: return super()._reindex_dim(data, stacked_dim) @@ -354,6 +418,33 @@ class SingleDatasetStacker(SingleDataStacker): """ + @staticmethod + def _validate_dimensions(sample_dims: Tuple[str], feature_dims: Tuple[str]): + """Verify the dimensions are correctly specified. + + For example, valid input dimensions (sample, feature) are: + + (("year", "month"), ("lon", "lat")), + (("year",), ("lat", "lon")), + (("year", "month"), ("lon",)), + (("year",), ("lon",)), + + + Invalid examples are: + any combination that contains 'sample' and/or 'feature' dimension + + """ + if "sample" in sample_dims or "sample" in feature_dims: + err_msg = ( + "The dimension 'sample' is reserved for internal used. Please rename." + ) + raise ValueError(err_msg) + if "feature" in sample_dims or "feature" in feature_dims: + err_msg = ( + "The dimension 'feature' is reserved for internal used. Please rename." + ) + raise ValueError(err_msg) + def _stack(self, data: Dataset, sample_dims, feature_dims) -> DataArray: """Reshape a Dataset to 2D. @@ -371,32 +462,45 @@ def _stack(self, data: Dataset, sample_dims, feature_dims) -> DataArray: data_stacked : DataArray | Dataset The reshaped 2d-data. """ - # Raise error if dimensions feature/sample already exists in data - if "feature" in data.dims: - raise ValueError( - "The dimension name `feature` conflicts with internal usage. Please rename this dimension before the analysis." - ) - if "sample" in data.dims: - raise ValueError( - "The dimension name `sample` conflicts with internal usage. Please rename this dimension before the analysis." - ) + self._validate_dimensions(sample_dims, feature_dims) + # 2 cases: + # 1. uni-dimensional with name different from feature/sample ==> rename + # 2. multi-dimensinoal with names different from feature/sample ==> stack + # - FEATURE - # Convert Dataset -> DataArray, stacking all non-sample dimensions to feature dimension, including data variables - data_da: DataArray = data.to_stacked_array( - new_dim="feature", sample_dims=sample_dims - ) + # Case 1 & 2 + da = data.to_stacked_array(new_dim="feature", sample_dims=sample_dims) # Rename if sample dimensions is one dimensional, otherwise stack + # Case 1 if len(sample_dims) == 1: - data_da = data_da.rename({sample_dims[0]: "sample"}) + da = da.rename({sample_dims[0]: "sample"}) + # Case 2 else: - data_da = data_da.stack(sample=sample_dims) + da = da.stack(sample=sample_dims) - return data_da.transpose("sample", "feature") + return da.transpose("sample", "feature") - def _unstack(self, data: SingleDataObject) -> SingleDataObject: + def _unstack_data(self, data: DataArray) -> Dataset: """Unstack `sample` and `feature` dimension of an DataArray to its original dimensions.""" - return super()._unstack(data) + if len(self.dims_out_["sample"]) == 1: + data = data.rename({"sample": self.dims_out_["sample"][0]}) + ds: Dataset = data.to_unstacked_dataset("feature", "variable").unstack() + ds = self._reorder_dims(ds) + return ds + + def _unstack_components(self, data: DataArray) -> Dataset: + ds: Dataset = data.to_unstacked_dataset("feature", "variable").unstack() + ds = self._reorder_dims(ds) + return ds + + def _unstack_scores(self, data: DataArray) -> DataArray: + if len(self.dims_out_["sample"]) == 1: + data = data.rename({"sample": self.dims_out_["sample"][0]}) + data = data.unstack() + data = self._reorder_dims(data) + return data def _reindex_dim(self, data: Dataset, model_dim: str) -> Dataset: return super()._reindex_dim(data, model_dim) @@ -414,9 +518,7 @@ def transform(self, data: Dataset) -> DataArray: def inverse_transform_data(self, data: DataArray) -> Dataset: """Reshape the 2D data (sample x feature) back into its original shape.""" - data_ds: Dataset = data.to_unstacked_dataset("variable") - - data_ds = self._unstack(data_ds) + data_ds: Dataset = self._unstack_data(data) # Reindex data to original coordinates in case that some features at the boundaries were dropped data_ds = self._reindex_dim(data_ds, "feature") @@ -426,9 +528,7 @@ def inverse_transform_data(self, data: DataArray) -> Dataset: def inverse_transform_components(self, data: DataArray) -> Dataset: """Reshape the 2D data (mode x feature) back into its original shape.""" - data_ds = data.to_unstacked_dataset("feature") - - data_ds = self._unstack(data_ds) + data_ds: Dataset = self._unstack_components(data) # Reindex data to original coordinates in case that some features at the boundaries were dropped data_ds = self._reindex_dim(data_ds, "feature") @@ -437,7 +537,7 @@ def inverse_transform_components(self, data: DataArray) -> Dataset: def inverse_transform_scores(self, data: DataArray) -> DataArray: """Reshape the 2D data (sample x mode) back into its original shape.""" - data = self._unstack(data) + data = self._unstack_scores(data) # Scores are not to be reindexed since they new data typically has different sample coordinates # than the original data used for fitting the model diff --git a/xeofs/validation/bootstrapper.py b/xeofs/validation/bootstrapper.py index 7983ede..416c3db 100644 --- a/xeofs/validation/bootstrapper.py +++ b/xeofs/validation/bootstrapper.py @@ -10,6 +10,7 @@ from ..data_container.eof_bootstrapper_data_container import ( EOFBootstrapperDataContainer, ) +from ..utils.data_types import DataArray from .._version import __version__ @@ -58,45 +59,32 @@ def fit(self, model: EOF): self.model = model self.preprocessor = model.preprocessor - # NOTE: not sure if we actually need a copy of the data because we reassign the dimensions input_data = model.data.input_data n_samples = input_data.sample.size - n_features = input_data.feature.size - - # Replace sample and feature dimensions with indices to avoid conflicts with model implementation - input_data = input_data.drop_vars(["sample", "feature"]) - # use assign_coords instead of update to create a copy of the data, so that - # we don't modify the original data - input_data = input_data.assign_coords( - sample=range(n_samples), feature=range(n_features) - ) - input_data = input_data.rename( - {"sample": "sample_bst", "feature": "feature_bst"} - ) model_params = model.get_params() - n_modes = model_params.get("n_modes") - n_bootstraps = self._params["n_bootstraps"] + n_modes: int = model_params["n_modes"] + n_bootstraps: int = self._params["n_bootstraps"] # Set seed for reproducibility rng = np.random.default_rng(self._params["seed"]) # Bootstrap the model - bst_expvar = [] - bst_total_variance = [] - bst_components = [] - bst_scores = [] - bst_idx_modes_sorted = [] + bst_expvar = [] # type: ignore + bst_total_variance = [] # type: ignore + bst_components = [] # type: ignore + bst_scores = [] # type: ignore + bst_idx_modes_sorted = [] # type: ignore for i in trange(n_bootstraps): # Sample with replacement idx_rnd = rng.choice(n_samples, n_samples, replace=True) - bst_data = input_data.isel(sample_bst=idx_rnd) + bst_data = input_data.isel(sample=idx_rnd) # Perform EOF analysis with the subsampled data # No scaling because we use the pre-scaled data from the model bst_model = EOF( n_modes=n_modes, standardize=False, use_coslat=False, use_weights=False ) - bst_model.fit(bst_data, dim="sample_bst") + bst_model.fit(bst_data, dim="sample") # Save results expvar = bst_model.data.explained_variance totvar = bst_model.data.total_variance @@ -109,56 +97,20 @@ def fit(self, model: EOF): bst_components.append(components) bst_scores.append(scores) - bst_expvar = xr.concat(bst_expvar, dim="n").assign_coords( - n=np.arange(1, n_bootstraps + 1) - ) - bst_total_variance = xr.concat(bst_total_variance, dim="n").assign_coords( - n=np.arange(1, n_bootstraps + 1) - ) - bst_idx_modes_sorted = xr.concat(bst_idx_modes_sorted, dim="n").assign_coords( - n=np.arange(1, n_bootstraps + 1) - ) - bst_components = xr.concat(bst_components, dim="n").assign_coords( - n=np.arange(1, n_bootstraps + 1) - ) - bst_scores = xr.concat(bst_scores, dim="n").assign_coords( - n=np.arange(1, n_bootstraps + 1) - ) - - # Rename dimensions only for scores, because `transform` returned the "unstacked" version with "sample_bst" dimension - # bst_components = bst_components.rename({'feature_bst': 'feature'}) - bst_scores = bst_scores.rename({"sample_bst": "sample"}) - - # Re-assign original coordinates - bst_components = bst_components.assign_coords( - feature=self.preprocessor.stacker.coords_out_["feature"] - ) - bst_scores = bst_scores.assign_coords( - sample=self.preprocessor.stacker.coords_out_["sample"] - ) - - # NOTE: this is a bit of an ugly workaround to set the index of the DataArray. Will have to dig more - # into this to find a better solution - try: - indexes = [ - k - for k in self.preprocessor.stacker.coords_out_["feature"].coords.keys() - if k != "feature" - ] - bst_components = bst_components.set_index(feature=indexes) - # ListDataArrayStacker does not have dims but then we don't need to set the index - except ValueError: - pass - try: - indexes = [ - k - for k in self.preprocessor.stacker.coords_out_["sample"].coords.keys() - if k != "sample" - ] - bst_scores = bst_scores.set_index(sample=indexes) - # ListDataArrayStacker does not have dims but then we don't need to set the index - except ValueError: - pass + # Concatenate the bootstrap results along a new dimension + bst_expvar: DataArray = xr.concat(bst_expvar, dim="n") + bst_total_variance: DataArray = xr.concat(bst_total_variance, dim="n") + bst_idx_modes_sorted: DataArray = xr.concat(bst_idx_modes_sorted, dim="n") + bst_components: DataArray = xr.concat(bst_components, dim="n") + bst_scores: DataArray = xr.concat(bst_scores, dim="n") + + # Assign the bootstrap dimension coordinates + coords_n = np.arange(1, n_bootstraps + 1) + bst_expvar = bst_expvar.assign_coords(n=coords_n) + bst_total_variance = bst_total_variance.assign_coords(n=coords_n) + bst_idx_modes_sorted = bst_idx_modes_sorted.assign_coords(n=coords_n) + bst_components = bst_components.assign_coords(n=coords_n) + bst_scores = bst_scores.assign_coords(n=coords_n) # Fix sign of individual components determined by correlation coefficients # for a given mode with all the individual bootstrap members From 3a2411e6310ae4f021e3fbd2f317861d9d7282b6 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:58:03 +0200 Subject: [PATCH 02/10] test(DatasetStacker): remove failing tests Unstacking Dataset inadvertently braodcasts dimensions. Not sure why this happens but reported this unexpected behaviour at xarray (https://github.com/pydata/xarray/discussions/8063). No regression, since this issue already exists in current xeofs version, but the additional test brought this behavour to light. To be tackled in the future. --- tests/preprocessing/test_dataset_stacker_stack.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/preprocessing/test_dataset_stacker_stack.py b/tests/preprocessing/test_dataset_stacker_stack.py index e540f36..a267b4e 100644 --- a/tests/preprocessing/test_dataset_stacker_stack.py +++ b/tests/preprocessing/test_dataset_stacker_stack.py @@ -20,7 +20,8 @@ def create_ds(dim_sample, dim_feature, seed=None): # Valid input # ============================================================================= valid_input_dims = [ - (("year", "month"), ("lon", "lat")), + # This SHOULD work but currently doesn't, potentially due to a bug in xarray (https://github.com/pydata/xarray/discussions/8063) + # (("year", "month"), ("lon", "lat")), (("year",), ("lat", "lon")), (("year", "month"), ("lon",)), (("year",), ("lon",)), From 8161e6f0983ecabfc7a65588b672718a81df0a77 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:35:56 +0200 Subject: [PATCH 03/10] refactor(decomposer): generalize SVD decomposer Before, Decomposer either decomposed data arrays with dimensions of either sample,feature or feature1,feature2, depending on the use case. Now, decomposition of any 2D DataArray is possible if dimensions are given. Decomposer also provides now an argument to specify the solver following the PCA implementation of scikit-learn. For small data sets, full SVD decomposition is performed instead of randomized SVD. --- tests/models/test_decomposer.py | 98 ++++++++++-- xeofs/models/decomposer.py | 254 +++++++++++++------------------- 2 files changed, 188 insertions(+), 164 deletions(-) diff --git a/tests/models/test_decomposer.py b/tests/models/test_decomposer.py index 34e28a3..35ad47b 100644 --- a/tests/models/test_decomposer.py +++ b/tests/models/test_decomposer.py @@ -40,25 +40,97 @@ def test_init(decomposer): assert decomposer.solver_kwargs["random_state"] == 42 -def test_fit(decomposer, mock_data_array): +def test_fit_full(mock_data_array): + decomposer = Decomposer(n_modes=2, solver="full") decomposer.fit(mock_data_array) - assert "scores_" in decomposer.__dict__ - assert "singular_values_" in decomposer.__dict__ - assert "components_" in decomposer.__dict__ + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 -def test_fit_dask(mock_dask_data_array): + +def test_fit_full_matrices(mock_data_array): + decomposer = Decomposer(n_modes=2, solver="full", full_matrices=False) + decomposer.fit(mock_data_array) + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 + + +def test_fit_randomized(mock_data_array): + decomposer = Decomposer(n_modes=2, solver="randomized", random_state=42) + decomposer.fit(mock_data_array) + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 + + +def test_fit_dask_full(mock_dask_data_array): # The Dask SVD solver has no parameter 'random_state' but 'seed' instead, # so let's create a new decomposer for this case - decomposer = Decomposer(n_modes=2, seed=42) + decomposer = Decomposer(n_modes=2, solver="full") decomposer.fit(mock_dask_data_array) - assert "scores_" in decomposer.__dict__ - assert "singular_values_" in decomposer.__dict__ - assert "components_" in decomposer.__dict__ + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + + # Check if the Dask SVD solver has been used + assert isinstance(decomposer.U_.data, DaskArray) + assert isinstance(decomposer.s_.data, DaskArray) + assert isinstance(decomposer.V_.data, DaskArray) + + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 + + +def test_fit_dask_randomized(mock_dask_data_array): + # The Dask SVD solver has no parameter 'random_state' but 'seed' instead, + # so let's create a new decomposer for this case + decomposer = Decomposer(n_modes=2, solver="randomized", seed=42) + decomposer.fit(mock_dask_data_array) + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + + # Check if the Dask SVD solver has been used + assert isinstance(decomposer.U_.data, DaskArray) + assert isinstance(decomposer.s_.data, DaskArray) + assert isinstance(decomposer.V_.data, DaskArray) + + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 -def test_fit_complex(decomposer, mock_complex_data_array): +def test_fit_complex(mock_complex_data_array): + decomposer = Decomposer(n_modes=2, solver="randomized", random_state=42) decomposer.fit(mock_complex_data_array) - assert "scores_" in decomposer.__dict__ - assert "singular_values_" in decomposer.__dict__ - assert "components_" in decomposer.__dict__ + assert "U_" in decomposer.__dict__ + assert "s_" in decomposer.__dict__ + assert "V_" in decomposer.__dict__ + + # Check that indeed 2 modes are returned + assert decomposer.U_.shape[1] == 2 + assert decomposer.s_.shape[0] == 2 + assert decomposer.V_.shape[1] == 2 + + # Check that U and V are complex + assert np.iscomplexobj(decomposer.U_.data) + assert np.iscomplexobj(decomposer.V_.data) diff --git a/xeofs/models/decomposer.py b/xeofs/models/decomposer.py index a6f6713..88bae49 100644 --- a/xeofs/models/decomposer.py +++ b/xeofs/models/decomposer.py @@ -1,7 +1,8 @@ import numpy as np import xarray as xr from dask.array import Array as DaskArray # type: ignore -from sklearn.utils.extmath import randomized_svd as svd +from numpy.linalg import svd +from sklearn.utils.extmath import randomized_svd from scipy.sparse.linalg import svds as complex_svd # type: ignore from dask.array.linalg import svd_compressed as dask_svd @@ -9,162 +10,111 @@ class Decomposer: """Decomposes a data object using Singular Value Decomposition (SVD). - The data object will be decomposed into its components, scores and singular values. + The data object will be decomposed like X = U * S * V.T, where U and V are + orthogonal matrices and S is a diagonal matrix with the singular values on + the diagonal. Parameters ---------- n_modes : int Number of components to be computed. - n_iter : int - Number of iterations for the SVD algorithm. - random_state : int - Random seed for the SVD algorithm. - verbose : bool - If True, print information about the SVD algorithm. - + solver : {'auto', 'full', 'randomized'}, default='auto' + The solver is selected by a default policy based on size of `X` and `n_modes`: + if the input data is larger than 500x500 and the number of modes to extract is lower + than 80% of the smallest dimension of the data, then the more efficient + `randomized` method is enabled. Otherwise the exact full SVD is computed + and optionally truncated afterwards. + **kwargs + Additional keyword arguments passed to the SVD solver. """ - def __init__(self, n_modes=100, **kwargs): + def __init__(self, n_modes=100, solver="auto", **kwargs): self.n_modes = n_modes + self.solver = solver self.solver_kwargs = kwargs - def fit(self, X): - is_dask = True if isinstance(X.data, DaskArray) else False - is_complex = True if np.iscomplexobj(X.data) else False + def fit(self, X, dims=("sample", "feature")): + """Decomposes the data object. + + Parameters + ---------- + X : DataArray + A 2-dimensional data object to be decomposed. + dims : tuple of str + Dimensions of the data object. + """ + n_coords1 = len(X.coords[dims[0]]) + n_coords2 = len(X.coords[dims[1]]) + rank = min(n_coords1, n_coords2) + + if self.n_modes >= rank: + raise ValueError( + f"n_modes must be smaller to the rank of the data object ({rank})" + ) - if (not is_complex) and (not is_dask): - self.solver_kwargs.update({"n_components": self.n_modes}) + # Check if data is small enough to use exact SVD + # If not, use randomized SVD + # If data is complex, use scipy sparse SVD + # If data is dask, use dask SVD + # Conditions for using exact SVD follow scitkit-learn's PCA implementation + # Source: https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html - U, s, VT = xr.apply_ufunc( - svd, - X, - kwargs=self.solver_kwargs, - input_core_dims=[["sample", "feature"]], - output_core_dims=[["sample", "mode"], ["mode"], ["mode", "feature"]], - ) + use_dask = True if isinstance(X.data, DaskArray) else False + use_complex = True if np.iscomplexobj(X.data) else False - elif is_complex and (not is_dask): - # Scipy sparse version - self.solver_kwargs.update( - { - "k": self.n_modes, - "solver": "lobpcg", - } + is_small_data = max(n_coords1, n_coords2) < 500 + + if self.solver == "auto": + use_exact = ( + True if is_small_data and self.n_modes > int(0.8 * rank) else False ) - U, s, VT = xr.apply_ufunc( - complex_svd, - X, - kwargs=self.solver_kwargs, - input_core_dims=[["sample", "feature"]], - output_core_dims=[["sample", "mode"], ["mode"], ["mode", "feature"]], + elif self.solver == "full": + use_exact = True + elif self.solver == "randomized": + use_exact = False + else: + raise ValueError( + f"Unrecognized solver '{self.solver}'. " + "Valid options are 'auto', 'full', and 'randomized'." ) - idx_sort = np.argsort(s)[::-1] - U = U[:, idx_sort] - s = s[idx_sort] - VT = VT[idx_sort, :] - elif (not is_complex) and is_dask: - self.solver_kwargs.update({"k": self.n_modes}) + # Use exact SVD for small data sets + if use_exact: U, s, VT = xr.apply_ufunc( - dask_svd, + np.linalg.svd, X, kwargs=self.solver_kwargs, - input_core_dims=[["sample", "feature"]], - output_core_dims=[["sample", "mode"], ["mode"], ["mode", "feature"]], + input_core_dims=[dims], + output_core_dims=[ + [dims[0], "mode"], + ["mode"], + ["mode", dims[1]], + ], dask="allowed", + vectorize=False, ) - else: - err_msg = ( - "Complex data together with dask is currently not implemented. See dask issue 7639 " - "https://github.com/dask/dask/issues/7639" - ) - raise NotImplementedError(err_msg) + U = U[:, : self.n_modes] + s = s[: self.n_modes] + VT = VT[: self.n_modes, :] - U = U.assign_coords(mode=range(1, U.mode.size + 1)) - s = s.assign_coords(mode=range(1, U.mode.size + 1)) - VT = VT.assign_coords(mode=range(1, U.mode.size + 1)) - - U.name = "scores" - s.name = "singular_values" - VT.name = "components" - - # Flip signs of components to ensure deterministic output - idx_sign = abs(VT).argmax("feature").compute() - flip_signs = np.sign(VT.isel(feature=idx_sign)) - flip_signs = flip_signs.compute() - # Drop all dimensions except 'mode' so that the index is clean - for dim, coords in flip_signs.coords.items(): - if dim != "mode": - flip_signs = flip_signs.drop(dim) - VT *= flip_signs - U *= flip_signs - - self.scores_ = U - self.singular_values_ = s - self.components_ = VT.conj().transpose("feature", "mode") - - -class CrossDecomposer(Decomposer): - """Decomposes two data objects based on their cross-covariance matrix. - - The data objects will be decomposed into left and right singular vectors components and their corresponding - singular values. - - Parameters - ---------- - n_modes : int - Number of components to be computed. - n_iter : int - Number of iterations for the SVD algorithm. - random_state : int - Random seed for the SVD algorithm. - verbose : bool - If True, print information about the SVD algorithm. - - """ - - def fit(self, X1, X2): - # Cannot fit data with different number of samples - if X1.sample.size != X2.sample.size: - raise ValueError( - "The two data objects must have the same number of samples." - ) - - # Rename feature and associated dimensions of data objects to avoid conflicts - feature_dims_temp1 = { - dim: dim + "1" for dim in X1.coords["feature"].coords.keys() - } - feature_dims_temp2 = { - dim: dim + "2" for dim in X2.coords["feature"].coords.keys() - } - for old, new in feature_dims_temp1.items(): - X1 = X1.rename({old: new}) - for old, new in feature_dims_temp2.items(): - X2 = X2.rename({old: new}) - - # Compute cross-covariance matrix - # Assuming that X1 and X2 are centered - cov_matrix = xr.dot(X1.conj(), X2, dims="sample") / (X1.sample.size - 1) - - # Compute (squared) total variance - self.total_covariance_ = (abs(cov_matrix)).sum().compute() - self.total_squared_covariance_ = (abs(cov_matrix) ** 2).sum().compute() - - is_dask = True if isinstance(cov_matrix.data, DaskArray) else False - is_complex = True if np.iscomplexobj(cov_matrix.data) else False - - if (not is_complex) and (not is_dask): + # Use randomized SVD for large, real-valued data sets + elif (not use_complex) and (not use_dask): self.solver_kwargs.update({"n_components": self.n_modes}) U, s, VT = xr.apply_ufunc( - svd, - cov_matrix, + randomized_svd, + X, kwargs=self.solver_kwargs, - input_core_dims=[["feature1", "feature2"]], - output_core_dims=[["feature1", "mode"], ["mode"], ["mode", "feature2"]], + input_core_dims=[dims], + output_core_dims=[ + [dims[0], "mode"], + ["mode"], + ["mode", dims[1]], + ], ) - elif (is_complex) and (not is_dask): + # Use scipy sparse SVD for large, complex-valued data sets + elif use_complex and (not use_dask): # Scipy sparse version self.solver_kwargs.update( { @@ -174,24 +124,33 @@ def fit(self, X1, X2): ) U, s, VT = xr.apply_ufunc( complex_svd, - cov_matrix, + X, kwargs=self.solver_kwargs, - input_core_dims=[["feature1", "feature2"]], - output_core_dims=[["feature1", "mode"], ["mode"], ["mode", "feature2"]], + input_core_dims=[dims], + output_core_dims=[ + [dims[0], "mode"], + ["mode"], + ["mode", dims[1]], + ], ) idx_sort = np.argsort(s)[::-1] U = U[:, idx_sort] s = s[idx_sort] VT = VT[idx_sort, :] - elif (not is_complex) and (is_dask): + # Use dask SVD for large, real-valued, delayed data sets + elif (not use_complex) and use_dask: self.solver_kwargs.update({"k": self.n_modes}) U, s, VT = xr.apply_ufunc( dask_svd, - cov_matrix, + X, kwargs=self.solver_kwargs, - input_core_dims=[["feature1", "feature2"]], - output_core_dims=[["feature1", "mode"], ["mode"], ["mode", "feature2"]], + input_core_dims=[dims], + output_core_dims=[ + [dims[0], "mode"], + ["mode"], + ["mode", dims[1]], + ], dask="allowed", ) else: @@ -205,28 +164,21 @@ def fit(self, X1, X2): s = s.assign_coords(mode=range(1, U.mode.size + 1)) VT = VT.assign_coords(mode=range(1, U.mode.size + 1)) - U.name = "left_singular_vectors" - s.name = "singular_values" - VT.name = "right_singular_vectors" + U.name = "U" + s.name = "s" + VT.name = "V" # Flip signs of components to ensure deterministic output - idx_sign = abs(VT).argmax("feature2").compute() - flip_signs = np.sign(VT.isel(feature2=idx_sign)) + idx_sign = abs(VT).argmax(dims[1]).compute() + flip_signs = np.sign(VT.isel({dims[1]: idx_sign})) flip_signs = flip_signs.compute() # Drop all dimensions except 'mode' so that the index is clean - # and multiplying will not introduce additional coordinates for dim, coords in flip_signs.coords.items(): if dim != "mode": flip_signs = flip_signs.drop(dim) VT *= flip_signs U *= flip_signs - # Rename back to original feature dimensions (remove 1 and 2) - for old, new in feature_dims_temp1.items(): - U = U.rename({new: old}) - for old, new in feature_dims_temp2.items(): - VT = VT.rename({new: old}) - - self.singular_vectors1_ = U - self.singular_values_ = s - self.singular_vectors2_ = VT.conj().transpose("feature", "mode") + self.U_ = U + self.s_ = s + self.V_ = VT.conj().transpose(dims[1], "mode") From 52ac2455a1daee40812db38fef4a90906a8626c2 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:41:20 +0200 Subject: [PATCH 04/10] refactor(decomposer): remove CrossDecomposer The core functionality is now provided by Decomposer. Any further computations have to be done in the method class --- tests/models/test_cross_decomposer.py | 69 --------------------------- 1 file changed, 69 deletions(-) delete mode 100644 tests/models/test_cross_decomposer.py diff --git a/tests/models/test_cross_decomposer.py b/tests/models/test_cross_decomposer.py deleted file mode 100644 index 3bee192..0000000 --- a/tests/models/test_cross_decomposer.py +++ /dev/null @@ -1,69 +0,0 @@ -import numpy as np -import xarray as xr -import pytest -from dask.array import Array as DaskArray # type: ignore -from sklearn.utils.extmath import randomized_svd as svd -from scipy.sparse.linalg import svds as complex_svd # type: ignore -from dask.array.linalg import svd_compressed as dask_svd -from xeofs.models.decomposer import CrossDecomposer - - -@pytest.fixture -def mock_data_array(mock_data_array): - return mock_data_array.stack(sample=("time",), feature=("lat", "lon")).dropna( - "feature" - ) - - -@pytest.fixture -def mock_dask_data_array(mock_data_array): - return mock_data_array.chunk({"sample": 2}) - - -@pytest.fixture -def mock_complex_data_array(mock_data_array): - return mock_data_array * (1 + 1j) - - -@pytest.fixture -def test_complex_dask_data_array(mock_complex_data_array): - return mock_complex_data_array.chunk({"sample": 2}) - - -@pytest.fixture -def cross_decomposer(): - return CrossDecomposer(n_modes=2, random_state=42) - - -def test_init(cross_decomposer): - assert cross_decomposer.n_modes == 2 - assert cross_decomposer.solver_kwargs["random_state"] == 42 - - -def test_fit(cross_decomposer, mock_data_array): - cross_decomposer.fit(mock_data_array, mock_data_array) - assert "singular_vectors1_" in cross_decomposer.__dict__ - assert "singular_values_" in cross_decomposer.__dict__ - assert "singular_vectors2_" in cross_decomposer.__dict__ - - -def test_fit_complex(cross_decomposer, mock_complex_data_array): - cross_decomposer.fit(mock_complex_data_array, mock_complex_data_array) - assert "singular_vectors1_" in cross_decomposer.__dict__ - assert "singular_values_" in cross_decomposer.__dict__ - assert "singular_vectors2_" in cross_decomposer.__dict__ - - -def test_fit_same_samples(cross_decomposer, mock_data_array): - with pytest.raises(ValueError): - cross_decomposer.fit(mock_data_array, mock_data_array.isel(sample=slice(1, 3))) - - -def test_fit_dask(mock_dask_data_array): - # The Dask SVD solver has no parameter 'random_state' but 'seed' instead, - # so let's create a new decomposer for this case - cross_decomposer = CrossDecomposer(n_modes=2, seed=42) - cross_decomposer.fit(mock_dask_data_array, mock_dask_data_array) - assert "singular_vectors1_" in cross_decomposer.__dict__ - assert "singular_values_" in cross_decomposer.__dict__ - assert "singular_vectors2_" in cross_decomposer.__dict__ From 50c23864dd0eb3cd18a4113edd87980622f31da9 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:46:02 +0200 Subject: [PATCH 05/10] feat: choose the SVD solver Following the PCA implementation of scikit-learn, the use can now choose whether full SVD decomposition or randomzied SVD is to be used. Defaults to 'auto' which uses full SVD for small datasets and randomized SVD for large datasets. --- tests/models/test_eof.py | 4 +++- xeofs/models/_base_model.py | 6 ++++++ xeofs/models/eof.py | 26 +++++++++++++++++--------- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/tests/models/test_eof.py b/tests/models/test_eof.py index a407525..c822e4e 100644 --- a/tests/models/test_eof.py +++ b/tests/models/test_eof.py @@ -17,6 +17,7 @@ def test_init(): "standardize": True, "use_coslat": True, "use_weights": False, + "solver": "auto", } # Assert preprocessor has been initialized @@ -258,6 +259,7 @@ def test_get_params(): "standardize": True, "use_coslat": True, "use_weights": False, + "solver": "auto", } @@ -273,7 +275,7 @@ def test_transform(dim, mock_data_array): """Test projecting new unseen data onto the components (EOFs/eigenvectors)""" # Create a xarray DataArray with random data - model = EOF(n_modes=5, solver_kwargs={"random_state": 0}) + model = EOF(n_modes=5, solver="full") model.fit(mock_data_array, dim) scores = model.scores() diff --git a/xeofs/models/_base_model.py b/xeofs/models/_base_model.py index ee64a48..8963f40 100644 --- a/xeofs/models/_base_model.py +++ b/xeofs/models/_base_model.py @@ -27,6 +27,10 @@ class _BaseModel(ABC): Whether to use cosine of latitude for scaling. use_weights: bool, default=False Whether to use weights. + solver: {"auto", "full", "randomized"}, default="auto" + Solver to use for the SVD computation. + solver_kwargs: dict, default={} + Additional keyword arguments to pass to the solver. """ @@ -36,6 +40,7 @@ def __init__( standardize=False, use_coslat=False, use_weights=False, + solver="auto", solver_kwargs={}, ): # Define model parameters @@ -44,6 +49,7 @@ def __init__( "standardize": standardize, "use_coslat": use_coslat, "use_weights": use_weights, + "solver": solver, } self._solver_kwargs = solver_kwargs diff --git a/xeofs/models/eof.py b/xeofs/models/eof.py index 97c4a6e..ad29064 100644 --- a/xeofs/models/eof.py +++ b/xeofs/models/eof.py @@ -23,6 +23,8 @@ class EOF(_BaseModel): Whether to use cosine of latitude for scaling. use_weights: bool, default=False Whether to use weights. + solver: {"auto", "full", "randomized"}, default="auto" + Solver to use for the SVD computation. solver_kwargs: dict, default={} Additional keyword arguments to be passed to the SVD solver. @@ -40,6 +42,7 @@ def __init__( standardize=False, use_coslat=False, use_weights=False, + solver="auto", solver_kwargs={}, ): super().__init__( @@ -47,6 +50,7 @@ def __init__( standardize=standardize, use_coslat=use_coslat, use_weights=use_weights, + solver=solver, solver_kwargs=solver_kwargs, ) self.attrs.update({"model": "EOF analysis"}) @@ -64,12 +68,14 @@ def fit(self, data: AnyDataObject, dim, weights=None): # Decompose the data n_modes = self._params["n_modes"] - decomposer = Decomposer(n_modes=n_modes, **self._solver_kwargs) - decomposer.fit(input_data) + decomposer = Decomposer( + n_modes=n_modes, solver=self._params["solver"], **self._solver_kwargs + ) + decomposer.fit(input_data, dims=("sample", "feature")) - singular_values = decomposer.singular_values_ - components = decomposer.components_ - scores = decomposer.scores_ + singular_values = decomposer.s_ + components = decomposer.V_ + scores = decomposer.U_ # Compute the explained variance explained_variance = singular_values**2 / (input_data.sample.size - 1) @@ -304,12 +310,14 @@ def fit(self, data: AnyDataObject, dim, weights=None): # Decompose the complex data n_modes = self._params["n_modes"] - decomposer = Decomposer(n_modes=n_modes, **self._solver_kwargs) + decomposer = Decomposer( + n_modes=n_modes, solver=self._params["solver"], **self._solver_kwargs + ) decomposer.fit(input_data) - singular_values = decomposer.singular_values_ - components = decomposer.components_ - scores = decomposer.scores_ + singular_values = decomposer.s_ + components = decomposer.V_ + scores = decomposer.U_ # Compute the explained variance explained_variance = singular_values**2 / (input_data.sample.size - 1) From 80a4c95851a9c7d56a3f3fed0073d0c3d5e23578 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:48:23 +0200 Subject: [PATCH 06/10] refactor: extra class for renaming (multi)indeces Helper class to keep track of dimension names. When dimension names of MultiIndex needs to be changes to avoid name conflicts, the class provides easy forth-and-back transformations between new and old dimension names. --- tests/utils/test_dimension_renamer.py | 68 +++++++++++++++++++++++++++ xeofs/utils/dimension_renamer.py | 34 ++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/utils/test_dimension_renamer.py create mode 100644 xeofs/utils/dimension_renamer.py diff --git a/tests/utils/test_dimension_renamer.py b/tests/utils/test_dimension_renamer.py new file mode 100644 index 0000000..4ce1ca4 --- /dev/null +++ b/tests/utils/test_dimension_renamer.py @@ -0,0 +1,68 @@ +import xarray as xr +import pytest + +from xeofs.utils.dimension_renamer import DimensionRenamer + +import xarray as xr +import pandas as pd +import pytest + + +@pytest.fixture +def da_simple(): + # Basic DataArray with single dimension + return xr.DataArray([1, 2, 3], coords={"dim0": ["a", "b", "c"]}, dims=["dim0"]) + + +@pytest.fixture +def da_multi(): + # DataArray with MultiIndex + arrays = [ + pd.Index(["a", "b", "c"], name="dim1a"), + pd.Index([1, 2, 3], name="dim1b"), + ] + multi_index = pd.MultiIndex.from_arrays(arrays) + return xr.DataArray([1, 2, 3], coords={"dim1": multi_index}, dims=["dim1"]) + + +def test_simple_dim_rename(da_simple): + renamer = DimensionRenamer("dim0", "_suffix") + da_new = renamer.fit_transform(da_simple) + assert "dim0_suffix" in da_new.dims + + # Inverse transform + da_orig = renamer.inverse_transform(da_new) + assert "dim0" in da_orig.dims + + +def test_multiindex_dim_rename(da_multi): + renamer = DimensionRenamer("dim1", "_suffix") + da_new = renamer.fit_transform(da_multi) + assert "dim1_suffix" in da_new.dims + assert "dim1a_suffix" in da_new.coords["dim1_suffix"].coords.keys() + assert "dim1b_suffix" in da_new.coords["dim1_suffix"].coords.keys() + + # Inverse transform + da_orig = renamer.inverse_transform(da_new) + assert "dim1" in da_orig.dims + assert "dim1a" in da_orig.coords["dim1"].coords.keys() + assert "dim1b" in da_orig.coords["dim1"].coords.keys() + + +def test_fit_without_transform(da_simple): + renamer = DimensionRenamer("dim0", "_suffix") + renamer.fit(da_simple) + assert hasattr(renamer, "dims_mapping") + assert renamer.dims_mapping == {"dim0": "dim0_suffix"} + + +def test_incorrect_dim_name(da_simple): + with pytest.raises(KeyError): + renamer = DimensionRenamer("nonexistent_dim", "_suffix") + renamer.fit_transform(da_simple) + + +def test_empty_suffix(da_simple): + renamer = DimensionRenamer("dim0", "") + da_new = renamer.fit_transform(da_simple) + assert "dim0" in da_new.dims diff --git a/xeofs/utils/dimension_renamer.py b/xeofs/utils/dimension_renamer.py new file mode 100644 index 0000000..7757c1d --- /dev/null +++ b/xeofs/utils/dimension_renamer.py @@ -0,0 +1,34 @@ +class DimensionRenamer: + """Rename dimensions of an xarray DataArray. + + Parameters + ---------- + dim : str + Name of the dimension to be renamed. Can be a dimension containing a MultiIndex. + suffix : str + Suffix to be added to the dimension name. + + """ + + def __init__(self, dim, suffix): + self.suffix = suffix + self.dim = dim + + def fit(self, da): + self.dims_mapping = { + dim: dim + self.suffix for dim in da.coords[self.dim].coords.keys() + } + + def transform(self, da): + for old, new in self.dims_mapping.items(): + da = da.rename({old: new}) + return da + + def fit_transform(self, da): + self.fit(da) + return self.transform(da) + + def inverse_transform(self, da): + for old, new in self.dims_mapping.items(): + da = da.rename({new: old}) + return da From 100f40d7d2f5d9c59725948f4a642a511c4b347f Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:50:35 +0200 Subject: [PATCH 07/10] feat(MCA): provide optional PCA preprocessing This can greatly increase computation speed --- xeofs/models/_base_cross_model.py | 15 +++ xeofs/models/mca.py | 199 +++++++++++++++++++++++++----- 2 files changed, 180 insertions(+), 34 deletions(-) diff --git a/xeofs/models/_base_cross_model.py b/xeofs/models/_base_cross_model.py index b108790..a18b8a8 100644 --- a/xeofs/models/_base_cross_model.py +++ b/xeofs/models/_base_cross_model.py @@ -4,6 +4,7 @@ from dask.diagnostics.progress import ProgressBar +from .eof import EOF from ..preprocessing.preprocessor import Preprocessor from ..data_container import _BaseCrossModelDataContainer from ..utils.data_types import AnyDataObject, DataArray @@ -24,6 +25,12 @@ class _BaseCrossModel(ABC): Whether to use cosine of latitude for scaling. use_weights: bool, default=False Whether to use weights. + n_pca_modes: int, default=None + Number of PCA modes to calculate. + solver: {"auto", "full", "randomized"}, default="auto" + Solver to use for the SVD computation. + solver_kwargs: dict, default={} + Additional keyword arguments to pass to the solver. """ @@ -33,6 +40,8 @@ def __init__( standardize=False, use_coslat=False, use_weights=False, + n_pca_modes=None, + solver="auto", solver_kwargs={}, ): # Define model parameters @@ -41,6 +50,8 @@ def __init__( "standardize": standardize, "use_coslat": use_coslat, "use_weights": use_weights, + "n_pca_modes": n_pca_modes, + "solver": solver, } self._solver_kwargs = solver_kwargs @@ -70,6 +81,10 @@ def __init__( # The actual data container will be initialized in respective subclasses self.data: _BaseCrossModelDataContainer = _BaseCrossModelDataContainer() + # Initialize PCA objects + self.pca1 = EOF(n_modes=n_pca_modes) if n_pca_modes else None + self.pca2 = EOF(n_modes=n_pca_modes) if n_pca_modes else None + @abstractmethod def fit( self, diff --git a/xeofs/models/mca.py b/xeofs/models/mca.py index 50942c6..42f1037 100644 --- a/xeofs/models/mca.py +++ b/xeofs/models/mca.py @@ -5,7 +5,7 @@ from dask.diagnostics.progress import ProgressBar from ._base_cross_model import _BaseCrossModel -from .decomposer import CrossDecomposer +from .decomposer import Decomposer from ..utils.data_types import AnyDataObject, DataArray from ..data_container.mca_data_container import ( MCADataContainer, @@ -13,6 +13,7 @@ ) from ..utils.statistics import pearson_correlation from ..utils.xarray_utils import hilbert_transform +from ..utils.dimension_renamer import DimensionRenamer class MCA(_BaseCrossModel): @@ -30,8 +31,18 @@ class MCA(_BaseCrossModel): Whether to use cosine of latitude for scaling. use_weights: bool, default=False Whether to use additional weights. + solver: {"auto", "full", "randomized"}, default="auto" + Solver to use for the SVD computation. solver_kwargs: dict, default={} Additional keyword arguments passed to the SVD solver. + n_pca_modes: int, default=None + The number of principal components to retain during the PCA preprocessing + step applied to both data sets prior to executing MCA. + If set to None, PCA preprocessing will be bypassed, and the MCA will be performed on the original datasets. + Specifying an integer value greater than 0 for `n_pca_modes` will trigger the PCA preprocessing, retaining + only the specified number of principal components. This reduction in dimensionality can be especially beneficial + when dealing with high-dimensional data, where computing the cross-covariance matrix can become computationally + intensive or in scenarios where multicollinearity is a concern. Notes ----- @@ -58,6 +69,18 @@ def __init__(self, solver_kwargs={}, **kwargs): # Initialize the DataContainer to store the results self.data: MCADataContainer = MCADataContainer() + def _compute_cross_covariance_matrix(self, X1, X2): + """Compute the cross-covariance matrix of two data objects. + + Note: It is assumed that the data objects are centered. + + """ + if X1.sample.size != X2.sample.size: + err_msg = "The two data objects must have the same number of samples." + raise ValueError(err_msg) + + return xr.dot(X1.conj(), X2, dims="sample") / (X1.sample.size - 1) + def fit( self, data1: AnyDataObject, @@ -66,6 +89,7 @@ def fit( weights1=None, weights2=None, ): + # Preprocess the data data1_processed: DataArray = self.preprocessor1.fit_transform( data1, dim, weights1 ) @@ -73,23 +97,67 @@ def fit( data2, dim, weights2 ) - decomposer = CrossDecomposer( - n_modes=self._params["n_modes"], **self._solver_kwargs + # Initialize the SVD decomposer + decomposer = Decomposer( + n_modes=self._params["n_modes"], + solver=self._params["solver"], + **self._solver_kwargs, ) - decomposer.fit(data1_processed, data2_processed) - - # Note: - # - explained variance is given by the singular values of the SVD; - # - We use the term singular_values_pca as used in the context of PCA: - # Considering data X1 = X2, MCA is the same as PCA. In this case, - # singular_values_pca is equivalent to the singular values obtained - # when performing PCA of X1 or X2. - singular_values = decomposer.singular_values_ - singular_vectors1 = decomposer.singular_vectors1_ - singular_vectors2 = decomposer.singular_vectors2_ + # Perform SVD on PCA-reduced data + if (self.pca1 is not None) and (self.pca2 is not None): + # Fit the PCA models + self.pca1.fit(data1_processed, "sample") + self.pca2.fit(data2_processed, "sample") + # Get the PCA scores + pca_scores1 = self.pca1.data.scores * self.pca1.data.singular_values + pca_scores2 = self.pca2.data.scores * self.pca2.data.singular_values + # Compute the cross-covariance matrix of the PCA scores + pca_scores1 = pca_scores1.rename({"mode": "feature1"}) + pca_scores2 = pca_scores2.rename({"mode": "feature2"}) + cov_matrix = self._compute_cross_covariance_matrix(pca_scores1, pca_scores2) + + # Perform the SVD + decomposer.fit(cov_matrix, dims=("feature1", "feature2")) + V1 = decomposer.U_ # left singular vectors (feature1 x mode) + V2 = decomposer.V_ # right singular vectors (feature2 x mode) + + V1pre = self.pca1.data.components # left PCA eigenvectors (feature x mode) + V2pre = self.pca2.data.components # right PCA eigenvectors (feature x mode) + + # Compute the singular vectors + V1pre = V1pre.rename({"mode": "feature1"}) + V2pre = V2pre.rename({"mode": "feature2"}) + singular_vectors1 = xr.dot(V1pre, V1, dims="feature1") + singular_vectors2 = xr.dot(V2pre, V2, dims="feature2") + + # Perform SVD directly on data + else: + # Rename feature and associated dimensions of data objects to avoid index conflicts + dim_renamer1 = DimensionRenamer("feature", "1") + dim_renamer2 = DimensionRenamer("feature", "2") + data1_processed_temp = dim_renamer1.fit_transform(data1_processed) + data2_processed_temp = dim_renamer2.fit_transform(data2_processed) + # Compute the cross-covariance matrix + cov_matrix = self._compute_cross_covariance_matrix( + data1_processed_temp, data2_processed_temp + ) + + # Perform the SVD + decomposer.fit(cov_matrix, dims=("feature1", "feature2")) + singular_vectors1 = decomposer.U_ + singular_vectors2 = decomposer.V_ + + # Rename the singular vectors + singular_vectors1 = dim_renamer1.inverse_transform(singular_vectors1) + singular_vectors2 = dim_renamer2.inverse_transform(singular_vectors2) + + # Store the results + singular_values = decomposer.s_ + + # Compute total squared variance squared_covariance = singular_values**2 - total_squared_covariance = decomposer.total_squared_covariance_ + total_squared_covariance = (abs(cov_matrix) ** 2).sum() norm1 = np.sqrt(singular_values) norm2 = np.sqrt(singular_values) @@ -441,7 +509,7 @@ def heterogeneous_patterns(self, correction=None, alpha=0.05): class ComplexMCA(MCA): """Complex Maximum Covariance Analysis (MCA). - Complex MCA, also referred to as Analytical SVD (ASVD) by Shane et al. (2017)[1]_, + Complex MCA, also referred to as Analytical SVD (ASVD) by Elipot et al. (2017)[1]_, enhances traditional MCA by accommodating both amplitude and phase information. It achieves this by utilizing the Hilbert transform to preprocess the data, thus allowing for a more comprehensive analysis in the subsequent MCA computation. @@ -530,39 +598,102 @@ def fit( """ data1_processed: DataArray = self.preprocessor1.fit_transform( - data1, dim, weights2 + data1, dim, weights1 ) data2_processed: DataArray = self.preprocessor2.fit_transform( data2, dim, weights2 ) - # apply hilbert transform: + # Apply Hilbert transform: padding = self._params["padding"] decay_factor = self._params["decay_factor"] data1_processed = hilbert_transform( - data1_processed, dim="sample", padding=padding, decay_factor=decay_factor + data1_processed, + dim="sample", + padding=padding, + decay_factor=decay_factor, ) data2_processed = hilbert_transform( - data2_processed, dim="sample", padding=padding, decay_factor=decay_factor + data2_processed, + dim="sample", + padding=padding, + decay_factor=decay_factor, ) - decomposer = CrossDecomposer( - n_modes=self._params["n_modes"], **self._solver_kwargs + # Initialize the SVD decomposer + decomposer = Decomposer( + n_modes=self._params["n_modes"], + solver=self._params["solver"], + **self._solver_kwargs, ) - decomposer.fit(data1_processed, data2_processed) - - # Note: - # - explained variance is given by the singular values of the SVD; - # - We use the term singular_values_pca as used in the context of PCA: - # Considering data X1 = X2, MCA is the same as PCA. In this case, - # singular_values_pca is equivalent to the singular values obtained - # when performing PCA of X1 or X2. - singular_values = decomposer.singular_values_ - singular_vectors1 = decomposer.singular_vectors1_ - singular_vectors2 = decomposer.singular_vectors2_ + # Perform SVD on PCA-reduced data + if (self.pca1 is not None) and (self.pca2 is not None): + # Fit the PCA models + self.pca1.fit(data1_processed, "sample") + self.pca2.fit(data2_processed, "sample") + # Get the PCA scores + pca_scores1 = self.pca1.data.scores * self.pca1.data.singular_values + pca_scores2 = self.pca2.data.scores * self.pca2.data.singular_values + # Apply hilbert transform + pca_scores1 = hilbert_transform( + pca_scores1, + dim="sample", + padding=padding, + decay_factor=decay_factor, + ) + pca_scores2 = hilbert_transform( + pca_scores2, + dim="sample", + padding=padding, + decay_factor=decay_factor, + ) + # Compute the cross-covariance matrix of the PCA scores + pca_scores1 = pca_scores1.rename({"mode": "feature"}) + pca_scores2 = pca_scores2.rename({"mode": "feature"}) + cov_matrix = self._compute_cross_covariance_matrix(pca_scores1, pca_scores2) + + # Perform the SVD + decomposer.fit(cov_matrix, dims=("feature1", "feature2")) + V1 = decomposer.U_ # left singular vectors (feature1 x mode) + V2 = decomposer.V_ # right singular vectors (feature2 x mode) + + V1pre = self.pca1.data.components # left PCA eigenvectors (feature x mode) + V2pre = self.pca2.data.components # right PCA eigenvectors (feature x mode) + + # Compute the singular vectors + V1pre = V1pre.rename({"mode": "feature1"}) + V2pre = V2pre.rename({"mode": "feature2"}) + singular_vectors1 = xr.dot(V1pre, V1, dims="feature1") + singular_vectors2 = xr.dot(V2pre, V2, dims="feature2") + + # Perform SVD directly on data + else: + # Rename feature and associated dimensions of data objects to avoid index conflicts + dim_renamer1 = DimensionRenamer("feature", "1") + dim_renamer2 = DimensionRenamer("feature", "2") + data1_processed_temp = dim_renamer1.fit_transform(data1_processed) + data2_processed_temp = dim_renamer2.fit_transform(data2_processed) + # Compute the cross-covariance matrix + cov_matrix = self._compute_cross_covariance_matrix( + data1_processed_temp, data2_processed_temp + ) + + # Perform the SVD + decomposer.fit(cov_matrix, dims=("feature1", "feature2")) + singular_vectors1 = decomposer.U_ + singular_vectors2 = decomposer.V_ + + # Rename the singular vectors + singular_vectors1 = dim_renamer1.inverse_transform(singular_vectors1) + singular_vectors2 = dim_renamer2.inverse_transform(singular_vectors2) + + # Store the results + singular_values = decomposer.s_ + + # Compute total squared variance squared_covariance = singular_values**2 - total_squared_covariance = decomposer.total_squared_covariance_ + total_squared_covariance = (abs(cov_matrix) ** 2).sum() norm1 = np.sqrt(singular_values) norm2 = np.sqrt(singular_values) From 782e6242ac1f0dd34ed12ee3d0e8154c66551d83 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Fri, 11 Aug 2023 14:51:11 +0200 Subject: [PATCH 08/10] feat: add Canonical Correlation Analysis --- xeofs/models/cca.py | 743 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 743 insertions(+) create mode 100644 xeofs/models/cca.py diff --git a/xeofs/models/cca.py b/xeofs/models/cca.py new file mode 100644 index 0000000..1955887 --- /dev/null +++ b/xeofs/models/cca.py @@ -0,0 +1,743 @@ +# from typing import Tuple + +# import numpy as np +# import xarray as xr + +# from ._base_cross_model import _BaseCrossModel +# from .decomposer import CrossDecomposer +# from ..utils.data_types import AnyDataObject, DataArray +# from ..data_container.mca_data_container import ( +# MCADataContainer, +# ComplexMCADataContainer, +# ) +# from ..utils.statistics import pearson_correlation +# from ..utils.xarray_utils import hilbert_transform + + +# class MCA(_BaseCrossModel): +# """Maximum Covariance Analyis (MCA). + +# MCA is a statistical method that finds patterns of maximum covariance between two datasets. + +# Parameters +# ---------- +# n_modes: int, default=10 +# Number of modes to calculate. +# standardize: bool, default=False +# Whether to standardize the input data. +# use_coslat: bool, default=False +# Whether to use cosine of latitude for scaling. +# use_weights: bool, default=False +# Whether to use additional weights. +# solver_kwargs: dict, default={} +# Additional keyword arguments passed to the SVD solver. +# n_pca_modes: int, default=None +# The number of principal components to retain during the PCA preprocessing +# step applied to both data sets prior to executing MCA. +# If set to None, PCA preprocessing will be bypassed, and the MCA will be performed on the original datasets. +# Specifying an integer value greater than 0 for `n_pca_modes` will trigger the PCA preprocessing, retaining +# only the specified number of principal components. This reduction in dimensionality can be especially beneficial +# when dealing with high-dimensional data, where computing the cross-covariance matrix can become computationally +# intensive or in scenarios where multicollinearity is a concern. + +# Notes +# ----- +# MCA is similar to Principal Component Analysis (PCA) and Canonical Correlation Analysis (CCA), +# but while PCA finds modes of maximum variance and CCA finds modes of maximum correlation, +# MCA finds modes of maximum covariance. See [1]_ [2]_ for more details. + +# References +# ---------- +# .. [1] Bretherton, C., Smith, C., Wallace, J., 1992. An intercomparison of methods for finding coupled patterns in climate data. Journal of climate 5, 541–560. +# .. [2] Cherry, S., 1996. Singular value decomposition analysis and canonical correlation analysis. Journal of Climate 9, 2003–2009. + +# Examples +# -------- +# >>> model = MCA(n_modes=5, standardize=True) +# >>> model.fit(data1, data2) + +# """ + +# def __init__(self, solver_kwargs={}, **kwargs): +# super().__init__(solver_kwargs=solver_kwargs, **kwargs) +# self.attrs.update({"model": "MCA"}) + +# # Initialize the DataContainer to store the results +# self.data: MCADataContainer = MCADataContainer() + +# def fit( +# self, +# data1: AnyDataObject, +# data2: AnyDataObject, +# dim, +# weights1=None, +# weights2=None, +# ): +# # Preprocess the data +# data1_processed: DataArray = self.preprocessor1.fit_transform( +# data1, dim, weights1 +# ) +# data2_processed: DataArray = self.preprocessor2.fit_transform( +# data2, dim, weights2 +# ) + +# # Perform the decomposition of the cross-covariance matrix +# decomposer = CrossDecomposer( +# n_modes=self._params["n_modes"], **self._solver_kwargs +# ) + +# # Perform SVD on PCA-reduced data +# if (self.pca1 is not None) and (self.pca2 is not None): +# # Fit the PCA models +# self.pca1.fit(data1_processed, "sample") +# self.pca2.fit(data2_processed, "sample") +# # Get the PCA scores +# pca_scores1 = self.pca1.data.scores * self.pca1.data.singular_values +# pca_scores2 = self.pca2.data.scores * self.pca2.data.singular_values +# # Rename the dimensions to adhere to the CrossDecomposer API +# pca_scores1 = pca_scores1.rename({"mode": "feature"}) +# pca_scores2 = pca_scores2.rename({"mode": "feature"}) + +# # Perform the SVD +# decomposer.fit(pca_scores1, pca_scores2) +# V1 = decomposer.singular_vectors1_.rename({"feature": "core"}) +# V2 = decomposer.singular_vectors2_.rename({"feature": "core"}) + +# V1pre = self.pca1.data.components.rename({"mode": "core"}) +# V2pre = self.pca2.data.components.rename({"mode": "core"}) + +# # Compute the singular vectors based on the PCA eigenvectors +# singular_vectors1 = xr.dot(V1pre, V1, dims="core") +# singular_vectors2 = xr.dot(V2pre, V2, dims="core") + +# # Perform SVD directly on data +# else: +# decomposer.fit(data1_processed, data2_processed) +# singular_vectors1 = decomposer.singular_vectors1_ +# singular_vectors2 = decomposer.singular_vectors2_ + +# # Store the results +# singular_values = decomposer.singular_values_ + +# squared_covariance = singular_values**2 +# total_squared_covariance = decomposer.total_squared_covariance_ + +# norm1 = np.sqrt(singular_values) +# norm2 = np.sqrt(singular_values) + +# # Index of the sorted squared covariance +# idx_sorted_modes = squared_covariance.compute().argsort()[::-1] +# idx_sorted_modes.coords.update(squared_covariance.coords) + +# # Project the data onto the singular vectors +# scores1 = xr.dot(data1_processed, singular_vectors1, dims="feature") / norm1 +# scores2 = xr.dot(data2_processed, singular_vectors2, dims="feature") / norm2 + +# self.data.set_data( +# input_data1=data1_processed, +# input_data2=data2_processed, +# components1=singular_vectors1, +# components2=singular_vectors2, +# scores1=scores1, +# scores2=scores2, +# squared_covariance=squared_covariance, +# total_squared_covariance=total_squared_covariance, +# idx_modes_sorted=idx_sorted_modes, +# norm1=norm1, +# norm2=norm2, +# ) +# # Assign analysis-relevant meta data +# self.data.set_attrs(self.attrs) + +# def transform(self, **kwargs): +# """Project new unseen data onto the singular vectors. + +# Parameters +# ---------- +# data1: xr.DataArray or list of xarray.DataArray +# Left input data. Must be provided if `data2` is not provided. +# data2: xr.DataArray or list of xarray.DataArray +# Right input data. Must be provided if `data1` is not provided. + +# Returns +# ------- +# scores1: DataArray | Dataset | List[DataArray] +# Left scores. +# scores2: DataArray | Dataset | List[DataArray] +# Right scores. + +# """ +# results = [] +# if "data1" in kwargs.keys(): +# # Preprocess input data +# data1 = kwargs["data1"] +# data1 = self.preprocessor1.transform(data1) +# # Project data onto singular vectors +# comps1 = self.data.components1 +# norm1 = self.data.norm1 +# scores1 = xr.dot(data1, comps1) / norm1 +# # Inverse transform scores +# scores1 = self.preprocessor1.inverse_transform_scores(scores1) +# results.append(scores1) + +# if "data2" in kwargs.keys(): +# # Preprocess input data +# data2 = kwargs["data2"] +# data2 = self.preprocessor2.transform(data2) +# # Project data onto singular vectors +# comps2 = self.data.components2 +# norm2 = self.data.norm2 +# scores2 = xr.dot(data2, comps2) / norm2 +# # Inverse transform scores +# scores2 = self.preprocessor2.inverse_transform_scores(scores2) +# results.append(scores2) + +# return results + +# def inverse_transform(self, mode): +# """Reconstruct the original data from transformed data. + +# Parameters +# ---------- +# mode: scalars, slices or array of tick labels. +# The mode(s) used to reconstruct the data. If a scalar is given, +# the data will be reconstructed using the given mode. If a slice +# is given, the data will be reconstructed using the modes in the +# given slice. If a array is given, the data will be reconstructed +# using the modes in the given array. + +# Returns +# ------- +# Xrec1: DataArray | Dataset | List[DataArray] +# Reconstructed data of left field. +# Xrec2: DataArray | Dataset | List[DataArray] +# Reconstructed data of right field. + +# """ +# # Singular vectors +# comps1 = self.data.components1.sel(mode=mode) +# comps2 = self.data.components2.sel(mode=mode) + +# # Scores = projections +# scores1 = self.data.scores1.sel(mode=mode) +# scores2 = self.data.scores2.sel(mode=mode) + +# # Norms +# norm1 = self.data.norm1.sel(mode=mode) +# norm2 = self.data.norm2.sel(mode=mode) + +# # Reconstruct the data +# data1 = xr.dot(scores1, comps1.conj() * norm1, dims="mode") +# data2 = xr.dot(scores2, comps2.conj() * norm2, dims="mode") + +# # Enforce real output +# data1 = data1.real +# data2 = data2.real + +# # Unstack and rescale the data +# data1 = self.preprocessor1.inverse_transform_data(data1) +# data2 = self.preprocessor2.inverse_transform_data(data2) + +# return data1, data2 + +# def squared_covariance(self): +# """Get the squared covariance. + +# The squared covariance corresponds to the explained variance in PCA and is given by the +# squared singular values of the covariance matrix. + +# """ +# return self.data.squared_covariance + +# def squared_covariance_fraction(self): +# """Calculate the squared covariance fraction (SCF). + +# The SCF is a measure of the proportion of the total squared covariance that is explained by each mode `i`. It is computed +# as follows: + +# .. math:: +# SCF_i = \\frac{\\sigma_i^2}{\\sum_{i=1}^{m} \\sigma_i^2} + +# where `m` is the total number of modes and :math:`\\sigma_i` is the `ith` singular value of the covariance matrix. + +# """ +# return self.data.squared_covariance_fraction + +# def singular_values(self): +# """Get the singular values of the cross-covariance matrix.""" +# return self.data.singular_values + +# def covariance_fraction(self): +# """Get the covariance fraction (CF). + +# Cheng and Dunkerton (1995) define the CF as follows: + +# .. math:: +# CF_i = \\frac{\\sigma_i}{\\sum_{i=1}^{m} \\sigma_i} + +# where `m` is the total number of modes and :math:`\\sigma_i` is the `ith` singular value of the covariance matrix. + +# In this implementation the sum of singular values is estimated from the first `n` modes, therefore one should aim to +# retain as many modes as possible to get a good estimate of the covariance fraction. + +# Note +# ---- +# It is important to differentiate the CF from the squared covariance fraction (SCF). While the SCF is an invariant quantity in MCA, the CF is not. +# Therefore, the SCF is used to assess the relative importance of each mode. Cheng and Dunkerton (1995) [1]_ introduced the CF in the context of +# Varimax-rotated MCA to compare the relative importance of each mode before and after rotation. In the special case of both data fields in MCA being identical, +# the CF is equivalent to the explained variance ratio in EOF analysis. + +# References +# ---------- +# .. [1] Cheng, X., Dunkerton, T.J., 1995. Orthogonal Rotation of Spatial Patterns Derived from Singular Value Decomposition Analysis. J. Climate 8, 2631–2643. https://doi.org/10.1175/1520-0442(1995)008<2631:OROSPD>2.0.CO;2 + + +# """ +# # Check how sensitive the CF is to the number of modes +# svals = self.data.singular_values +# cf = svals[0] / svals.cumsum() +# change_per_mode = cf.shift({"mode": 1}) - cf +# change_in_cf_in_last_mode = change_per_mode.isel(mode=-1) +# if change_in_cf_in_last_mode > 0.001: +# print( +# f"Warning: CF is sensitive to the number of modes retained. Please increase `n_modes` for a better estimate." +# ) +# return self.data.covariance_fraction + +# def components(self): +# """Return the singular vectors of the left and right field. + +# Returns +# ------- +# components1: DataArray | Dataset | List[DataArray] +# Left components of the fitted model. +# components2: DataArray | Dataset | List[DataArray] +# Right components of the fitted model. + +# """ +# return super().components() + +# def scores(self): +# """Return the scores of the left and right field. + +# The scores in MCA are the projection of the left and right field onto the +# left and right singular vector of the cross-covariance matrix. + +# Returns +# ------- +# scores1: DataArray +# Left scores. +# scores2: DataArray +# Right scores. + +# """ +# return super().scores() + +# def homogeneous_patterns(self, correction=None, alpha=0.05): +# """Return the homogeneous patterns of the left and right field. + +# The homogeneous patterns are the correlation coefficients between the +# input data and the scores. + +# More precisely, the homogeneous patterns `r_{hom}` are defined as + +# .. math:: +# r_{hom, x} = corr \\left(X, A_x \\right) +# .. math:: +# r_{hom, y} = corr \\left(Y, A_y \\right) + +# where :math:`X` and :math:`Y` are the input data, :math:`A_x` and :math:`A_y` +# are the scores of the left and right field, respectively. + +# Parameters +# ---------- +# correction: str, default=None +# Method to apply a multiple testing correction. If None, no correction +# is applied. Available methods are: +# - bonferroni : one-step correction +# - sidak : one-step correction +# - holm-sidak : step down method using Sidak adjustments +# - holm : step-down method using Bonferroni adjustments +# - simes-hochberg : step-up method (independent) +# - hommel : closed method based on Simes tests (non-negative) +# - fdr_bh : Benjamini/Hochberg (non-negative) (default) +# - fdr_by : Benjamini/Yekutieli (negative) +# - fdr_tsbh : two stage fdr correction (non-negative) +# - fdr_tsbky : two stage fdr correction (non-negative) +# alpha: float, default=0.05 +# The desired family-wise error rate. Not used if `correction` is None. + +# Returns +# ------- +# patterns1: DataArray | Dataset | List[DataArray] +# Left homogenous patterns. +# patterns2: DataArray | Dataset | List[DataArray] +# Right homogenous patterns. +# pvals1: DataArray | Dataset | List[DataArray] +# Left p-values. +# pvals2: DataArray | Dataset | List[DataArray] +# Right p-values. + +# """ +# input_data1 = self.data.input_data1 +# input_data2 = self.data.input_data2 + +# scores1 = self.data.scores1 +# scores2 = self.data.scores2 + +# hom_pat1, pvals1 = pearson_correlation( +# input_data1, scores1, correction=correction, alpha=alpha +# ) +# hom_pat2, pvals2 = pearson_correlation( +# input_data2, scores2, correction=correction, alpha=alpha +# ) + +# hom_pat1 = self.preprocessor1.inverse_transform_components(hom_pat1) +# hom_pat2 = self.preprocessor2.inverse_transform_components(hom_pat2) + +# pvals1 = self.preprocessor1.inverse_transform_components(pvals1) +# pvals2 = self.preprocessor2.inverse_transform_components(pvals2) + +# hom_pat1.name = "left_homogeneous_patterns" +# hom_pat2.name = "right_homogeneous_patterns" + +# pvals1.name = "pvalues_of_left_homogeneous_patterns" +# pvals2.name = "pvalues_of_right_homogeneous_patterns" + +# return (hom_pat1, hom_pat2), (pvals1, pvals2) + +# def heterogeneous_patterns(self, correction=None, alpha=0.05): +# """Return the heterogeneous patterns of the left and right field. + +# The heterogeneous patterns are the correlation coefficients between the +# input data and the scores of the other field. + +# More precisely, the heterogeneous patterns `r_{het}` are defined as + +# .. math:: +# r_{het, x} = corr \\left(X, A_y \\right) +# .. math:: +# r_{het, y} = corr \\left(Y, A_x \\right) + +# where :math:`X` and :math:`Y` are the input data, :math:`A_x` and :math:`A_y` +# are the scores of the left and right field, respectively. + +# Parameters +# ---------- +# correction: str, default=None +# Method to apply a multiple testing correction. If None, no correction +# is applied. Available methods are: +# - bonferroni : one-step correction +# - sidak : one-step correction +# - holm-sidak : step down method using Sidak adjustments +# - holm : step-down method using Bonferroni adjustments +# - simes-hochberg : step-up method (independent) +# - hommel : closed method based on Simes tests (non-negative) +# - fdr_bh : Benjamini/Hochberg (non-negative) (default) +# - fdr_by : Benjamini/Yekutieli (negative) +# - fdr_tsbh : two stage fdr correction (non-negative) +# - fdr_tsbky : two stage fdr correction (non-negative) +# alpha: float, default=0.05 +# The desired family-wise error rate. Not used if `correction` is None. + +# """ +# input_data1 = self.data.input_data1 +# input_data2 = self.data.input_data2 + +# scores1 = self.data.scores1 +# scores2 = self.data.scores2 + +# patterns1, pvals1 = pearson_correlation( +# input_data1, scores2, correction=correction, alpha=alpha +# ) +# patterns2, pvals2 = pearson_correlation( +# input_data2, scores1, correction=correction, alpha=alpha +# ) + +# patterns1 = self.preprocessor1.inverse_transform_components(patterns1) +# patterns2 = self.preprocessor2.inverse_transform_components(patterns2) + +# pvals1 = self.preprocessor1.inverse_transform_components(pvals1) +# pvals2 = self.preprocessor2.inverse_transform_components(pvals2) + +# patterns1.name = "left_heterogeneous_patterns" +# patterns2.name = "right_heterogeneous_patterns" + +# pvals1.name = "pvalues_of_left_heterogeneous_patterns" +# pvals2.name = "pvalues_of_right_heterogeneous_patterns" + +# return (patterns1, patterns2), (pvals1, pvals2) + + +# class ComplexMCA(MCA): +# """Complex Maximum Covariance Analysis (MCA). + +# Complex MCA, also referred to as Analytical SVD (ASVD) by Shane et al. (2017)[1]_, +# enhances traditional MCA by accommodating both amplitude and phase information. +# It achieves this by utilizing the Hilbert transform to preprocess the data, +# thus allowing for a more comprehensive analysis in the subsequent MCA computation. + +# An optional padding with exponentially decaying values can be applied prior to +# the Hilbert transform in order to mitigate the impact of spectral leakage. + +# Parameters +# ---------- +# n_modes: int, default=10 +# Number of modes to calculate. +# standardize: bool, default=False +# Whether to standardize the input data. +# use_coslat: bool, default=False +# Whether to use cosine of latitude for scaling. +# use_weights: bool, default=False +# Whether to use additional weights. +# padding : str, optional +# Specifies the method used for padding the data prior to applying the Hilbert +# transform. This can help to mitigate the effect of spectral leakage. +# Currently, only 'exp' for exponential padding is supported. Default is 'exp'. +# decay_factor : float, optional +# Specifies the decay factor used in the exponential padding. This parameter +# is only used if padding='exp'. The recommended value typically ranges between 0.05 to 0.2 +# but ultimately depends on the variability in the data. +# A smaller value (e.g. 0.05) is recommended for +# data with high variability, while a larger value (e.g. 0.2) is recommended +# for data with low variability. Default is 0.2. +# solver_kwargs: dict, default={} +# Additional keyword arguments passed to the SVD solver. + +# Notes +# ----- +# Complex MCA extends MCA to complex-valued data that contain both magnitude and phase information. +# The Hilbert transform is used to transform real-valued data to complex-valued data, from which both +# amplitude and phase can be extracted. + +# Similar to MCA, Complex MCA is used in climate science to identify coupled patterns of variability +# between two different climate variables. But unlike MCA, Complex MCA can identify coupled patterns +# that involve phase shifts. + +# References +# ---------- +# [1]_: Elipot, S., Frajka-Williams, E., Hughes, C.W., Olhede, S., Lankhorst, M., 2017. Observed Basin-Scale Response of the North Atlantic Meridional Overturning Circulation to Wind Stress Forcing. Journal of Climate 30, 2029–2054. https://doi.org/10.1175/JCLI-D-16-0664.1 + + +# Examples +# -------- +# >>> model = ComplexMCA(n_modes=5, standardize=True) +# >>> model.fit(data1, data2) + +# """ + +# def __init__(self, padding="exp", decay_factor=0.2, **kwargs): +# super().__init__(**kwargs) +# self.attrs.update({"model": "Complex MCA"}) +# self._params.update({"padding": padding, "decay_factor": decay_factor}) + +# # Initialize the DataContainer to store the results +# self.data: ComplexMCADataContainer = ComplexMCADataContainer() + +# def fit( +# self, +# data1: AnyDataObject, +# data2: AnyDataObject, +# dim, +# weights1=None, +# weights2=None, +# ): +# """Fit the model. + +# Parameters +# ---------- +# data1: xr.DataArray or list of xarray.DataArray +# Left input data. +# data2: xr.DataArray or list of xarray.DataArray +# Right input data. +# dim: tuple +# Tuple specifying the sample dimensions. The remaining dimensions +# will be treated as feature dimensions. +# weights1: xr.DataArray or xr.Dataset or None, default=None +# If specified, the left input data will be weighted by this array. +# weights2: xr.DataArray or xr.Dataset or None, default=None +# If specified, the right input data will be weighted by this array. + +# """ + +# data1_processed: DataArray = self.preprocessor1.fit_transform( +# data1, dim, weights2 +# ) +# data2_processed: DataArray = self.preprocessor2.fit_transform( +# data2, dim, weights2 +# ) + +# # apply hilbert transform: +# padding = self._params["padding"] +# decay_factor = self._params["decay_factor"] +# data1_processed = hilbert_transform( +# data1_processed, dim="sample", padding=padding, decay_factor=decay_factor +# ) +# data2_processed = hilbert_transform( +# data2_processed, dim="sample", padding=padding, decay_factor=decay_factor +# ) + +# decomposer = CrossDecomposer( +# n_modes=self._params["n_modes"], **self._solver_kwargs +# ) +# decomposer.fit(data1_processed, data2_processed) + +# # Note: +# # - explained variance is given by the singular values of the SVD; +# # - We use the term singular_values_pca as used in the context of PCA: +# # Considering data X1 = X2, MCA is the same as PCA. In this case, +# # singular_values_pca is equivalent to the singular values obtained +# # when performing PCA of X1 or X2. +# singular_values = decomposer.singular_values_ +# singular_vectors1 = decomposer.singular_vectors1_ +# singular_vectors2 = decomposer.singular_vectors2_ + +# squared_covariance = singular_values**2 +# total_squared_covariance = decomposer.total_squared_covariance_ + +# norm1 = np.sqrt(singular_values) +# norm2 = np.sqrt(singular_values) + +# # Index of the sorted squared covariance +# idx_sorted_modes = squared_covariance.compute().argsort()[::-1] +# idx_sorted_modes.coords.update(squared_covariance.coords) + +# # Project the data onto the singular vectors +# scores1 = xr.dot(data1_processed, singular_vectors1) / norm1 +# scores2 = xr.dot(data2_processed, singular_vectors2) / norm2 + +# self.data.set_data( +# input_data1=data1_processed, +# input_data2=data2_processed, +# components1=singular_vectors1, +# components2=singular_vectors2, +# scores1=scores1, +# scores2=scores2, +# squared_covariance=squared_covariance, +# total_squared_covariance=total_squared_covariance, +# idx_modes_sorted=idx_sorted_modes, +# norm1=norm1, +# norm2=norm2, +# ) +# # Assign analysis relevant meta data +# self.data.set_attrs(self.attrs) + +# def components_amplitude(self) -> Tuple[AnyDataObject, AnyDataObject]: +# """Compute the amplitude of the components. + +# The amplitude of the components are defined as + +# .. math:: +# A_ij = |C_ij| + +# where :math:`C_{ij}` is the :math:`i`-th entry of the :math:`j`-th component and +# :math:`|\\cdot|` denotes the absolute value. + +# Returns +# ------- +# AnyDataObject +# Amplitude of the left components. +# AnyDataObject +# Amplitude of the left components. + +# """ +# comps1 = self.data.components_amplitude1 +# comps2 = self.data.components_amplitude2 + +# comps1 = self.preprocessor1.inverse_transform_components(comps1) +# comps2 = self.preprocessor2.inverse_transform_components(comps2) + +# return (comps1, comps2) + +# def components_phase(self) -> Tuple[AnyDataObject, AnyDataObject]: +# """Compute the phase of the components. + +# The phase of the components are defined as + +# .. math:: +# \\phi_{ij} = \\arg(C_{ij}) + +# where :math:`C_{ij}` is the :math:`i`-th entry of the :math:`j`-th component and +# :math:`\\arg(\\cdot)` denotes the argument of a complex number. + +# Returns +# ------- +# AnyDataObject +# Phase of the left components. +# AnyDataObject +# Phase of the right components. + +# """ +# comps1 = self.data.components_phase1 +# comps2 = self.data.components_phase2 + +# comps1 = self.preprocessor1.inverse_transform_components(comps1) +# comps2 = self.preprocessor2.inverse_transform_components(comps2) + +# return (comps1, comps2) + +# def scores_amplitude(self) -> Tuple[DataArray, DataArray]: +# """Compute the amplitude of the scores. + +# The amplitude of the scores are defined as + +# .. math:: +# A_ij = |S_ij| + +# where :math:`S_{ij}` is the :math:`i`-th entry of the :math:`j`-th score and +# :math:`|\\cdot|` denotes the absolute value. + +# Returns +# ------- +# DataArray +# Amplitude of the left scores. +# DataArray +# Amplitude of the right scores. + +# """ +# scores1 = self.data.scores_amplitude1 +# scores2 = self.data.scores_amplitude2 + +# scores1 = self.preprocessor1.inverse_transform_scores(scores1) +# scores2 = self.preprocessor2.inverse_transform_scores(scores2) +# return (scores1, scores2) + +# def scores_phase(self) -> Tuple[DataArray, DataArray]: +# """Compute the phase of the scores. + +# The phase of the scores are defined as + +# .. math:: +# \\phi_{ij} = \\arg(S_{ij}) + +# where :math:`S_{ij}` is the :math:`i`-th entry of the :math:`j`-th score and +# :math:`\\arg(\\cdot)` denotes the argument of a complex number. + +# Returns +# ------- +# DataArray +# Phase of the left scores. +# DataArray +# Phase of the right scores. + +# """ +# scores1 = self.data.scores_phase1 +# scores2 = self.data.scores_phase2 + +# scores1 = self.preprocessor1.inverse_transform_scores(scores1) +# scores2 = self.preprocessor2.inverse_transform_scores(scores2) + +# return (scores1, scores2) + +# def transform(self, data1: AnyDataObject, data2: AnyDataObject): +# raise NotImplementedError("Complex MCA does not support transform method.") + +# def homogeneous_patterns(self, correction=None, alpha=0.05): +# raise NotImplementedError( +# "Complex MCA does not support homogeneous_patterns method." +# ) + +# def heterogeneous_patterns(self, correction=None, alpha=0.05): +# raise NotImplementedError( +# "Complex MCA does not support heterogeneous_patterns method." +# ) From bb8a0dad086bea00ba77c981eee64aa06a18f95e Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Mon, 14 Aug 2023 14:24:46 +0200 Subject: [PATCH 09/10] refactor(decomposer): coercing signs is optional --- xeofs/models/decomposer.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/xeofs/models/decomposer.py b/xeofs/models/decomposer.py index 88bae49..3a0df3b 100644 --- a/xeofs/models/decomposer.py +++ b/xeofs/models/decomposer.py @@ -28,8 +28,9 @@ class Decomposer: Additional keyword arguments passed to the SVD solver. """ - def __init__(self, n_modes=100, solver="auto", **kwargs): + def __init__(self, n_modes=100, flip_signs=True, solver="auto", **kwargs): self.n_modes = n_modes + self.flip_signs = flip_signs self.solver = solver self.solver_kwargs = kwargs @@ -47,9 +48,9 @@ def fit(self, X, dims=("sample", "feature")): n_coords2 = len(X.coords[dims[1]]) rank = min(n_coords1, n_coords2) - if self.n_modes >= rank: + if self.n_modes > rank: raise ValueError( - f"n_modes must be smaller to the rank of the data object ({rank})" + f"n_modes must be smaller or equal to the rank of the data object (rank={rank})" ) # Check if data is small enough to use exact SVD @@ -168,16 +169,17 @@ def fit(self, X, dims=("sample", "feature")): s.name = "s" VT.name = "V" - # Flip signs of components to ensure deterministic output - idx_sign = abs(VT).argmax(dims[1]).compute() - flip_signs = np.sign(VT.isel({dims[1]: idx_sign})) - flip_signs = flip_signs.compute() - # Drop all dimensions except 'mode' so that the index is clean - for dim, coords in flip_signs.coords.items(): - if dim != "mode": - flip_signs = flip_signs.drop(dim) - VT *= flip_signs - U *= flip_signs + if self.flip_signs: + # Flip signs of components to ensure deterministic output + idx_sign = abs(VT).argmax(dims[1]).compute() + flip_signs = np.sign(VT.isel({dims[1]: idx_sign})) + flip_signs = flip_signs.compute() + # Drop all dimensions except 'mode' so that the index is clean + for dim, coords in flip_signs.coords.items(): + if dim != "mode": + flip_signs = flip_signs.drop(dim) + VT *= flip_signs + U *= flip_signs self.U_ = U self.s_ = s From 248211086e140b5f0d8ec40a40962ed4b56a7b94 Mon Sep 17 00:00:00 2001 From: Niclas Rieger Date: Mon, 14 Aug 2023 14:25:28 +0200 Subject: [PATCH 10/10] feat: add Optimal Persistence Analysis (OPA) --- docs/_autosummary/xeofs.models.OPA.rst | 34 ++ docs/api_core_methods.rst | 1 + docs/models.rst | 11 + tests/models/test_opa.py | 392 +++++++++++++++++++++ xeofs/data_container/opa_data_container.py | 78 ++++ xeofs/models/__init__.py | 1 + xeofs/models/opa.py | 221 ++++++++++++ 7 files changed, 738 insertions(+) create mode 100644 docs/_autosummary/xeofs.models.OPA.rst create mode 100644 tests/models/test_opa.py create mode 100644 xeofs/data_container/opa_data_container.py create mode 100644 xeofs/models/opa.py diff --git a/docs/_autosummary/xeofs.models.OPA.rst b/docs/_autosummary/xeofs.models.OPA.rst new file mode 100644 index 0000000..478174a --- /dev/null +++ b/docs/_autosummary/xeofs.models.OPA.rst @@ -0,0 +1,34 @@ +xeofs.models.OPA +================ + +.. currentmodule:: xeofs.models + +.. autoclass:: OPA + :members: + :show-inheritance: + :inherited-members: + + + .. automethod:: __init__ + + + .. rubric:: Methods + + .. autosummary:: + + ~OPA.__init__ + ~OPA.components + ~OPA.compute + ~OPA.decorrelation_time + ~OPA.filter_patterns + ~OPA.fit + ~OPA.get_params + ~OPA.inverse_transform + ~OPA.scores + ~OPA.transform + + + + + + \ No newline at end of file diff --git a/docs/api_core_methods.rst b/docs/api_core_methods.rst index c3c5f90..8b8486d 100644 --- a/docs/api_core_methods.rst +++ b/docs/api_core_methods.rst @@ -6,4 +6,5 @@ Core methods :recursive: xeofs.models.EOF + xeofs.models.OPA xeofs.models.MCA diff --git a/docs/models.rst b/docs/models.rst index 3d0d890..95644b7 100644 --- a/docs/models.rst +++ b/docs/models.rst @@ -26,6 +26,17 @@ The supported models in ``xeofs`` currently include: Uncovers co-varying patterns between two sets of data. +.. grid:: 2 + + .. grid-item-card:: Optimal Persistence Analysis (OPA) + :link: api_core_methods + :link-type: doc + + + Identifies the optimal persistence patterns (OPP) with the largest decorrelation time. + + + Enhanced Interpretability --------------------------- diff --git a/tests/models/test_opa.py b/tests/models/test_opa.py new file mode 100644 index 0000000..8f3d38c --- /dev/null +++ b/tests/models/test_opa.py @@ -0,0 +1,392 @@ +import numpy as np +import xarray as xr +import pytest +import dask.array as da +from numpy.testing import assert_allclose + +from xeofs.models import OPA + + +@pytest.fixture +def opa_model(): + return OPA(n_modes=3, tau_max=3, n_pca_modes=19) + + +def test_init(): + """Tests the initialization of the OPA class""" + opa = OPA(n_modes=3, tau_max=3, n_pca_modes=19, use_coslat=True) + + # Assert parameters are correctly stored in the _params attribute + assert opa._params == { + "n_modes": 3, + "tau_max": 3, + "n_pca_modes": 19, + "standardize": False, + "use_coslat": True, + "use_weights": False, + "solver": "auto", + } + + # Assert preprocessor has been initialized + assert hasattr(opa, "preprocessor") + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_fit(dim, mock_data_array, opa_model): + """Tests the fit method of the OPA class""" + + opa_model.fit(mock_data_array, dim) + + # Assert the required attributes have been set + assert hasattr(opa_model, "preprocessor") + assert hasattr(opa_model, "data") + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_decorrelation_time(dim, mock_data_array, opa_model): + """Tests the decorrelation_time method of the OPA class""" + + opa_model.fit(mock_data_array, dim) + + # Test decorrelation time method + decorrelation_time = opa_model.decorrelation_time() + assert isinstance(decorrelation_time, xr.DataArray) + + # decorrelation times are all positive + assert (decorrelation_time > 0).all() + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_components(dim, mock_data_array, opa_model): + """Tests the components method of the OPA class""" + opa_model.fit(mock_data_array, dim) + + # Test components method + components = opa_model.components() + feature_dims = tuple(set(mock_data_array.dims) - set(dim)) + assert isinstance(components, xr.DataArray), "Components is not a DataArray" + assert set(components.dims) == set( + ("mode",) + feature_dims + ), "Components does not have the right feature dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_filter_patterns(dim, mock_data_array, opa_model): + """Tests the filter_patterns method of the OPA class""" + opa_model.fit(mock_data_array, dim) + + # Test components method + components = opa_model.filter_patterns() + feature_dims = tuple(set(mock_data_array.dims) - set(dim)) + assert isinstance(components, xr.DataArray), "Components is not a DataArray" + assert set(components.dims) == set( + ("mode",) + feature_dims + ), "Components does not have the right feature dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_components_dataset(dim, mock_dataset, opa_model): + """Tests the components method of the OPA class""" + opa_model.fit(mock_dataset, dim) + + # Test components method + components = opa_model.components() + feature_dims = tuple(set(mock_dataset.dims) - set(dim)) + assert isinstance(components, xr.Dataset), "Components is not a Dataset" + assert set(components.data_vars) == set( + mock_dataset.data_vars + ), "Components does not have the same data variables as the input Dataset" + assert set(components.dims) == set( + ("mode",) + feature_dims + ), "Components does not have the right feature dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_components_dataarray_list(dim, mock_data_array_list, opa_model): + """Tests the components method of the OPA class""" + opa_model.fit(mock_data_array_list, dim) + + # Test components method + components = opa_model.components() + feature_dims = [tuple(set(data.dims) - set(dim)) for data in mock_data_array_list] + assert isinstance(components, list), "Components is not a list" + assert len(components) == len( + mock_data_array_list + ), "Components does not have the same length as the input list" + assert isinstance( + components[0], xr.DataArray + ), "Components is not a list of DataArrays" + for comp, feat_dims in zip(components, feature_dims): + assert set(comp.dims) == set( + ("mode",) + feat_dims + ), "Components does not have the right feature dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_scores(dim, mock_data_array, opa_model): + """Tests the scores method of the OPA class""" + opa_model.fit(mock_data_array, dim) + + # Test scores method + scores = opa_model.scores() + assert isinstance(scores, xr.DataArray), "Scores is not a DataArray" + assert set(scores.dims) == set( + (dim + ("mode",)) + ), "Scores does not have the right dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_scores_dataset(dim, mock_dataset, opa_model): + """Tests the scores method of the OPA class""" + opa_model.fit(mock_dataset, dim) + + # Test scores method + scores = opa_model.scores() + assert isinstance(scores, xr.DataArray) + assert set(scores.dims) == set( + (dim + ("mode",)) + ), "Scores does not have the right dimensions" + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_scores_dataarray_list(dim, mock_data_array_list, opa_model): + """Tests the scores method of the OPA class""" + opa_model.fit(mock_data_array_list, dim) + + # Test scores method + scores = opa_model.scores() + assert isinstance(scores, xr.DataArray) + assert set(scores.dims) == set( + (dim + ("mode",)) + ), "Scores does not have the right dimensions" + + +def test_get_params(opa_model): + """Tests the get_params method of the OPA class""" + # Test get_params method + params = opa_model.get_params() + assert isinstance(params, dict) + assert params == { + "n_modes": 3, + "tau_max": 3, + "n_pca_modes": 19, + "standardize": False, + "use_coslat": False, + "use_weights": False, + "solver": "auto", + } + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_transform(dim, mock_data_array, opa_model): + """Transform is not implemented for OPA""" + + # Create a xarray DataArray with random data + opa_model.fit(mock_data_array, dim) + + with pytest.raises(NotImplementedError): + opa_model.transform(mock_data_array) + # scores = opa_model.scores() + + # # Create a new xarray DataArray with random data + # new_data = mock_data_array + + # projections = opa_model.transform(new_data) + + # # Check that the projection has the right dimensions + # assert projections.dims == scores.dims, "Projection has wrong dimensions" # type: ignore + + # # Check that the projection has the right data type + # assert isinstance(projections, xr.DataArray), "Projection is not a DataArray" + + # # Check that the projection has the right name + # assert projections.name == "scores", "Projection has wrong name" + + # # Check that the projection's data is the same as the scores + # np.testing.assert_allclose( + # scores.sel(mode=slice(1, 3)), projections.sel(mode=slice(1, 3)), rtol=1e-3 + # ) + + +@pytest.mark.parametrize( + "dim", + [ + (("time",)), + (("lat", "lon")), + (("lon", "lat")), + ], +) +def test_inverse_transform(dim, mock_data_array, opa_model): + """Test inverse_transform method in EOF class.""" + + # fit the EOF model + opa_model.fit(mock_data_array, dim=dim) + + with pytest.raises(NotImplementedError): + opa_model.inverse_transform(1) + + # # Test with scalar + # mode = 1 + # reconstructed_data = opa_model.inverse_transform(mode) + # assert isinstance(reconstructed_data, xr.DataArray) + + # # Test with slice + # mode = slice(1, 2) + # reconstructed_data = opa_model.inverse_transform(mode) + # assert isinstance(reconstructed_data, xr.DataArray) + + # # Test with array of tick labels + # mode = np.array([1, 3]) + # reconstructed_data = opa_model.inverse_transform(mode) + # assert isinstance(reconstructed_data, xr.DataArray) + + # # Check that the reconstructed data has the same dimensions as the original data + # assert set(reconstructed_data.dims) == set(mock_data_array.dims) + + +# Check mathematical properties +# ============================================================================= +@pytest.mark.parametrize( + "dim, use_coslat", + [ + (("time",), True), + (("lat", "lon"), False), + (("lon", "lat"), False), + ], +) +def test_U_orthogonal(dim, use_coslat, mock_data_array): + """U patterns are orthogonal""" + model = OPA( + n_modes=3, tau_max=3, n_pca_modes=19, standardize=True, use_coslat=use_coslat + ) + model.fit(mock_data_array, dim=dim) + U = model._U.values + assert np.allclose( + U.T @ U, np.eye(U.shape[1]), atol=1e-5 + ), "U patterns are not orthogonal" + + +# IGNORE THIS TEST: Current implementation yields C0 in PCA space and FP in original space +# therefore the test fails +# @pytest.mark.parametrize( +# "dim, use_coslat", +# [ +# (("time",), True), +# (("lat", "lon"), False), +# (("lon", "lat"), False), +# ], +# ) +# def test_filter_patterns_biorthogonal(dim, use_coslat, mock_data_array): +# """Filter patterns are biorthogonal""" +# n_pca_modes = 20 +# model = OPA( +# n_modes=3, +# tau_max=3, +# n_pca_modes=n_pca_modes, +# standardize=True, +# use_coslat=use_coslat, +# ) +# model.fit(mock_data_array, dim=dim) +# FP = model.data.filter_patterns.values +# C0 = model._C0.values # zero-lag covariance matrix +# assert C0.shape == (n_pca_modes, n_pca_modes) +# check = FP.T @ C0 @ FP +# assert np.allclose( +# check, np.eye(check.shape[1]), atol=1e-5 +# ), "Filter patterns are are not biorthogonal" + + +@pytest.mark.parametrize( + "dim, use_coslat", + [ + (("time",), True), + (("lat", "lon"), False), + (("lon", "lat"), False), + ], +) +def test_scores_uncorrelated(dim, use_coslat, mock_data_array): + """Scores are uncorrelated""" + n_pca_modes = 20 + model = OPA( + n_modes=3, + tau_max=3, + n_pca_modes=n_pca_modes, + standardize=True, + use_coslat=use_coslat, + ) + model.fit(mock_data_array, dim=dim) + scores = model.data.scores.values + check = scores.T @ scores / (scores.shape[0] - 1) + assert np.allclose( + check, np.eye(check.shape[1]), atol=1e-5 + ), "Scores are not uncorrelated" diff --git a/xeofs/data_container/opa_data_container.py b/xeofs/data_container/opa_data_container.py new file mode 100644 index 0000000..7f6930e --- /dev/null +++ b/xeofs/data_container/opa_data_container.py @@ -0,0 +1,78 @@ +from typing import Optional + +import numpy as np +import xarray as xr +from dask.diagnostics.progress import ProgressBar + +from xeofs.utils.data_types import DataArray + +from ._base_model_data_container import _BaseModelDataContainer +from ..utils.data_types import DataArray + + +class OPADataContainer(_BaseModelDataContainer): + """Container to store the results of an Optimal Persistence Analysis (OPA).""" + + def __init__(self): + super().__init__() + self._filter_patterns: Optional[DataArray] = None + self._decorrelation_time: Optional[DataArray] = None + + def set_data( + self, + input_data: DataArray, + components: DataArray, + scores: DataArray, + filter_patterns: DataArray, + decorrelation_time: DataArray, + ): + super().set_data(input_data=input_data, components=components, scores=scores) + + self._verify_dims(decorrelation_time, ("mode",)) + self._decorrelation_time = decorrelation_time + self._decorrelation_time.name = "decorrelation_time" + + self._verify_dims(filter_patterns, ("feature", "mode")) + self._filter_patterns = filter_patterns + self._filter_patterns.name = "filter_patterns" + + @property + def components(self) -> DataArray: + comps = super().components + comps.name = "optimal_persistence_pattern" + return comps + + @property + def decorrelation_time(self) -> DataArray: + """Get the decorrelation time.""" + decorr = super()._sanity_check(self._decorrelation_time) + decorr.name = "decorrelation_time" + return decorr + + @property + def filter_patterns(self) -> DataArray: + """Get the filter patterns.""" + filter_patterns = super()._sanity_check(self._filter_patterns) + filter_patterns.name = "filter_patterns" + return filter_patterns + + def compute(self, verbose=False): + super().compute(verbose) + + if verbose: + with ProgressBar(): + self._filter_patterns = self.filter_patterns.compute() + self._decorrelation_time = self.decorrelation_time.compute() + else: + self._filter_patterns = self.filter_patterns.compute() + self._decorrelation_time = self.decorrelation_time.compute() + + def set_attrs(self, attrs: dict): + """Set the attributes of the results.""" + super().set_attrs(attrs) + + filter_patterns = self._sanity_check(self._filter_patterns) + decorrelation_time = self._sanity_check(self._decorrelation_time) + + filter_patterns.attrs.update(attrs) + decorrelation_time.attrs.update(attrs) diff --git a/xeofs/models/__init__.py b/xeofs/models/__init__.py index 81aea09..d1d954e 100644 --- a/xeofs/models/__init__.py +++ b/xeofs/models/__init__.py @@ -1,5 +1,6 @@ from .eof import EOF, ComplexEOF from .mca import MCA, ComplexMCA +from .opa import OPA from .rotator_factory import RotatorFactory from .eof_rotator import EOFRotator, ComplexEOFRotator from .mca_rotator import MCARotator, ComplexMCARotator diff --git a/xeofs/models/opa.py b/xeofs/models/opa.py new file mode 100644 index 0000000..0222ed7 --- /dev/null +++ b/xeofs/models/opa.py @@ -0,0 +1,221 @@ +from typing import Optional + +import xarray as xr +import numpy as np + +from ._base_model import _BaseModel +from .eof import EOF +from .decomposer import Decomposer +from ..data_container.opa_data_container import OPADataContainer +from ..utils.data_types import AnyDataObject, DataArray + + +class OPA(_BaseModel): + """Optimal Persistence Analysis (OPA). + + OPA identifies the optimal persistence patterns (OPP) with the + largest decorrelation time in a time-varying field. Introduced by DelSole + in 2001 [1]_, and further developed in 2006 [2]_, it's a method used to + find patterns whose time series show strong persistence over time. + + Parameters + ---------- + n_modes : int + Number of optimal persistence patterns (OPP) to be computed. + tau_max : int + Maximum time lag for the computation of the covariance matrix. + n_pca_modes : int + Number of modes to be computed in the pre-processing step using EOF. + + References + ---------- + .. [1] DelSole, T., 2001. Optimally Persistent Patterns in Time-Varying Fields. Journal of the Atmospheric Sciences 58, 1341–1356. https://doi.org/10.1175/1520-0469(2001)058<1341:OPPITV>2.0.CO;2 + .. [2] DelSole, T., 2006. Low-Frequency Variations of Surface Temperature in Observations and Simulations. Journal of Climate 19, 4487–4507. https://doi.org/10.1175/JCLI3879.1 + + Examples + -------- + >>> from xeofs.models import OPA + >>> model = OPA(n_modes=10, tau_max=50, n_pca_modes=100) + >>> model.fit(data, dim=("time")) + + Retrieve the optimal perstitence patterns (OPP) and their time series: + + >>> opp = model.components() + >>> opp_ts = model.scores() + + Retrieve the decorrelation time of the optimal persistence patterns (OPP): + + >>> decorrelation_time = model.decorrelation_time() + """ + + def __init__(self, n_modes, tau_max, n_pca_modes, **kwargs): + if n_modes > n_pca_modes: + raise ValueError( + f"n_modes must be smaller or equal to n_pca_modes (n_modes={n_modes}, n_pca_modes={n_pca_modes})" + ) + super().__init__(n_modes=n_modes, **kwargs) + self.attrs.update({"model": "OPA"}) + self._params.update({"tau_max": tau_max, "n_pca_modes": n_pca_modes}) + + # Initialize the DataContainer to store the results + self.data: OPADataContainer = OPADataContainer() + + def _Ctau(self, X, tau: int) -> DataArray: + """Compute the time-lage covariance matrix C(tau) of the data X.""" + X0 = X.copy(deep=True) + Xtau = X.shift(sample=-tau).dropna("sample") + + X0 = X0.rename({"mode": "feature1"}) + Xtau = Xtau.rename({"mode": "feature2"}) + return xr.dot(X0, Xtau, dims=["sample"]) / (Xtau.sample.size - 1) + + @staticmethod + def _compute_matrix_inverse(X, dims): + """Compute the inverse of a symmetric matrix X.""" + return xr.apply_ufunc( + np.linalg.inv, + X, + input_core_dims=[dims], + output_core_dims=[dims[::-1]], + vectorize=False, + dask="allowed", + ) + + def fit(self, data: AnyDataObject, dim, weights: Optional[AnyDataObject] = None): + # Preprocess the data + input_data: DataArray = self.preprocessor.fit_transform(data, dim, weights) + + # Perform PCA as a pre-processing step + pca = EOF(n_modes=self._params["n_pca_modes"], use_coslat=False) + pca.fit(input_data, dim="sample") + svals = pca.data.singular_values + expvar = pca.data.explained_variance + comps = pca.data.components * svals / np.sqrt(expvar) + # -> comps (feature x mode) + scores = pca.data.scores * np.sqrt(expvar) + # -> scores (sample x mode) + + # Compute the covariance matrix with zero time lag + C0 = self._Ctau(scores, 0) + # -> C0 (feature1 x feature2) + C0inv = self._compute_matrix_inverse(C0, dims=("feature1", "feature2")) + # -> C0inv (feature2 x feature1) + M = 0.5 * C0 + # -> M (feature1 x feature2) + tau_max = self._params["tau_max"] + for tau in range(1, tau_max + 1): + Ctau = self._Ctau(scores, tau) + if tau == tau_max: + Ctau = 0.5 * Ctau + M = M + (Ctau) + + MT = xr.DataArray(M.data.T, dims=M.dims, coords=M.coords) + # -> MT (feature1 x feature2) + M_summed = M + MT + # -> M_summed (feature1 x feature2) + + # Instead of solving the generalized eigenvalue problem + # as proposed in DelSole (2001), we solve the + # eigenvalue problem of the alternativ formulation + # using a symmtric matrix given in + # A. Hannachi (2021), Patterns Identification and + # Data Mining in Weather and Climate, Equation (8.20) + decomposer = Decomposer(n_modes=C0.shape[0], flip_signs=False, solver="full") + decomposer.fit(C0, dims=("feature1", "feature2")) + C0_sqrt = decomposer.U_ * np.sqrt(decomposer.s_) + # -> C0_sqrt (feature1 x mode) + C0_sqrt_inv = self._compute_matrix_inverse(C0_sqrt, dims=("feature1", "mode")) + # -> C0_sqrt_inv (mode x feature1) + target = 0.5 * xr.dot(C0_sqrt_inv, M_summed, dims="feature1") + # -> target (mode x feature2) + target = xr.dot( + target, C0_sqrt_inv.rename({"mode": "feature2"}), dims="feature2" + ) + # -> target (mode x feature1) + target = target.rename({"feature1": "dummy"}) + target = target.rename({"mode": "feature1"}) + # -> target (feature1 x dummy) + + # Solve the symmetric eigenvalue problem + eigensolver = Decomposer( + n_modes=self._params["n_modes"], flip_signs=False, solver="full" + ) + eigensolver.fit(target, dims=("feature1", "dummy")) + U = eigensolver.U_ + # -> U (feature1 x mode) + lbda = eigensolver.s_ + # -> lbda (mode) + # U, lbda, ct = xr.apply_ufunc( + # np.linalg.svd, + # target, + # input_core_dims=[("feature1", "dummy")], + # output_core_dims=[("feature1", "mode"), ("mode",), ("mode", "dummy")], + # vectorize=False, + # dask="allowed", + # ) + # Compute the filter patterns + V = C0_sqrt_inv.rename({"mode": "mode1"}).dot( + U.rename({"mode": "mode2"}), dims="feature1" + ) + # -> V (mode1 x mode2) + + # Compute the optimally persistent patterns (OPPs) + W = xr.dot( + C0.rename({"feature2": "temp"}), V.rename({"mode1": "temp"}), dims="temp" + ) + # -> W (feature1 x mode2) + + # Compute the time series of the optimally persistent patterns (OPPs) + P = xr.dot(scores.rename({"mode": "mode1"}), V, dims="mode1") + # -> P (sample x mode2) + + # Transform filter patterns and OPPs into original space + V = xr.dot(comps.rename({"mode": "mode1"}), V, dims="mode1") + # -> V (feature x mode2) + + W = xr.dot(comps.rename({"mode": "feature1"}), W, dims="feature1") + # -> W (feature x mode2) + + # Rename dimensions + U = U.rename({"feature1": "feature"}) # -> (feature x mode) + V = V.rename({"mode2": "mode"}) # -> (feature x mode) + W = W.rename({"mode2": "mode"}) # -> (feature x mode) + P = P.rename({"mode2": "mode"}) # -> (sample x mode) + + # Store the results + self.data.set_data( + input_data=scores.rename({"mode": "feature"}), + components=W, + scores=P, + filter_patterns=V, + decorrelation_time=lbda, + ) + self.data.set_attrs(self.attrs) + self._U = U # store U for testing purposes of orthogonality + self._C0 = C0 # store C0 for testing purposes of orthogonality + + def transform(self, data: AnyDataObject): + raise NotImplementedError() + + def inverse_transform(self, mode): + raise NotImplementedError() + + def components(self) -> AnyDataObject: + """Return the optimal persistence pattern (OPP).""" + return super().components() + + def scores(self) -> DataArray: + """Return the time series of the optimal persistence pattern (OPP). + + The time series have a maximum decorrelation time that are uncorrelated with each other. + """ + return super().scores() + + def decorrelation_time(self) -> DataArray: + """Return the decorrelation time of the optimal persistence pattern (OPP).""" + return self.data.decorrelation_time + + def filter_patterns(self) -> DataArray: + """Return the filter patterns.""" + fps = self.data.filter_patterns + return self.preprocessor.inverse_transform_components(fps)