Skip to content

Commit

Permalink
adds deprecated streamflow module for backwards compatibility. correc…
Browse files Browse the repository at this point in the history
…ts documentation generation (#32)
  • Loading branch information
rileyhales authored Apr 12, 2024
1 parent 24e2c10 commit e80ca2d
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 81 deletions.
17 changes: 1 addition & 16 deletions docs/api-documentation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,4 @@ There are 3 modules in the geoglows package.
api-documentation/bias
api-documentation/plots
api-documentation/analyze


FAQ
~~~

How do I save streamflow data to csv?
-------------------------------------
By default, the results of most of the `geoglows.data` functions return a pandas DataFrame. You can save those to
a csv, json, pickle, or other file. For example, save to csv with the dataframe's ``.to_csv()`` method.

.. code-block:: python
# get some data from the geoglows streamflow model
data = geoglows.streamflow.forecast_stats(12341234)
# save it to a csv
data.to_csv('/path/to/save/the/csv/file.csv')
api-documentation/streamflow
5 changes: 5 additions & 0 deletions docs/api-documentation/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ To find a LINKNO (river ID number), please refer to https://data.geoglows.org an
Forecasted Streamflow
---------------------

.. automodule:: geoglows.data
:members:
forecast, forecast_stats, forecast_ensembles, forecast_records
:noindex:

Historical Simulation
---------------------

Expand Down
20 changes: 3 additions & 17 deletions docs/api-documentation/plots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,7 @@ Plots for Streamflow Data

.. automodule:: geoglows.plots
:members:
hydroviewer, forecast_stats, forecast_records, forecast_ensembles, historic_simulation, flow_duration_curve
:noindex:

Tables for Streamflow Data
~~~~~~~~~~~~~~~~~~~~~~~~~~

.. automodule:: geoglows.plots
:members:
probabilities_table, return_periods_table
:noindex:

Plots for Bias Corrected Data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. automodule:: geoglows.plots
:members:
corrected_historical, corrected_scatterplots, corrected_day_average, corrected_month_average, corrected_volume_compare
forecast, forecast_stats, forecast_ensembles,
retrospective, annual_averages, monthly_averages, daily_averages,
daily_variance, flow_duration_curve
:noindex:
32 changes: 32 additions & 0 deletions docs/api-documentation/streamflow.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
===================
geoglows.streamflow
===================

THIS MODULE IS DEPRECATED. Please update your code to use the new GEOGLOWS model and data services. Analogous functions
to everything in this module is found in the `geoglows.data` or `geoglows.streams` modules.

The streamflow module provides a series of functions for requesting forecasted and historical data from the GEOGloWS
ECMWF Streamflow Service for Model and Data Services Version 1.

Forecasted Streamflow
---------------------

.. automodule:: geoglows.streamflow
:members:
forecast_stats, forecast_ensembles, forecast_warnings, forecast_records

Historically Simulated Streamflow
---------------------------------

.. automodule:: geoglows.streamflow
:members:
historic_simulation, return_periods, daily_averages, monthly_averages
:noindex:

GEOGloWS Model Utilities
------------------------

.. automodule:: geoglows.streamflow
:members:
available_dates
:noindex:
3 changes: 2 additions & 1 deletion geoglows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import geoglows.analyze
import geoglows.streams
import geoglows.tables
import geoglows.streamflow

from ._constants import METADATA_TABLE_PATH

__all__ = [
'bias', 'plots', 'data', 'analyze', 'streams', 'tables',
'METADATA_TABLE_PATH'
]
__version__ = '1.0.4'
__version__ = '1.1.0'
__author__ = 'Riley Hales'
__license__ = 'BSD 3-Clause Clear License'
95 changes: 52 additions & 43 deletions geoglows/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def from_aws(*args, **kwargs):
warnings.warn('forecast_records are not available from the AWS Open Data Program.')
return from_rest(*args, **kwargs)

reach_id = kwargs.get('reach_id', '')
reach_id = args[0] if len(args) > 0 else None
river_id = kwargs.get('river_id', '')
river_id = args[0] if len(args) > 0 else None

s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION))
if kwargs.get('date', '') and not product_name == 'dates':
Expand All @@ -68,14 +68,14 @@ def from_aws(*args, **kwargs):
date = dates[-1]
s3store = s3fs.S3Map(root=f'{ODP_FORECAST_S3_BUCKET_URI}/{date}', s3=s3, check=False)

df = xr.open_zarr(s3store).sel(rivid=reach_id).to_dataframe().round(2).reset_index()
df = xr.open_zarr(s3store).sel(rivid=river_id).to_dataframe().round(2).reset_index()

# rename columns to match the REST API
if isinstance(reach_id, int):
if isinstance(river_id, int):
df = df.pivot(index='time', columns='ensemble', values='Qout')
else:
df = df.pivot(index=['time', 'rivid'], columns='ensemble', values='Qout')
df.index.names = ['time', 'LINKNO']
df.index.names = ['time', 'river_id']
df = df[sorted(df.columns)]
df.columns = [f'ensemble_{str(x).zfill(2)}' for x in df.columns]

Expand All @@ -102,17 +102,24 @@ def from_rest(*args, **kwargs):
endpoint = f'https://{endpoint}' if not endpoint.startswith(('https://', 'http://')) else endpoint

version = kwargs.get('version', DEFAULT_REST_ENDPOINT_VERSION)
assert version in ('v1', 'v2', ), ValueError(f'Unrecognized model version parameter: {version}')

product_name = function.__name__.replace("_", "").lower()

reach_id = args[0] if len(args) > 0 else None
reach_id = kwargs.get('reach_id', '') if not reach_id else reach_id
river_id = args[0] if len(args) > 0 else None
river_id = kwargs.get('river_id', '') if not river_id else river_id
if isinstance(river_id, list):
raise ValueError('Multiple river_ids are not available via REST API or on v1. '
'Use data_source="aws" and version="v2" for multiple river_ids.')
river_id = int(river_id) if river_id else None
if river_id and version == 'v2':
assert river_id < 1_000_000_000 and river_id >= 110_000_000, ValueError('River ID must be a 9 digit integer')

return_format = kwargs.get('return_format', 'csv')
assert return_format in ('csv', 'json', 'url'), f'Unsupported return format requested: {return_format}'

# request parameter validation before submitting
for key in ('endpoint', 'version', 'reach_id'):
for key in ('endpoint', 'version', 'river_id'):
if key in kwargs:
del kwargs[key]
for key, value in kwargs.items():
Expand All @@ -129,7 +136,7 @@ def from_rest(*args, **kwargs):

# piece together the request url
request_url = f'{endpoint}/{version}/{product_name}' # build the base url
request_url = f'{request_url}/{reach_id}' if reach_id else request_url # add the reach_id if it exists
request_url = f'{request_url}/{river_id}' if river_id else request_url # add the river_id if it exists
request_url = f'{request_url}?{params}' # add the query parameters

if return_url:
Expand Down Expand Up @@ -158,6 +165,7 @@ def main(*args, **kwargs):
return from_rest(*args, **kwargs)
else:
return from_aws(*args, **kwargs)
main.__doc__ = function.__doc__ # necessary for code documentation auto generators
return main


Expand All @@ -181,16 +189,17 @@ def dates(**kwargs) -> dict or str:


@_forecast_endpoint_decorator
def forecast(*, reach_id: int, date: str, return_format: str, data_source: str,
def forecast(*, river_id: int, date: str, return_format: str, data_source: str,
**kwargs) -> pd.DataFrame or dict or str:
"""
Gets the average forecasted flow for a certain reach_id on a certain date
Gets the average forecasted flow for a certain river_id on a certain date
Keyword Args:
reach_id: the ID of a stream, should be a 9 digit integer
date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified
return_format: csv, json, or url, default csv
data_source: location to query for data, either 'rest' or 'aws'. default is aws.
river_id (str): the ID of a stream, should be a 9 digit integer
date (str): a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified
return_format (str): csv, json, or url, default csv
data_source (str): location to query for data, either 'rest' or 'aws'. default is aws.
version (str): the version of the API and model data to retrieve. default is 'v2'. should be 'v1' or 'v2'
Returns:
pd.DataFrame or dict or str
Expand All @@ -199,14 +208,14 @@ def forecast(*, reach_id: int, date: str, return_format: str, data_source: str,


@_forecast_endpoint_decorator
def forecast_stats(*, reach_id: int, date: str, return_format: str, data_source: str,
def forecast_stats(*, river_id: int, date: str, return_format: str, data_source: str,
**kwargs) -> pd.DataFrame or dict or str:
"""
Retrieves the min, 25%, mean, median, 75%, and max river discharge of the 51 ensembles members for a reach_id
Retrieves the min, 25%, mean, median, 75%, and max river discharge of the 51 ensembles members for a river_id
The 52nd higher resolution member is excluded
Keyword Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified
return_format: csv, json, or url, default csv
data_source: location to query for data, either 'rest' or 'aws'. default is aws.
Expand All @@ -218,13 +227,13 @@ def forecast_stats(*, reach_id: int, date: str, return_format: str, data_source:


@_forecast_endpoint_decorator
def forecast_ensembles(*, reach_id: int, date: str, return_format: str, data_source: str,
def forecast_ensembles(*, river_id: int, date: str, return_format: str, data_source: str,
**kwargs) -> pd.DataFrame or dict or str:
"""
Retrieves each of 52 time series of forecasted discharge for a reach_id on a certain date
Retrieves each of 52 time series of forecasted discharge for a river_id on a certain date
Keyword Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified
return_format: csv, json, or url, default csv
data_source: location to query for data, either 'rest' or 'aws'. default is aws.
Expand All @@ -236,13 +245,13 @@ def forecast_ensembles(*, reach_id: int, date: str, return_format: str, data_sou


@_forecast_endpoint_decorator
def forecast_records(*, reach_id: int, start_date: str, end_date: str, return_format: str, data_source: str,
def forecast_records(*, river_id: int, start_date: str, end_date: str, return_format: str, data_source: str,
**kwargs) -> pd.DataFrame or dict or str:
"""
Retrieves a csv showing the ensemble average forecasted flow for the year from January 1 to the current date
Keyword Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
start_date: a YYYYMMDD string giving the earliest date this year to include, defaults to 14 days ago.
end_date: a YYYYMMDD string giving the latest date this year to include, defaults to latest available
data_source: location to query for data, either 'rest' or 'aws'. default is aws.
Expand All @@ -255,20 +264,20 @@ def forecast_records(*, reach_id: int, start_date: str, end_date: str, return_fo


# Retrospective simulation and derived products
def retrospective(reach_id: int or list) -> pd.DataFrame:
def retrospective(river_id: int or list) -> pd.DataFrame:
"""
Retrieves the retrospective simulation of streamflow for a given reach_id from the
Retrieves the retrospective simulation of streamflow for a given river_id from the
AWS Open Data Program GEOGloWS V2 S3 bucket
Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
Returns:
pd.DataFrame
"""
s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION))
s3store = s3fs.S3Map(root=f'{ODP_RETROSPECTIVE_S3_BUCKET_URI}/retrospective.zarr', s3=s3, check=False)
return (xr.open_zarr(s3store).sel(rivid=reach_id).to_dataframe().reset_index().set_index('time')
return (xr.open_zarr(s3store).sel(rivid=river_id).to_dataframe().reset_index().set_index('time')
.pivot(columns='rivid', values='Qout'))


Expand All @@ -277,61 +286,61 @@ def historical(*args, **kwargs):
return retrospective(*args, **kwargs)


def daily_averages(reach_id: int or list) -> pd.DataFrame:
def daily_averages(river_id: int or list) -> pd.DataFrame:
"""
Retrieves daily average streamflow for a given reach_id
Retrieves daily average streamflow for a given river_id
Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
Returns:
pd.DataFrame
"""
df = retrospective(reach_id)
df = retrospective(river_id)
return calc_daily_averages(df)


def monthly_averages(reach_id: int or list) -> pd.DataFrame:
def monthly_averages(river_id: int or list) -> pd.DataFrame:
"""
Retrieves monthly average streamflow for a given reach_id
Retrieves monthly average streamflow for a given river_id
Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
Returns:
pd.DataFrame
"""
df = retrospective(reach_id)
df = retrospective(river_id)
return calc_monthly_averages(df)


def annual_averages(reach_id: int or list) -> pd.DataFrame:
def annual_averages(river_id: int or list) -> pd.DataFrame:
"""
Retrieves annual average streamflow for a given reach_id
Retrieves annual average streamflow for a given river_id
Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
Returns:
pd.DataFrame
"""
df = retrospective(reach_id)
df = retrospective(river_id)
return calc_annual_averages(df)


def return_periods(reach_id: int or list) -> pd.DataFrame:
def return_periods(river_id: int or list) -> pd.DataFrame:
"""
Retrieves the return period thresholds based on a specified historic simulation forcing on a certain reach_id.
Retrieves the return period thresholds based on a specified historic simulation forcing on a certain river_id.
Args:
reach_id: the ID of a stream, should be a 9 digit integer
river_id: the ID of a stream, should be a 9 digit integer
Returns:
pd.DataFrame
"""
s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION))
s3store = s3fs.S3Map(root=f'{ODP_RETROSPECTIVE_S3_BUCKET_URI}/return-periods.zarr', s3=s3, check=False)
return (xr.open_zarr(s3store).sel(rivid=reach_id)['return_period_flow'].to_dataframe().reset_index()
return (xr.open_zarr(s3store).sel(rivid=river_id)['return_period_flow'].to_dataframe().reset_index()
.pivot(index='rivid', columns='return_period', values='return_period_flow'))


Expand Down
Loading

0 comments on commit e80ca2d

Please sign in to comment.