Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out CMIP recipe stages #27

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: CI

on:
push:
branches:
- "main"
pull_request:
branches:
- "*"
schedule:
- cron: "0 13 * * 1"

jobs:
build:
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: ["3.9", "3.10", "3.11"]
runs-on: ${{ matrix.os }}
steps:
- name: 🫙 Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # checkout tags (which is not done by default)
- name: 🔁 Setup Python
id: setup-python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: pyproject.toml
- name: 🎯 Check cache hit
run: echo '${{ steps.setup-python.outputs.cache-hit }}'
- name: 🌈 Install leap-data-management-utils package
shell: bash -l {0}
run: |
python -m pip install -e ".[test]"
- name: 🔎 Check current version
run: python -c "import leap_data_management_utils; print(leap_data_management_utils.__version__)"
- name: 🏄‍♂️ Run Tests
shell: bash -l {0}
run: |
py.test leap_data_management_utils/tests -v
62 changes: 62 additions & 0 deletions leap_data_management_utils/cmip_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import apache_beam as beam
import zarr
from google.cloud import bigquery
from pangeo_forge_recipes.transforms import Indexed, T

from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils.data_management_transforms import BQInterface


Expand Down Expand Up @@ -192,3 +194,63 @@ def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._log_to_bigquery)


@dataclass
class Preprocessor(beam.PTransform):
"""
Preprocessor for xarray datasets.
Set all data_variables except for `variable_id` attrs to coord
Add additional information

"""

@staticmethod
def _keep_only_variable_id(item: Indexed[T]) -> Indexed[T]:
"""
Many netcdfs contain variables other than the one specified in the `variable_id` facet.
Set them all to coords
"""
index, ds = item
print(f'Preprocessing before {ds =}')
new_coords_vars = [var for var in ds.data_vars if var != ds.attrs['variable_id']]
ds = ds.set_coords(new_coords_vars)
print(f'Preprocessing after {ds =}')
return index, ds

@staticmethod
def _sanitize_attrs(item: Indexed[T]) -> Indexed[T]:
"""Removes non-ascii characters from attributes see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/586"""
index, ds = item
for att, att_value in ds.attrs.items():
if isinstance(att_value, str):
new_value = att_value.encode('utf-8', 'ignore').decode()
if new_value != att_value:
print(
f'Sanitized datasets attributes field {att}: \n {att_value} \n ----> \n {new_value}'
)
ds.attrs[att] = new_value
return index, ds

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (
pcoll
| 'Fix coordinates' >> beam.Map(self._keep_only_variable_id)
| 'Sanitize Attrs' >> beam.Map(self._sanitize_attrs)
)


@dataclass
class TestDataset(beam.PTransform):
"""
Test stage for data written to zarr store
"""

iid: str

def _test(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
test_all(store, self.iid)
return store

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | 'Testing - Running all tests' >> beam.Map(self._test)
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"google-cloud-bigquery",
"pandas",
"pangeo-forge-esgf",
"pangeo-forge-recipes",
"pydantic-core",
"pydantic>=2",
"pyyaml",
Expand All @@ -40,7 +41,12 @@ dependencies = [

[project.optional-dependencies]
test = [
"pre-commit",
"pytest"
]

dev = [
"leap-data-leap_data_management_utils[test]",
"pre-commit"
]


Expand Down