Skip to content

Commit

Permalink
feat(dev): implement IPipeValidation;
Browse files Browse the repository at this point in the history
- Implement experimental `IPipeValidation` implement class from ckanext-validation.
  • Loading branch information
JVickery-TBS committed Jul 23, 2024
1 parent ac9d2bf commit cc7e33c
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 3 deletions.
17 changes: 17 additions & 0 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,23 @@ groups:
to True.
type: bool
required: false
- key: ckanext.xloader.validation.requires_successful_report
default: False
example: True
description: |
Resources are required to pass Validation from the ckanext-validation
plugin to be able to get XLoadered.
type: bool
required: false
- key: ckanext.xloader.validation.enforce_schema
default: True
example: False
description: |
Resources are expected to have a Validation Schema, or use the default ones if not.
If this option is set to `False`, Resources that do not have
a Validation Schema will be treated like they do not require Validation.
See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema
for more details.
- key: ckanext.xloader.clean_datastore_tables
default: False
example: True
Expand Down
39 changes: 38 additions & 1 deletion ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from . import action, auth, helpers as xloader_helpers, utils
from ckanext.xloader.utils import XLoaderFormats

try:
from ckanext.validation.interfaces import IPipeValidation
HAS_IPIPE_VALIDATION = True
except ImportError:
HAS_IPIPE_VALIDATION = False

try:
config_declarations = toolkit.blanket.config_declarations
except AttributeError:
Expand All @@ -34,6 +40,8 @@ class xloaderPlugin(plugins.SingletonPlugin):
plugins.implements(plugins.IResourceController, inherit=True)
plugins.implements(plugins.IClick)
plugins.implements(plugins.IBlueprint)
if HAS_IPIPE_VALIDATION:
plugins.implements(IPipeValidation)

# IClick
def get_commands(self):
Expand Down Expand Up @@ -68,6 +76,21 @@ def configure(self, config_):
)
)

# IPipeValidation

def receive_validation_report(self, validation_report):
if utils.requires_successful_validation_report():
res_dict = toolkit.get_action('resource_show')({'ignore_auth': True},
{'id': validation_report.get('resource_id')})
if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True))
or res_dict.get('schema', None)) and validation_report.get('status') != 'success':
# A schema is present, or required to be present
return
# if validation is running in async mode, it is running from the redis workers.
# thus we need to do sync=True to have Xloader put the job at the front of the queue.
sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True))
self._submit_to_xloader(res_dict, sync=sync)

# IDomainObjectModification

def notify(self, entity, operation):
Expand Down Expand Up @@ -95,7 +118,16 @@ def notify(self, entity, operation):
if _should_remove_unsupported_resource_from_datastore(resource_dict):
toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id])

if not getattr(entity, 'url_changed', False):
if utils.requires_successful_validation_report():
# If the resource requires validation, stop here if validation
# has not been performed or did not succeed. The Validation
# extension will call resource_patch and this method should
# be called again. However, url_changed will not be in the entity
# once Validation does the patch.
log.debug("Deferring xloading resource %s because the "
"resource did not pass validation yet.", resource_dict.get('id'))
return
elif not getattr(entity, 'url_changed', False):
# do not submit to xloader if the url has not changed.
return

Expand All @@ -104,6 +136,11 @@ def notify(self, entity, operation):
# IResourceController

def after_resource_create(self, context, resource_dict):
if utils.requires_successful_validation_report():
log.debug("Deferring xloading resource %s because the "
"resource did not pass validation yet.", resource_dict.get('id'))
return

self._submit_to_xloader(resource_dict)

def before_resource_show(self, resource_dict):
Expand Down
87 changes: 87 additions & 0 deletions ckanext/xloader/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,93 @@ def test_submit_when_url_changes(self, monkeypatch):

assert func.called

@pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True)
def test_require_validation(self, monkeypatch):
func = mock.Mock()
monkeypatch.setitem(_actions, "xloader_submit", func)

mock_resource_validation_show = mock.Mock()
monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show)

dataset = factories.Dataset()

resource = helpers.call_action(
"resource_create",
{},
package_id=dataset["id"],
url="http://example.com/file.csv",
format="CSV",
validation_status='failure',
)

# TODO: test IPipeValidation
assert not func.called # because of the validation_status not being `success`
func.called = None # reset

helpers.call_action(
"resource_update",
{},
id=resource["id"],
package_id=dataset["id"],
url="http://example.com/file2.csv",
format="CSV",
validation_status='success',
)

# TODO: test IPipeValidation
assert not func.called # because of the validation_status is `success`

@pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True)
@pytest.mark.ckan_config("ckanext.xloader.validation.enforce_schema", False)
def test_enforce_validation_schema(self, monkeypatch):
func = mock.Mock()
monkeypatch.setitem(_actions, "xloader_submit", func)

mock_resource_validation_show = mock.Mock()
monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show)

dataset = factories.Dataset()

resource = helpers.call_action(
"resource_create",
{},
package_id=dataset["id"],
url="http://example.com/file.csv",
schema='',
validation_status='',
)

# TODO: test IPipeValidation
assert not func.called # because of the schema being empty
func.called = None # reset

helpers.call_action(
"resource_update",
{},
id=resource["id"],
package_id=dataset["id"],
url="http://example.com/file2.csv",
schema='https://example.com/schema.json',
validation_status='failure',
)

# TODO: test IPipeValidation
assert not func.called # because of the validation_status not being `success` and there is a schema
func.called = None # reset

helpers.call_action(
"resource_update",
{},
package_id=dataset["id"],
id=resource["id"],
url="http://example.com/file3.csv",
schema='https://example.com/schema.json',
validation_status='success',
)

# TODO: test IPipeValidation
assert not func.called # because of the validation_status is `success` and there is a schema

@pytest.mark.parametrize("toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result", [
# Test1: Should pass as it is an upload with an active datastore entry but an unsupported format
(True, False, 'upload', True, True),
Expand Down
70 changes: 68 additions & 2 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
from decimal import Decimal

import ckan.plugins as p
from ckan.plugins.toolkit import config
from ckan.plugins.toolkit import config, h, _

from .job_exceptions import JobError

from logging import getLogger


log = getLogger(__name__)

# resource.formats accepted by ckanext-xloader. Must be lowercase here.
DEFAULT_FORMATS = [
"csv",
Expand Down Expand Up @@ -46,9 +51,70 @@ def is_it_an_xloader_format(cls, format_):
return format_.lower() in cls._formats


def requires_successful_validation_report():
return p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False))


def awaiting_validation(res_dict):
"""
Checks the existence of a logic action from the ckanext-validation
plugin, thus supporting any extending of the Validation Plugin class.
Checks ckanext.xloader.validation.requires_successful_report config
option value.
Checks ckanext.xloader.validation.enforce_schema config
option value. Then checks the Resource's validation_status.
"""
if not requires_successful_validation_report():
# validation.requires_successful_report is turned off, return right away
return False

try:
# check for one of the main actions from ckanext-validation
# in the case that users extend the Validation plugin class
# and rename the plugin entry-point.
p.toolkit.get_action('resource_validation_show')
is_validation_plugin_loaded = True
except KeyError:
is_validation_plugin_loaded = False

if not is_validation_plugin_loaded:
# the validation plugin is not loaded but required, log a warning
log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.')
return False

if (p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True))
or res_dict.get('schema', None)) and res_dict.get('validation_status', None) != 'success':

# either validation.enforce_schema is turned on or it is off and there is a schema,
# we then explicitly check for the `validation_status` report to be `success``
return True

# at this point, we can assume that the Resource is not waiting for Validation.
# or that the Resource does not have a Validation Schema and we are not enforcing schemas.
return False


def resource_data(id, resource_id, rows=None):

if p.toolkit.request.method == "POST":

context = {
"ignore_auth": True,
}
resource_dict = p.toolkit.get_action("resource_show")(
context,
{
"id": resource_id,
},
)

if awaiting_validation(resource_dict):
h.flash_error(_("Cannot upload resource %s to the DataStore "
"because the resource did not pass validation yet.") % resource_id)
return p.toolkit.redirect_to(
"xloader.resource_data", id=id, resource_id=resource_id
)

try:
p.toolkit.get_action("xloader_submit")(
None,
Expand Down Expand Up @@ -231,7 +297,7 @@ def type_guess(rows, types=TYPES, strict=False):
else:
for i, row in enumerate(rows):
diff = len(row) - len(guesses)
for _ in range(diff):
for _i in range(diff):
guesses.append(defaultdict(int))
for i, cell in enumerate(row):
# add string guess so that we have at least one guess
Expand Down

0 comments on commit cc7e33c

Please sign in to comment.