diff --git a/deployment/docker/Dockerfile b/deployment/docker/Dockerfile index 5799515f..22e41042 100644 --- a/deployment/docker/Dockerfile +++ b/deployment/docker/Dockerfile @@ -6,7 +6,7 @@ RUN apt-get update -y && \ spatialite-bin libsqlite3-mod-spatialite \ python3-dev python3-gdal python3-psycopg2 python3-ldap \ python3-pip python3-pil python3-lxml python3-pylibmc \ - uwsgi uwsgi-plugin-python3 + uwsgi uwsgi-plugin-python3 build-essential # Install pip packages ADD deployment/docker/requirements.txt /requirements.txt diff --git a/deployment/docker/requirements.txt b/deployment/docker/requirements.txt index 73b1065b..e1f217bc 100644 --- a/deployment/docker/requirements.txt +++ b/deployment/docker/requirements.txt @@ -67,3 +67,5 @@ openpyxl==3.1.5 # salient sdk numpy==1.26.4 salientsdk==0.3.5 + +python-geohash==0.8.5 diff --git a/django_project/core/celery.py b/django_project/core/celery.py index 316d54e4..34093501 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -56,7 +56,12 @@ 'salient-collector-session': { 'task': 'salient_collector_session', # Run every Monday 02:00 UTC - 'schedule': crontab(minute='0', hour='2', day_of_week='1'), + 'schedule': crontab(minute='0', hour='4', day_of_week='1'), + }, + 'tio-collector-session': { + 'task': 'tio_collector_session', + # Run everyday at 00:30 UTC + 'schedule': crontab(minute='30', hour='00'), }, # Run all ingestor session daily 'run-daily-ingestor-session': { diff --git a/django_project/gap/ingestor/base.py b/django_project/gap/ingestor/base.py index 53e090f2..5068efd6 100644 --- a/django_project/gap/ingestor/base.py +++ b/django_project/gap/ingestor/base.py @@ -6,13 +6,29 @@ """ from typing import Union +import logging +import datetime +import pytz +import uuid +import fsspec +import xarray as xr + +from django.utils import timezone +from django.core.files.storage import default_storage from core.models import BackgroundTask from gap.models import ( CollectorSession, IngestorSession, - IngestorSessionStatus + IngestorSessionStatus, + Dataset, + DatasetStore, + DataSourceFile ) +from gap.utils.zarr import BaseZarrReader + + +logger = logging.getLogger(__name__) class BaseIngestor: @@ -53,6 +69,126 @@ def get_config(self, name: str, default_value = None): return self.session.additional_config.get(name, default_value) +class BaseZarrIngestor(BaseIngestor): + """Base Ingestor class for Zarr product.""" + + default_zarr_name = f'{uuid.uuid4()}.zarr' + + def __init__(self, session, working_dir): + """Initialize base zarr ingestor.""" + super().__init__(session, working_dir) + self.dataset = self._init_dataset() + + self.s3 = BaseZarrReader.get_s3_variables() + self.s3_options = { + 'key': self.s3.get('AWS_ACCESS_KEY_ID'), + 'secret': self.s3.get('AWS_SECRET_ACCESS_KEY'), + 'client_kwargs': BaseZarrReader.get_s3_client_kwargs() + } + self.metadata = {} + + # get zarr data source file + datasourcefile_id = self.get_config('datasourcefile_id') + if datasourcefile_id: + self.datasource_file = DataSourceFile.objects.get( + id=datasourcefile_id) + self.created = not self.get_config( + 'datasourcefile_zarr_exists', True) + else: + datasourcefile_name = self.get_config( + 'datasourcefile_name', f'{self.default_zarr_name}.zarr') + self.datasource_file, self.created = ( + DataSourceFile.objects.get_or_create( + name=datasourcefile_name, + dataset=self.dataset, + format=DatasetStore.ZARR, + defaults={ + 'created_on': timezone.now(), + 'start_date_time': timezone.now(), + 'end_date_time': ( + timezone.now() + ) + } + ) + ) + + def _init_dataset(self) -> Dataset: + """Fetch dataset for this ingestor. + + :raises NotImplementedError: should be implemented in child class + :return: Dataset for this ingestor + :rtype: Dataset + """ + raise NotImplementedError + + def _update_zarr_source_file(self, updated_date: datetime.date): + """Update zarr DataSourceFile start and end datetime. + + :param updated_date: Date that has been processed + :type updated_date: datetime.date + """ + if self.created: + self.datasource_file.start_date_time = datetime.datetime( + updated_date.year, updated_date.month, updated_date.day, + 0, 0, 0, tzinfo=pytz.UTC + ) + self.datasource_file.end_date_time = ( + self.datasource_file.start_date_time + ) + else: + if self.datasource_file.start_date_time.date() > updated_date: + self.datasource_file.start_date_time = datetime.datetime( + updated_date.year, updated_date.month, + updated_date.day, + 0, 0, 0, tzinfo=pytz.UTC + ) + if self.datasource_file.end_date_time.date() < updated_date: + self.datasource_file.end_date_time = datetime.datetime( + updated_date.year, updated_date.month, + updated_date.day, + 0, 0, 0, tzinfo=pytz.UTC + ) + self.datasource_file.save() + + def _remove_temporary_source_file( + self, source_file: DataSourceFile, file_path: str): + """Remove temporary file from collector. + + :param source_file: Temporary File + :type source_file: DataSourceFile + :param file_path: s3 file path + :type file_path: str + """ + try: + default_storage.delete(file_path) + except Exception as ex: + logger.error( + f'Failed to remove original source_file {file_path}!', ex) + finally: + source_file.delete() + + def _open_zarr_dataset(self, drop_variables = []) -> xr.Dataset: + """Open existing Zarr file. + + :param drop_variables: variables to exclude from reader + :type drop_variables: list, optional + :return: xarray dataset + :rtype: xr.Dataset + """ + zarr_url = ( + BaseZarrReader.get_zarr_base_url(self.s3) + + self.datasource_file.name + ) + s3_mapper = fsspec.get_mapper(zarr_url, **self.s3_options) + return xr.open_zarr( + s3_mapper, consolidated=True, drop_variables=drop_variables) + + def verify(self): + """Verify the resulting zarr file.""" + self.zarr_ds = self._open_zarr_dataset() + print(self.zarr_ds) + + def ingestor_revoked_handler(bg_task: BackgroundTask): """Event handler when ingestor task is cancelled by celery. diff --git a/django_project/gap/ingestor/cbam.py b/django_project/gap/ingestor/cbam.py index 584f8d2d..9e5fd0b7 100644 --- a/django_project/gap/ingestor/cbam.py +++ b/django_project/gap/ingestor/cbam.py @@ -130,9 +130,9 @@ def run(self): try: self._run() except Exception as e: - logger.error('Collector CBAM failed!', e) + logger.error('Collector CBAM failed!') logger.error(traceback.format_exc()) - raise Exception(e) + raise e finally: pass @@ -385,8 +385,8 @@ def run(self): ) self.datasource_file.save() except Exception as e: - logger.error('Ingestor CBAM failed!', e) + logger.error('Ingestor CBAM failed!') logger.error(traceback.format_exc()) - raise Exception(e) + raise e finally: pass diff --git a/django_project/gap/ingestor/exceptions.py b/django_project/gap/ingestor/exceptions.py index 859589fa..b3bc3ae1 100644 --- a/django_project/gap/ingestor/exceptions.py +++ b/django_project/gap/ingestor/exceptions.py @@ -43,3 +43,13 @@ class AdditionalConfigNotFoundException(Exception): def __init__(self, key): # noqa self.message = f'{key} is required in additional_config.' super().__init__(self.message) + + +class MissingCollectorSessionException(Exception): + """Collector session not found.""" + + def __init__(self, session_id): # noqa + self.message = ( + f'Missing collector session in IngestorSession {session_id}.' + ) + super().__init__(self.message) diff --git a/django_project/gap/ingestor/salient.py b/django_project/gap/ingestor/salient.py index 55a20cfd..e3b2164f 100644 --- a/django_project/gap/ingestor/salient.py +++ b/django_project/gap/ingestor/salient.py @@ -12,7 +12,6 @@ import datetime import pytz import traceback -import fsspec import s3fs import numpy as np import pandas as pd @@ -28,7 +27,7 @@ Dataset, DataSourceFile, DatasetStore, IngestorSession, CollectorSession, Preferences ) -from gap.ingestor.base import BaseIngestor +from gap.ingestor.base import BaseIngestor, BaseZarrIngestor from gap.utils.netcdf import NetCDFMediaS3, find_start_latlng from gap.utils.zarr import BaseZarrReader from gap.utils.dask import execute_dask_compute @@ -206,14 +205,14 @@ def run(self): self._run() self.session.notes = json.dumps(self.metadata, default=str) except Exception as e: - logger.error('Collector Salient failed!', e) + logger.error('Collector Salient failed!') logger.error(traceback.format_exc()) - raise Exception(e) + raise e finally: pass -class SalientIngestor(BaseIngestor): +class SalientIngestor(BaseZarrIngestor): """Ingestor for Salient seasonal forecast data.""" default_chunks = { @@ -227,39 +226,6 @@ class SalientIngestor(BaseIngestor): def __init__(self, session: IngestorSession, working_dir: str = '/tmp'): """Initialize SalientIngestor.""" super().__init__(session, working_dir) - self.dataset = Dataset.objects.get(name='Salient Seasonal Forecast') - self.s3 = BaseZarrReader.get_s3_variables() - self.s3_options = { - 'key': self.s3.get('AWS_ACCESS_KEY_ID'), - 'secret': self.s3.get('AWS_SECRET_ACCESS_KEY'), - 'client_kwargs': BaseZarrReader.get_s3_client_kwargs() - } - self.metadata = {} - - # get zarr data source file - datasourcefile_id = self.get_config('datasourcefile_id') - if datasourcefile_id: - self.datasource_file = DataSourceFile.objects.get( - id=datasourcefile_id) - self.created = not self.get_config( - 'datasourcefile_zarr_exists', True) - else: - datasourcefile_name = self.get_config( - 'datasourcefile_name', 'salient.zarr') - self.datasource_file, self.created = ( - DataSourceFile.objects.get_or_create( - name=datasourcefile_name, - dataset=self.dataset, - format=DatasetStore.ZARR, - defaults={ - 'created_on': timezone.now(), - 'start_date_time': timezone.now(), - 'end_date_time': ( - timezone.now() - ) - } - ) - ) # min+max are the BBOX that GAP processes # inc and original_min comes from Salient netcdf file @@ -277,6 +243,14 @@ def __init__(self, session: IngestorSession, working_dir: str = '/tmp'): } self.reindex_tolerance = 0.01 + def _init_dataset(self) -> Dataset: + """Fetch dataset for this ingestor. + + :return: Dataset for this ingestor + :rtype: Dataset + """ + return Dataset.objects.get(name='Salient Seasonal Forecast') + def _get_s3_filepath(self, source_file: DataSourceFile): """Get Salient NetCDF temporary file from Collector. @@ -316,52 +290,6 @@ def _open_dataset(self, source_file: DataSourceFile) -> xrDataset: return xr.open_dataset( fs.open(netcdf_url), chunks=self.default_chunks) - def _update_zarr_source_file(self, forecast_date: datetime.date): - """Update zarr DataSourceFile start and end datetime. - - :param forecast_date: Forecast date that has been processed - :type forecast_date: datetime.date - """ - if self.created: - self.datasource_file.start_date_time = datetime.datetime( - forecast_date.year, forecast_date.month, forecast_date.day, - 0, 0, 0, tzinfo=pytz.UTC - ) - self.datasource_file.end_date_time = ( - self.datasource_file.start_date_time - ) - else: - if self.datasource_file.start_date_time.date() > forecast_date: - self.datasource_file.start_date_time = datetime.datetime( - forecast_date.year, forecast_date.month, - forecast_date.day, - 0, 0, 0, tzinfo=pytz.UTC - ) - if self.datasource_file.end_date_time.date() < forecast_date: - self.datasource_file.end_date_time = datetime.datetime( - forecast_date.year, forecast_date.month, - forecast_date.day, - 0, 0, 0, tzinfo=pytz.UTC - ) - self.datasource_file.save() - - def _remove_temporary_source_file( - self, source_file: DataSourceFile, file_path: str): - """Remove temporary NetCDFFile from collector. - - :param source_file: Temporary NetCDF File - :type source_file: DataSourceFile - :param file_path: s3 file path - :type file_path: str - """ - try: - default_storage.delete(file_path) - except Exception as ex: - logger.error( - f'Failed to remove original source_file {file_path}!', ex) - finally: - source_file.delete() - def _run(self): """Run Salient ingestor.""" logger.info(f'Running data ingestor for Salient: {self.session.id}.') @@ -538,18 +466,8 @@ def run(self): self._run() self.session.notes = json.dumps(self.metadata, default=str) except Exception as e: - logger.error('Ingestor Salient failed!', e) + logger.error('Ingestor Salient failed!') logger.error(traceback.format_exc()) - raise Exception(e) + raise e finally: pass - - def verify(self): - """Verify the resulting zarr file.""" - zarr_url = ( - BaseZarrReader.get_zarr_base_url(self.s3) + - self.datasource_file.name - ) - s3_mapper = fsspec.get_mapper(zarr_url, **self.s3_options) - self.zarr_ds = xr.open_zarr(s3_mapper, consolidated=True) - print(self.zarr_ds) diff --git a/django_project/gap/ingestor/tio_shortterm.py b/django_project/gap/ingestor/tio_shortterm.py index cf38f614..7d0aed3a 100644 --- a/django_project/gap/ingestor/tio_shortterm.py +++ b/django_project/gap/ingestor/tio_shortterm.py @@ -10,21 +10,39 @@ import os import traceback import uuid -from datetime import timedelta +import zipfile +import fsspec +import numpy as np +import pandas as pd +import xarray as xr +import dask.array as da +import geohash +from typing import List +from datetime import timedelta, date from django.conf import settings from django.core.files.base import ContentFile from django.core.files.storage import default_storage +from django.contrib.gis.db.models.functions import Centroid from django.utils import timezone from core.utils.s3 import zip_folder_in_s3 -from gap.ingestor.base import BaseIngestor +from gap.ingestor.base import BaseIngestor, BaseZarrIngestor +from gap.ingestor.exceptions import ( + MissingCollectorSessionException, FileNotFoundException, + AdditionalConfigNotFoundException +) from gap.models import ( - CastType, CollectorSession, DataSourceFile, DatasetStore, Grid + CastType, CollectorSession, DataSourceFile, DatasetStore, Grid, + IngestorSession, Dataset ) from gap.providers import TomorrowIODatasetReader from gap.providers.tio import tomorrowio_shortterm_forecast_dataset from gap.utils.reader import DatasetReaderInput +from gap.utils.zarr import BaseZarrReader +from gap.utils.netcdf import find_start_latlng +from gap.utils.dask import execute_dask_compute + logger = logging.getLogger(__name__) @@ -48,9 +66,10 @@ def __init__(self, session: CollectorSession, working_dir: str = '/tmp'): # Total days: 21 self.start_dt = today - timedelta(days=6) self.end_dt = today + timedelta(days=15) + self.forecast_date = today def _run(self): - """Run Salient ingestor.""" + """Run TomorrowIO ingestor.""" s3_storage = default_storage zip_file = path(f"{uuid.uuid4()}.zip") dataset = self.dataset @@ -63,7 +82,10 @@ def _run(self): format=DatasetStore.ZIP_FILE, defaults={ 'name': zip_file, - 'created_on': timezone.now() + 'created_on': timezone.now(), + 'metadata': { + 'forecast_date': self.forecast_date.date().isoformat() + } } ) filename = data_source_file.name.split('/')[-1] @@ -109,6 +131,9 @@ def _run(self): s3_storage, folder_path=folder, zip_file_name=zip_file ) + # Add data source file to collector result + self.session.dataset_files.set([data_source_file]) + def run(self): """Run Tio Short Term Ingestor.""" # Run the ingestion @@ -120,3 +145,536 @@ def run(self): raise Exception(e) finally: pass + + +class CoordMapping: + """Mapping coordinate between Grid and Zarr.""" + + def __init__(self, value, nearest_idx, nearest_val) -> None: + """Initialize coordinate mapping class. + + :param value: lat/lon value from Grid + :type value: float + :param nearest_idx: nearest index in Zarr + :type nearest_idx: int + :param nearest_val: nearest value in Zarr + :type nearest_val: float + """ + self.value = value + self.nearest_idx = nearest_idx + self.nearest_val = nearest_val + + +class TioShortTermIngestor(BaseZarrIngestor): + """Ingestor Tio Short Term data into Zarr.""" + + default_chunks = { + 'forecast_date': 10, + 'forecast_day_idx': 21, + 'lat': 150, + 'lon': 110 + } + + variables = [ + 'total_rainfall', + 'total_evapotranspiration_flux', + 'max_temperature', + 'min_temperature', + 'precipitation_probability', + 'humidity_maximum', + 'humidity_minimum', + 'wind_speed_avg' + ] + + def __init__(self, session: IngestorSession, working_dir: str = '/tmp'): + """Initialize TioShortTermIngestor.""" + super().__init__(session, working_dir) + + self.metadata = { + 'chunks': [], + 'total_json_processed': 0 + } + + # min+max are the BBOX that GAP processes + self.lat_metadata = { + 'min': -27, + 'max': 16, + 'inc': 0.03586314, + 'original_min': -4.65013565 + } + self.lon_metadata = { + 'min': 21.8, + 'max': 52, + 'inc': 0.036353, + 'original_min': 33.91823667 + } + self.reindex_tolerance = 0.001 + self.existing_dates = None + + def _init_dataset(self) -> Dataset: + """Fetch dataset for this ingestor. + + :return: Dataset for this ingestor + :rtype: Dataset + """ + return Dataset.objects.get( + name='Tomorrow.io Short-term Forecast', + store_type=DatasetStore.ZARR + ) + + def _is_date_in_zarr(self, date: date) -> bool: + """Check whether a date has been added to zarr file. + + :param date: date to check + :type date: date + :return: True if date exists in zarr file. + :rtype: bool + """ + if self.created: + return False + if self.existing_dates is None: + ds = self._open_zarr_dataset(self.variables) + self.existing_dates = ds.forecast_date.values + ds.close() + np_date = np.datetime64(f'{date.isoformat()}') + return np_date in self.existing_dates + + def _append_new_forecast_date( + self, forecast_date: date, is_new_dataset=False): + """Append a new forecast date to the zarr structure. + + The dataset will be initialized with empty values. + :param forecast_date: forecast date + :type forecast_date: date + """ + # expand lat and lon + min_lat = find_start_latlng(self.lat_metadata) + min_lon = find_start_latlng(self.lon_metadata) + new_lat = np.arange( + min_lat, self.lat_metadata['max'] + self.lat_metadata['inc'], + self.lat_metadata['inc'] + ) + new_lon = np.arange( + min_lon, self.lon_metadata['max'] + self.lon_metadata['inc'], + self.lon_metadata['inc'] + ) + + # create empty data variables + empty_shape = ( + 1, + self.default_chunks['forecast_day_idx'], + len(new_lat), + len(new_lon) + ) + chunks = ( + 1, + self.default_chunks['forecast_day_idx'], + self.default_chunks['lat'], + self.default_chunks['lon'] + ) + + # Create the Dataset + forecast_date_array = pd.date_range( + forecast_date.isoformat(), periods=1) + forecast_day_indices = np.arange(-6, 15, 1) + data_vars = {} + encoding = { + 'forecast_date': { + 'chunks': self.default_chunks['forecast_date'] + } + } + for var in self.variables: + empty_data = da.empty(empty_shape, chunks=chunks) + data_vars[var] = ( + ['forecast_date', 'forecast_day_idx', 'lat', 'lon'], + empty_data + ) + encoding[var] = { + 'chunks': ( + self.default_chunks['forecast_date'], + self.default_chunks['forecast_day_idx'], + self.default_chunks['lat'], + self.default_chunks['lon'] + ) + } + ds = xr.Dataset( + data_vars=data_vars, + coords={ + 'forecast_date': ('forecast_date', forecast_date_array), + 'forecast_day_idx': ( + 'forecast_day_idx', forecast_day_indices), + 'lat': ('lat', new_lat), + 'lon': ('lon', new_lon) + } + ) + + # write/append to zarr + # note: when writing to a new chunk of forecast_date, + # the memory usage will be higher than the rest + zarr_url = ( + BaseZarrReader.get_zarr_base_url(self.s3) + + self.datasource_file.name + ) + if is_new_dataset: + # write + x = ds.to_zarr( + zarr_url, mode='w', consolidated=True, + encoding=encoding, + storage_options=self.s3_options, + compute=False + ) + else: + # append + x = ds.to_zarr( + zarr_url, mode='a', append_dim='forecast_date', + consolidated=True, + storage_options=self.s3_options, + compute=False + ) + execute_dask_compute(x) + + # close dataset and remove empty_data + ds.close() + del ds + del empty_data + + def _is_sorted_and_incremented(self, arr): + """Check if array is sorted ascending and incremented by 1. + + :param arr: array + :type arr: List + :return: True if array is sorted and incremented by 1 + :rtype: bool + """ + if not arr: + return False + if len(arr) == 1: + return True + return all(arr[i] + 1 == arr[i + 1] for i in range(len(arr) - 1)) + + def _transform_coordinates_array( + self, coord_arr, coord_type) -> List[CoordMapping]: + """Find nearest in Zarr for array of lat/lon. + + :param coord_arr: array of lat/lon + :type coord_arr: List[float] + :param coord_type: lat or lon + :type coord_type: str + :return: List CoordMapping with nearest val/idx + :rtype: List[CoordMapping] + """ + # open existing zarr + ds = self._open_zarr_dataset() + + # find nearest coordinate for each item + results: List[CoordMapping] = [] + for target_coord in coord_arr: + if coord_type == 'lat': + nearest_coord = ds['lat'].sel( + lat=target_coord, method='nearest', + tolerance=self.reindex_tolerance + ).item() + else: + nearest_coord = ds['lon'].sel( + lon=target_coord, method='nearest', + tolerance=self.reindex_tolerance + ).item() + + coord_idx = np.where(ds[coord_type].values == nearest_coord)[0][0] + results.append( + CoordMapping(target_coord, coord_idx, nearest_coord) + ) + + # close dataset + ds.close() + + return results + + def _find_chunk_slices( + self, arr_length: int, chunk_size: int) -> List: + """Create chunk slices for processing Tio data. + + Given arr with length 300 and chunk_size 150, + this method will return [slice(0, 150), slice(150, 300)]. + :param arr_length: length of array + :type arr_length: int + :param chunk_size: chunk size + :type chunk_size: int + :return: list of slice + :rtype: List + """ + coord_slices = [] + for coord_range in range(0, arr_length, chunk_size): + max_idx = coord_range + chunk_size + coord_slices.append( + slice( + coord_range, + max_idx if max_idx < arr_length else arr_length + ) + ) + return coord_slices + + def _update_by_region( + self, forecast_date: date, lat_arr: List[CoordMapping], + lon_arr: List[CoordMapping], new_data: dict): + """Update new_data to the zarr by its forecast_date. + + The lat_arr and lon_arr should already be chunked + before calling this method. + :param forecast_date: forecast date of the new data + :type forecast_date: date + :param lat_arr: list of lat coordinate mapping + :type lat_arr: List[CoordMapping] + :param lon_arr: list of lon coordinate mapping + :type lon_arr: List[CoordMapping] + :param new_data: dictionary of new data + :type new_data: dict + """ + # open existing zarr + ds = self._open_zarr_dataset() + + # find index of forecast_date + forecast_date_array = pd.date_range( + forecast_date.isoformat(), periods=1) + new_forecast_date = forecast_date_array[0] + forecast_date_idx = ( + np.where(ds['forecast_date'].values == new_forecast_date)[0][0] + ) + + # find nearest lat and lon and its indices + nearest_lat_arr = [lat.nearest_val for lat in lat_arr] + nearest_lat_indices = [lat.nearest_idx for lat in lat_arr] + + nearest_lon_arr = [lon.nearest_val for lon in lon_arr] + nearest_lon_indices = [lon.nearest_idx for lon in lon_arr] + + # ensure that the lat/lon indices are in correct order + assert self._is_sorted_and_incremented(nearest_lat_indices) + assert self._is_sorted_and_incremented(nearest_lon_indices) + + # Create the dataset with updated data for the region + data_vars = { + var: ( + ['forecast_date', 'forecast_day_idx', 'lat', 'lon'], + new_data[var] + ) for var in new_data + } + new_ds = xr.Dataset( + data_vars=data_vars, + coords={ + 'forecast_date': [new_forecast_date], + 'forecast_day_idx': ds['forecast_day_idx'], + 'lat': nearest_lat_arr, + 'lon': nearest_lon_arr + } + ) + + # write the updated data to zarr + zarr_url = ( + BaseZarrReader.get_zarr_base_url(self.s3) + + self.datasource_file.name + ) + x = new_ds.to_zarr( + zarr_url, + mode='a', + region={ + 'forecast_date': slice( + forecast_date_idx, forecast_date_idx + 1), + 'forecast_day_idx': slice(None), + 'lat': slice( + nearest_lat_indices[0], nearest_lat_indices[-1] + 1), + 'lon': slice( + nearest_lon_indices[0], nearest_lon_indices[-1] + 1) + }, + storage_options=self.s3_options, + consolidated=True, + compute=False + ) + execute_dask_compute(x) + + def _run(self): + """Process the tio shortterm data into Zarr.""" + collector = self.session.collectors.first() + if not collector: + raise MissingCollectorSessionException(self.session.id) + data_source = collector.dataset_files.first() + if not data_source: + raise FileNotFoundException() + + # find forecast date + if 'forecast_date' not in data_source.metadata: + raise AdditionalConfigNotFoundException('metadata.forecast_date') + self.metadata['forecast_date'] = data_source.metadata['forecast_date'] + forecast_date = date.fromisoformat( + data_source.metadata['forecast_date']) + if not self._is_date_in_zarr(forecast_date): + self._append_new_forecast_date(forecast_date, self.created) + + # get lat and lon array from grids + lat_arr = set() + lon_arr = set() + grid_dict = {} + + # query grids + grids = Grid.objects.annotate( + centroid=Centroid('geometry') + ) + for grid in grids: + lat = round(grid.centroid.y, 8) + lon = round(grid.centroid.x, 8) + grid_hash = geohash.encode(lat, lon, precision=8) + lat_arr.add(lat) + lon_arr.add(lon) + grid_dict[grid_hash] = grid.id + lat_arr = sorted(lat_arr) + lon_arr = sorted(lon_arr) + + # transform lat lon arrays + lat_arr = self._transform_coordinates_array(lat_arr, 'lat') + lon_arr = self._transform_coordinates_array(lon_arr, 'lon') + + lat_indices = [lat.nearest_idx for lat in lat_arr] + lon_indices = [lon.nearest_idx for lon in lon_arr] + assert self._is_sorted_and_incremented(lat_indices) + assert self._is_sorted_and_incremented(lon_indices) + + # create slices for chunks + lat_slices = self._find_chunk_slices( + len(lat_arr), self.default_chunks['lat']) + lon_slices = self._find_chunk_slices( + len(lon_arr), self.default_chunks['lon']) + + # open zip file and process the data by chunks + with default_storage.open(data_source.name) as _file: + with zipfile.ZipFile(_file, 'r') as zip_file: + for lat_slice in lat_slices: + for lon_slice in lon_slices: + lat_chunks = lat_arr[lat_slice] + lon_chunks = lon_arr[lon_slice] + warnings, count = self._process_tio_shortterm_data( + forecast_date, lat_chunks, lon_chunks, + grid_dict, zip_file + ) + self.metadata['chunks'].append({ + 'lat_slice': str(lat_slice), + 'lon_slice': str(lon_slice), + 'warnings': warnings + }) + self.metadata['total_json_processed'] += count + + # update end date of zarr datasource file + self._update_zarr_source_file(forecast_date) + + # remove temporary source file + remove_temp_file = self.get_config('remove_temp_file', True) + if remove_temp_file: + self._remove_temporary_source_file(data_source, data_source.name) + + def run(self): + """Run TomorrowIO Ingestor.""" + # Run the ingestion + try: + self._run() + self.session.notes = json.dumps(self.metadata, default=str) + except Exception as e: + logger.error('Ingestor TomorrowIO failed!') + logger.error(traceback.format_exc()) + raise e + finally: + pass + + def _process_tio_shortterm_data( + self, forecast_date: date, lat_arr: List[CoordMapping], + lon_arr: List[CoordMapping], grids: dict, + zip_file: zipfile.ZipFile) -> dict: + """Process Tio data and update into zarr. + + :param forecast_date: forecast date + :type forecast_date: date + :param lat_arr: list of latitude + :type lat_arr: List[CoordMapping] + :param lon_arr: list of longitude + :type lon_arr: List[CoordMapping] + :param grids: dictionary for geohash and grid id + :type grids: dict + :param zip_file: zip file from collector + :type zip_file: zipfile.ZipFile + :return: dictionary of warnings + :rtype: dict + """ + zip_file_list = zip_file.namelist() + print(f'name list {zip_file_list}') + count = 0 + data_shape = ( + 1, + self.default_chunks['forecast_day_idx'], + len(lat_arr), + len(lon_arr) + ) + warnings = { + 'missing_hash': 0, + 'missing_json': 0, + 'invalid_json': 0 + } + + # initialize empty new data for each variable + new_data = {} + for variable in self.variables: + new_data[variable] = np.empty(data_shape) + + for idx_lat, lat in enumerate(lat_arr): + for idx_lon, lon in enumerate(lon_arr): + # find grid id by geohash of lat and lon + grid_hash = geohash.encode(lat.value, lon.value, precision=8) + if grid_hash not in grids: + warnings['missing_hash'] += 1 + continue + + # open the grid json file using grid id from grid_hash + json_filename = f'grid-{grids[grid_hash]}.json' + if json_filename not in zip_file_list: + warnings['missing_json'] += 1 + continue + + with zip_file.open(json_filename) as _file: + data = json.loads(_file.read().decode('utf-8')) + + # there might be invalid json (e.g. API returns error) + if 'data' not in data: + warnings['invalid_json'] += 1 + continue + + # iterate for each item in data + assert ( + len(data['data']) == + self.default_chunks['forecast_day_idx'] + ) + forecast_day_idx = 0 + for item in data['data']: + values = item['values'] + for var in values: + if var not in new_data: + continue + # assign the variable value into new data + new_data[var][ + 0, forecast_day_idx, idx_lat, idx_lon] = ( + values[var] + ) + forecast_day_idx += 1 + count += 1 + + # update new data to zarr using region + self._update_by_region(forecast_date, lat_arr, lon_arr, new_data) + del new_data + + return warnings, count + + def verify(self): + """Verify the resulting zarr file.""" + zarr_url = ( + BaseZarrReader.get_zarr_base_url(self.s3) + + self.datasource_file.name + ) + s3_mapper = fsspec.get_mapper(zarr_url, **self.s3_options) + self.zarr_ds = xr.open_zarr(s3_mapper, consolidated=True) + print(self.zarr_ds) diff --git a/django_project/gap/models/ingestor.py b/django_project/gap/models/ingestor.py index c1d038df..7a75548d 100644 --- a/django_project/gap/models/ingestor.py +++ b/django_project/gap/models/ingestor.py @@ -192,6 +192,7 @@ def _run(self, working_dir): from gap.ingestor.arable import ArableIngestor from gap.ingestor.tahmo_api import TahmoAPIIngestor from gap.ingestor.wind_borne_systems import WindBorneSystemsIngestor + from gap.ingestor.tio_shortterm import TioShortTermIngestor ingestor = None if self.ingestor_type == IngestorType.TAHMO: @@ -210,6 +211,8 @@ def _run(self, working_dir): ingestor = TahmoAPIIngestor elif self.ingestor_type == IngestorType.WIND_BORNE_SYSTEMS_API: ingestor = WindBorneSystemsIngestor + elif self.ingestor_type == IngestorType.TOMORROWIO: + ingestor = TioShortTermIngestor if ingestor: ingestor(self, working_dir).run() diff --git a/django_project/gap/tasks/collector.py b/django_project/gap/tasks/collector.py index d1844c63..bc85a8de 100644 --- a/django_project/gap/tasks/collector.py +++ b/django_project/gap/tasks/collector.py @@ -48,14 +48,18 @@ def run_cbam_collector_session(): ) -@app.task(name='salient_collector_session') -def run_salient_collector_session(): - """Run Collector for Salient Dataset.""" - dataset = Dataset.objects.get(name='Salient Seasonal Forecast') - # create the collector object - collector_session = CollectorSession.objects.create( - ingestor_type=IngestorType.SALIENT - ) +def _do_run_zarr_collector( + dataset: Dataset, collector_session: CollectorSession, + ingestor_type): + """Run collector for zarr file. + + :param dataset: dataset + :type dataset: Dataset + :param collector_session: collector session to be run + :type collector_session: CollectorSession + :param ingestor_type: ingestor type + :type ingestor_type: IngestorType + """ # run collector collector_session.run() @@ -76,9 +80,33 @@ def run_salient_collector_session(): 'datasourcefile_zarr_exists': True } session = IngestorSession.objects.create( - ingestor_type=IngestorType.SALIENT, + ingestor_type=ingestor_type, trigger_task=False, additional_config=additional_conf ) session.collectors.add(collector_session) run_ingestor_session.delay(session.id) + + +@app.task(name='salient_collector_session') +def run_salient_collector_session(): + """Run Collector for Salient Dataset.""" + dataset = Dataset.objects.get(name='Salient Seasonal Forecast') + collector_session = CollectorSession.objects.create( + ingestor_type=IngestorType.SALIENT + ) + _do_run_zarr_collector(dataset, collector_session, IngestorType.SALIENT) + + +@app.task(name='tio_collector_session') +def run_tio_collector_session(): + """Run Collector for Tomorrow.io Dataset.""" + dataset = Dataset.objects.get( + name='Tomorrow.io Short-term Forecast', + store_type=DatasetStore.ZARR + ) + # create the collector object + collector_session = CollectorSession.objects.create( + ingestor_type=IngestorType.TIO_FORECAST_COLLECTOR + ) + _do_run_zarr_collector(dataset, collector_session, IngestorType.TOMORROWIO) diff --git a/django_project/gap/tests/ingestor/data/tio_shorterm_collector/grid_data.zip b/django_project/gap/tests/ingestor/data/tio_shorterm_collector/grid_data.zip new file mode 100644 index 00000000..e15c7e9e Binary files /dev/null and b/django_project/gap/tests/ingestor/data/tio_shorterm_collector/grid_data.zip differ diff --git a/django_project/gap/tests/ingestor/test_cbam.py b/django_project/gap/tests/ingestor/test_cbam.py index 2ee96e73..6ed5def0 100644 --- a/django_project/gap/tests/ingestor/test_cbam.py +++ b/django_project/gap/tests/ingestor/test_cbam.py @@ -160,7 +160,10 @@ def test_init(self, mock_get_s3_client_kwargs, mock_get_s3_variables): mock_get_s3_client_kwargs.return_value = { 'endpoint_url': 'https://test-endpoint.com' } - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) ingestor = CBAMIngestor(session) self.assertEqual(ingestor.s3['AWS_ACCESS_KEY_ID'], 'test_access_key') self.assertEqual(ingestor.s3_options['key'], 'test_access_key') @@ -191,7 +194,8 @@ def test_init_with_existing_source( additional_config={ 'datasourcefile_id': datasource.id, 'datasourcefile_zarr_exists': True - } + }, + trigger_task=False ) ingestor = CBAMIngestor(session) self.assertEqual(ingestor.s3['AWS_ACCESS_KEY_ID'], 'test_access_key') @@ -218,7 +222,10 @@ def test_is_date_in_zarr(self, mock_open_dataset): mock_ds.date.values = np.array([np.datetime64('2023-01-01')]) mock_open_dataset.return_value = mock_ds - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) ingestor = CBAMIngestor(session) ingestor.created = False ingestor.existing_dates = None @@ -232,7 +239,10 @@ def test_run( self, mock_setup_reader, mock_open_dataset, mock_open_dataset_reader ): """Test run ingestor.""" - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) ingestor = CBAMIngestor(session) mock_ds = MagicMock(spec=xrDataset) mock_open_dataset.return_value = mock_ds @@ -260,7 +270,10 @@ def test_run( @patch('json.dumps') def test_run_success(self, mock_json_dumps): """Test successful run.""" - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) ingestor = CBAMIngestor(session) ingestor._run = MagicMock() ingestor.metadata = { @@ -292,7 +305,10 @@ def test_cancel_run( self, mock_setup_reader, mock_open_dataset, mock_open_dataset_reader ): """Test cancel run.""" - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) session.is_cancelled = True session.save() ingestor = CBAMIngestor(session) @@ -356,7 +372,10 @@ def test_store_as_zarr( mock_get_zarr_base_url.return_value = 's3://bucket/' # Create instance of CBAMIngestor - session = IngestorSession.objects.create() + session = IngestorSession.objects.create( + ingestor_type=IngestorType.CBAM, + trigger_task=False + ) instance = CBAMIngestor(session=session) instance.created = True # Simulate that Zarr file doesn't exist yet instance.s3 = { diff --git a/django_project/gap/tests/ingestor/test_salient.py b/django_project/gap/tests/ingestor/test_salient.py index a811ccf3..55b1f3c6 100644 --- a/django_project/gap/tests/ingestor/test_salient.py +++ b/django_project/gap/tests/ingestor/test_salient.py @@ -37,7 +37,7 @@ class SalientIngestorBaseTest(TestCase): ] def setUp(self): - """Set CBAMIngestorBaseTest.""" + """Set SalientIngestorBaseTest.""" self.dataset = Dataset.objects.get(name='Salient Seasonal Forecast') @@ -168,7 +168,7 @@ def test_run_salient_collector_session( # assert session = IngestorSession.objects.filter( ingestor_type=IngestorType.SALIENT, - ).last() + ).order_by('id').last() self.assertTrue(session) self.assertEqual(session.collectors.count(), 1) mock_collector.assert_called_once() @@ -201,7 +201,8 @@ def test_init_with_existing_source( additional_config={ 'datasourcefile_id': datasource.id, 'datasourcefile_zarr_exists': True - } + }, + trigger_task=False ) ingestor = SalientIngestor(session) self.assertEqual(ingestor.s3['AWS_ACCESS_KEY_ID'], 'test_access_key') @@ -224,7 +225,8 @@ def setUp(self, mock_zarr_reader, mock_netcdf_media_s3): ) self.collector.dataset_files.set([self.datasourcefile]) self.session = IngestorSession.objects.create( - ingestor_type=IngestorType.SALIENT + ingestor_type=IngestorType.SALIENT, + trigger_task=False ) self.session.collectors.set([self.collector]) diff --git a/django_project/gap/tests/ingestor/test_tio_shortterm_collector.py b/django_project/gap/tests/ingestor/test_tio_shortterm_collector.py index d1d82d31..e0b6e712 100644 --- a/django_project/gap/tests/ingestor/test_tio_shortterm_collector.py +++ b/django_project/gap/tests/ingestor/test_tio_shortterm_collector.py @@ -124,9 +124,14 @@ def test_collector_one_grid(self, mock_timezone): ) session.run() session.refresh_from_db() + self.assertEqual(session.dataset_files.count(), 1) self.assertEqual(session.status, IngestorSessionStatus.SUCCESS) self.assertEqual(DataSourceFile.objects.count(), 1) - _file = default_storage.open(DataSourceFile.objects.first().name) + data_source = DataSourceFile.objects.first() + self.assertIn('forecast_date', data_source.metadata) + self.assertEqual( + data_source.metadata['forecast_date'], today.date().isoformat()) + _file = default_storage.open(data_source.name) with zipfile.ZipFile(_file, 'r') as zip_file: self.assertEqual(len(zip_file.filelist), 1) _file = zip_file.open(f'grid-{grid.id}.json') diff --git a/django_project/gap/tests/ingestor/test_tio_shortterm_ingestor.py b/django_project/gap/tests/ingestor/test_tio_shortterm_ingestor.py new file mode 100644 index 00000000..eebe1405 --- /dev/null +++ b/django_project/gap/tests/ingestor/test_tio_shortterm_ingestor.py @@ -0,0 +1,377 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Unit tests for Tio Shortterm Ingestor. +""" + +import os +from unittest.mock import patch, MagicMock +from datetime import date +import zipfile +import numpy as np +import pandas as pd +import dask.array as da +from xarray.core.dataset import Dataset as xrDataset +from django.test import TestCase +from django.contrib.gis.geos import Polygon + +from gap.models import Dataset, DatasetStore +from gap.models.ingestor import ( + IngestorSession, + IngestorType, + CollectorSession +) +from gap.ingestor.tio_shortterm import TioShortTermIngestor, CoordMapping +from gap.ingestor.exceptions import ( + MissingCollectorSessionException, FileNotFoundException, + AdditionalConfigNotFoundException +) +from gap.factories import DataSourceFileFactory, GridFactory +from gap.tasks.collector import run_tio_collector_session + + +LAT_METADATA = { + 'min': -4.65013565, + 'max': 5.46326983, + 'inc': 0.03586314, + 'original_min': -4.65013565 +} +LON_METADATA = { + 'min': 33.91823667, + 'max': 41.84325607, + 'inc': 0.036353, + 'original_min': 33.91823667 +} + + +def mock_open_zarr_dataset(): + """Mock open zarr dataset.""" + new_lat = np.arange( + LAT_METADATA['min'], LAT_METADATA['max'] + LAT_METADATA['inc'], + LAT_METADATA['inc'] + ) + new_lon = np.arange( + LON_METADATA['min'], LON_METADATA['max'] + LON_METADATA['inc'], + LON_METADATA['inc'] + ) + + # Create the Dataset + forecast_date_array = pd.date_range( + '2024-10-02', periods=1) + forecast_day_indices = np.arange(-6, 15, 1) + empty_shape = (1, 21, len(new_lat), len(new_lon)) + chunks = (1, 21, 150, 110) + data_vars = { + 'max_temperature': ( + ['forecast_date', 'forecast_day_idx', 'lat', 'lon'], + da.empty(empty_shape, chunks=chunks) + ) + } + return xrDataset( + data_vars=data_vars, + coords={ + 'forecast_date': ('forecast_date', forecast_date_array), + 'forecast_day_idx': ( + 'forecast_day_idx', forecast_day_indices), + 'lat': ('lat', new_lat), + 'lon': ('lon', new_lon) + } + ) + + +def create_polygon(): + """Create mock polygon for Grid.""" + return Polygon.from_bbox([ + LON_METADATA['min'], + LAT_METADATA['min'], + LON_METADATA['min'] + 2 * LON_METADATA['inc'], + LAT_METADATA['min'] + 2 * LON_METADATA['inc'], + ]) + + +class TestTioIngestor(TestCase): + """Salient ingestor test case.""" + + fixtures = [ + '2.provider.json', + '3.station_type.json', + '4.dataset_type.json', + '5.dataset.json', + '6.unit.json', + '7.attribute.json', + '8.dataset_attribute.json' + ] + + @patch('gap.utils.netcdf.NetCDFMediaS3') + @patch('gap.utils.zarr.BaseZarrReader') + def setUp( + self, mock_zarr_reader, + mock_netcdf_media_s3): + """Initialize TestSalientIngestor.""" + super().setUp() + self.dataset = Dataset.objects.get( + name='Tomorrow.io Short-term Forecast', + store_type=DatasetStore.ZARR + ) + self.collector = CollectorSession.objects.create( + ingestor_type=IngestorType.TIO_FORECAST_COLLECTOR + ) + self.datasourcefile = DataSourceFileFactory.create( + dataset=self.dataset, + name='2024-10-02.zip', + format=DatasetStore.ZIP_FILE, + metadata={ + 'forecast_date': '2024-10-02' + } + ) + self.collector.dataset_files.set([self.datasourcefile]) + self.zarr_source = DataSourceFileFactory.create( + dataset=self.dataset, + format=DatasetStore.ZARR, + name='tio.zarr' + ) + self.session = IngestorSession.objects.create( + ingestor_type=IngestorType.TOMORROWIO, + trigger_task=False, + additional_config={ + 'datasourcefile_id': self.zarr_source.id, + 'datasourcefile_zarr_exists': True + } + ) + self.session.collectors.set([self.collector]) + + self.mock_zarr_reader = mock_zarr_reader + self.mock_netcdf_media_s3 = mock_netcdf_media_s3 + # self.mock_dask_compute = mock_dask_compute + + self.ingestor = TioShortTermIngestor(self.session) + self.ingestor.lat_metadata = LAT_METADATA + self.ingestor.lon_metadata = LON_METADATA + + @patch('gap.utils.zarr.BaseZarrReader.get_s3_variables') + @patch('gap.utils.zarr.BaseZarrReader.get_s3_client_kwargs') + def test_init_with_existing_source( + self, mock_get_s3_client_kwargs, mock_get_s3_variables + ): + """Test init method with existing DataSourceFile.""" + datasource = DataSourceFileFactory.create( + dataset=self.dataset, + format=DatasetStore.ZARR, + name='tio_test.zarr' + ) + mock_get_s3_variables.return_value = { + 'AWS_ACCESS_KEY_ID': 'test_access_key', + 'AWS_SECRET_ACCESS_KEY': 'test_secret_key' + } + mock_get_s3_client_kwargs.return_value = { + 'endpoint_url': 'https://test-endpoint.com' + } + session = IngestorSession.objects.create( + ingestor_type=IngestorType.TOMORROWIO, + additional_config={ + 'datasourcefile_id': datasource.id, + 'datasourcefile_zarr_exists': True, + 'remove_temp_file': False + }, + trigger_task=False + ) + ingestor = TioShortTermIngestor(session) + self.assertEqual(ingestor.s3['AWS_ACCESS_KEY_ID'], 'test_access_key') + self.assertEqual(ingestor.s3_options['key'], 'test_access_key') + self.assertTrue(ingestor.datasource_file) + self.assertEqual(ingestor.datasource_file.name, datasource.name) + self.assertFalse(ingestor.created) + + def test_run_with_exception(self): + """Test exception during run.""" + session = IngestorSession.objects.create( + ingestor_type=IngestorType.TOMORROWIO, + trigger_task=False + ) + ingestor = TioShortTermIngestor(session) + with self.assertRaises(MissingCollectorSessionException) as context: + ingestor._run() + self.assertTrue( + 'Missing collector session' in context.exception.message) + + collector = CollectorSession.objects.create( + ingestor_type=IngestorType.TIO_FORECAST_COLLECTOR + ) + session.collectors.set([collector]) + with self.assertRaises(FileNotFoundException) as context: + ingestor._run() + self.assertTrue('File not found.' in context.exception.message) + + datasource = DataSourceFileFactory.create( + dataset=self.dataset, + name='2024-10-02.zip', + metadata={} + ) + collector.dataset_files.set([datasource]) + with self.assertRaises(AdditionalConfigNotFoundException) as context: + ingestor.run() + self.assertTrue('metadata.forecast_date' in context.exception.message) + + @patch('gap.ingestor.tio_shortterm.execute_dask_compute') + def test_append_new_forecast_date(self, mock_dask_compute): + """Test append new forecast date method.""" + forecast_date = date(2024, 10, 1) + self.ingestor._append_new_forecast_date(forecast_date, True) + mock_dask_compute.assert_called_once() + + mock_dask_compute.reset_mock() + forecast_date = date(2024, 10, 2) + self.ingestor._append_new_forecast_date(forecast_date, False) + mock_dask_compute.assert_called_once() + + def test_is_date_in_zarr(self): + """Test check date in zarr function.""" + with patch.object(self.ingestor, '_open_zarr_dataset') as mock_open: + mock_open.return_value = mock_open_zarr_dataset() + self.assertTrue(self.ingestor._is_date_in_zarr(date(2024, 10, 2))) + mock_open.assert_called_once() + mock_open.reset_mock() + self.assertFalse(self.ingestor._is_date_in_zarr(date(2024, 10, 1))) + mock_open.assert_not_called() + # created is True + self.ingestor.created = True + self.assertFalse(self.ingestor._is_date_in_zarr(date(2024, 10, 2))) + self.ingestor.created = False + + def test_is_sorted_and_incremented(self): + """Test is_sorted_and_incremented function.""" + arr = None + self.assertFalse(self.ingestor._is_sorted_and_incremented(arr)) + arr = [1] + self.assertTrue(self.ingestor._is_sorted_and_incremented(arr)) + arr = [1, 2, 5, 7] + self.assertFalse(self.ingestor._is_sorted_and_incremented(arr)) + arr = [1, 2, 3, 4, 5, 6] + self.assertTrue(self.ingestor._is_sorted_and_incremented(arr)) + + def test_transform_coordinates_array(self): + """Test transform_coordinates_array function.""" + with patch.object(self.ingestor, '_open_zarr_dataset') as mock_open: + mock_open.return_value = mock_open_zarr_dataset() + lat_arr = [] + for i in range(10): + lat_arr.append(LAT_METADATA['min'] + i * LAT_METADATA['inc']) + coords = self.ingestor._transform_coordinates_array(lat_arr, 'lat') + mock_open.assert_called_once() + self.assertEqual(len(coords), 10) + self.assertTrue( + self.ingestor._is_sorted_and_incremented( + [c.nearest_idx for c in coords] + ) + ) + + def test_find_chunk_slices(self): + """Test find_chunk_slices function.""" + coords = self.ingestor._find_chunk_slices(1, 1) + self.assertEqual(len(coords), 1) + coords = self.ingestor._find_chunk_slices(150, 60) + self.assertEqual(len(coords), 3) + self.assertEqual( + coords, + [slice(0, 60), slice(60, 120), slice(120, 150)] + ) + + @patch('gap.ingestor.tio_shortterm.execute_dask_compute') + def test_update_by_region(self, mock_dask_compute): + """Test update_by_region function.""" + with patch.object(self.ingestor, '_open_zarr_dataset') as mock_open: + mock_open.return_value = mock_open_zarr_dataset() + forecast_date = date(2024, 10, 2) + new_data = { + 'max_temperature': np.random.rand(1, 21, 1, 1) + } + lat_arr = [CoordMapping( + LAT_METADATA['min'], 0, LAT_METADATA['min'] + )] + lon_arr = [CoordMapping( + LON_METADATA['min'], 0, LON_METADATA['min'] + )] + self.ingestor._update_by_region( + forecast_date, lat_arr, lon_arr, new_data) + mock_dask_compute.assert_called_once() + + @patch('django.core.files.storage.default_storage.open') + @patch('zipfile.ZipFile.namelist') + @patch('gap.ingestor.tio_shortterm.execute_dask_compute') + def test_run_success( + self, mock_dask_compute, mock_namelist, mock_default_storage): + """Test run ingestor succesfully.""" + filepath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'data', + 'tio_shorterm_collector', + 'grid_data.zip' + ) + # mock default_storage.open + f = open(filepath, "rb") + mock_default_storage.return_value = f + json_f = zipfile.ZipFile(f, 'r').open('grid-1.json') + + # create a grid + grid = GridFactory(geometry=create_polygon()) + # mock zip_file.namelist + mock_namelist.return_value = [f'grid-{grid.id}.json'] + with patch.object(self.ingestor, '_open_zarr_dataset') as mock_open: + with patch('zipfile.ZipFile.open') as mock_zip_open: + mock_open.return_value = mock_open_zarr_dataset() + mock_zip_open.return_value = json_f + self.ingestor._run() + mock_zip_open.assert_called_once() + + mock_default_storage.assert_called_once() + mock_dask_compute.assert_called_once() + self.assertEqual(self.ingestor.metadata['total_json_processed'], 1) + self.assertEqual(len(self.ingestor.metadata['chunks']), 1) + f.close() + + @patch('gap.utils.zarr.BaseZarrReader.get_zarr_base_url') + @patch('xarray.open_zarr') + @patch('fsspec.get_mapper') + def test_verify( + self, mock_get_mapper, mock_open_zarr, mock_get_zarr_base_url + ): + """Test verify Tio zarr file in s3.""" + # Set up mocks + mock_open_zarr.return_value = MagicMock(spec=xrDataset) + + # Call the method + self.ingestor.verify() + + # Assertions + mock_get_zarr_base_url.assert_called_once() + mock_get_mapper.assert_called_once() + mock_open_zarr.assert_called_once() + + @patch('gap.models.ingestor.CollectorSession.dataset_files') + @patch('gap.models.ingestor.CollectorSession.run') + @patch('gap.tasks.ingestor.run_ingestor_session.delay') + def test_run_tio_collector_session( + self, mock_ingestor, mock_collector, mock_count + ): + """Test run tio collector session.""" + mock_count.count.return_value = 0 + run_tio_collector_session() + # assert + mock_collector.assert_called_once() + mock_ingestor.assert_not_called() + + mock_collector.reset_mock() + mock_ingestor.reset_mock() + # test with collector result + mock_count.count.return_value = 1 + run_tio_collector_session() + + # assert + session = IngestorSession.objects.filter( + ingestor_type=IngestorType.TOMORROWIO, + ).order_by('id').last() + self.assertTrue(session) + self.assertEqual(session.collectors.count(), 1) + mock_collector.assert_called_once() + mock_ingestor.assert_called_once_with(session.id) diff --git a/django_project/gap/utils/salient.py b/django_project/gap/utils/salient.py index 87a2fe5e..e8e54e56 100644 --- a/django_project/gap/utils/salient.py +++ b/django_project/gap/utils/salient.py @@ -92,7 +92,6 @@ def patch_download_query( chunk_size=CHUNK_DOWNLOAD_SIZE ): if chunk: # Filter out keep-alive new chunks - print(f'chunk here {chunk}') if format == "nc": f.write(chunk) else: diff --git a/django_project/gap/utils/zarr.py b/django_project/gap/utils/zarr.py index 532fabfa..8522ccef 100644 --- a/django_project/gap/utils/zarr.py +++ b/django_project/gap/utils/zarr.py @@ -9,6 +9,7 @@ import logging import s3fs import fsspec +import shutil from typing import List from datetime import datetime import xarray as xr @@ -168,3 +169,13 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset: s3_mapper, consolidated=True, drop_variables=drop_variables) return ds + + def clear_cache(self, source_file: DataSourceFile): + """Clear cache of zarr file. + + :param source_file: DataSourceFile for the zarr + :type source_file: DataSourceFile + """ + cache_dir = self.get_zarr_cache_dir(source_file.name) + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir)