Skip to content

Commit

Permalink
[#44] preventing aggregation delete as part of aggregation update via…
Browse files Browse the repository at this point in the history
… data object
  • Loading branch information
pkdash committed Mar 29, 2023
1 parent 2e52e0e commit 469c4e2
Showing 1 changed file with 43 additions and 56 deletions.
99 changes: 43 additions & 56 deletions hsclient/hydroshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pprint import pformat
from typing import Callable, Dict, List, TYPE_CHECKING, Union
from urllib.parse import quote, unquote, urlparse
from uuid import uuid4
from zipfile import ZipFile

if TYPE_CHECKING:
Expand Down Expand Up @@ -179,9 +180,6 @@ def _files(self):
@property
def _aggregations(self):

def populate_files(_aggr):
_aggr._files

def populate_metadata(_aggr):
_aggr._metadata

Expand All @@ -191,9 +189,8 @@ def populate_metadata(_aggr):
if is_aggregation(str(file)):
self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums))

# load files (instances of File) and metadata for all aggregations
# load metadata for all aggregations (metadata is needed to create a typed aggregation)
with ThreadPoolExecutor() as executor:
executor.map(populate_files, self._parsed_aggregations)
executor.map(populate_metadata, self._parsed_aggregations)

# convert aggregations to aggregation type supporting data object
Expand All @@ -204,11 +201,10 @@ def populate_metadata(_aggr):
AggregationType.GeographicFeatureAggregation: GeoFeatureAggregation,
}
for aggr in aggregations_copy:
typed_aggr = None
typed_aggr_cls = typed_aggregation_classes.get(aggr.metadata.type, None)
if typed_aggr_cls:
typed_aggr = typed_aggr_cls.create(base_aggr=aggr)
if typed_aggr:
# swapping the generic aggregation with the typed aggregation in the aggregation list
self._parsed_aggregations.remove(aggr)
self._parsed_aggregations.append(typed_aggr)

Expand Down Expand Up @@ -476,6 +472,24 @@ def _validate_aggregation_for_update(self, resource: 'Resource', agg_type: Aggre
if aggr is None:
raise Exception("This aggregation is not part of the specified resource.")

def _update_aggregation(self, resource, *files):
temp_folder = uuid4().hex
resource.folder_create(temp_folder)
resource.file_upload(*files, destination_path=temp_folder)
# check aggregation got created in the temp folder
file_path = os.path.join(temp_folder, os.path.basename(self.main_file_path))
original_aggr_dir_path = dirname(self.main_file_path)
aggr = resource.aggregation(file__path=file_path)
if aggr is not None:
# delete this aggregation which will be replaced with the updated aggregation
self.delete()
# move the aggregation from the temp folder to the location of the deleted aggregation
resource.aggregation_move(aggr, dst_path=original_aggr_dir_path)

resource.folder_delete(temp_folder)
if aggr is None:
raise Exception("Failed to update aggregation")


class NetCDFAggregation(DataObjectSupportingAggregation):

Expand All @@ -484,11 +498,8 @@ def create(cls, base_aggr):
return super().create(aggr_cls=cls, base_aggr=base_aggr)

def as_data_object(self, agg_path: str) -> 'xarray.Dataset':
if self.metadata.type != AggregationType.MultidimensionalAggregation:
raise Exception("Aggregation is not of type NetCDF")
if xarray is None:
raise Exception("xarray package was not found")

return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset)

def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
Expand All @@ -498,18 +509,14 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
self._data_object.to_netcdf(file_path, format="NETCDF4")
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
destination_path = dirname(self.main_file_path)

# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata

# TODO: keep a local backup copy of the aggregation before deleting it
self.delete()
resource.file_upload(file_path, destination_path=destination_path)
# upload the updated aggregation files
self._update_aggregation(resource, file_path)

# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
Expand All @@ -521,15 +528,14 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo
aggr.metadata.additional_metadata = additional_meta
aggr.save()
else:
# creating a new aggregation by uploading the updated data files
# creating a new aggregation
resource.file_upload(file_path, destination_path=destination_path)

# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None

aggr._data_object = data_object
aggr._data_object = None
return aggr


Expand Down Expand Up @@ -571,20 +577,15 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo
aggr_main_file_path = self.main_file_path
data_object = self._data_object
if not as_new_aggr:
destination_path = dirname(self.main_file_path)

# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata
title = self.metadata.title
abstract = self.metadata.abstract

# TODO: If the creation of the replacement aggregation fails for some reason, then with the following
# delete action we will lose this aggregation from HydroShare. Need to keep a copy of the
# original aggregation locally so that we can upload that to HydroShare if needed.
self.delete()
resource.file_upload(file_path, destination_path=destination_path)
# upload the updated aggregation files to the temp folder - to create the updated aggregation
self._update_aggregation(resource, file_path)
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)

Expand Down Expand Up @@ -643,40 +644,38 @@ def upload_shape_files(main_file_path, dst_path=""):
if item.startswith(filename_starts_with):
file_full_path = os.path.join(shp_file_dir_path, item)
shape_files.append(file_full_path)
resource.file_upload(*shape_files, destination_path=dst_path)

if not dst_path:
self._update_aggregation(resource, *shape_files)
else:
resource.file_upload(*shape_files, destination_path=dst_path)

self._validate_aggregation_for_update(resource, AggregationType.GeographicFeatureAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
aggr_main_file_path = self.main_file_path
data_object = self._data_object
# need to close the fiona.Collection object to free up access to all the original shape files
data_object.close()
if not as_new_aggr:
destination_path = dirname(self.main_file_path)

# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata

# TODO: keep a local backup copy of the aggregation before deleting it
self.delete()
# copy the updated shape files to the original shape file location where the user downloaded the
# aggregation previously
src_shp_file_dir_path = os.path.dirname(file_path)
tgt_shp_file_dir_path = os.path.dirname(data_object.path)
agg_path = tgt_shp_file_dir_path
filename_starts_with = f"{pathlib.Path(file_path).stem}."

# need to close the fiona.Collection object to free up access to all the original shape files
data_object.close()

for item in os.listdir(src_shp_file_dir_path):
if item.startswith(filename_starts_with):
src_file_full_path = os.path.join(src_shp_file_dir_path, item)
tgt_file_full_path = os.path.join(tgt_shp_file_dir_path, item)
shutil.copyfile(src_file_full_path, tgt_file_full_path)

# upload the updated shape files to replace this aggregation
upload_shape_files(main_file_path=data_object.path, dst_path=destination_path)
upload_shape_files(main_file_path=data_object.path)

# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)
Expand All @@ -687,23 +686,15 @@ def upload_shape_files(main_file_path, dst_path=""):
aggr.metadata.subjects.append(kw)
aggr.metadata.additional_metadata = additional_meta
aggr.save()

# load aggregation data to fiona Collection object
data_object = aggr.as_data_object(agg_path=agg_path)
else:
# creating a new aggregation
# close the original fiona Collection object
data_object.close()

# upload the updated shape files to create a new geo feature aggregation
upload_shape_files(main_file_path=file_path, dst_path=destination_path)

# retrieve the new aggregation
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)
data_object = None

aggr._data_object = data_object
aggr._data_object = None
return aggr


Expand Down Expand Up @@ -753,7 +744,6 @@ def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False)
def as_data_object(self, agg_path: str) -> 'rasterio.DatasetReader':
if rasterio is None:
raise Exception("rasterio package was not found")

return self._get_data_object(agg_path=agg_path, func=rasterio.open)

def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False,
Expand All @@ -764,7 +754,11 @@ def upload_raster_files(dst_path=""):
item_full_path = os.path.join(agg_path, item)
if os.path.isfile(item_full_path):
raster_files.append(item_full_path)
resource.file_upload(*raster_files, destination_path=dst_path)

if not dst_path:
self._update_aggregation(resource, *raster_files)
else:
resource.file_upload(*raster_files, destination_path=dst_path)

def get_main_file_path():
main_file_name = os.path.basename(file_path)
Expand All @@ -778,23 +772,17 @@ def get_main_file_path():

self._validate_aggregation_for_update(resource, AggregationType.GeographicRasterAggregation)
file_path = self._validate_aggregation_path(agg_path, for_save_data=True)
# aggr_main_file_path = self.main_file_path
# data_object = self._data_object
if not as_new_aggr:
destination_path = dirname(self.main_file_path)

# cache some of the metadata fields of the original aggregation to update the metadata of the
# updated aggregation
keywords = self.metadata.subjects
additional_meta = self.metadata.additional_metadata

# TODO: keep a local backup copy of the aggregation before deleting it
self.delete()
upload_raster_files(dst_path=destination_path)

# retrieve the updated aggregation
# compute the main file name
aggr_main_file_path = get_main_file_path()
# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_main_file_path)

# update metadata
Expand All @@ -812,8 +800,7 @@ def get_main_file_path():
agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path))
aggr = resource.aggregation(file__path=agg_path)

data_object = None
aggr._data_object = data_object
aggr._data_object = None
return aggr


Expand Down

0 comments on commit 469c4e2

Please sign in to comment.