From e80ca2d5d0b05756b9c43dfed83666acad256e23 Mon Sep 17 00:00:00 2001 From: Riley Hales PhD <39097632+rileyhales@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:18:42 -0600 Subject: [PATCH] adds deprecated streamflow module for backwards compatibility. corrects documentation generation (#32) --- docs/api-documentation.rst | 17 +- docs/api-documentation/data.rst | 5 + docs/api-documentation/plots.rst | 20 +- docs/api-documentation/streamflow.rst | 32 ++ geoglows/__init__.py | 3 +- geoglows/data.py | 95 +++--- geoglows/streamflow.py | 402 ++++++++++++++++++++++++++ geoglows/streams.py | 8 +- 8 files changed, 501 insertions(+), 81 deletions(-) create mode 100644 docs/api-documentation/streamflow.rst create mode 100644 geoglows/streamflow.py diff --git a/docs/api-documentation.rst b/docs/api-documentation.rst index 5cd35dd..3abe0a0 100644 --- a/docs/api-documentation.rst +++ b/docs/api-documentation.rst @@ -11,19 +11,4 @@ There are 3 modules in the geoglows package. api-documentation/bias api-documentation/plots api-documentation/analyze - - -FAQ -~~~ - -How do I save streamflow data to csv? -------------------------------------- -By default, the results of most of the `geoglows.data` functions return a pandas DataFrame. You can save those to -a csv, json, pickle, or other file. For example, save to csv with the dataframe's ``.to_csv()`` method. - -.. code-block:: python - - # get some data from the geoglows streamflow model - data = geoglows.streamflow.forecast_stats(12341234) - # save it to a csv - data.to_csv('/path/to/save/the/csv/file.csv') \ No newline at end of file + api-documentation/streamflow diff --git a/docs/api-documentation/data.rst b/docs/api-documentation/data.rst index 611c52d..582aa7c 100644 --- a/docs/api-documentation/data.rst +++ b/docs/api-documentation/data.rst @@ -13,6 +13,11 @@ To find a LINKNO (river ID number), please refer to https://data.geoglows.org an Forecasted Streamflow --------------------- +.. automodule:: geoglows.data + :members: + forecast, forecast_stats, forecast_ensembles, forecast_records + :noindex: + Historical Simulation --------------------- diff --git a/docs/api-documentation/plots.rst b/docs/api-documentation/plots.rst index 4d4e268..6ecd745 100644 --- a/docs/api-documentation/plots.rst +++ b/docs/api-documentation/plots.rst @@ -7,21 +7,7 @@ Plots for Streamflow Data .. automodule:: geoglows.plots :members: - hydroviewer, forecast_stats, forecast_records, forecast_ensembles, historic_simulation, flow_duration_curve - :noindex: - -Tables for Streamflow Data -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. automodule:: geoglows.plots - :members: - probabilities_table, return_periods_table - :noindex: - -Plots for Bias Corrected Data -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. automodule:: geoglows.plots - :members: - corrected_historical, corrected_scatterplots, corrected_day_average, corrected_month_average, corrected_volume_compare + forecast, forecast_stats, forecast_ensembles, + retrospective, annual_averages, monthly_averages, daily_averages, + daily_variance, flow_duration_curve :noindex: diff --git a/docs/api-documentation/streamflow.rst b/docs/api-documentation/streamflow.rst new file mode 100644 index 0000000..fc31bdd --- /dev/null +++ b/docs/api-documentation/streamflow.rst @@ -0,0 +1,32 @@ +=================== +geoglows.streamflow +=================== + +THIS MODULE IS DEPRECATED. Please update your code to use the new GEOGLOWS model and data services. Analogous functions +to everything in this module is found in the `geoglows.data` or `geoglows.streams` modules. + +The streamflow module provides a series of functions for requesting forecasted and historical data from the GEOGloWS +ECMWF Streamflow Service for Model and Data Services Version 1. + +Forecasted Streamflow +--------------------- + +.. automodule:: geoglows.streamflow + :members: + forecast_stats, forecast_ensembles, forecast_warnings, forecast_records + +Historically Simulated Streamflow +--------------------------------- + +.. automodule:: geoglows.streamflow + :members: + historic_simulation, return_periods, daily_averages, monthly_averages + :noindex: + +GEOGloWS Model Utilities +------------------------ + +.. automodule:: geoglows.streamflow + :members: + available_dates + :noindex: \ No newline at end of file diff --git a/geoglows/__init__.py b/geoglows/__init__.py index db3a090..cb963de 100644 --- a/geoglows/__init__.py +++ b/geoglows/__init__.py @@ -4,6 +4,7 @@ import geoglows.analyze import geoglows.streams import geoglows.tables +import geoglows.streamflow from ._constants import METADATA_TABLE_PATH @@ -11,6 +12,6 @@ 'bias', 'plots', 'data', 'analyze', 'streams', 'tables', 'METADATA_TABLE_PATH' ] -__version__ = '1.0.4' +__version__ = '1.1.0' __author__ = 'Riley Hales' __license__ = 'BSD 3-Clause Clear License' diff --git a/geoglows/data.py b/geoglows/data.py index 57cc5e7..e385658 100644 --- a/geoglows/data.py +++ b/geoglows/data.py @@ -47,8 +47,8 @@ def from_aws(*args, **kwargs): warnings.warn('forecast_records are not available from the AWS Open Data Program.') return from_rest(*args, **kwargs) - reach_id = kwargs.get('reach_id', '') - reach_id = args[0] if len(args) > 0 else None + river_id = kwargs.get('river_id', '') + river_id = args[0] if len(args) > 0 else None s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION)) if kwargs.get('date', '') and not product_name == 'dates': @@ -68,14 +68,14 @@ def from_aws(*args, **kwargs): date = dates[-1] s3store = s3fs.S3Map(root=f'{ODP_FORECAST_S3_BUCKET_URI}/{date}', s3=s3, check=False) - df = xr.open_zarr(s3store).sel(rivid=reach_id).to_dataframe().round(2).reset_index() + df = xr.open_zarr(s3store).sel(rivid=river_id).to_dataframe().round(2).reset_index() # rename columns to match the REST API - if isinstance(reach_id, int): + if isinstance(river_id, int): df = df.pivot(index='time', columns='ensemble', values='Qout') else: df = df.pivot(index=['time', 'rivid'], columns='ensemble', values='Qout') - df.index.names = ['time', 'LINKNO'] + df.index.names = ['time', 'river_id'] df = df[sorted(df.columns)] df.columns = [f'ensemble_{str(x).zfill(2)}' for x in df.columns] @@ -102,17 +102,24 @@ def from_rest(*args, **kwargs): endpoint = f'https://{endpoint}' if not endpoint.startswith(('https://', 'http://')) else endpoint version = kwargs.get('version', DEFAULT_REST_ENDPOINT_VERSION) + assert version in ('v1', 'v2', ), ValueError(f'Unrecognized model version parameter: {version}') product_name = function.__name__.replace("_", "").lower() - reach_id = args[0] if len(args) > 0 else None - reach_id = kwargs.get('reach_id', '') if not reach_id else reach_id + river_id = args[0] if len(args) > 0 else None + river_id = kwargs.get('river_id', '') if not river_id else river_id + if isinstance(river_id, list): + raise ValueError('Multiple river_ids are not available via REST API or on v1. ' + 'Use data_source="aws" and version="v2" for multiple river_ids.') + river_id = int(river_id) if river_id else None + if river_id and version == 'v2': + assert river_id < 1_000_000_000 and river_id >= 110_000_000, ValueError('River ID must be a 9 digit integer') return_format = kwargs.get('return_format', 'csv') assert return_format in ('csv', 'json', 'url'), f'Unsupported return format requested: {return_format}' # request parameter validation before submitting - for key in ('endpoint', 'version', 'reach_id'): + for key in ('endpoint', 'version', 'river_id'): if key in kwargs: del kwargs[key] for key, value in kwargs.items(): @@ -129,7 +136,7 @@ def from_rest(*args, **kwargs): # piece together the request url request_url = f'{endpoint}/{version}/{product_name}' # build the base url - request_url = f'{request_url}/{reach_id}' if reach_id else request_url # add the reach_id if it exists + request_url = f'{request_url}/{river_id}' if river_id else request_url # add the river_id if it exists request_url = f'{request_url}?{params}' # add the query parameters if return_url: @@ -158,6 +165,7 @@ def main(*args, **kwargs): return from_rest(*args, **kwargs) else: return from_aws(*args, **kwargs) + main.__doc__ = function.__doc__ # necessary for code documentation auto generators return main @@ -181,16 +189,17 @@ def dates(**kwargs) -> dict or str: @_forecast_endpoint_decorator -def forecast(*, reach_id: int, date: str, return_format: str, data_source: str, +def forecast(*, river_id: int, date: str, return_format: str, data_source: str, **kwargs) -> pd.DataFrame or dict or str: """ - Gets the average forecasted flow for a certain reach_id on a certain date + Gets the average forecasted flow for a certain river_id on a certain date Keyword Args: - reach_id: the ID of a stream, should be a 9 digit integer - date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified - return_format: csv, json, or url, default csv - data_source: location to query for data, either 'rest' or 'aws'. default is aws. + river_id (str): the ID of a stream, should be a 9 digit integer + date (str): a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified + return_format (str): csv, json, or url, default csv + data_source (str): location to query for data, either 'rest' or 'aws'. default is aws. + version (str): the version of the API and model data to retrieve. default is 'v2'. should be 'v1' or 'v2' Returns: pd.DataFrame or dict or str @@ -199,14 +208,14 @@ def forecast(*, reach_id: int, date: str, return_format: str, data_source: str, @_forecast_endpoint_decorator -def forecast_stats(*, reach_id: int, date: str, return_format: str, data_source: str, +def forecast_stats(*, river_id: int, date: str, return_format: str, data_source: str, **kwargs) -> pd.DataFrame or dict or str: """ - Retrieves the min, 25%, mean, median, 75%, and max river discharge of the 51 ensembles members for a reach_id + Retrieves the min, 25%, mean, median, 75%, and max river discharge of the 51 ensembles members for a river_id The 52nd higher resolution member is excluded Keyword Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified return_format: csv, json, or url, default csv data_source: location to query for data, either 'rest' or 'aws'. default is aws. @@ -218,13 +227,13 @@ def forecast_stats(*, reach_id: int, date: str, return_format: str, data_source: @_forecast_endpoint_decorator -def forecast_ensembles(*, reach_id: int, date: str, return_format: str, data_source: str, +def forecast_ensembles(*, river_id: int, date: str, return_format: str, data_source: str, **kwargs) -> pd.DataFrame or dict or str: """ - Retrieves each of 52 time series of forecasted discharge for a reach_id on a certain date + Retrieves each of 52 time series of forecasted discharge for a river_id on a certain date Keyword Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer date: a string specifying the date to request in YYYYMMDD format, returns the latest available if not specified return_format: csv, json, or url, default csv data_source: location to query for data, either 'rest' or 'aws'. default is aws. @@ -236,13 +245,13 @@ def forecast_ensembles(*, reach_id: int, date: str, return_format: str, data_sou @_forecast_endpoint_decorator -def forecast_records(*, reach_id: int, start_date: str, end_date: str, return_format: str, data_source: str, +def forecast_records(*, river_id: int, start_date: str, end_date: str, return_format: str, data_source: str, **kwargs) -> pd.DataFrame or dict or str: """ Retrieves a csv showing the ensemble average forecasted flow for the year from January 1 to the current date Keyword Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer start_date: a YYYYMMDD string giving the earliest date this year to include, defaults to 14 days ago. end_date: a YYYYMMDD string giving the latest date this year to include, defaults to latest available data_source: location to query for data, either 'rest' or 'aws'. default is aws. @@ -255,20 +264,20 @@ def forecast_records(*, reach_id: int, start_date: str, end_date: str, return_fo # Retrospective simulation and derived products -def retrospective(reach_id: int or list) -> pd.DataFrame: +def retrospective(river_id: int or list) -> pd.DataFrame: """ - Retrieves the retrospective simulation of streamflow for a given reach_id from the + Retrieves the retrospective simulation of streamflow for a given river_id from the AWS Open Data Program GEOGloWS V2 S3 bucket Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer Returns: pd.DataFrame """ s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION)) s3store = s3fs.S3Map(root=f'{ODP_RETROSPECTIVE_S3_BUCKET_URI}/retrospective.zarr', s3=s3, check=False) - return (xr.open_zarr(s3store).sel(rivid=reach_id).to_dataframe().reset_index().set_index('time') + return (xr.open_zarr(s3store).sel(rivid=river_id).to_dataframe().reset_index().set_index('time') .pivot(columns='rivid', values='Qout')) @@ -277,61 +286,61 @@ def historical(*args, **kwargs): return retrospective(*args, **kwargs) -def daily_averages(reach_id: int or list) -> pd.DataFrame: +def daily_averages(river_id: int or list) -> pd.DataFrame: """ - Retrieves daily average streamflow for a given reach_id + Retrieves daily average streamflow for a given river_id Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer Returns: pd.DataFrame """ - df = retrospective(reach_id) + df = retrospective(river_id) return calc_daily_averages(df) -def monthly_averages(reach_id: int or list) -> pd.DataFrame: +def monthly_averages(river_id: int or list) -> pd.DataFrame: """ - Retrieves monthly average streamflow for a given reach_id + Retrieves monthly average streamflow for a given river_id Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer Returns: pd.DataFrame """ - df = retrospective(reach_id) + df = retrospective(river_id) return calc_monthly_averages(df) -def annual_averages(reach_id: int or list) -> pd.DataFrame: +def annual_averages(river_id: int or list) -> pd.DataFrame: """ - Retrieves annual average streamflow for a given reach_id + Retrieves annual average streamflow for a given river_id Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer Returns: pd.DataFrame """ - df = retrospective(reach_id) + df = retrospective(river_id) return calc_annual_averages(df) -def return_periods(reach_id: int or list) -> pd.DataFrame: +def return_periods(river_id: int or list) -> pd.DataFrame: """ - Retrieves the return period thresholds based on a specified historic simulation forcing on a certain reach_id. + Retrieves the return period thresholds based on a specified historic simulation forcing on a certain river_id. Args: - reach_id: the ID of a stream, should be a 9 digit integer + river_id: the ID of a stream, should be a 9 digit integer Returns: pd.DataFrame """ s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name=ODP_S3_BUCKET_REGION)) s3store = s3fs.S3Map(root=f'{ODP_RETROSPECTIVE_S3_BUCKET_URI}/return-periods.zarr', s3=s3, check=False) - return (xr.open_zarr(s3store).sel(rivid=reach_id)['return_period_flow'].to_dataframe().reset_index() + return (xr.open_zarr(s3store).sel(rivid=river_id)['return_period_flow'].to_dataframe().reset_index() .pivot(index='rivid', columns='return_period', values='return_period_flow')) diff --git a/geoglows/streamflow.py b/geoglows/streamflow.py new file mode 100644 index 0000000..a0cb642 --- /dev/null +++ b/geoglows/streamflow.py @@ -0,0 +1,402 @@ +import json +import warnings +from io import StringIO + +import pandas as pd +import requests + +ENDPOINT = 'https://geoglows.ecmwf.int/api/' + +DEPRECATIONWARNING = """ +The streamflow module is deprecated and will be removed early 2025 when GEOGLOWS Model V1 is removed. These functions +will no longer be updated and they will not work with the latest model and datasets. Please upgrade to GEOGLOWS Model V2 +and update your code to use the geoglows.data module's analogous functions. Visit https://data.geoglows.org for more +information and tutorials to help you transition. +""" + +__all__ = ['forecast_stats', 'forecast_ensembles', 'forecast_warnings', 'forecast_records', 'historic_simulation', + 'daily_averages', 'monthly_averages', 'return_periods', 'available_dates', ] + + +# FUNCTIONS THAT CALL THE GLOBAL STREAMFLOW PREDICTION API +def forecast_stats(reach_id: int, return_format: str = 'csv', forecast_date: str = None, + endpoint: str = ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves statistics that summarize the ensemble streamflow forecast for a certain reach_id + + Args: + reach_id: the ID of a stream + return_format: 'csv', 'json', 'waterml', 'url' + forecast_date: a string specifying the date to request in YYYYMMDD format + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.forecast_stats(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'ForecastStats/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}' + params = {'reach_id': reach_id, 'return_format': return_format} + if forecast_date is not None: + params["date"] = forecast_date + # return the requested data + return _make_request(endpoint, method, params, return_format, s) + + +def forecast_ensembles(reach_id: int, return_format: str = 'csv', forecast_date: str = None, + endpoint: str = ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves each ensemble from the most recent streamflow forecast for a certain reach_id + + Args: + reach_id: the ID of a stream + return_format: 'csv', 'json', 'waterml', 'url' + forecast_date: a string specifying the date to request in YYYYMMDD format + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.forecast_ensembles(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning, stacklevel=2) + method = 'ForecastEnsembles/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}' + + params = {'reach_id': reach_id, 'return_format': return_format} + if forecast_date is not None: + params["date"] = forecast_date + + # return the requested data + return _make_request(endpoint, method, params, return_format, s) + + +def forecast_warnings(region: str = 'all', return_format='csv', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves a csv listing streams likely to experience a return period level flow during the forecast period. + + Args: + region: the name of a region as shown in the available_regions request + return_format: 'csv', 'json', 'waterml', 'request', 'url' + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.forecast_warnings('australia-geoglows') + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'ForecastWarnings/' + + # if you only wanted the url, quit here + if return_format == 'url': + return endpoint + method + f'?region={region}' + + # return the requested data + return _make_request(endpoint, method, {'region': region, 'return_format': return_format}, return_format, s) + + +def forecast_records(reach_id: int, start_date: str = None, end_date: str = None, return_format='csv', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves a csv showing the ensemble average forecasted flow for the year from January 1 to the current date + + Args: + reach_id: the ID of a stream + return_format: 'csv', 'json', 'waterml', 'url' + start_date: a string specifying the earliest date to request in YYYYMMDD format + end_date: a string specifying the latest date to request in YYYYMMDD format + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.forecast_warnings('australia-geoglows') + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'ForecastRecords/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}' + + params = {'reach_id': reach_id, 'return_format': return_format} + if start_date is not None: + params["start_date"] = start_date + if end_date is not None: + params["end_date"] = end_date + + # return the requested data + return _make_request(endpoint, method, params, return_format, s) + + +def historic_simulation(reach_id: int, return_format='csv', forcing='era_5', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves a historical streamflow simulation derived from a specified forcing for a certain reach_id + + Args: + reach_id: the ID of a stream + return_format: 'csv', 'json', 'waterml', 'url' + forcing: the runoff dataset used to drive the historic simulation (era_interim or era_5) + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.historic_simulation(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'HistoricSimulation/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}&forcing={forcing}' + + # return the requested data + params = {'reach_id': reach_id, 'forcing': forcing, 'return_format': return_format} + return _make_request(endpoint, method, params, return_format, s) + + +def daily_averages(reach_id: int, return_format='csv', forcing='era_5', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves the average flow for every day of the year at a certain reach_id. + + Args: + reach_id: the ID of a stream + return_format: 'csv', 'json', 'waterml', 'url' + forcing: the runoff dataset used to drive the historic simulation (era_interim or era_5) + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.seasonal_average(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'DailyAverages/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}&forcing={forcing}' + + # return the requested data + params = {'reach_id': reach_id, 'forcing': forcing, 'return_format': return_format} + return _make_request(endpoint, method, params, return_format, s) + + +def monthly_averages(reach_id: int, return_format='csv', forcing='era_5', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves the average flow for each month at a certain reach_id. + + Args: + reach_id: the ID of a stream + forcing: the runoff dataset used to drive the historic simulation (era_interim or era_5) + return_format: 'csv', 'json', 'waterml', 'url' + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.seasonal_average(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'MonthlyAverages/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}&forcing={forcing}' + + # return the requested data + params = {'reach_id': reach_id, 'forcing': forcing, 'return_format': return_format} + return _make_request(endpoint, method, params, return_format, s) + + +def return_periods(reach_id: int, return_format='csv', forcing='era_5', + endpoint=ENDPOINT, s: requests.Session = False) -> pd.DataFrame: + """ + Retrieves the return period thresholds based on a specified historic simulation forcing on a certain reach_id. + + Args: + reach_id: the ID of a stream + forcing: the runoff dataset used to drive the historic simulation (era_interim or era_5) + return_format: 'csv', 'json', 'waterml', 'url' + endpoint: the endpoint of an api instance + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='csv' returns a pd.DataFrame() + - return_format='json' returns a json + - return_format='waterml' returns a waterml string + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.return_periods(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'ReturnPeriods/' + + # if you only wanted the url, quit here + if return_format == 'url': + return f'{endpoint}{method}?reach_id={reach_id}&forcing={forcing}' + + # return the requested data + params = {'reach_id': reach_id, 'forcing': forcing, 'return_format': return_format} + + return _make_request(endpoint, method, params, return_format, s) + + +def available_data(endpoint: str = ENDPOINT, return_format='json', s: requests.Session = False) -> dict or str: + """ + Returns a dictionary with a key for each available_regions containing the available_dates for that region + + Args: + endpoint: the endpoint of an api instance + return_format: 'json' or 'url' + s: requests.Session instance connected to the api's root url + + Returns: + dict + + Example: + .. code-block:: python + + data = streamflow.rst.available_data() + + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'AvailableData/' + + # if you only wanted the url, quit here + if return_format == 'url': + return endpoint + method + + # return the requested data + return _make_request(endpoint, method, {}, return_format, s) + + +def available_dates(reach_id: int = None, region: str = None, return_format: str = 'json', + endpoint: str = ENDPOINT, s: requests.Session = False) -> dict or str: + """ + Retrieves the list of dates of stored streamflow forecasts. You need to specify either a reach_id or a region. + + Args: + reach_id: the ID of a stream + region: the name of a hydrologic region used in the model + endpoint: the endpoint of an api instance + return_format: 'json' or 'url' + s: requests.Session instance connected to the api's root url + + Return Format: + - return_format='json' *(default)* returns {'available_dates': ['list_of_dates']} + - return_format='url' returns a url string for using in a request or web browser + + Example: + .. code-block:: python + + data = streamflow.rst.available_dates(12341234) + """ + warnings.warn(DEPRECATIONWARNING, DeprecationWarning) + method = 'AvailableDates/' + + # you need a region for the api call, so the user needs to provide one or a valid reach_id to get it from + params = {'region': 'africa-geoglows'} + # if you only wanted the url, quit here + if return_format == 'url': + return endpoint + method + + # return the requested data + return _make_request(endpoint, method, params, return_format, s) + + +# API AUXILIARY FUNCTION +def _make_request(endpoint: str, method: str, params: dict, return_format: str, s: requests.Session = False): + if return_format == 'request': + params['return_format'] = 'csv' + + # request the data from the API + if s: + data = s.get(endpoint + method, params=params) + else: + data = requests.get(endpoint + method, params=params) + if data.status_code != 200: + raise RuntimeError('Recieved an error from the Streamflow REST API: ' + data.text) + + # process the response from the API as appropriate to make the corresponding python object + if return_format == 'csv': + tmp = pd.read_csv(StringIO(data.text), index_col=0) + if 'z' in tmp.columns: + del tmp['z'] + if method in ('ForecastWarnings/', 'ReturnPeriods/', 'DailyAverages/', 'MonthlyAverages/'): + return tmp + if method == 'SeasonalAverage/': + tmp.index = pd.to_datetime(tmp.index + 1, format='%j').strftime('%b %d') + return tmp + tmp.index = pd.to_datetime(tmp.index) + return tmp + elif return_format == 'json': + return json.loads(data.text) + elif return_format == 'waterml': + return data.text + else: + raise ValueError(f'Unsupported return format requested: {return_format}') diff --git a/geoglows/streams.py b/geoglows/streams.py index f83ee71..6ce2064 100644 --- a/geoglows/streams.py +++ b/geoglows/streams.py @@ -2,10 +2,10 @@ from .data import metadata_tables -__all__ = ['reach_to_vpu', 'latlon_to_reach', 'reach_to_latlon', ] +__all__ = ['river_to_vpu', 'latlon_to_river', 'river_to_latlon', ] -def reach_to_vpu(reach_id: int) -> str or int: +def river_to_vpu(reach_id: int) -> str or int: return ( metadata_tables(columns=['LINKNO', 'VPUCode']) .loc[lambda x: x['LINKNO'] == reach_id, 'VPUCode'] @@ -13,13 +13,13 @@ def reach_to_vpu(reach_id: int) -> str or int: ) -def latlon_to_reach(lat: float, lon: float) -> int: +def latlon_to_river(lat: float, lon: float) -> int: df = metadata_tables(columns=['LINKNO', 'lat', 'lon']) df['dist'] = ((df['lat'] - lat) ** 2 + (df['lon'] - lon) ** 2) ** 0.5 return df.loc[lambda x: x['dist'] == df['dist'].min(), 'LINKNO'].values[0] -def reach_to_latlon(reach_id: int) -> np.ndarray: +def river_to_latlon(reach_id: int) -> np.ndarray: return ( metadata_tables(columns=['LINKNO', 'lat', 'lon']) .loc[lambda x: x['LINKNO'] == reach_id, ['lat', 'lon']]