Skip to content

Commit

Permalink
Tomorrow io ShortTerm forecast to zarr file (#177)
Browse files Browse the repository at this point in the history
* add script to create zarr for tio forecast

* fix update region by chunks

* refactor tio ingestor class

* fix failed build

* add tio ingestor class to ingestor session

* use dask utility function to execute delayed to_zarr

* add test for tio ingestor

* add periodic task for tio collector

* fix test run tio ingestor
  • Loading branch information
danangmassandy authored Oct 10, 2024
1 parent 4b5c5b1 commit 294ac32
Show file tree
Hide file tree
Showing 17 changed files with 1,203 additions and 130 deletions.
2 changes: 1 addition & 1 deletion deployment/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN apt-get update -y && \
spatialite-bin libsqlite3-mod-spatialite \
python3-dev python3-gdal python3-psycopg2 python3-ldap \
python3-pip python3-pil python3-lxml python3-pylibmc \
uwsgi uwsgi-plugin-python3
uwsgi uwsgi-plugin-python3 build-essential

# Install pip packages
ADD deployment/docker/requirements.txt /requirements.txt
Expand Down
2 changes: 2 additions & 0 deletions deployment/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ openpyxl==3.1.5
# salient sdk
numpy==1.26.4
salientsdk==0.3.5

python-geohash==0.8.5
7 changes: 6 additions & 1 deletion django_project/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@
'salient-collector-session': {
'task': 'salient_collector_session',
# Run every Monday 02:00 UTC
'schedule': crontab(minute='0', hour='2', day_of_week='1'),
'schedule': crontab(minute='0', hour='4', day_of_week='1'),
},
'tio-collector-session': {
'task': 'tio_collector_session',
# Run everyday at 00:30 UTC
'schedule': crontab(minute='30', hour='00'),
},
# Run all ingestor session daily
'run-daily-ingestor-session': {
Expand Down
138 changes: 137 additions & 1 deletion django_project/gap/ingestor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,29 @@
"""

from typing import Union
import logging
import datetime
import pytz
import uuid
import fsspec
import xarray as xr

from django.utils import timezone
from django.core.files.storage import default_storage

from core.models import BackgroundTask
from gap.models import (
CollectorSession,
IngestorSession,
IngestorSessionStatus
IngestorSessionStatus,
Dataset,
DatasetStore,
DataSourceFile
)
from gap.utils.zarr import BaseZarrReader


logger = logging.getLogger(__name__)


class BaseIngestor:
Expand Down Expand Up @@ -53,6 +69,126 @@ def get_config(self, name: str, default_value = None):
return self.session.additional_config.get(name, default_value)


class BaseZarrIngestor(BaseIngestor):
"""Base Ingestor class for Zarr product."""

default_zarr_name = f'{uuid.uuid4()}.zarr'

def __init__(self, session, working_dir):
"""Initialize base zarr ingestor."""
super().__init__(session, working_dir)
self.dataset = self._init_dataset()

self.s3 = BaseZarrReader.get_s3_variables()
self.s3_options = {
'key': self.s3.get('AWS_ACCESS_KEY_ID'),
'secret': self.s3.get('AWS_SECRET_ACCESS_KEY'),
'client_kwargs': BaseZarrReader.get_s3_client_kwargs()
}
self.metadata = {}

# get zarr data source file
datasourcefile_id = self.get_config('datasourcefile_id')
if datasourcefile_id:
self.datasource_file = DataSourceFile.objects.get(
id=datasourcefile_id)
self.created = not self.get_config(
'datasourcefile_zarr_exists', True)
else:
datasourcefile_name = self.get_config(
'datasourcefile_name', f'{self.default_zarr_name}.zarr')
self.datasource_file, self.created = (
DataSourceFile.objects.get_or_create(
name=datasourcefile_name,
dataset=self.dataset,
format=DatasetStore.ZARR,
defaults={
'created_on': timezone.now(),
'start_date_time': timezone.now(),
'end_date_time': (
timezone.now()
)
}
)
)

def _init_dataset(self) -> Dataset:
"""Fetch dataset for this ingestor.
:raises NotImplementedError: should be implemented in child class
:return: Dataset for this ingestor
:rtype: Dataset
"""
raise NotImplementedError

def _update_zarr_source_file(self, updated_date: datetime.date):
"""Update zarr DataSourceFile start and end datetime.
:param updated_date: Date that has been processed
:type updated_date: datetime.date
"""
if self.created:
self.datasource_file.start_date_time = datetime.datetime(
updated_date.year, updated_date.month, updated_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
self.datasource_file.end_date_time = (
self.datasource_file.start_date_time
)
else:
if self.datasource_file.start_date_time.date() > updated_date:
self.datasource_file.start_date_time = datetime.datetime(
updated_date.year, updated_date.month,
updated_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
if self.datasource_file.end_date_time.date() < updated_date:
self.datasource_file.end_date_time = datetime.datetime(
updated_date.year, updated_date.month,
updated_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
self.datasource_file.save()

def _remove_temporary_source_file(
self, source_file: DataSourceFile, file_path: str):
"""Remove temporary file from collector.
:param source_file: Temporary File
:type source_file: DataSourceFile
:param file_path: s3 file path
:type file_path: str
"""
try:
default_storage.delete(file_path)
except Exception as ex:
logger.error(
f'Failed to remove original source_file {file_path}!', ex)
finally:
source_file.delete()

def _open_zarr_dataset(self, drop_variables = []) -> xr.Dataset:
"""Open existing Zarr file.
:param drop_variables: variables to exclude from reader
:type drop_variables: list, optional
:return: xarray dataset
:rtype: xr.Dataset
"""
zarr_url = (
BaseZarrReader.get_zarr_base_url(self.s3) +
self.datasource_file.name
)
s3_mapper = fsspec.get_mapper(zarr_url, **self.s3_options)
return xr.open_zarr(
s3_mapper, consolidated=True, drop_variables=drop_variables)

def verify(self):
"""Verify the resulting zarr file."""
self.zarr_ds = self._open_zarr_dataset()
print(self.zarr_ds)


def ingestor_revoked_handler(bg_task: BackgroundTask):
"""Event handler when ingestor task is cancelled by celery.
Expand Down
8 changes: 4 additions & 4 deletions django_project/gap/ingestor/cbam.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ def run(self):
try:
self._run()
except Exception as e:
logger.error('Collector CBAM failed!', e)
logger.error('Collector CBAM failed!')
logger.error(traceback.format_exc())
raise Exception(e)
raise e
finally:
pass

Expand Down Expand Up @@ -385,8 +385,8 @@ def run(self):
)
self.datasource_file.save()
except Exception as e:
logger.error('Ingestor CBAM failed!', e)
logger.error('Ingestor CBAM failed!')
logger.error(traceback.format_exc())
raise Exception(e)
raise e
finally:
pass
10 changes: 10 additions & 0 deletions django_project/gap/ingestor/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ class AdditionalConfigNotFoundException(Exception):
def __init__(self, key): # noqa
self.message = f'{key} is required in additional_config.'
super().__init__(self.message)


class MissingCollectorSessionException(Exception):
"""Collector session not found."""

def __init__(self, session_id): # noqa
self.message = (
f'Missing collector session in IngestorSession {session_id}.'
)
super().__init__(self.message)
110 changes: 14 additions & 96 deletions django_project/gap/ingestor/salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import datetime
import pytz
import traceback
import fsspec
import s3fs
import numpy as np
import pandas as pd
Expand All @@ -28,7 +27,7 @@
Dataset, DataSourceFile, DatasetStore,
IngestorSession, CollectorSession, Preferences
)
from gap.ingestor.base import BaseIngestor
from gap.ingestor.base import BaseIngestor, BaseZarrIngestor
from gap.utils.netcdf import NetCDFMediaS3, find_start_latlng
from gap.utils.zarr import BaseZarrReader
from gap.utils.dask import execute_dask_compute
Expand Down Expand Up @@ -206,14 +205,14 @@ def run(self):
self._run()
self.session.notes = json.dumps(self.metadata, default=str)
except Exception as e:
logger.error('Collector Salient failed!', e)
logger.error('Collector Salient failed!')
logger.error(traceback.format_exc())
raise Exception(e)
raise e
finally:
pass


class SalientIngestor(BaseIngestor):
class SalientIngestor(BaseZarrIngestor):
"""Ingestor for Salient seasonal forecast data."""

default_chunks = {
Expand All @@ -227,39 +226,6 @@ class SalientIngestor(BaseIngestor):
def __init__(self, session: IngestorSession, working_dir: str = '/tmp'):
"""Initialize SalientIngestor."""
super().__init__(session, working_dir)
self.dataset = Dataset.objects.get(name='Salient Seasonal Forecast')
self.s3 = BaseZarrReader.get_s3_variables()
self.s3_options = {
'key': self.s3.get('AWS_ACCESS_KEY_ID'),
'secret': self.s3.get('AWS_SECRET_ACCESS_KEY'),
'client_kwargs': BaseZarrReader.get_s3_client_kwargs()
}
self.metadata = {}

# get zarr data source file
datasourcefile_id = self.get_config('datasourcefile_id')
if datasourcefile_id:
self.datasource_file = DataSourceFile.objects.get(
id=datasourcefile_id)
self.created = not self.get_config(
'datasourcefile_zarr_exists', True)
else:
datasourcefile_name = self.get_config(
'datasourcefile_name', 'salient.zarr')
self.datasource_file, self.created = (
DataSourceFile.objects.get_or_create(
name=datasourcefile_name,
dataset=self.dataset,
format=DatasetStore.ZARR,
defaults={
'created_on': timezone.now(),
'start_date_time': timezone.now(),
'end_date_time': (
timezone.now()
)
}
)
)

# min+max are the BBOX that GAP processes
# inc and original_min comes from Salient netcdf file
Expand All @@ -277,6 +243,14 @@ def __init__(self, session: IngestorSession, working_dir: str = '/tmp'):
}
self.reindex_tolerance = 0.01

def _init_dataset(self) -> Dataset:
"""Fetch dataset for this ingestor.
:return: Dataset for this ingestor
:rtype: Dataset
"""
return Dataset.objects.get(name='Salient Seasonal Forecast')

def _get_s3_filepath(self, source_file: DataSourceFile):
"""Get Salient NetCDF temporary file from Collector.
Expand Down Expand Up @@ -316,52 +290,6 @@ def _open_dataset(self, source_file: DataSourceFile) -> xrDataset:
return xr.open_dataset(
fs.open(netcdf_url), chunks=self.default_chunks)

def _update_zarr_source_file(self, forecast_date: datetime.date):
"""Update zarr DataSourceFile start and end datetime.
:param forecast_date: Forecast date that has been processed
:type forecast_date: datetime.date
"""
if self.created:
self.datasource_file.start_date_time = datetime.datetime(
forecast_date.year, forecast_date.month, forecast_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
self.datasource_file.end_date_time = (
self.datasource_file.start_date_time
)
else:
if self.datasource_file.start_date_time.date() > forecast_date:
self.datasource_file.start_date_time = datetime.datetime(
forecast_date.year, forecast_date.month,
forecast_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
if self.datasource_file.end_date_time.date() < forecast_date:
self.datasource_file.end_date_time = datetime.datetime(
forecast_date.year, forecast_date.month,
forecast_date.day,
0, 0, 0, tzinfo=pytz.UTC
)
self.datasource_file.save()

def _remove_temporary_source_file(
self, source_file: DataSourceFile, file_path: str):
"""Remove temporary NetCDFFile from collector.
:param source_file: Temporary NetCDF File
:type source_file: DataSourceFile
:param file_path: s3 file path
:type file_path: str
"""
try:
default_storage.delete(file_path)
except Exception as ex:
logger.error(
f'Failed to remove original source_file {file_path}!', ex)
finally:
source_file.delete()

def _run(self):
"""Run Salient ingestor."""
logger.info(f'Running data ingestor for Salient: {self.session.id}.')
Expand Down Expand Up @@ -538,18 +466,8 @@ def run(self):
self._run()
self.session.notes = json.dumps(self.metadata, default=str)
except Exception as e:
logger.error('Ingestor Salient failed!', e)
logger.error('Ingestor Salient failed!')
logger.error(traceback.format_exc())
raise Exception(e)
raise e
finally:
pass

def verify(self):
"""Verify the resulting zarr file."""
zarr_url = (
BaseZarrReader.get_zarr_base_url(self.s3) +
self.datasource_file.name
)
s3_mapper = fsspec.get_mapper(zarr_url, **self.s3_options)
self.zarr_ds = xr.open_zarr(s3_mapper, consolidated=True)
print(self.zarr_ds)
Loading

0 comments on commit 294ac32

Please sign in to comment.