Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean Datastore Tables Job #196

Merged
Merged
8 changes: 8 additions & 0 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,13 @@ groups:
to True.
type: bool
required: false
- key: ckanext.xloader.clean_datastore_tables
default: False
example: True
description: |
Enqueue jobs to remove Datastore tables from Resources that have a format
that is not in ckanext.xloader.formats after a Resource is updated.
type: bool
required: false


53 changes: 49 additions & 4 deletions ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ckan.model.domain_object import DomainObjectOperation
from ckan.model.resource import Resource
from ckan.model.package import Package

from . import action, auth, helpers as xloader_helpers, utils
from ckanext.xloader.utils import XLoaderFormats
Expand Down Expand Up @@ -70,17 +71,17 @@ def configure(self, config_):
# IDomainObjectModification

def notify(self, entity, operation):
# type: (ckan.model.Package|ckan.model.Resource, DomainObjectOperation) -> None
# type: (Package|Resource, DomainObjectOperation) -> None
"""
Runs before_commit to database for Packages and Resources.
We only want to check for changed Resources for this.
We want to check if values have changed, namely the url.
We want to check if values have changed, namely the url and the format.
See: ckan/model/modification.py.DomainObjectModificationExtension
"""
if operation != DomainObjectOperation.changed \
or not isinstance(entity, Resource) \
or not getattr(entity, 'url_changed', False):
or not isinstance(entity, Resource):
return

context = {
"ignore_auth": True,
}
Expand All @@ -90,6 +91,14 @@ def notify(self, entity, operation):
"id": entity.id,
},
)

if _should_remove_unsupported_resource_from_datastore(resource_dict):
toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id])

if not getattr(entity, 'url_changed', False):
# do not submit to xloader if the url has not changed.
return

self._submit_to_xloader(resource_dict)

# IResourceController
Expand Down Expand Up @@ -197,3 +206,39 @@ def get_helpers(self):
"xloader_status_description": xloader_helpers.xloader_status_description,
"is_resource_supported_by_xloader": xloader_helpers.is_resource_supported_by_xloader,
}


def _should_remove_unsupported_resource_from_datastore(res_dict):
if not toolkit.asbool(toolkit.config.get('ckanext.xloader.clean_datastore_tables', False)):
return False
return (not XLoaderFormats.is_it_an_xloader_format(res_dict.get('format', u''))
and (res_dict.get('url_type') == 'upload'
or not res_dict.get('url_type'))
and (toolkit.asbool(res_dict.get('datastore_active', False))
or toolkit.asbool(res_dict.get('extras', {}).get('datastore_active', False))))


def _remove_unsupported_resource_from_datastore(resource_id):
"""
Callback to remove unsupported datastore tables.
Controlled by config value: ckanext.xloader.clean_datastore_tables.
Double check the resource format. Only supported Xloader formats should have datastore tables.
If the resource format is not supported, we should delete the datastore tables.
"""
context = {"ignore_auth": True}
try:
res = toolkit.get_action('resource_show')(context, {"id": resource_id})
except toolkit.ObjectNotFound:
log.error('Resource %s does not exist.', resource_id)
return

if _should_remove_unsupported_resource_from_datastore(res):
log.info('Unsupported resource format "%s". Deleting datastore tables for resource %s',
res.get(u'format', u''), res['id'])
try:
toolkit.get_action('datastore_delete')(context, {
"resource_id": res['id'],
"force": True})
log.info('Datastore table dropped for resource %s', res['id'])
except toolkit.ObjectNotFound:
log.error('Datastore table for resource %s does not exist', res['id'])
34 changes: 34 additions & 0 deletions ckanext/xloader/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from six import text_type as str
from ckan.tests import helpers, factories
from ckan.logic import _actions
from ckanext.xloader.plugin import _should_remove_unsupported_resource_from_datastore


@pytest.mark.usefixtures("clean_db", "with_plugins")
Expand Down Expand Up @@ -58,6 +59,39 @@ def test_submit_when_url_changes(self, monkeypatch):

assert func.called

@pytest.mark.parametrize("toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result", [
# Test1: Should pass as it is an upload with an active datastore entry but an unsupported format
(True, False, 'upload', True, True),
# Test2: Should fail as it is a supported XLoader format.
(True, True, 'upload', True, False),
# Test3: Should fail as the config option is turned off.
(False, False, 'upload', True, False),
# Test4: Should fail as the url_type is not supported.
(True, False, 'custom_type', True, False),
# Test5: Should fail as datastore is inactive.
(True, False, 'upload', False, False),
# Test6: Should pass as it is a recognised resource type with an active datastore entry but an unsupported format
(True, False, '', True, True),
# Test7: Should pass as it is a recognised resource type with an active datastore entry but an unsupported format
(True, False, None, True, True),
])
def test_should_remove_unsupported_resource_from_datastore(
self, toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result):

# Setup mock data
res_dict = {
'format': 'some_format',
'url_type': url_type,
'datastore_active': datastore_active,
'extras': {'datastore_active': datastore_active}
}

# Assert the result based on the logic paths covered
with helpers.changed_config('ckanext.xloader.clean_datastore_tables', toolkit_config_value):
with mock.patch('ckanext.xloader.utils.XLoaderFormats.is_it_an_xloader_format') as mock_is_xloader_format:
mock_is_xloader_format.return_value = mock_xloader_formats
assert _should_remove_unsupported_resource_from_datastore(res_dict) == expected_result

def _pending_task(self, resource_id):
return {
"entity_id": resource_id,
Expand Down
Loading