From a0ce4359ec32edf93e4f64b3251c2ce284e1f26a Mon Sep 17 00:00:00 2001 From: pkdash Date: Mon, 27 Feb 2023 15:13:27 -0500 Subject: [PATCH] [#44] initial work - loading aggregation data to data processing object --- hsclient/hydroshare.py | 116 +++++++++++++++++++++++++++++++++++++---- requirements.txt | 4 ++ 2 files changed, 110 insertions(+), 10 deletions(-) diff --git a/hsclient/hydroshare.py b/hsclient/hydroshare.py index 4c843e8..0a92c59 100644 --- a/hsclient/hydroshare.py +++ b/hsclient/hydroshare.py @@ -1,6 +1,8 @@ import getpass import os +import pathlib import pickle +import shutil import sqlite3 import tempfile import time @@ -13,8 +15,11 @@ from urllib.parse import quote, unquote, urlparse from zipfile import ZipFile +import fiona import pandas +import rasterio import requests +import xarray from hsmodels.schemas import load_rdf, rdf_string from hsmodels.schemas.base_models import BaseMetadata from hsmodels.schemas.enums import AggregationType @@ -108,6 +113,7 @@ def __init__(self, map_path, hs_session, checksums=None): self._parsed_files = None self._parsed_aggregations = None self._parsed_checksums = checksums + self._data_object = None def __str__(self): return self._map_path @@ -232,6 +238,10 @@ def main_file_path(self) -> str: return self.files()[0].folder return self.files()[0].path + @property + def data_object(self) -> Union[pandas.Series, fiona.Collection, rasterio.DatasetReader, xarray.Dataset, None]: + return self._data_object + @refresh def save(self) -> None: """ @@ -281,10 +291,10 @@ def aggregations(self, **kwargs) -> List[BaseMetadata]: aggregations = self._aggregations for key, value in kwargs.items(): if key.startswith('file__'): - file_args = {key[len('file__') :]: value} + file_args = {key[len('file__'):]: value} aggregations = [agg for agg in aggregations if agg.files(**file_args)] elif key.startswith('files__'): - file_args = {key[len('files__') :]: value} + file_args = {key[len('files__'):]: value} aggregations = [agg for agg in aggregations if agg.files(**file_args)] else: aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations) @@ -314,14 +324,15 @@ def refresh(self) -> None: self._parsed_files = None self._parsed_aggregations = None self._parsed_checksums = None + self._data_object = None - def as_series(self, series_id: str, agg_path: str = None) -> Dict[int, pandas.Series]: + def as_series(self, series_id: str, agg_path: str = None) -> pandas.DataFrame: """ - Creates a pandas Series object out of an aggregation of type TimeSeries. - :param series_id: The series_id of the timeseries result to be converted to a Series object. + Creates a pandas DataFrame object out of an aggregation of type TimeSeries. + :param series_id: The series_id of the timeseries result to be converted to a Dataframe object. :param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have it downloaded locally. - :return: A pandas.Series object + :return: A pandas.DataFrame object """ def to_series(timeseries_file: str): @@ -332,13 +343,98 @@ def to_series(timeseries_file: str): con, ).squeeze() + return self._get_data_object(agg_path=agg_path, func=to_series) + + def as_multi_dimensional_dataset(self, agg_path: str = None) -> xarray.Dataset: + """ + Creates a xarray Dataset object out of an aggregation of type NetCDF. + :param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have + it downloaded locally. + :return: A xarray.Dataset object + """ + if self.metadata.type != AggregationType.MultidimensionalAggregation: + raise Exception("Aggregation is not of type NetCDF") + + return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset) + + def as_feature_collection(self, agg_path: str = None) -> fiona.Collection: + """ + Creates a fiona Collection object out of an aggregation of type GeoFeature. + :param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have + it downloaded locally at aggr_path. + :return: A fiona.Collection object + Note: The caller is responsible for closing the fiona.Collection object to free up aggregation files used to + create this object. + """ + if self.metadata.type != AggregationType.GeographicFeatureAggregation: + raise Exception("Aggregation is not of type GeoFeature") + + return self._get_data_object(agg_path=agg_path, func=fiona.open) + + def as_raster_dataset(self, agg_path: str = None) -> rasterio.DatasetReader: + """ + Creates a rasterio DatasetReader object out of an aggregation of type GeoRaster + :param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have + it downloaded locally at aggr_path. + :return: A rasterio.DatasetReader object + Note: The caller is responsible for closing the rasterio.DatasetReader object to free up aggregation files + used to create this object. + """ + if self.metadata.type != AggregationType.GeographicRasterAggregation: + raise Exception("Aggregation is not of type GeoRaster") + + return self._get_data_object(agg_path=agg_path, func=rasterio.open) + + def as_data_object(self, series_id: str = None, agg_path: str = None) -> \ + Union[pandas.DataFrame, fiona.Collection, rasterio.DatasetReader, xarray.Dataset, None]: + """Load aggregation data to a relevant data object tyoe""" + + if self.metadata.type == AggregationType.TimeSeriesAggregation: + if not series_id: + raise Exception("Please specify series_id for which the timeseries data object is needed.") + return self.as_series(series_id=series_id, agg_path=agg_path) + if self.metadata.type == AggregationType.MultidimensionalAggregation: + return self.as_multi_dimensional_dataset(agg_path=agg_path) + if self.metadata.type == AggregationType.GeographicFeatureAggregation: + return self.as_feature_collection(agg_path=agg_path) + if self.metadata.type == AggregationType.GeographicRasterAggregation: + return self.as_raster_dataset(agg_path=agg_path) + + raise Exception(f"Data object is not supported for '{self.metadata.type}' aggregation type") + + def _get_data_object(self, agg_path, func): + if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation: + return self._data_object + + main_file_ext = pathlib.Path(self.main_file_path).suffix if agg_path is None: - with tempfile.TemporaryDirectory() as td: + td = tempfile.mkdtemp() + try: self._download(unzip_to=td) # zip extracted to folder with main file name - file_name = self.file(extension=".sqlite").name - return to_series(urljoin(td, file_name, file_name)) - return to_series(urljoin(agg_path, self.file(extension=".sqlite").name)) + file_name = self.file(extension=main_file_ext).name + file_path = urljoin(td, file_name, file_name) + data_object = func(file_path) + if self.metadata.type == AggregationType.MultidimensionalAggregation: + data_object.close() + finally: + # we can delete the temporary directory for the data object created + # for these 2 aggregation types only. For other aggregation types, the generated data object + # needs to have access to the aggregation files in the temporary directory - so it's the caller's + # responsibility to delete the temporary directory + if self.metadata.type in (AggregationType.TimeSeriesAggregation, + AggregationType.MultidimensionalAggregation): + shutil.rmtree(td) + else: + file_path = urljoin(agg_path, self.file(extension=main_file_ext).name) + data_object = func(file_path) + if self.metadata.type == AggregationType.MultidimensionalAggregation: + data_object.close() + + # cache the object for the aggregation + self._data_object = data_object + + return data_object class Resource(Aggregation): diff --git a/requirements.txt b/requirements.txt index 93b1196..a6d02eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,10 @@ pytest == 6.0.2 requests == 2.24.0 email-validator pandas +netCDF4 +xarray +rasterio +fiona isort black pytest-xdist