Skip to content

Commit

Permalink
add cache management for zarr file
Browse files Browse the repository at this point in the history
  • Loading branch information
danangmassandy committed Oct 11, 2024
1 parent 913e337 commit 2b92bf7
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 3 deletions.
14 changes: 13 additions & 1 deletion django_project/gap/ingestor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from django.utils import timezone
from django.core.files.storage import default_storage
from django.db import transaction

from core.models import BackgroundTask
from gap.models import (
Expand All @@ -23,7 +24,8 @@
IngestorSessionStatus,
Dataset,
DatasetStore,
DataSourceFile
DataSourceFile,
DataSourceFileCache
)
from gap.utils.zarr import BaseZarrReader

Expand Down Expand Up @@ -188,6 +190,16 @@ def verify(self):
self.zarr_ds = self._open_zarr_dataset()
print(self.zarr_ds)

def _invalidate_zarr_cache(self):
"""Invalidate existing zarr cache after ingestor is finished."""
source_caches = DataSourceFileCache.objects.select_for_update().filter(
source_file=self.datasource_file
)
with transaction.atomic():
for source_cache in source_caches:
source_cache.expired_on = timezone.now()
source_cache.save()


def ingestor_revoked_handler(bg_task: BackgroundTask):
"""Event handler when ingestor task is cancelled by celery.
Expand Down
5 changes: 4 additions & 1 deletion django_project/gap/ingestor/salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _run(self):
s3_storage = default_storage
file_path = self._get_s3_filepath(source_file)
if not s3_storage.exists(file_path):
logger.warn(f'DataSource {file_path} does not exist!')
logger.warning(f'DataSource {file_path} does not exist!')
continue

# open the dataset
Expand Down Expand Up @@ -333,6 +333,9 @@ def _run(self):
'forecast_dates': forecast_dates
}

# invalidate zarr cache
self._invalidate_zarr_cache()

def store_as_zarr(self, dataset: xrDataset, forecast_date: datetime.date):
"""Store xarray dataset from forecast_date into Salient zarr file.
Expand Down
3 changes: 3 additions & 0 deletions django_project/gap/ingestor/tio_shortterm.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ def _run(self):
if remove_temp_file:
self._remove_temporary_source_file(data_source, data_source.name)

# invalidate zarr cache
self._invalidate_zarr_cache()

def run(self):
"""Run TomorrowIO Ingestor."""
# Run the ingestion
Expand Down
28 changes: 28 additions & 0 deletions django_project/gap/migrations/0029_datasourcefilecache_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.7 on 2024-10-11 08:07

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('gap', '0028_preferences_dask_threads_num'),
]

operations = [
migrations.CreateModel(
name='DataSourceFileCache',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('hostname', models.CharField(max_length=512)),
('created_on', models.DateTimeField()),
('expired_on', models.DateTimeField(blank=True, null=True)),
('source_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='gap.datasourcefile')),
],
),
migrations.AddConstraint(
model_name='datasourcefilecache',
constraint=models.UniqueConstraint(fields=('source_file', 'hostname'), name='source_hostname'),
),
]
29 changes: 29 additions & 0 deletions django_project/gap/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,32 @@ class DataSourceFile(models.Model):
)
is_latest = models.BooleanField(default=False)
metadata = models.JSONField(blank=True, default=dict, null=True)

def __str__(self):
return f'{self.name} - {self.id}'


class DataSourceFileCache(models.Model):
"""Model representing cache for DataSourceFile."""

source_file = models.ForeignKey(
DataSourceFile, on_delete=models.CASCADE
)
hostname = models.CharField(
max_length=512
)
created_on = models.DateTimeField()
expired_on = models.DateTimeField(
null=True,
blank=True
)

class Meta:
"""Meta class for DataSourceFileCache."""

constraints = [
models.UniqueConstraint(
fields=['source_file', 'hostname'],
name='source_hostname'
)
]
39 changes: 38 additions & 1 deletion django_project/gap/utils/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
from datetime import datetime
import xarray as xr
from xarray.core.dataset import Dataset as xrDataset
from django.db import IntegrityError, transaction
from django.utils import timezone

from gap.models import (
Dataset,
DatasetAttribute,
DataSourceFile
DataSourceFile,
DataSourceFileCache
)
from gap.utils.reader import (
DatasetReaderInput
Expand Down Expand Up @@ -136,6 +139,7 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset:
:return: xArray Dataset object
:rtype: xrDataset
"""
self._check_zarr_cache_expiry(source_file)
# get zarr url
zarr_url = self.get_zarr_base_url(self.s3)
zarr_url += f'{source_file.name}'
Expand Down Expand Up @@ -170,6 +174,39 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset:

return ds

def _check_zarr_cache_expiry(self, source_file: DataSourceFile):
"""Validate cache directory for zarr.
The cache dir will be cleared if there is update from ingestor.
:param source_file: zarr source file
:type source_file: DataSourceFile
"""
hostname = os.uname()[1]
cache_row = DataSourceFileCache.objects.filter(
source_file=source_file,
hostname=hostname
).first()
if cache_row is None:
# no existing record yet, create first
try:
DataSourceFileCache.objects.create(
source_file=source_file,
hostname=hostname,
created_on=timezone.now()
)
self.clear_cache(source_file)
except IntegrityError:
pass
elif cache_row.expired_on:
# when there is expired_on, we should remove the cache dir
update_row = DataSourceFileCache.objects.select_for_update().get(
id=cache_row.id
)
with transaction.atomic():
self.clear_cache(source_file)
update_row.expired_on = None
update_row.save()

def clear_cache(self, source_file: DataSourceFile):
"""Clear cache of zarr file.
Expand Down

0 comments on commit 2b92bf7

Please sign in to comment.