Skip to content

Commit

Permalink
Fix cancel task (#72)
Browse files Browse the repository at this point in the history
* Fix cancel task

* Fix failed tests

* Fix failed tests
  • Loading branch information
zamuzakki authored Oct 11, 2024
1 parent 9dfd53b commit 7e0c8e2
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 216 deletions.
2 changes: 1 addition & 1 deletion django_project/cplus_api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from .runner import * # noqa
from .remove_layers import * # noqa
from .verify_input_layer import * # noqa
from .sync_default_layers import *
from .sync_default_layers import sync_default_layers
212 changes: 5 additions & 207 deletions django_project/cplus_api/tasks/sync_default_layers.py
Original file line number Diff line number Diff line change
@@ -1,221 +1,19 @@
import logging
import os
import typing
import tempfile

import rasterio
from rasterio.errors import RasterioIOError
from datetime import datetime
from django.utils import timezone

from celery import shared_task
from django.contrib.auth.models import User
from storages.backends.s3 import S3Storage
from django.conf import settings
from django.core.files.storage import FileSystemStorage
from django.db.models import Q

from cplus_api.models import (
select_input_layer_storage,
InputLayer,
COMMON_LAYERS_DIR
)
from cplus_api.utils.api_helper import get_layer_type

logger = logging.getLogger(__name__)


class ProcessFile:
"""
Class to process a file dictionary into an Input Layer
:param storage: Django storage instance
:type storage: FileSystemStorage or S3Storage
:param owner: Owner of the input layer
:type owner: User
:param component_type: Component type of the input layer e.g. ncs_pathway
:type component_type: str
:param file: Dictionary of the file info to be processed
:type file: dict
:return: None
:rtype: None
"""
def __init__(
self,
storage: typing.Union[FileSystemStorage, S3Storage],
owner: User,
component_type: str,
file: dict
):
self.storage = storage
self.owner = owner
self.component_type = component_type
self.file = file
self.input_layer, self.created = InputLayer.objects.get_or_create(
owner=owner,
privacy_type=InputLayer.PrivacyTypes.COMMON,
component_type=component_type,
file=file['Key'],
defaults={
'created_on': timezone.now(),
'layer_type': get_layer_type(file['Key'])
}
)

def read_metadata(
self,
file_path: str
):
"""
Read metadata from layer file and save it to InputLayer object
:param file_path: path for input file
:type file_path: str
:return: None
:rtype: None
"""
with rasterio.open(file_path) as dataset:
transform = dataset.transform
res_x = abs(transform[0])
res_y = abs(transform[4])
crs = dataset.crs
nodata = dataset.nodata
unit = dataset.crs.units_factor[0]
unit = "m" if unit == "metre" else unit

metadata = {
"is_raster": get_layer_type(self.file['Key']) == 0,
"crs": str(crs),
"resolution": [res_x, res_y],
"unit": unit,
"nodata_value": nodata,
"is_geographic": dataset.crs.is_geographic
}
if not self.input_layer.name:
self.input_layer.name = os.path.basename(self.file['Key'])
if not self.input_layer.description:
self.input_layer.description = (
os.path.basename(self.file['Key'])
)
self.input_layer.metadata = metadata
self.input_layer.file.name = self.file['Key']
self.input_layer.size = os.path.getsize(file_path)
self.input_layer.save()

def run(self):
"""
Function to trigger file processing
:return: None
:rtype: None
"""
# Save layer if the file is modified after input layers last saved OR
# if input layer is a new record
if (
self.file['LastModified'] > self.input_layer.modified_on or
self.created
):
media_root = self.storage.location or settings.MEDIA_ROOT
if isinstance(self.storage, FileSystemStorage):
download_path = os.path.join(media_root, self.file['Key'])
os.makedirs(os.path.dirname(download_path), exist_ok=True)
iteration = 0
while iteration < 3:
try:
self.read_metadata(download_path)
except RasterioIOError:
iteration += 1
if iteration == 3 and (
self.input_layer.name == '' or
self.input_layer.file is None
):
self.input_layer.delete()
else:
os.remove(download_path)
break
else:
iteration = 0
while iteration < 3:
with tempfile.NamedTemporaryFile() as tmpfile:
boto3_client = self.storage.connection.meta.client
boto3_client.download_file(
self.storage.bucket_name,
self.file['Key'],
tmpfile.name,
Config=settings.AWS_TRANSFER_CONFIG
)
try:
self.read_metadata(tmpfile.name)
except RasterioIOError:
iteration += 1
if iteration == 3 and (
self.input_layer.name == '' or
self.input_layer.file is None
):
self.input_layer.delete()
else:
break


def delete_invalid_default_layers():
"""Delete invalid default layers in DB
:return: None
:rtype: None
"""
common_layers = InputLayer.objects.filter(
privacy_type=InputLayer.PrivacyTypes.COMMON
)
invalid_common_layers = common_layers.filter(
Q(name='') | Q(file='')
)
invalid_common_layers.delete()


@shared_task(name="sync_default_layers")
def sync_default_layers():
"""
Create Input Layers from default layers copied to S3/local directory
"""
from cplus_api.utils.layers import (
delete_invalid_default_layers,
sync_cplus_layers
)

delete_invalid_default_layers()

storage = select_input_layer_storage()
component_types = [c[0] for c in InputLayer.ComponentTypes.choices]
admin_username = os.getenv('ADMIN_USERNAME')
owner = User.objects.get(username=admin_username)
if isinstance(storage, FileSystemStorage):
media_root = storage.location or settings.MEDIA_ROOT
for component_type in component_types:
component_path = os.path.join(
media_root, COMMON_LAYERS_DIR, component_type
)
os.makedirs(component_path, exist_ok=True)
layers = os.listdir(component_path)
for layer in layers:
key = f"{COMMON_LAYERS_DIR}/{component_type}/{layer}"
download_path = os.path.join(media_root, key)
last_modified = datetime.fromtimestamp(
os.path.getmtime(download_path),
tz=timezone.now().tzinfo
)
file = {
"Key": key,
"LastModified": last_modified,
"Size": os.path.getsize(download_path)
}
ProcessFile(storage, owner, component_type, file).run()
else:
boto3_client = storage.connection.meta.client
for component_type in component_types:
response = boto3_client.list_objects(
Bucket=storage.bucket_name,
Prefix=f"{COMMON_LAYERS_DIR}/{component_type}"
)
for file in response.get('Contents', []):
ProcessFile(storage, owner, component_type, file).run()
sync_cplus_layers()
10 changes: 4 additions & 6 deletions django_project/cplus_api/tests/test_sync_default_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
InputLayer,
COMMON_LAYERS_DIR
)
from cplus_api.tasks.sync_default_layers import (
sync_default_layers,
ProcessFile
)
from cplus_api.tasks.sync_default_layers import sync_default_layers
from cplus_api.tests.common import BaseAPIViewTransactionTest
from cplus_api.tests.factories import InputLayerF
from cplus_api.utils.layers import ProcessFile


class TestSyncDefaultLayer(BaseAPIViewTransactionTest):
Expand Down Expand Up @@ -209,12 +207,12 @@ def run_s3(self, mock_storage, mock_named_tmp_file=None):
__enter__.return_value).name = dest_path
sync_default_layers()

@patch('cplus_api.tasks.sync_default_layers.select_input_layer_storage')
@patch('cplus_api.utils.layers.select_input_layer_storage')
def test_invalid_input_layers_not_created_s3(self, mock_storage):
self.run_s3(mock_storage)
self.assertFalse(InputLayer.objects.exists())

@patch('cplus_api.tasks.sync_default_layers.select_input_layer_storage')
@patch('cplus_api.utils.layers.select_input_layer_storage')
@patch.object(tempfile, 'NamedTemporaryFile')
def test_invalid_input_layers_created_s3(
self,
Expand Down
Loading

0 comments on commit 7e0c8e2

Please sign in to comment.