Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cache management for zarr file #187

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion django_project/gap/admin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from gap.models import (
Attribute, Country, Provider, Measurement, IngestorSession,
IngestorSessionProgress, Dataset, DatasetAttribute, DataSourceFile,
DatasetType, Unit, Village, CollectorSession, DatasetStore
DatasetType, Unit, Village, CollectorSession, DatasetStore,
DataSourceFileCache
)
from gap.tasks.collector import run_collector_session
from gap.tasks.ingestor import run_ingestor_session
Expand Down Expand Up @@ -204,6 +205,42 @@
actions = (load_source_zarr_cache, clear_source_zarr_cache,)


@admin.register(DataSourceFileCache)
class DataSourceFileCacheAdmin(admin.ModelAdmin):
"""DataSourceFileCache admin."""

list_display = (
'get_name', 'get_dataset', 'hostname',
'created_on', 'expired_on'
)
list_filter = ('hostname',)

def get_name(self, obj: DataSourceFileCache):
"""Get name of data source.

:param obj: data source object
:type obj: DataSourceFileCache
:return: name of the data source
:rtype: str
"""
return obj.source_file.name

Check warning on line 226 in django_project/gap/admin/main.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/main.py#L226

Added line #L226 was not covered by tests

def get_dataset(self, obj: DataSourceFileCache):
"""Get dataset of data source.

:param obj: data source object
:type obj: DataSourceFileCache
:return: dataset of the data source
:rtype: str
"""
return obj.source_file.dataset.name

Check warning on line 236 in django_project/gap/admin/main.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/main.py#L236

Added line #L236 was not covered by tests

get_name.short_description = 'Name'
get_name.admin_order_field = 'source_file__name'
get_dataset.short_description = 'Dataset'
get_dataset.admin_order_field = 'source_file__dataset__name'


@admin.register(Village)
class VillageAdmin(AbstractDefinitionAdmin):
"""Village admin."""
Expand Down
14 changes: 13 additions & 1 deletion django_project/gap/factories/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
DatasetTimeStep,
DatasetStore,
DataSourceFile,
Village
Village,
DataSourceFileCache
)


Expand Down Expand Up @@ -131,6 +132,17 @@ class Meta: # noqa
format = DatasetStore.NETCDF


class DataSourceFileCacheFactory(DjangoModelFactory):
"""Factory class for DataSourceFileCache model."""

class Meta: # noqa
model = DataSourceFileCache

source_file = factory.SubFactory(DataSourceFileFactory)
hostname = factory.Faker('text')
created_on = factory.Faker('date_time')


class VillageFactory(DjangoModelFactory):
"""Factory class for Village model."""

Expand Down
16 changes: 14 additions & 2 deletions django_project/gap/ingestor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from django.utils import timezone
from django.core.files.storage import default_storage
from django.db import transaction

from core.models import BackgroundTask
from gap.models import (
Expand All @@ -23,7 +24,8 @@
IngestorSessionStatus,
Dataset,
DatasetStore,
DataSourceFile
DataSourceFile,
DataSourceFileCache
)
from gap.utils.zarr import BaseZarrReader

Expand Down Expand Up @@ -96,7 +98,7 @@ def __init__(self, session, working_dir):
'datasourcefile_zarr_exists', True)
else:
datasourcefile_name = self.get_config(
'datasourcefile_name', f'{self.default_zarr_name}.zarr')
'datasourcefile_name', f'{self.default_zarr_name}')
self.datasource_file, self.created = (
DataSourceFile.objects.get_or_create(
name=datasourcefile_name,
Expand Down Expand Up @@ -188,6 +190,16 @@ def verify(self):
self.zarr_ds = self._open_zarr_dataset()
print(self.zarr_ds)

def _invalidate_zarr_cache(self):
"""Invalidate existing zarr cache after ingestor is finished."""
source_caches = DataSourceFileCache.objects.select_for_update().filter(
source_file=self.datasource_file
)
with transaction.atomic():
for source_cache in source_caches:
source_cache.expired_on = timezone.now()
source_cache.save()


def ingestor_revoked_handler(bg_task: BackgroundTask):
"""Event handler when ingestor task is cancelled by celery.
Expand Down
5 changes: 4 additions & 1 deletion django_project/gap/ingestor/salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@
s3_storage = default_storage
file_path = self._get_s3_filepath(source_file)
if not s3_storage.exists(file_path):
logger.warn(f'DataSource {file_path} does not exist!')
logger.warning(f'DataSource {file_path} does not exist!')

Check warning on line 308 in django_project/gap/ingestor/salient.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/ingestor/salient.py#L308

Added line #L308 was not covered by tests
continue

# open the dataset
Expand Down Expand Up @@ -333,6 +333,9 @@
'forecast_dates': forecast_dates
}

# invalidate zarr cache
self._invalidate_zarr_cache()

def store_as_zarr(self, dataset: xrDataset, forecast_date: datetime.date):
"""Store xarray dataset from forecast_date into Salient zarr file.

Expand Down
3 changes: 3 additions & 0 deletions django_project/gap/ingestor/tio_shortterm.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ def _run(self):
if remove_temp_file:
self._remove_temporary_source_file(data_source, data_source.name)

# invalidate zarr cache
self._invalidate_zarr_cache()

def run(self):
"""Run TomorrowIO Ingestor."""
# Run the ingestion
Expand Down
28 changes: 28 additions & 0 deletions django_project/gap/migrations/0029_datasourcefilecache_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.7 on 2024-10-11 08:07

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('gap', '0028_preferences_dask_threads_num'),
]

operations = [
migrations.CreateModel(
name='DataSourceFileCache',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('hostname', models.CharField(max_length=512)),
('created_on', models.DateTimeField()),
('expired_on', models.DateTimeField(blank=True, null=True)),
('source_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='gap.datasourcefile')),
],
),
migrations.AddConstraint(
model_name='datasourcefilecache',
constraint=models.UniqueConstraint(fields=('source_file', 'hostname'), name='source_hostname'),
),
]
29 changes: 29 additions & 0 deletions django_project/gap/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,32 @@
)
is_latest = models.BooleanField(default=False)
metadata = models.JSONField(blank=True, default=dict, null=True)

def __str__(self):
return f'{self.name} - {self.id}'

Check warning on line 112 in django_project/gap/models/dataset.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/models/dataset.py#L112

Added line #L112 was not covered by tests


class DataSourceFileCache(models.Model):
"""Model representing cache for DataSourceFile."""

source_file = models.ForeignKey(
DataSourceFile, on_delete=models.CASCADE
)
hostname = models.CharField(
max_length=512
)
created_on = models.DateTimeField()
expired_on = models.DateTimeField(
null=True,
blank=True
)

class Meta:
"""Meta class for DataSourceFileCache."""

constraints = [
models.UniqueConstraint(
fields=['source_file', 'hostname'],
name='source_hostname'
)
]
11 changes: 10 additions & 1 deletion django_project/gap/tests/ingestor/test_salient.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
CollectorSession
)
from gap.ingestor.salient import SalientIngestor, SalientCollector
from gap.factories import DataSourceFileFactory
from gap.factories import DataSourceFileFactory, DataSourceFileCacheFactory
from gap.tasks.collector import run_salient_collector_session


Expand Down Expand Up @@ -401,3 +401,12 @@ def test_run(
self.ingestor._get_s3_filepath(self.datasourcefile))
mock_to_zarr.assert_called_once()
mock_compute.assert_called_once()

def test_invalidate_cache(self):
"""Test invalidate cache function."""
cache_file = DataSourceFileCacheFactory.create(
source_file=self.ingestor.datasource_file
)
self.ingestor._invalidate_zarr_cache()
cache_file.refresh_from_db()
self.assertIsNotNone(cache_file.expired_on)
91 changes: 85 additions & 6 deletions django_project/gap/tests/utils/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
from django.contrib.gis.geos import (
Point
)
from django.utils import timezone
from unittest.mock import MagicMock, patch

from gap.models import (
Provider, Dataset, DatasetAttribute, DataSourceFile,
Provider, Dataset, DatasetAttribute,
DatasetStore
)
from gap.utils.reader import (
Expand All @@ -28,7 +29,10 @@
load_source_zarr_cache,
clear_source_zarr_cache
)
from gap.factories import DataSourceFileFactory
from gap.factories import (
DataSourceFileFactory,
DataSourceFileCacheFactory
)


class TestCBAMZarrReader(TestCase):
Expand Down Expand Up @@ -121,10 +125,81 @@ def test_open_dataset(
mock_dataset = MagicMock(spec=xrDataset)
mock_open_zarr.return_value = mock_dataset

source_file = DataSourceFile(name='test_dataset.zarr')
source_file.metadata = {
'drop_variables': ['test']
}
source_file = DataSourceFileFactory.create(
name='test_dataset.zarr',
metadata={
'drop_variables': ['test']
}
)
self.reader.setup_reader()
result = self.reader.open_dataset(source_file)

# Assertions to ensure the method is called correctly
assert result == mock_dataset
mock_s3fs.assert_called_once_with(
key='test_access_key',
secret='test_secret_key',
endpoint_url='https://test-endpoint.com'
)
cache_filename = 'test_dataset_zarr'
mock_fsspec_filesystem.assert_called_once_with(
'filecache',
target_protocol='s3',
target_options=self.reader.s3_options,
cache_storage=f'/tmp/{cache_filename}',
cache_check=3600,
expiry_time=86400,
target_kwargs={'s3': mock_s3fs_instance}
)
mock_fs_instance.get_mapper.assert_called_once_with(
's3://test-bucket/test-prefix/test_dataset.zarr')
mock_open_zarr.assert_called_once_with(
mock_fs_instance.get_mapper.return_value,
consolidated=True, drop_variables=['test'])

@patch.dict('os.environ', {
'MINIO_AWS_ACCESS_KEY_ID': 'test_access_key',
'MINIO_AWS_SECRET_ACCESS_KEY': 'test_secret_key',
'MINIO_AWS_ENDPOINT_URL': 'https://test-endpoint.com',
'MINIO_AWS_REGION_NAME': 'us-test-1',
'MINIO_GAP_AWS_BUCKET_NAME': 'test-bucket',
'MINIO_GAP_AWS_DIR_PREFIX': 'test-prefix/'
})
@patch('xarray.open_zarr')
@patch('fsspec.filesystem')
@patch('s3fs.S3FileSystem')
@patch('os.uname')
def test_open_dataset_with_cache(
self, mock_uname, mock_s3fs, mock_fsspec_filesystem,
mock_open_zarr
):
"""Test open zarr dataset function."""
# Mock the s3fs.S3FileSystem constructor
mock_s3fs_instance = MagicMock()
mock_s3fs.return_value = mock_s3fs_instance

# Mock the fsspec.filesystem constructor
mock_fs_instance = MagicMock()
mock_fsspec_filesystem.return_value = mock_fs_instance

# Mock the xr.open_zarr function
mock_dataset = MagicMock(spec=xrDataset)
mock_open_zarr.return_value = mock_dataset

# Mock uname to get hostname
mock_uname.return_value = [0, 'test-host']

source_file = DataSourceFileFactory.create(
name='test_dataset.zarr',
metadata={
'drop_variables': ['test']
}
)
source_file_cache = DataSourceFileCacheFactory.create(
source_file=source_file,
hostname='test-host',
expired_on=timezone.now()
)
self.reader.setup_reader()
result = self.reader.open_dataset(source_file)

Expand All @@ -150,6 +225,10 @@ def test_open_dataset(
mock_open_zarr.assert_called_once_with(
mock_fs_instance.get_mapper.return_value,
consolidated=True, drop_variables=['test'])
mock_uname.assert_called_once()
# assert cache does not expired_on
source_file_cache.refresh_from_db()
self.assertIsNone(source_file_cache.expired_on)

@patch('gap.utils.zarr.BaseZarrReader.get_s3_variables')
@patch('gap.utils.zarr.BaseZarrReader.get_s3_client_kwargs')
Expand Down
Loading
Loading