diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index feb1cc9c..717657bd 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -119,5 +119,13 @@ groups: to True. type: bool required: false + - key: ckanext.xloader.clean_datastore_tables + default: False + example: True + description: | + Enqueue jobs to remove Datastore tables from Resources that have a format + that is not in ckanext.xloader.formats after a Resource is updated. + type: bool + required: false diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 6e65e466..e0ce027e 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -7,6 +7,7 @@ from ckan.model.domain_object import DomainObjectOperation from ckan.model.resource import Resource +from ckan.model.package import Package from . import action, auth, helpers as xloader_helpers, utils from ckanext.xloader.utils import XLoaderFormats @@ -70,17 +71,17 @@ def configure(self, config_): # IDomainObjectModification def notify(self, entity, operation): - # type: (ckan.model.Package|ckan.model.Resource, DomainObjectOperation) -> None + # type: (Package|Resource, DomainObjectOperation) -> None """ Runs before_commit to database for Packages and Resources. We only want to check for changed Resources for this. - We want to check if values have changed, namely the url. + We want to check if values have changed, namely the url and the format. See: ckan/model/modification.py.DomainObjectModificationExtension """ if operation != DomainObjectOperation.changed \ - or not isinstance(entity, Resource) \ - or not getattr(entity, 'url_changed', False): + or not isinstance(entity, Resource): return + context = { "ignore_auth": True, } @@ -90,6 +91,14 @@ def notify(self, entity, operation): "id": entity.id, }, ) + + if _should_remove_unsupported_resource_from_datastore(resource_dict): + toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id]) + + if not getattr(entity, 'url_changed', False): + # do not submit to xloader if the url has not changed. + return + self._submit_to_xloader(resource_dict) # IResourceController @@ -197,3 +206,39 @@ def get_helpers(self): "xloader_status_description": xloader_helpers.xloader_status_description, "is_resource_supported_by_xloader": xloader_helpers.is_resource_supported_by_xloader, } + + +def _should_remove_unsupported_resource_from_datastore(res_dict): + if not toolkit.asbool(toolkit.config.get('ckanext.xloader.clean_datastore_tables', False)): + return False + return (not XLoaderFormats.is_it_an_xloader_format(res_dict.get('format', u'')) + and (res_dict.get('url_type') == 'upload' + or not res_dict.get('url_type')) + and (toolkit.asbool(res_dict.get('datastore_active', False)) + or toolkit.asbool(res_dict.get('extras', {}).get('datastore_active', False)))) + + +def _remove_unsupported_resource_from_datastore(resource_id): + """ + Callback to remove unsupported datastore tables. + Controlled by config value: ckanext.xloader.clean_datastore_tables. + Double check the resource format. Only supported Xloader formats should have datastore tables. + If the resource format is not supported, we should delete the datastore tables. + """ + context = {"ignore_auth": True} + try: + res = toolkit.get_action('resource_show')(context, {"id": resource_id}) + except toolkit.ObjectNotFound: + log.error('Resource %s does not exist.', resource_id) + return + + if _should_remove_unsupported_resource_from_datastore(res): + log.info('Unsupported resource format "%s". Deleting datastore tables for resource %s', + res.get(u'format', u''), res['id']) + try: + toolkit.get_action('datastore_delete')(context, { + "resource_id": res['id'], + "force": True}) + log.info('Datastore table dropped for resource %s', res['id']) + except toolkit.ObjectNotFound: + log.error('Datastore table for resource %s does not exist', res['id']) diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index 05b83b5b..8382e68b 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -7,8 +7,10 @@ except ImportError: import mock from six import text_type as str + from ckan.tests import helpers, factories from ckan.logic import _actions +from ckanext.xloader.plugin import _should_remove_unsupported_resource_from_datastore @pytest.mark.usefixtures("clean_db", "with_plugins") @@ -58,6 +60,39 @@ def test_submit_when_url_changes(self, monkeypatch): assert func.called + @pytest.mark.parametrize("toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result", [ + # Test1: Should pass as it is an upload with an active datastore entry but an unsupported format + (True, False, 'upload', True, True), + # Test2: Should fail as it is a supported XLoader format. + (True, True, 'upload', True, False), + # Test3: Should fail as the config option is turned off. + (False, False, 'upload', True, False), + # Test4: Should fail as the url_type is not supported. + (True, False, 'custom_type', True, False), + # Test5: Should fail as datastore is inactive. + (True, False, 'upload', False, False), + # Test6: Should pass as it is a recognised resource type with an active datastore entry but an unsupported format + (True, False, '', True, True), + # Test7: Should pass as it is a recognised resource type with an active datastore entry but an unsupported format + (True, False, None, True, True), + ]) + def test_should_remove_unsupported_resource_from_datastore( + self, toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result): + + # Setup mock data + res_dict = { + 'format': 'some_format', + 'url_type': url_type, + 'datastore_active': datastore_active, + 'extras': {'datastore_active': datastore_active} + } + + # Assert the result based on the logic paths covered + with helpers.changed_config('ckanext.xloader.clean_datastore_tables', toolkit_config_value): + with mock.patch('ckanext.xloader.utils.XLoaderFormats.is_it_an_xloader_format') as mock_is_xloader_format: + mock_is_xloader_format.return_value = mock_xloader_formats + assert _should_remove_unsupported_resource_from_datastore(res_dict) == expected_result + def _pending_task(self, resource_id): return { "entity_id": resource_id,