From d178bf7fd44f67f2440aa800696d6ee60c055933 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 30 Apr 2024 15:40:38 -0400 Subject: [PATCH 1/5] Factor out CMIP recipe stages Moving more of the CMIP6 feedstock specific transforms into this repo --- leap_data_management_utils/cmip_transforms.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index 3bb7ade..60c8078 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -192,3 +192,65 @@ def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: def expand(self, pcoll: beam.PCollection) -> beam.PCollection: return pcoll | beam.Map(self._log_to_bigquery) + + +@dataclass +class Preprocessor(beam.PTransform): + """ + Preprocessor for xarray datasets. + Set all data_variables except for `variable_id` attrs to coord + Add additional information + + """ + + @staticmethod + def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]: + """ + Many netcdfs contain variables other than the one specified in the `variable_id` facet. + Set them all to coords + """ + index, ds = item + print(f"Preprocessing before {ds =}") + new_coords_vars = [ + var for var in ds.data_vars if var != ds.attrs["variable_id"] + ] + ds = ds.set_coords(new_coords_vars) + print(f"Preprocessing after {ds =}") + return index, ds + + @staticmethod + def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]: + """Removes non-ascii characters from attributes see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586""" + index, ds = item + for att, att_value in ds.attrs.items(): + if isinstance(att_value, str): + new_value = att_value.encode("utf-8", "ignore").decode() + if new_value != att_value: + print( + f"Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}" + ) + ds.attrs[att] = new_value + return index, ds + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return ( + pcoll + | "Fix coordinates" >> beam.Map(self._keep_only_variable_id) + | "Sanitize Attrs" >> beam.Map(self._sanitize_attrs) + ) + + +@dataclass +class TestDataset(beam.PTransform): + """ + Test stage for data written to zarr store + """ + + iid: str + + def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: + test_all(store, self.iid) + return store + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | "Testing - Running all tests" >> beam.Map(self._test) From 1fcb4f923f230e0fcd7ba738b2a42aa3f16bfd61 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 30 Apr 2024 19:40:47 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- leap_data_management_utils/cmip_transforms.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index 60c8078..a9766b5 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -210,12 +210,10 @@ def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]: Set them all to coords """ index, ds = item - print(f"Preprocessing before {ds =}") - new_coords_vars = [ - var for var in ds.data_vars if var != ds.attrs["variable_id"] - ] + print(f'Preprocessing before {ds =}') + new_coords_vars = [var for var in ds.data_vars if var != ds.attrs['variable_id']] ds = ds.set_coords(new_coords_vars) - print(f"Preprocessing after {ds =}") + print(f'Preprocessing after {ds =}') return index, ds @staticmethod @@ -224,10 +222,10 @@ def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]: index, ds = item for att, att_value in ds.attrs.items(): if isinstance(att_value, str): - new_value = att_value.encode("utf-8", "ignore").decode() + new_value = att_value.encode('utf-8', 'ignore').decode() if new_value != att_value: print( - f"Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}" + f'Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}' ) ds.attrs[att] = new_value return index, ds @@ -235,8 +233,8 @@ def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]: def expand(self, pcoll: beam.PCollection) -> beam.PCollection: return ( pcoll - | "Fix coordinates" >> beam.Map(self._keep_only_variable_id) - | "Sanitize Attrs" >> beam.Map(self._sanitize_attrs) + | 'Fix coordinates' >> beam.Map(self._keep_only_variable_id) + | 'Sanitize Attrs' >> beam.Map(self._sanitize_attrs) ) @@ -253,4 +251,4 @@ def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: return store def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - return pcoll | "Testing - Running all tests" >> beam.Map(self._test) + return pcoll | 'Testing - Running all tests' >> beam.Map(self._test) From adcd48d074f7f848401b26ad075582287e529217 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 30 Apr 2024 15:42:50 -0400 Subject: [PATCH 3/5] Update cmip_transforms.py --- leap_data_management_utils/cmip_transforms.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index a9766b5..4f11850 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -10,7 +10,9 @@ from google.cloud import bigquery from leap_data_management_utils.data_management_transforms import BQInterface +from leap_data_management_utils.cmip_testing import test_all +from pangeo_forge_recipes.transforms import Indexed,T @dataclass class IIDEntry: From e137d21d2f239477d2964d1e7e171517a8d36ba6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 30 Apr 2024 19:42:56 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- leap_data_management_utils/cmip_transforms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index 4f11850..a0e8f87 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -8,11 +8,11 @@ import apache_beam as beam import zarr from google.cloud import bigquery +from pangeo_forge_recipes.transforms import Indexed, T -from leap_data_management_utils.data_management_transforms import BQInterface from leap_data_management_utils.cmip_testing import test_all +from leap_data_management_utils.data_management_transforms import BQInterface -from pangeo_forge_recipes.transforms import Indexed,T @dataclass class IIDEntry: From 5e9fa6c554febd343e5f8f10a27282f4206ec65f Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 30 Apr 2024 15:51:40 -0400 Subject: [PATCH 5/5] Add CI and add dependencies --- .github/workflows/ci.yaml | 47 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 8 ++++++- 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..de9446e --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,47 @@ +name: CI + +on: + push: + branches: + - "main" + pull_request: + branches: + - "*" + schedule: + - cron: "0 13 * * 1" + +jobs: + build: + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.9", "3.10", "3.11"] + runs-on: ${{ matrix.os }} + steps: + - name: 🫙 Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 # checkout tags (which is not done by default) + - name: 🔁 Setup Python + id: setup-python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: 🎯 Check cache hit + run: echo '${{ steps.setup-python.outputs.cache-hit }}' + - name: 🌈 Install leap-data-management-utils package + shell: bash -l {0} + run: | + python -m pip install -e ".[test]" + - name: 🔎 Check current version + run: python -c "import leap_data_management_utils; print(leap_data_management_utils.__version__)" + - name: 🏄‍♂️ Run Tests + shell: bash -l {0} + run: | + py.test leap_data_management_utils/tests -v diff --git a/pyproject.toml b/pyproject.toml index b85a27d..f30a7ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "google-cloud-bigquery", "pandas", "pangeo-forge-esgf", + "pangeo-forge-recipes", "pydantic-core", "pydantic>=2", "pyyaml", @@ -40,7 +41,12 @@ dependencies = [ [project.optional-dependencies] test = [ - "pre-commit", + "pytest" +] + +dev = [ + "leap-data-leap_data_management_utils[test]", + "pre-commit" ]