From 2b92bf7e0009ed4b3a8ff5cbf3e2e0df5958747b Mon Sep 17 00:00:00 2001 From: Danang Massandy Date: Fri, 11 Oct 2024 09:23:41 +0100 Subject: [PATCH 1/4] add cache management for zarr file --- django_project/gap/ingestor/base.py | 14 ++++++- django_project/gap/ingestor/salient.py | 5 ++- django_project/gap/ingestor/tio_shortterm.py | 3 ++ .../0029_datasourcefilecache_and_more.py | 28 +++++++++++++ django_project/gap/models/dataset.py | 29 ++++++++++++++ django_project/gap/utils/zarr.py | 39 ++++++++++++++++++- 6 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 django_project/gap/migrations/0029_datasourcefilecache_and_more.py diff --git a/django_project/gap/ingestor/base.py b/django_project/gap/ingestor/base.py index 5068efd6..ae7da77a 100644 --- a/django_project/gap/ingestor/base.py +++ b/django_project/gap/ingestor/base.py @@ -15,6 +15,7 @@ from django.utils import timezone from django.core.files.storage import default_storage +from django.db import transaction from core.models import BackgroundTask from gap.models import ( @@ -23,7 +24,8 @@ IngestorSessionStatus, Dataset, DatasetStore, - DataSourceFile + DataSourceFile, + DataSourceFileCache ) from gap.utils.zarr import BaseZarrReader @@ -188,6 +190,16 @@ def verify(self): self.zarr_ds = self._open_zarr_dataset() print(self.zarr_ds) + def _invalidate_zarr_cache(self): + """Invalidate existing zarr cache after ingestor is finished.""" + source_caches = DataSourceFileCache.objects.select_for_update().filter( + source_file=self.datasource_file + ) + with transaction.atomic(): + for source_cache in source_caches: + source_cache.expired_on = timezone.now() + source_cache.save() + def ingestor_revoked_handler(bg_task: BackgroundTask): """Event handler when ingestor task is cancelled by celery. diff --git a/django_project/gap/ingestor/salient.py b/django_project/gap/ingestor/salient.py index e3b2164f..2802a732 100644 --- a/django_project/gap/ingestor/salient.py +++ b/django_project/gap/ingestor/salient.py @@ -305,7 +305,7 @@ def _run(self): s3_storage = default_storage file_path = self._get_s3_filepath(source_file) if not s3_storage.exists(file_path): - logger.warn(f'DataSource {file_path} does not exist!') + logger.warning(f'DataSource {file_path} does not exist!') continue # open the dataset @@ -333,6 +333,9 @@ def _run(self): 'forecast_dates': forecast_dates } + # invalidate zarr cache + self._invalidate_zarr_cache() + def store_as_zarr(self, dataset: xrDataset, forecast_date: datetime.date): """Store xarray dataset from forecast_date into Salient zarr file. diff --git a/django_project/gap/ingestor/tio_shortterm.py b/django_project/gap/ingestor/tio_shortterm.py index 94f9cdd6..3f9abeaa 100644 --- a/django_project/gap/ingestor/tio_shortterm.py +++ b/django_project/gap/ingestor/tio_shortterm.py @@ -569,6 +569,9 @@ def _run(self): if remove_temp_file: self._remove_temporary_source_file(data_source, data_source.name) + # invalidate zarr cache + self._invalidate_zarr_cache() + def run(self): """Run TomorrowIO Ingestor.""" # Run the ingestion diff --git a/django_project/gap/migrations/0029_datasourcefilecache_and_more.py b/django_project/gap/migrations/0029_datasourcefilecache_and_more.py new file mode 100644 index 00000000..df0d7e93 --- /dev/null +++ b/django_project/gap/migrations/0029_datasourcefilecache_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.7 on 2024-10-11 08:07 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0028_preferences_dask_threads_num'), + ] + + operations = [ + migrations.CreateModel( + name='DataSourceFileCache', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('hostname', models.CharField(max_length=512)), + ('created_on', models.DateTimeField()), + ('expired_on', models.DateTimeField(blank=True, null=True)), + ('source_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='gap.datasourcefile')), + ], + ), + migrations.AddConstraint( + model_name='datasourcefilecache', + constraint=models.UniqueConstraint(fields=('source_file', 'hostname'), name='source_hostname'), + ), + ] diff --git a/django_project/gap/models/dataset.py b/django_project/gap/models/dataset.py index 4bb0a398..d3d38ae9 100644 --- a/django_project/gap/models/dataset.py +++ b/django_project/gap/models/dataset.py @@ -107,3 +107,32 @@ class DataSourceFile(models.Model): ) is_latest = models.BooleanField(default=False) metadata = models.JSONField(blank=True, default=dict, null=True) + + def __str__(self): + return f'{self.name} - {self.id}' + + +class DataSourceFileCache(models.Model): + """Model representing cache for DataSourceFile.""" + + source_file = models.ForeignKey( + DataSourceFile, on_delete=models.CASCADE + ) + hostname = models.CharField( + max_length=512 + ) + created_on = models.DateTimeField() + expired_on = models.DateTimeField( + null=True, + blank=True + ) + + class Meta: + """Meta class for DataSourceFileCache.""" + + constraints = [ + models.UniqueConstraint( + fields=['source_file', 'hostname'], + name='source_hostname' + ) + ] diff --git a/django_project/gap/utils/zarr.py b/django_project/gap/utils/zarr.py index 8522ccef..b116dde7 100644 --- a/django_project/gap/utils/zarr.py +++ b/django_project/gap/utils/zarr.py @@ -14,11 +14,14 @@ from datetime import datetime import xarray as xr from xarray.core.dataset import Dataset as xrDataset +from django.db import IntegrityError, transaction +from django.utils import timezone from gap.models import ( Dataset, DatasetAttribute, - DataSourceFile + DataSourceFile, + DataSourceFileCache ) from gap.utils.reader import ( DatasetReaderInput @@ -136,6 +139,7 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset: :return: xArray Dataset object :rtype: xrDataset """ + self._check_zarr_cache_expiry(source_file) # get zarr url zarr_url = self.get_zarr_base_url(self.s3) zarr_url += f'{source_file.name}' @@ -170,6 +174,39 @@ def open_dataset(self, source_file: DataSourceFile) -> xrDataset: return ds + def _check_zarr_cache_expiry(self, source_file: DataSourceFile): + """Validate cache directory for zarr. + + The cache dir will be cleared if there is update from ingestor. + :param source_file: zarr source file + :type source_file: DataSourceFile + """ + hostname = os.uname()[1] + cache_row = DataSourceFileCache.objects.filter( + source_file=source_file, + hostname=hostname + ).first() + if cache_row is None: + # no existing record yet, create first + try: + DataSourceFileCache.objects.create( + source_file=source_file, + hostname=hostname, + created_on=timezone.now() + ) + self.clear_cache(source_file) + except IntegrityError: + pass + elif cache_row.expired_on: + # when there is expired_on, we should remove the cache dir + update_row = DataSourceFileCache.objects.select_for_update().get( + id=cache_row.id + ) + with transaction.atomic(): + self.clear_cache(source_file) + update_row.expired_on = None + update_row.save() + def clear_cache(self, source_file: DataSourceFile): """Clear cache of zarr file. From f02f728d0e08ca11f2e6cdc389edfa70d614ed68 Mon Sep 17 00:00:00 2001 From: Danang Massandy Date: Fri, 11 Oct 2024 09:49:34 +0100 Subject: [PATCH 2/4] add admin for source file cache model --- django_project/gap/admin/main.py | 39 ++++++++++++++++++++++++++++- django_project/gap/ingestor/base.py | 2 +- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/django_project/gap/admin/main.py b/django_project/gap/admin/main.py index f8e2da8c..345926bc 100644 --- a/django_project/gap/admin/main.py +++ b/django_project/gap/admin/main.py @@ -13,7 +13,8 @@ from gap.models import ( Attribute, Country, Provider, Measurement, IngestorSession, IngestorSessionProgress, Dataset, DatasetAttribute, DataSourceFile, - DatasetType, Unit, Village, CollectorSession, DatasetStore + DatasetType, Unit, Village, CollectorSession, DatasetStore, + DataSourceFileCache ) from gap.tasks.collector import run_collector_session from gap.tasks.ingestor import run_ingestor_session @@ -204,6 +205,42 @@ class DataSourceFileAdmin(admin.ModelAdmin): actions = (load_source_zarr_cache, clear_source_zarr_cache,) +@admin.register(DataSourceFileCache) +class DataSourceFileCacheAdmin(admin.ModelAdmin): + """DataSourceFileCache admin.""" + + list_display = ( + 'get_name', 'get_dataset', 'hostname', + 'created_on', 'expired_on' + ) + list_filter = ('hostname',) + + def get_name(self, obj: DataSourceFileCache): + """Get name of data source. + + :param obj: data source object + :type obj: DataSourceFileCache + :return: name of the data source + :rtype: str + """ + return obj.source_file.name + + def get_dataset(self, obj: DataSourceFileCache): + """Get dataset of data source. + + :param obj: data source object + :type obj: DataSourceFileCache + :return: dataset of the data source + :rtype: str + """ + return obj.source_file.dataset.name + + get_name.short_description = 'Name' + get_name.admin_order_field = 'source_file__name' + get_dataset.short_description = 'Dataset' + get_dataset.admin_order_field = 'source_file__dataset__name' + + @admin.register(Village) class VillageAdmin(AbstractDefinitionAdmin): """Village admin.""" diff --git a/django_project/gap/ingestor/base.py b/django_project/gap/ingestor/base.py index ae7da77a..8323d424 100644 --- a/django_project/gap/ingestor/base.py +++ b/django_project/gap/ingestor/base.py @@ -98,7 +98,7 @@ def __init__(self, session, working_dir): 'datasourcefile_zarr_exists', True) else: datasourcefile_name = self.get_config( - 'datasourcefile_name', f'{self.default_zarr_name}.zarr') + 'datasourcefile_name', f'{self.default_zarr_name}') self.datasource_file, self.created = ( DataSourceFile.objects.get_or_create( name=datasourcefile_name, From f61f5eff6cca04494b18503ee4e6c27f0688a429 Mon Sep 17 00:00:00 2001 From: Danang Massandy Date: Fri, 11 Oct 2024 09:49:50 +0100 Subject: [PATCH 3/4] fix select_for_update transaction --- django_project/gap/utils/zarr.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/django_project/gap/utils/zarr.py b/django_project/gap/utils/zarr.py index b116dde7..d67b99d1 100644 --- a/django_project/gap/utils/zarr.py +++ b/django_project/gap/utils/zarr.py @@ -199,10 +199,12 @@ def _check_zarr_cache_expiry(self, source_file: DataSourceFile): pass elif cache_row.expired_on: # when there is expired_on, we should remove the cache dir - update_row = DataSourceFileCache.objects.select_for_update().get( - id=cache_row.id - ) with transaction.atomic(): + update_row = ( + DataSourceFileCache.objects.select_for_update().get( + id=cache_row.id + ) + ) self.clear_cache(source_file) update_row.expired_on = None update_row.save() From 32b431e3ef1496eb2157305b6b08fb88ba9118ff Mon Sep 17 00:00:00 2001 From: Danang Massandy Date: Fri, 11 Oct 2024 10:14:51 +0100 Subject: [PATCH 4/4] add tests for zarr cache --- django_project/gap/factories/main.py | 14 ++- .../gap/tests/ingestor/test_salient.py | 11 ++- django_project/gap/tests/utils/test_zarr.py | 91 +++++++++++++++++-- 3 files changed, 108 insertions(+), 8 deletions(-) diff --git a/django_project/gap/factories/main.py b/django_project/gap/factories/main.py index 1d60cf09..25c22937 100644 --- a/django_project/gap/factories/main.py +++ b/django_project/gap/factories/main.py @@ -20,7 +20,8 @@ DatasetTimeStep, DatasetStore, DataSourceFile, - Village + Village, + DataSourceFileCache ) @@ -131,6 +132,17 @@ class Meta: # noqa format = DatasetStore.NETCDF +class DataSourceFileCacheFactory(DjangoModelFactory): + """Factory class for DataSourceFileCache model.""" + + class Meta: # noqa + model = DataSourceFileCache + + source_file = factory.SubFactory(DataSourceFileFactory) + hostname = factory.Faker('text') + created_on = factory.Faker('date_time') + + class VillageFactory(DjangoModelFactory): """Factory class for Village model.""" diff --git a/django_project/gap/tests/ingestor/test_salient.py b/django_project/gap/tests/ingestor/test_salient.py index 55b1f3c6..ac0ca7fc 100644 --- a/django_project/gap/tests/ingestor/test_salient.py +++ b/django_project/gap/tests/ingestor/test_salient.py @@ -19,7 +19,7 @@ CollectorSession ) from gap.ingestor.salient import SalientIngestor, SalientCollector -from gap.factories import DataSourceFileFactory +from gap.factories import DataSourceFileFactory, DataSourceFileCacheFactory from gap.tasks.collector import run_salient_collector_session @@ -401,3 +401,12 @@ def test_run( self.ingestor._get_s3_filepath(self.datasourcefile)) mock_to_zarr.assert_called_once() mock_compute.assert_called_once() + + def test_invalidate_cache(self): + """Test invalidate cache function.""" + cache_file = DataSourceFileCacheFactory.create( + source_file=self.ingestor.datasource_file + ) + self.ingestor._invalidate_zarr_cache() + cache_file.refresh_from_db() + self.assertIsNotNone(cache_file.expired_on) diff --git a/django_project/gap/tests/utils/test_zarr.py b/django_project/gap/tests/utils/test_zarr.py index 58cd0eee..48acab4c 100644 --- a/django_project/gap/tests/utils/test_zarr.py +++ b/django_project/gap/tests/utils/test_zarr.py @@ -12,10 +12,11 @@ from django.contrib.gis.geos import ( Point ) +from django.utils import timezone from unittest.mock import MagicMock, patch from gap.models import ( - Provider, Dataset, DatasetAttribute, DataSourceFile, + Provider, Dataset, DatasetAttribute, DatasetStore ) from gap.utils.reader import ( @@ -28,7 +29,10 @@ load_source_zarr_cache, clear_source_zarr_cache ) -from gap.factories import DataSourceFileFactory +from gap.factories import ( + DataSourceFileFactory, + DataSourceFileCacheFactory +) class TestCBAMZarrReader(TestCase): @@ -121,10 +125,81 @@ def test_open_dataset( mock_dataset = MagicMock(spec=xrDataset) mock_open_zarr.return_value = mock_dataset - source_file = DataSourceFile(name='test_dataset.zarr') - source_file.metadata = { - 'drop_variables': ['test'] - } + source_file = DataSourceFileFactory.create( + name='test_dataset.zarr', + metadata={ + 'drop_variables': ['test'] + } + ) + self.reader.setup_reader() + result = self.reader.open_dataset(source_file) + + # Assertions to ensure the method is called correctly + assert result == mock_dataset + mock_s3fs.assert_called_once_with( + key='test_access_key', + secret='test_secret_key', + endpoint_url='https://test-endpoint.com' + ) + cache_filename = 'test_dataset_zarr' + mock_fsspec_filesystem.assert_called_once_with( + 'filecache', + target_protocol='s3', + target_options=self.reader.s3_options, + cache_storage=f'/tmp/{cache_filename}', + cache_check=3600, + expiry_time=86400, + target_kwargs={'s3': mock_s3fs_instance} + ) + mock_fs_instance.get_mapper.assert_called_once_with( + 's3://test-bucket/test-prefix/test_dataset.zarr') + mock_open_zarr.assert_called_once_with( + mock_fs_instance.get_mapper.return_value, + consolidated=True, drop_variables=['test']) + + @patch.dict('os.environ', { + 'MINIO_AWS_ACCESS_KEY_ID': 'test_access_key', + 'MINIO_AWS_SECRET_ACCESS_KEY': 'test_secret_key', + 'MINIO_AWS_ENDPOINT_URL': 'https://test-endpoint.com', + 'MINIO_AWS_REGION_NAME': 'us-test-1', + 'MINIO_GAP_AWS_BUCKET_NAME': 'test-bucket', + 'MINIO_GAP_AWS_DIR_PREFIX': 'test-prefix/' + }) + @patch('xarray.open_zarr') + @patch('fsspec.filesystem') + @patch('s3fs.S3FileSystem') + @patch('os.uname') + def test_open_dataset_with_cache( + self, mock_uname, mock_s3fs, mock_fsspec_filesystem, + mock_open_zarr + ): + """Test open zarr dataset function.""" + # Mock the s3fs.S3FileSystem constructor + mock_s3fs_instance = MagicMock() + mock_s3fs.return_value = mock_s3fs_instance + + # Mock the fsspec.filesystem constructor + mock_fs_instance = MagicMock() + mock_fsspec_filesystem.return_value = mock_fs_instance + + # Mock the xr.open_zarr function + mock_dataset = MagicMock(spec=xrDataset) + mock_open_zarr.return_value = mock_dataset + + # Mock uname to get hostname + mock_uname.return_value = [0, 'test-host'] + + source_file = DataSourceFileFactory.create( + name='test_dataset.zarr', + metadata={ + 'drop_variables': ['test'] + } + ) + source_file_cache = DataSourceFileCacheFactory.create( + source_file=source_file, + hostname='test-host', + expired_on=timezone.now() + ) self.reader.setup_reader() result = self.reader.open_dataset(source_file) @@ -150,6 +225,10 @@ def test_open_dataset( mock_open_zarr.assert_called_once_with( mock_fs_instance.get_mapper.return_value, consolidated=True, drop_variables=['test']) + mock_uname.assert_called_once() + # assert cache does not expired_on + source_file_cache.refresh_from_db() + self.assertIsNone(source_file_cache.expired_on) @patch('gap.utils.zarr.BaseZarrReader.get_s3_variables') @patch('gap.utils.zarr.BaseZarrReader.get_s3_client_kwargs')