From 40746ecc11cc2d9d033ec59bb21102e2bd619ddd Mon Sep 17 00:00:00 2001 From: Irwan Fathurrahman Date: Thu, 29 Feb 2024 08:29:37 +0700 Subject: [PATCH] Updates 4.4.6 (#117) * Update hubeau harvester API * Update 'download request' to just 'download' * Fix error aware * Update ehyd harvester --- harvesters/admin.py | 9 +- harvesters/harvester/base.py | 26 +- harvesters/harvester/ehyd.py | 328 ++++++++++-------- harvesters/harvester/hubeau.py | 84 ++++- harvesters/models/harvester.py | 4 +- .../update_number_of_measurements_well.py | 10 +- migrations/0072_views.py | 15 - .../0079_alter_harvesterattribute_value.py | 18 + models/well.py | 16 +- signals/well.py | 20 +- tasks/data_file_cache/country_recache.py | 5 +- templates/download/download-page.html | 2 +- templates/download/form.html | 2 +- urls.py | 6 +- utilities.py | 9 + 15 files changed, 350 insertions(+), 204 deletions(-) create mode 100644 migrations/0079_alter_harvesterattribute_value.py diff --git a/harvesters/admin.py b/harvesters/admin.py index 6f975ad9..f995fc72 100644 --- a/harvesters/admin.py +++ b/harvesters/admin.py @@ -94,12 +94,9 @@ class HarvesterAttributeInline(admin.TabularInline): model = HarvesterAttribute - readonly_fields = ('harvester', 'name') + readonly_fields = ('harvester',) extra = 0 - def has_add_permission(self, request, obj=None): - return False - class HarvesterLogInline(admin.TabularInline): model = HarvesterLog @@ -130,7 +127,9 @@ class HarvesterAdmin(admin.ModelAdmin): form = HarvesterForm inlines = [HarvesterAttributeInline, HarvesterLogInline] list_display = ( - 'id', 'name', 'organisation', 'is_run', 'active', 'harvester_class', 'last_run') + 'id', 'name', 'organisation', 'is_run', 'active', 'harvester_class', + 'last_run' + ) list_editable = ('active',) actions = (harvest_data,) diff --git a/harvesters/harvester/base.py b/harvesters/harvester/base.py index 8a3044ec..292c6f2d 100644 --- a/harvesters/harvester/base.py +++ b/harvesters/harvester/base.py @@ -11,8 +11,8 @@ from django.utils import timezone from gwml2.harvesters.models.harvester import ( - Harvester, HarvesterLog, RUNNING, - ERROR, DONE, HarvesterWellData + HarvesterLog, RUNNING, ERROR, + DONE, HarvesterWellData, Harvester, HarvesterAttribute ) from gwml2.models.general import Quantity, Unit from gwml2.models.term import TermFeatureType @@ -25,7 +25,7 @@ from gwml2.signals.well import post_save_measurement_for_cache from gwml2.tasks.data_file_cache import generate_data_well_cache from gwml2.tasks.well import generate_measurement_cache -from gwml2.utilities import temp_disconnect_signal +from gwml2.utilities import temp_disconnect_signal, make_aware_local User = get_user_model() @@ -239,7 +239,7 @@ def _save_measurement( try: obj, created = model.objects.get_or_create( well=harvester_well_data.well, - time=time, + time=make_aware_local(time), parameter=defaults.get('parameter', None), defaults=defaults ) @@ -267,8 +267,22 @@ def _save_measurement( obj.save() return obj - def post_processing_well(self, well: Well): + def post_processing_well( + self, well: Well, generate_country_cache: bool = True + ): """Specifically for processing cache after procesing well.""" print(f'Generate cache for {well.original_id}') + well.update_metadata() generate_measurement_cache(well.id) - generate_data_well_cache(well.id) + generate_data_well_cache( + well.id, generate_country_cache=generate_country_cache + ) + + def update_attribute(self, key: str, value): + """Update attribute.""" + attr, _ = HarvesterAttribute.objects.get_or_create( + harvester=self.harvester, + name=key + ) + attr.value = value + attr.save() diff --git a/harvesters/harvester/ehyd.py b/harvesters/harvester/ehyd.py index 723a5417..df6a60ab 100644 --- a/harvesters/harvester/ehyd.py +++ b/harvesters/harvester/ehyd.py @@ -16,8 +16,13 @@ from gwml2.models.general import Quantity, Unit from gwml2.models.term_measurement_parameter import TermMeasurementParameter from gwml2.models.well import ( - MEASUREMENT_PARAMETER_AMSL, Well, WellLevelMeasurement + MEASUREMENT_PARAMETER_AMSL, WellLevelMeasurement ) +from gwml2.tasks.data_file_cache.country_recache import ( + generate_data_country_cache +) + +last_file_key = 'last-file' class EHYD(BaseHarvester): @@ -26,6 +31,9 @@ class EHYD(BaseHarvester): """ updated = False folder = settings.SFTP_FOLDER + wells_by_original_id = {} + wells = [] + countries = [] def __init__( self, harvester: Harvester, @@ -40,152 +48,196 @@ def __init__( def _process(self): """Pass the process.""" + try: + self.last_file = self.attributes.get(last_file_key, None) + except Exception: + pass + if not os.path.exists(self.folder): raise HarvestingError( - f'Sftp folder not found or not setup. current: {self.folder}' + 'Sftp folder not found or not setup correctly. ' + f'Current: {self.folder}' ) ori_coord = SpatialReference(31287) target_coord = SpatialReference(4326) - trans = CoordTransform(ori_coord, target_coord) + self.trans = CoordTransform(ori_coord, target_coord) + + # Check the wells + well_file = os.path.join(self.folder, 'base data.csv') + if not os.path.exists(well_file): + raise HarvestingError( + '"base data.csv" is not exist. ' + 'Please put it to provide list of wells.' + ) # Get csv file that contains wells data - for (root, dirs, file) in os.walk(self.folder): - for f in file: - split_name = os.path.splitext(f) - if split_name[1] == '.csv': - _file = open( - os.path.join(root, f), 'r', - encoding="utf8", errors='ignore' - ) - reader = csv.DictReader(_file, delimiter=';') - - for row in list(reader): - try: - x = row['X (Lambert cone right)'] - y = row['Y (Lambert cone high)'] - point = Point(float(x), float(y), srid=31287) - point.transform(trans) - date = parser.parse( - row['date of construction'] - ) - well, harvester_well_data = self._save_well( - row['ID'], - row['name'], - longitude=point.coords[0], - latitude=point.coords[1], - ground_surface_elevation_masl=row[ - 'ground level (m above Adriatic Sea)' - ] - ) - if well.drilling: - drilling = well.drilling - else: - drilling = Drilling.objects.create( - year_of_drilling=date.year - ) - well.drilling = drilling - well.save() - except (KeyError, ValueError): - continue - - # Get all files that found - for (root, dirs, file) in os.walk(self.folder): - for f in file: - split_name = os.path.splitext(f) - if split_name[1] == '.csv': - continue - self._update(f'Check file {f}') - _file = open( - os.path.join(root, f), 'r', - encoding="utf8", errors='ignore' + _file = open( + well_file, 'r', encoding="utf8", errors='ignore' + ) + reader = csv.DictReader(_file, delimiter=';') + + for row in list(reader): + try: + x = row['X (Lambert cone right)'] + y = row['Y (Lambert cone high)'] + point = Point(float(x), float(y), srid=31287) + point.transform(self.trans) + date = parser.parse( + row['date of construction'] ) - lines = _file.readlines() + self._update(f'Checking well {row["ID"]}') + well, harvester_well_data = self._save_well( + row['ID'], + row['name'], + longitude=point.coords[0], + latitude=point.coords[1], + ground_surface_elevation_masl=row[ + 'ground level (m above Adriatic Sea)' + ] + ) + last_measurement = WellLevelMeasurement.objects.filter( + well=well, + ).order_by('-time').first() + self.wells_by_original_id[row['ID']] = { + 'harvester_well_data': harvester_well_data, + 'last_measurement': last_measurement + } + if well.drilling: + drilling = well.drilling + else: + drilling = Drilling.objects.create( + year_of_drilling=date.year + ) + well.drilling = drilling + well.save() + except (KeyError, ValueError): + continue + + files = os.listdir(self.folder) + files.sort() + last_file = self.last_file + for filename in files: + name, ext = os.path.splitext(filename) + if ext == '.asc': + try: + datetime.strptime(name, '%Y%m%d%H%M%S') + if not last_file or filename > last_file: + # Process it because of the file is new + last_file = filename + self.process_measurements( + os.path.join(self.folder, filename) + ) + self.update_attribute(last_file_key, filename) + except ValueError as e: + pass + + # Process cache + for well in self.wells: + self._update(f'Processing cache {well.original_id}') + self.post_processing_well(well, generate_country_cache=False) + + # Run country caches + self._update('Run country caches') + countries = list(set(self.countries)) + for country in countries: + generate_data_country_cache(country) + + self._done('Done') + def process_measurement(self, data): + """Process measurement.""" + updated = False + # Save the data + x = data['X'] + y = data['Y'] + point = Point(float(y), float(x), srid=31287) + point.transform(self.trans) + self._update(f'Saving measurement for well {data["Ort"]}') + well_metadata = self.wells_by_original_id[data['Ort']] + harvester_well_data = well_metadata['harvester_well_data'] + last_measurement = well_metadata['last_measurement'] + well = harvester_well_data.well + + for measurement in data['measurements']: + # Save measurements + if last_measurement and \ + measurement['time'] <= last_measurement.time: + continue + defaults = { + 'parameter': self.parameter, + 'value_in_m': measurement['value'] + } + obj = self._save_measurement( + WellLevelMeasurement, + measurement['time'], + defaults, + harvester_well_data + ) + if not obj.value: + obj.value = Quantity.objects.create( + unit=self.unit_m, + value=measurement['value'] + ) + obj.save() + updated = True + if updated: + self.wells.append(well) + self.countries.append(well.country.code) + + def process_measurements(self, path): + """Process measurements.""" + self._update(f'Check file {path}') + _file = open(path, 'r', encoding="utf8", errors='ignore') + lines = _file.readlines() + + data = {} + is_measurement = False + for line in lines: + line = line.replace('\n', '') + # Parse data per measurement + # Start with BEGIN + if line == 'BEGIN': + try: + self.process_measurement(data) + except KeyError: + pass data = {} is_measurement = False - for line in lines: - line = line.replace('\n', '') - # Parse data per measurement - # Start with BEGIN - if line == 'BEGIN': - try: - updated = False - # Save the data - x = data['X'] - y = data['Y'] - point = Point(float(y), float(x), srid=31287) - point.transform(trans) - self._update(f'Saving well {data["Ort"]}') - well, harvester_well_data = self._save_well( - data['Ort'], - '', - longitude=point.coords[1], - latitude=point.coords[0] - ) - last_measurement = WellLevelMeasurement.objects.filter( - well=harvester_well_data.well, - ).order_by('-time').first() - - for measurement in data['measurements']: - # Save measurements - if last_measurement and measurement[ - 'time'] <= last_measurement.time: - print('found') - continue - defaults = { - 'parameter': self.parameter, - 'value_in_m': measurement['value'] - } - obj = self._save_measurement( - WellLevelMeasurement, - measurement['time'], - defaults, - harvester_well_data - ) - if not obj.value: - obj.value = Quantity.objects.create( - unit=self.unit_m, - value=measurement['value'] - ) - obj.save() - updated = True - if updated: - self.post_processing_well(well) - - except KeyError: - pass - data = {} - is_measurement = False - continue - - # If it is measurements word - if line == 'Werte: ': - is_measurement = True - data['measurements'] = [] - continue - - # If measurements - if is_measurement: - columns = line.split(' ') - try: - measurement_time = datetime.strptime( - ' '.join([columns[0], columns[1]]), - '%d.%m.%Y %H:%M:%S' - ).replace(tzinfo=pytz.timezone(settings.TIME_ZONE)) - measurement_value = float(columns[2]) - data['measurements'].append({ - 'time': measurement_time, - 'value': measurement_value - }) - - continue - except (Well.DoesNotExist, ValueError): - continue - - row_data = line.split(': ') - try: - data[row_data[0]] = row_data[1] - except IndexError: - pass - self._done('Done') + continue + + # If it is measurements word + if line == 'Werte: ': + is_measurement = True + data['measurements'] = [] + continue + + # ------------------------------------------------ + # If measurements + # Save it to data['measurements'] + # ------------------------------------------------ + if is_measurement: + try: + columns = line.split(' ') + measurement_time = datetime.strptime( + ' '.join([columns[0], columns[1]]), + '%d.%m.%Y %H:%M:%S' + ).replace(tzinfo=pytz.timezone(settings.TIME_ZONE)) + measurement_value = float(columns[2]) + data['measurements'].append({ + 'time': measurement_time, + 'value': measurement_value + }) + except ValueError: + pass + continue + + # ------------------------------------------------ + # If not measurements + # ------------------------------------------------ + row_data = line.split(': ') + try: + data[row_data[0]] = row_data[1] + except IndexError: + pass + self.process_measurement(data) diff --git a/harvesters/harvester/hubeau.py b/harvesters/harvester/hubeau.py index e6dbf6ee..8162a83f 100644 --- a/harvesters/harvester/hubeau.py +++ b/harvesters/harvester/hubeau.py @@ -1,5 +1,7 @@ """Harvester of using hubeau.""" +import json + import requests from dateutil.parser import parse @@ -10,15 +12,25 @@ from gwml2.models.well import ( Well, MEASUREMENT_PARAMETER_GROUND, WellLevelMeasurement ) +from gwml2.tasks.data_file_cache.country_recache import ( + generate_data_country_cache +) class Hubeau(BaseHarvester): """ Harvester for https://hubeau.eaufrance.fr/page/api-piezometrie + + attributes : + - codes : List of code_bss + Limit the codes that will be fetched. + - re-fetch : boolean + Telling the harvester to try re fetching all measurement. """ domain = 'https://hubeau.eaufrance.fr/api/v1/niveaux_nappes' original_id_key = 'code_bss' updated = False + re_fetch_codes_done_key = 're-fetch-codes-done' def __init__( self, harvester: Harvester, replace: bool = False, @@ -31,27 +43,55 @@ def __init__( def _process(self): """Processing the data.""" + self.codes = None + try: + self.codes = json.loads(self.attributes['codes']) + except Exception: + pass + self.re_fetch = self.attributes.get('re-fetch', False) + self.re_fetch_codes_done = json.loads( + self.attributes.get(self.re_fetch_codes_done_key, '[]') + ) + self.countries = [] # Process the stations self._process_stations( f'{self.domain}/stations?' 'format=json&nb_mesures_piezo_min=2&size=100' ) + + # Run country caches + self._update('Run country caches') + countries = list(set(self.countries)) + for country in countries: + generate_data_country_cache(country) self._done('Done') + def check_prefetch_wells(self, well): + """Get prefetch wells is needed or not""" + if self.re_fetch: + original_id = well.original_id + if original_id not in self.re_fetch_codes_done: + self._update(f'{original_id} - Deleting measurements') + well.welllevelmeasurement_set.all().delete() + self.re_fetch_codes_done.append(original_id) + self.update_attribute( + self.re_fetch_codes_done_key, + json.dumps(self.re_fetch_codes_done) + ) + def _measurement_url(self, params_dict: dict): """Construct url for measurement""" measurement_params = { 'size': 500, - 'fields': 'date_mesure,profondeur_nappe', - 'sort': 'asc', + 'fields': 'date_mesure,profondeur_nappe' } measurement_params.update(params_dict) params = [] for key, value in measurement_params.items(): params.append(f'{key}={value}') - return f'{self.domain}/chroniques_tr?{"&".join(params)}' + return f'{self.domain}/chroniques?{"&".join(params)}' def _process_stations(self, url: str): """Process the stations from the url. @@ -64,6 +104,10 @@ def _process_stations(self, url: str): data = response.json() stations = data['data'] for station in stations: + original_id = station[self.original_id_key] + if self.codes and original_id not in self.codes: + continue + self.updated = False geometry = station['geometry'] if not geometry or not geometry['coordinates']: @@ -71,31 +115,39 @@ def _process_stations(self, url: str): latitude = geometry['coordinates'][1] longitude = geometry['coordinates'][0] - params = { - 'bss_id': station['bss_id'] - } - original_id = station[self.original_id_key] + last_date = None well = self.get_well(original_id, latitude, longitude) + # We just filter by latest one if well: - last_measurement = well.welllevelmeasurement_set.first() - if last_measurement: - last_date = last_measurement.time.strftime("%Y-%m-%d") - params.update( - { - 'date_debut_mesure': last_date - } - ) + # Remove all measurements if re-fetch + self.check_prefetch_wells(well) + last = well.welllevelmeasurement_set.first() + if last: + last_date = last.time.strftime("%Y-%m-%d") # Process measurement try: + # ----------------------- + # Process after + params = { + 'code_bss': original_id, + 'sort': 'asc' + + } + if last_date: + params['date_debut_mesure'] = last_date self._process_measurements( self._measurement_url(params), station, None ) + # ----------------------- # Generate cache well = self.get_well(original_id, latitude, longitude) if well and self.updated: - self.post_processing_well(well) + self.post_processing_well( + well, generate_country_cache=False + ) + self.countries.append(well.country.code) except ( Well.DoesNotExist, requests.exceptions.ConnectionError ): diff --git a/harvesters/models/harvester.py b/harvesters/models/harvester.py index 6c9b1155..04d21555 100644 --- a/harvesters/models/harvester.py +++ b/harvesters/models/harvester.py @@ -1,6 +1,7 @@ from django.contrib.gis.db import models from django.utils.module_loading import import_string from django.utils.translation import ugettext_lazy as _ + from gwml2.models.term import TermFeatureType from gwml2.models.well import Well from gwml2.models.well_management.organisation import Organisation @@ -105,8 +106,7 @@ class HarvesterAttribute(models.Model): help_text=_( "The name of attribute") ) - value = models.CharField( - max_length=100, + value = models.TextField( null=True, default=True, help_text=_( "The value of attribute") diff --git a/management/commands/update_number_of_measurements_well.py b/management/commands/update_number_of_measurements_well.py index 116e8cf1..4f62c8c4 100644 --- a/management/commands/update_number_of_measurements_well.py +++ b/management/commands/update_number_of_measurements_well.py @@ -33,12 +33,4 @@ def handle(self, *args, **options): if idx + 1 < _from: continue print('{}/{}'.format(idx + 1, count)) - well.number_of_measurements_level = well.welllevelmeasurement_set.count() - well.number_of_measurements_quality = well.wellqualitymeasurement_set.count() - well.number_of_measurements_yield = well.wellyieldmeasurement_set.count() - well.number_of_measurements = ( - well.number_of_measurements_level + - well.number_of_measurements_quality + - well.number_of_measurements_yield - ) - well.save() + well.update_metadata() diff --git a/migrations/0072_views.py b/migrations/0072_views.py index 6af02d09..cabd4d55 100644 --- a/migrations/0072_views.py +++ b/migrations/0072_views.py @@ -2,26 +2,11 @@ from django.db import migrations -from gwml2.migrations.sql.utils import load_sql - class Migration(migrations.Migration): dependencies = [ ('gwml2', '0071_auto_20230608_0322'), ] - gwml_default = load_sql('gwml', 'default.sql') - gwml_views = load_sql('gwml', 'views.sql') - gwml_functions = load_sql('gwml', 'functions.sql') - - istsos_default = load_sql('istsos', 'default.sql') - istsos_views = load_sql('istsos', 'views.sql') - operations = [ - migrations.RunSQL(gwml_default, gwml_default), - migrations.RunSQL(gwml_views, gwml_default), - migrations.RunSQL(gwml_functions, gwml_default), - - migrations.RunSQL(istsos_default, istsos_default), - migrations.RunSQL(istsos_views, istsos_default), ] diff --git a/migrations/0079_alter_harvesterattribute_value.py b/migrations/0079_alter_harvesterattribute_value.py new file mode 100644 index 00000000..739562e9 --- /dev/null +++ b/migrations/0079_alter_harvesterattribute_value.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.20 on 2024-02-16 03:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('gwml2', '0078_alter_uploadsession_category'), + ] + + operations = [ + migrations.AlterField( + model_name='harvesterattribute', + name='value', + field=models.TextField(default=True, help_text='The value of attribute', null=True), + ), + ] diff --git a/models/well.py b/models/well.py index a26e0f23..5b618faf 100644 --- a/models/well.py +++ b/models/well.py @@ -239,7 +239,9 @@ def measurement_data(self, measurement_name: str): parameter = measurement.parameter.name if MeasurementModel == WellLevelMeasurement: - if parameter in [MEASUREMENT_PARAMETER_AMSL, MEASUREMENT_PARAMETER_TOP, MEASUREMENT_PARAMETER_GROUND]: + if parameter in [MEASUREMENT_PARAMETER_AMSL, + MEASUREMENT_PARAMETER_TOP, + MEASUREMENT_PARAMETER_GROUND]: parameter = MEASUREMENT_PARAMETER_AMSL if measurement.parameter.name == MEASUREMENT_PARAMETER_TOP: if top_borehole_elevation: @@ -292,6 +294,18 @@ def generate_measurement_cache(self, model=None): file.write(json_bytes) file.close() + def update_metadata(self): + """Update metadata of well.""" + self.number_of_measurements_level = self.welllevelmeasurement_set.count() + self.number_of_measurements_quality = self.wellqualitymeasurement_set.count() + self.number_of_measurements_yield = self.wellyieldmeasurement_set.count() + self.number_of_measurements = ( + self.number_of_measurements_level + + self.number_of_measurements_quality + + self.number_of_measurements_yield + ) + self.save() + # documents class WellDocument(Document): diff --git a/signals/well.py b/signals/well.py index a7367e4a..6ee01173 100644 --- a/signals/well.py +++ b/signals/well.py @@ -7,6 +7,7 @@ Well, WellLevelMeasurement, WellQualityMeasurement, WellYieldMeasurement ) from gwml2.tasks.well import generate_measurement_cache +from gwml2.utilities import make_aware_local # -------------------- WELL -------------------- @@ -96,13 +97,20 @@ def pre_save_measurement(sender, instance, **kwargs): instance.well.number_of_measurements_yield += 1 instance.set_default_value() - if not instance.well.first_time_measurement or \ - instance.time < instance.well.first_time_measurement: - instance.well.first_time_measurement = instance.time - if not instance.well.last_time_measurement or \ - instance.time > instance.well.last_time_measurement: - instance.well.last_time_measurement = instance.time + # Check instance of time + instance_time = make_aware_local(instance.time) + first_time = make_aware_local( + instance.well.first_time_measurement + ) + if not first_time or instance_time < first_time: + instance.well.first_time_measurement = instance_time + + last_time = make_aware_local( + instance.well.last_time_measurement + ) + if not last_time or instance_time > last_time: + instance.well.last_time_measurement = instance_time except Well.DoesNotExist: pass diff --git a/tasks/data_file_cache/country_recache.py b/tasks/data_file_cache/country_recache.py index 8ae0bf63..4708d011 100644 --- a/tasks/data_file_cache/country_recache.py +++ b/tasks/data_file_cache/country_recache.py @@ -137,7 +137,10 @@ def __init__(self, country): ) zip_file.close() - shutil.rmtree(self.folder) + try: + shutil.rmtree(self.folder) + except FileNotFoundError: + pass self.log(f'----- Finish zipping : {country.code} -------') def merge_data_per_well( diff --git a/templates/download/download-page.html b/templates/download/download-page.html index 7990671e..04a240d9 100644 --- a/templates/download/download-page.html +++ b/templates/download/download-page.html @@ -28,7 +28,7 @@

Download Form.