Skip to content

Commit

Permalink
feat(era5): add weekly and monthly aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
yannforget committed Nov 5, 2024
1 parent c79aac3 commit f1292a1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
108 changes: 108 additions & 0 deletions openhexa/toolbox/era5/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import rasterio
import rasterio.transform
import xarray as xr
from epiweeks import Week


def clip_dataset(ds: xr.Dataset, xmin: float, ymin: float, xmax: float, ymax: float) -> xr.Dataset:
Expand Down Expand Up @@ -148,6 +149,23 @@ def _has_missing_data(da: xr.DataArray) -> bool:
return missing


def _week(date: datetime) -> str:
year = date.isocalendar()[0]
week = date.isocalendar()[1]
return f"{year}W{week}"


def _epi_week(date: datetime) -> str:
epiweek = Week.fromdate(date)
year = epiweek.year
week = epiweek.week
return f"{year}W{week}"


def _month(date: datetime) -> str:
return date.strftime("%Y%m")


def aggregate(ds: xr.Dataset, var: str, masks: np.ndarray, boundaries_id: list[str]) -> pl.DataFrame:
"""Aggregate hourly measurements in space and time.
Expand Down Expand Up @@ -210,4 +228,94 @@ def aggregate(ds: xr.Dataset, var: str, masks: np.ndarray, boundaries_id: list[s

df = pl.DataFrame(data=rows, schema=SCHEMA)

# add week, month, and epi_week period columns
df = df.with_columns(
pl.col("date").map_elements(_week, str).alias("week"),
pl.col("date").map_elements(_month, str).alias("month"),
pl.col("date").map_elements(_epi_week, str).alias("epi_week"),
)

return df


def aggregate_per_week(daily: pl.DataFrame, column_uid: str, use_epidemiological_weeks: bool = False) -> pl.DataFrame:
"""Aggregate daily data per week.
Parameters
----------
daily : pl.DataFrame
Daily data with a "week" or "epi_week", "mean", "min", and "max" columns
Length of the dataframe should be (n_boundaries * n_days).
column_uid : str
Column containing the boundary ID.
Returns
-------
pl.DataFrame
Weekly aggregated data of length (n_boundaries * n_weeks).
"""
if use_epidemiological_weeks:
week_column = "epi_week"
else:
week_column = "week"

df = daily.select([column_uid, pl.col(week_column).alias("week"), "mean", "min", "max"])

# by default, we aggregate min & max values by respectively using the min() and max()
# functions, however it might makes sense to use mean() for some use cases
df = df.group_by([column_uid, "week"]).agg(
[
pl.col("mean").mean().alias("mean"),
pl.col("min").min().alias("min"),
pl.col("max").max().alias("max"),
]
)

# sort per date since dhis2 period format is "2012W9", we need to extract year and week number
# from the period string and cast them to int before sorting, else "2012W9" will be superior to
# "2012W32"
df = df.sort(
by=[
pl.col("week").str.split("W").list.get(0).cast(int),
pl.col("week").str.split("W").list.get(1).cast(int),
pl.col(column_uid),
]
)

return df


def aggregate_per_month(daily: pl.DataFrame, column_uid: str) -> pl.DataFrame:
"""Aggregate daily data per month.
Parameters
----------
daily : pl.DataFrame
Daily data with a "month", "mean", "min", and "max" columns
Length of the dataframe should be (n_boundaries * n_days).
column_uid : str
Column containing the boundary ID.
Returns
-------
pl.DataFrame
Monthly aggregated data of length (n_boundaries * n_months).
"""
df = daily.select([column_uid, "month", "mean", "min", "max"])

df = df.group_by([column_uid, "month"]).agg(
[
pl.col("mean").mean().alias("mean"),
pl.col("min").min().alias("min"),
pl.col("max").max().alias("max"),
]
)

df = df.sort(
by=[
pl.col("month").cast(int),
pl.col(column_uid),
]
)

return df
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ dependencies = [
"pyjwt",
"cdsapi >=0.7.3",
"cads-api-client >=1.4.0",
"rasterio",
"cfgrib",
"xarray",
"epiweeks",
]

[project.optional-dependencies]
Expand Down

0 comments on commit f1292a1

Please sign in to comment.