Skip to content

Commit

Permalink
feat(dev): IPipeValidation interface;
Browse files Browse the repository at this point in the history
- Created new `IPipeValidation` interface which sends dictized validation reports to other plugins.
  • Loading branch information
JVickery-TBS committed May 16, 2024
1 parent 1073c80 commit c5a5242
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 18 deletions.
18 changes: 18 additions & 0 deletions ckanext/validation/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ def can_validate(self, context, data_dict):
'''
return True


class IPipeValidation(Interface):
"""
Process data in a Data Pipeline.
Inherit this to subscribe to events in the Data Pipeline and be able to
broadcast the results for others to process next. In this way, a number of
IPipes can be linked up in sequence to build up a data processing pipeline.
When a resource is validated, it broadcasts its validation_report,
perhaps triggering a process which transforms the data to another format,
or loads it into a datastore. These processes can in turn put the resulting
validation reports into the pipeline
"""

def receive_validation_report(self, validation_report):
pass
7 changes: 6 additions & 1 deletion ckanext/validation/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import ckantoolkit as t

from ckanext.validation.model import Validation
from ckanext.validation.utils import get_update_mode_from_config
from ckanext.validation.utils import (
get_update_mode_from_config,
send_validation_report,
validation_dictize,
)


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,6 +136,7 @@ def run_validation_job(resource):
'_validation_performed': True
}
t.get_action('resource_patch')(patch_context, data_dict)
send_validation_report(validation_dictize(validation))



Expand Down
19 changes: 2 additions & 17 deletions ckanext/validation/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_create_mode_from_config,
get_update_mode_from_config,
delete_local_uploaded_file,
validation_dictize,
)


Expand Down Expand Up @@ -172,7 +173,7 @@ def resource_validation_show(context, data_dict):
raise t.ObjectNotFound(
'No validation report exists for this resource')

return _validation_dictize(validation)
return validation_dictize(validation)


def resource_validation_delete(context, data_dict):
Expand Down Expand Up @@ -404,22 +405,6 @@ def _add_default_formats(search_data_dict):
search_data_dict['fq_list'].append(' OR '.join(filter_formats_query))


def _validation_dictize(validation):
out = {
'id': validation.id,
'resource_id': validation.resource_id,
'status': validation.status,
'report': validation.report,
'error': validation.error,
}
out['created'] = (
validation.created.isoformat() if validation.created else None)
out['finished'] = (
validation.finished.isoformat() if validation.finished else None)

return out


@t.chained_action
def resource_create(up_func, context, data_dict):
'''Appends a new resource to a datasets list of resources.
Expand Down
30 changes: 30 additions & 0 deletions ckanext/validation/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import logging

from ckan.plugins import PluginImplementations
from ckan.lib.uploader import ResourceUpload
from ckantoolkit import config, asbool

from ckanext.validation.interfaces import IPipeValidation


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,3 +74,30 @@ def delete_local_uploaded_file(resource_id):

except OSError as e:
log.warning(u'Error deleting uploaded file: %s', e)


def validation_dictize(validation):
out = {
'id': validation.id,
'resource_id': validation.resource_id,
'status': validation.status,
'report': validation.report,
'error': validation.error,
}
out['created'] = (
validation.created.isoformat() if validation.created else None)
out['finished'] = (
validation.finished.isoformat() if validation.finished else None)

return out


def send_validation_report(validation_report):
for observer in PluginImplementations(IPipeValidation):
try:
observer.receive_validation_report(validation_report)
except Exception as ex:
log.exception(ex)
# We reraise all exceptions so they are obvious there
# is something wrong
raise

0 comments on commit c5a5242

Please sign in to comment.