diff --git a/django_project/gap/ingestor/base.py b/django_project/gap/ingestor/base.py index 5068efd6..ae7da77a 100644 --- a/django_project/gap/ingestor/base.py +++ b/django_project/gap/ingestor/base.py @@ -15,6 +15,7 @@ from django.utils import timezone from django.core.files.storage import default_storage +from django.db import transaction from core.models import BackgroundTask from gap.models import ( @@ -23,7 +24,8 @@ IngestorSessionStatus, Dataset, DatasetStore, - DataSourceFile + DataSourceFile, + DataSourceFileCache ) from gap.utils.zarr import BaseZarrReader @@ -188,6 +190,16 @@ def verify(self): self.zarr_ds = self._open_zarr_dataset() print(self.zarr_ds) + def _invalidate_zarr_cache(self): + """Invalidate existing zarr cache after ingestor is finished.""" + source_caches = DataSourceFileCache.objects.select_for_update().filter( + source_file=self.datasource_file + ) + with transaction.atomic(): + for source_cache in source_caches: + source_cache.expired_on = timezone.now() + source_cache.save() + def ingestor_revoked_handler(bg_task: BackgroundTask): """Event handler when ingestor task is cancelled by celery. diff --git a/django_project/gap/ingestor/salient.py b/django_project/gap/ingestor/salient.py index e3b2164f..2802a732 100644 --- a/django_project/gap/ingestor/salient.py +++ b/django_project/gap/ingestor/salient.py @@ -305,7 +305,7 @@ def _run(self): s3_storage = default_storage file_path = self._get_s3_filepath(source_file) if not s3_storage.exists(file_path): - logger.warn(f'DataSource {file_path} does not exist!') + logger.warning(f'DataSource {file_path} does not exist!') continue # open the dataset @@ -333,6 +333,9 @@ def _run(self): 'forecast_dates': forecast_dates } + # invalidate zarr cache + self._invalidate_zarr_cache() + def store_as_zarr(self, dataset: xrDataset, forecast_date: datetime.date): """Store xarray dataset from forecast_date into Salient zarr file. diff --git a/django_project/gap/ingestor/tio_shortterm.py b/django_project/gap/ingestor/tio_shortterm.py index 94f9cdd6..3f9abeaa 100644 --- a/django_project/gap/ingestor/tio_shortterm.py +++ b/django_project/gap/ingestor/tio_shortterm.py @@ -569,6 +569,9 @@ def _run(self): if remove_temp_file: self._remove_temporary_source_file(data_source, data_source.name) + # invalidate zarr cache + self._invalidate_zarr_cache() + def run(self): """Run TomorrowIO Ingestor.""" # Run the ingestion diff --git a/django_project/gap/migrations/0029_datasourcefilecache_and_more.py b/django_project/gap/migrations/0029_datasourcefilecache_and_more.py new file mode 100644 index 00000000..df0d7e93 --- /dev/null +++ b/django_project/gap/migrations/0029_datasourcefilecache_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.7 on 2024-10-11 08:07 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0028_preferences_dask_threads_num'), + ] + + operations = [ + migrations.CreateModel( + name='DataSourceFileCache', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('hostname', models.CharField(max_length=512)), + ('created_on', models.DateTimeField()), + ('expired_on', models.DateTimeField(blank=True, null=True)), + ('source_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='gap.datasourcefile')), + ], + ), + migrations.AddConstraint( + model_name='datasourcefilecache', + constraint=models.UniqueConstraint(fields=('source_file', 'hostname'), name='source_hostname'), + ), + ] diff --git a/django_project/gap/models/dataset.py b/django_project/gap/models/dataset.py index 4bb0a398..d3d38ae9 100644 --- a/django_project/gap/models/dataset.py +++ b/django_project/gap/models/dataset.py @@ -107,3 +107,32 @@ class DataSourceFile(models.Model): ) is_latest = models.BooleanField(default=False) metadata = models.JSONField(blank=True, default=dict, null=True) + + def __str__(self): + return f'{self.name} - {self.id}' + + +class DataSourceFileCache(models.Model): + """Model representing cache for DataSourceFile.""" + + source_file = models.ForeignKey( + DataSourceFile, on_delete=models.CASCADE + ) + hostname = models.CharField( + max_length=512 + ) + created_on = models.DateTimeField() + expired_on = models.DateTimeField( + null=True, + blank=True + ) + + class Meta: + """Meta class for DataSourceFileCache.""" + + constraints = [ + models.UniqueConstraint( + fields=['source_file', 'hostname'], + name='source_hostname' + ) + ] diff --git a/django_project/gap/utils/zarr.py b/django_project/gap/utils/zarr.py index 8522ccef..b116dde7 100644 --- a/django_project/gap/utils/zarr.py +++ b/django_project/gap/utils/zarr.py @@ -14,11 +14,14 @@ from datetime import datetime import xarray as xr from xarray.core.dataset import Dataset as xrDataset +from django.db import IntegrityError, transaction +from django.utils import timezone from gap.models import ( Dataset, DatasetAttribute, - DataSourceFile + DataSourceFile, + DataSourceFileCache ) from gap.utils.reader import ( DatasetReaderInput @@ -136,6 +139,7 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset: :return: xArray Dataset object :rtype: xrDataset """ + self._check_zarr_cache_expiry(source_file) # get zarr url zarr_url = self.get_zarr_base_url(self.s3) zarr_url += f'{source_file.name}' @@ -170,6 +174,39 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset: return ds + def _check_zarr_cache_expiry(self, source_file: DataSourceFile): + """Validate cache directory for zarr. + + The cache dir will be cleared if there is update from ingestor. + :param source_file: zarr source file + :type source_file: DataSourceFile + """ + hostname = os.uname()[1] + cache_row = DataSourceFileCache.objects.filter( + source_file=source_file, + hostname=hostname + ).first() + if cache_row is None: + # no existing record yet, create first + try: + DataSourceFileCache.objects.create( + source_file=source_file, + hostname=hostname, + created_on=timezone.now() + ) + self.clear_cache(source_file) + except IntegrityError: + pass + elif cache_row.expired_on: + # when there is expired_on, we should remove the cache dir + update_row = DataSourceFileCache.objects.select_for_update().get( + id=cache_row.id + ) + with transaction.atomic(): + self.clear_cache(source_file) + update_row.expired_on = None + update_row.save() + def clear_cache(self, source_file: DataSourceFile): """Clear cache of zarr file.