Skip to content

Commit

Permalink
[#44] initial work - loading aggregation data to data processing object
Browse files Browse the repository at this point in the history
  • Loading branch information
pkdash committed Feb 27, 2023
1 parent 89f6c69 commit a0ce435
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 10 deletions.
116 changes: 106 additions & 10 deletions hsclient/hydroshare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import getpass
import os
import pathlib
import pickle
import shutil
import sqlite3
import tempfile
import time
Expand All @@ -13,8 +15,11 @@
from urllib.parse import quote, unquote, urlparse
from zipfile import ZipFile

import fiona
import pandas
import rasterio
import requests
import xarray
from hsmodels.schemas import load_rdf, rdf_string
from hsmodels.schemas.base_models import BaseMetadata
from hsmodels.schemas.enums import AggregationType
Expand Down Expand Up @@ -108,6 +113,7 @@ def __init__(self, map_path, hs_session, checksums=None):
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = checksums
self._data_object = None

def __str__(self):
return self._map_path
Expand Down Expand Up @@ -232,6 +238,10 @@ def main_file_path(self) -> str:
return self.files()[0].folder
return self.files()[0].path

@property
def data_object(self) -> Union[pandas.Series, fiona.Collection, rasterio.DatasetReader, xarray.Dataset, None]:
return self._data_object

@refresh
def save(self) -> None:
"""
Expand Down Expand Up @@ -281,10 +291,10 @@ def aggregations(self, **kwargs) -> List[BaseMetadata]:
aggregations = self._aggregations
for key, value in kwargs.items():
if key.startswith('file__'):
file_args = {key[len('file__') :]: value}
file_args = {key[len('file__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
elif key.startswith('files__'):
file_args = {key[len('files__') :]: value}
file_args = {key[len('files__'):]: value}
aggregations = [agg for agg in aggregations if agg.files(**file_args)]
else:
aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
Expand Down Expand Up @@ -314,14 +324,15 @@ def refresh(self) -> None:
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = None
self._data_object = None

def as_series(self, series_id: str, agg_path: str = None) -> Dict[int, pandas.Series]:
def as_series(self, series_id: str, agg_path: str = None) -> pandas.DataFrame:
"""
Creates a pandas Series object out of an aggregation of type TimeSeries.
:param series_id: The series_id of the timeseries result to be converted to a Series object.
Creates a pandas DataFrame object out of an aggregation of type TimeSeries.
:param series_id: The series_id of the timeseries result to be converted to a Dataframe object.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally.
:return: A pandas.Series object
:return: A pandas.DataFrame object
"""

def to_series(timeseries_file: str):
Expand All @@ -332,13 +343,98 @@ def to_series(timeseries_file: str):
con,
).squeeze()

return self._get_data_object(agg_path=agg_path, func=to_series)

def as_multi_dimensional_dataset(self, agg_path: str = None) -> xarray.Dataset:
"""
Creates a xarray Dataset object out of an aggregation of type NetCDF.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally.
:return: A xarray.Dataset object
"""
if self.metadata.type != AggregationType.MultidimensionalAggregation:
raise Exception("Aggregation is not of type NetCDF")

return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset)

def as_feature_collection(self, agg_path: str = None) -> fiona.Collection:
"""
Creates a fiona Collection object out of an aggregation of type GeoFeature.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:return: A fiona.Collection object
Note: The caller is responsible for closing the fiona.Collection object to free up aggregation files used to
create this object.
"""
if self.metadata.type != AggregationType.GeographicFeatureAggregation:
raise Exception("Aggregation is not of type GeoFeature")

return self._get_data_object(agg_path=agg_path, func=fiona.open)

def as_raster_dataset(self, agg_path: str = None) -> rasterio.DatasetReader:
"""
Creates a rasterio DatasetReader object out of an aggregation of type GeoRaster
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:return: A rasterio.DatasetReader object
Note: The caller is responsible for closing the rasterio.DatasetReader object to free up aggregation files
used to create this object.
"""
if self.metadata.type != AggregationType.GeographicRasterAggregation:
raise Exception("Aggregation is not of type GeoRaster")

return self._get_data_object(agg_path=agg_path, func=rasterio.open)

def as_data_object(self, series_id: str = None, agg_path: str = None) -> \
Union[pandas.DataFrame, fiona.Collection, rasterio.DatasetReader, xarray.Dataset, None]:
"""Load aggregation data to a relevant data object tyoe"""

if self.metadata.type == AggregationType.TimeSeriesAggregation:
if not series_id:
raise Exception("Please specify series_id for which the timeseries data object is needed.")
return self.as_series(series_id=series_id, agg_path=agg_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
return self.as_multi_dimensional_dataset(agg_path=agg_path)
if self.metadata.type == AggregationType.GeographicFeatureAggregation:
return self.as_feature_collection(agg_path=agg_path)
if self.metadata.type == AggregationType.GeographicRasterAggregation:
return self.as_raster_dataset(agg_path=agg_path)

raise Exception(f"Data object is not supported for '{self.metadata.type}' aggregation type")

def _get_data_object(self, agg_path, func):
if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation:
return self._data_object

main_file_ext = pathlib.Path(self.main_file_path).suffix
if agg_path is None:
with tempfile.TemporaryDirectory() as td:
td = tempfile.mkdtemp()
try:
self._download(unzip_to=td)
# zip extracted to folder with main file name
file_name = self.file(extension=".sqlite").name
return to_series(urljoin(td, file_name, file_name))
return to_series(urljoin(agg_path, self.file(extension=".sqlite").name))
file_name = self.file(extension=main_file_ext).name
file_path = urljoin(td, file_name, file_name)
data_object = func(file_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
data_object.close()
finally:
# we can delete the temporary directory for the data object created
# for these 2 aggregation types only. For other aggregation types, the generated data object
# needs to have access to the aggregation files in the temporary directory - so it's the caller's
# responsibility to delete the temporary directory
if self.metadata.type in (AggregationType.TimeSeriesAggregation,
AggregationType.MultidimensionalAggregation):
shutil.rmtree(td)
else:
file_path = urljoin(agg_path, self.file(extension=main_file_ext).name)
data_object = func(file_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
data_object.close()

# cache the object for the aggregation
self._data_object = data_object

return data_object


class Resource(Aggregation):
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ pytest == 6.0.2
requests == 2.24.0
email-validator
pandas
netCDF4
xarray
rasterio
fiona
isort
black
pytest-xdist
Expand Down

0 comments on commit a0ce435

Please sign in to comment.