diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..de9446e --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,47 @@ +name: CI + +on: + push: + branches: + - "main" + pull_request: + branches: + - "*" + schedule: + - cron: "0 13 * * 1" + +jobs: + build: + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.9", "3.10", "3.11"] + runs-on: ${{ matrix.os }} + steps: + - name: 🫙 Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 # checkout tags (which is not done by default) + - name: 🔁 Setup Python + id: setup-python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: pyproject.toml + - name: 🎯 Check cache hit + run: echo '${{ steps.setup-python.outputs.cache-hit }}' + - name: 🌈 Install leap-data-management-utils package + shell: bash -l {0} + run: | + python -m pip install -e ".[test]" + - name: 🔎 Check current version + run: python -c "import leap_data_management_utils; print(leap_data_management_utils.__version__)" + - name: 🏄‍♂️ Run Tests + shell: bash -l {0} + run: | + py.test leap_data_management_utils/tests -v diff --git a/leap_data_management_utils/cmip_transforms.py b/leap_data_management_utils/cmip_transforms.py index 3bb7ade..a0e8f87 100644 --- a/leap_data_management_utils/cmip_transforms.py +++ b/leap_data_management_utils/cmip_transforms.py @@ -8,7 +8,9 @@ import apache_beam as beam import zarr from google.cloud import bigquery +from pangeo_forge_recipes.transforms import Indexed, T +from leap_data_management_utils.cmip_testing import test_all from leap_data_management_utils.data_management_transforms import BQInterface @@ -192,3 +194,63 @@ def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: def expand(self, pcoll: beam.PCollection) -> beam.PCollection: return pcoll | beam.Map(self._log_to_bigquery) + + +@dataclass +class Preprocessor(beam.PTransform): + """ + Preprocessor for xarray datasets. + Set all data_variables except for `variable_id` attrs to coord + Add additional information + + """ + + @staticmethod + def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]: + """ + Many netcdfs contain variables other than the one specified in the `variable_id` facet. + Set them all to coords + """ + index, ds = item + print(f'Preprocessing before {ds =}') + new_coords_vars = [var for var in ds.data_vars if var != ds.attrs['variable_id']] + ds = ds.set_coords(new_coords_vars) + print(f'Preprocessing after {ds =}') + return index, ds + + @staticmethod + def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]: + """Removes non-ascii characters from attributes see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586""" + index, ds = item + for att, att_value in ds.attrs.items(): + if isinstance(att_value, str): + new_value = att_value.encode('utf-8', 'ignore').decode() + if new_value != att_value: + print( + f'Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}' + ) + ds.attrs[att] = new_value + return index, ds + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return ( + pcoll + | 'Fix coordinates' >> beam.Map(self._keep_only_variable_id) + | 'Sanitize Attrs' >> beam.Map(self._sanitize_attrs) + ) + + +@dataclass +class TestDataset(beam.PTransform): + """ + Test stage for data written to zarr store + """ + + iid: str + + def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: + test_all(store, self.iid) + return store + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | 'Testing - Running all tests' >> beam.Map(self._test) diff --git a/pyproject.toml b/pyproject.toml index b85a27d..f30a7ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "google-cloud-bigquery", "pandas", "pangeo-forge-esgf", + "pangeo-forge-recipes", "pydantic-core", "pydantic>=2", "pyyaml", @@ -40,7 +41,12 @@ dependencies = [ [project.optional-dependencies] test = [ - "pre-commit", + "pytest" +] + +dev = [ + "leap-data-leap_data_management_utils[test]", + "pre-commit" ]