|
| 1 | +from icon_validator.rules.validator import KomandPluginValidator |
| 2 | +from icon_validator.exceptions import ValidationException |
| 3 | +from icon_plugin_spec.plugin_spec import KomandPluginSpec |
| 4 | + |
| 5 | +import os |
| 6 | +import json |
| 7 | +import re |
| 8 | + |
| 9 | + |
| 10 | +class _Plugin(object): |
| 11 | + |
| 12 | + def __init__(self, name: str, version: str): |
| 13 | + self.plugin = name |
| 14 | + self.version = version |
| 15 | + |
| 16 | + def __eq__(self, other): |
| 17 | + return (self.plugin == other.plugin) and (self.version == other.version) |
| 18 | + |
| 19 | + def __hash__(self): |
| 20 | + return hash((self.plugin, self.version)) |
| 21 | + |
| 22 | + |
| 23 | +class WorkflowHelpPluginUtilizationValidator(KomandPluginValidator): |
| 24 | + |
| 25 | + @staticmethod |
| 26 | + def load_workflow_file(spec: KomandPluginSpec) -> dict: |
| 27 | + """ |
| 28 | + Load a workflow file as KomandPluginSpec into a dictionary |
| 29 | + :param spec: .icon workflow |
| 30 | + :return: Workflow spec as a dictionary |
| 31 | + """ |
| 32 | + workflow_directory = spec.directory |
| 33 | + |
| 34 | + for file_name in os.listdir(workflow_directory): |
| 35 | + if not (file_name.endswith(".icon")): |
| 36 | + continue |
| 37 | + |
| 38 | + with open(f"{workflow_directory}/{file_name}") as json_file: |
| 39 | + try: |
| 40 | + workflow_file = json.load(json_file) |
| 41 | + except json.JSONDecodeError: |
| 42 | + raise ValidationException( |
| 43 | + "The .icon file is not in JSON format. Try exporting the .icon file again") |
| 44 | + |
| 45 | + return workflow_file |
| 46 | + |
| 47 | + @staticmethod |
| 48 | + def extract_workflow(workflow_file: dict) -> dict: |
| 49 | + """ |
| 50 | + Returns a workflow with metadata and step information |
| 51 | + :param workflow_file: Dictionary containing workflow information |
| 52 | + :return: Workflow metadata and step information as a dict |
| 53 | + """ |
| 54 | + try: |
| 55 | + workflow = workflow_file["kom"]["workflowVersions"][0] |
| 56 | + except KeyError: |
| 57 | + raise ValidationException("The .icon file is not formatted correctly. Try exporting the .icon file again") |
| 58 | + |
| 59 | + return workflow |
| 60 | + |
| 61 | + @staticmethod |
| 62 | + def extract_plugins_used(workflow: dict) -> [dict]: |
| 63 | + |
| 64 | + # Raw list of plugins |
| 65 | + plugin_list = list() |
| 66 | + try: |
| 67 | + for step_id in workflow["steps"]: |
| 68 | + step: dict = workflow["steps"][step_id] |
| 69 | + |
| 70 | + # We only care about plugin steps, so continue if it is not |
| 71 | + if "plugin" not in step.keys(): |
| 72 | + continue |
| 73 | + |
| 74 | + plugin_name = step["plugin"]["name"] |
| 75 | + plugin_version = step["plugin"]["slugVersion"] |
| 76 | + |
| 77 | + plugin = _Plugin(name=plugin_name, version=plugin_version) |
| 78 | + plugin_list.append(plugin) |
| 79 | + |
| 80 | + # Once the loop is done, count unique items |
| 81 | + plugins_and_counts = [] |
| 82 | + |
| 83 | + for plugin in set(plugin_list): |
| 84 | + count = plugin_list.count(plugin) |
| 85 | + plugins_and_counts.append({**plugin.__dict__, "count": count}) |
| 86 | + |
| 87 | + return plugins_and_counts |
| 88 | + |
| 89 | + except KeyError: |
| 90 | + raise ValidationException("The .icon file is not formatted correctly. Try exporting the .icon file again") |
| 91 | + |
| 92 | + @staticmethod |
| 93 | + def extract_plugins_in_help(help_str: str) -> list: |
| 94 | + """ |
| 95 | + Takes the help.md file as a string and extracts plugin version and count as a list of dictionaries |
| 96 | + """ |
| 97 | + # regex to isolate the plugin utilization table |
| 98 | + regex = r"\|Plugin\|Version\|Count\|.*?#" |
| 99 | + |
| 100 | + plugins_utilized = re.findall(regex, help_str, re.DOTALL) |
| 101 | + # Split each line into a sub string |
| 102 | + plugins_list = plugins_utilized[0].split("\n") |
| 103 | + # remove trailing and leading lines so that only plugin utilization data is left |
| 104 | + plugins_list = list( |
| 105 | + filter(lambda item: item.startswith("|") and not (item.startswith("|Plugin") or item.startswith("|-")), |
| 106 | + plugins_list)) |
| 107 | + plugins_dict_list = list() |
| 108 | + # Build dictionary for each plugin e.g. {'Plugin': 'ExtractIt', 'Version': '1.1.6', 'Count': 1} |
| 109 | + # then append to a list |
| 110 | + for plugin in plugins_list: |
| 111 | + temp = plugin.split("|") |
| 112 | + plugins_dict_list.append({"plugin": temp[1], "version": temp[2], "count": int(temp[3])}) |
| 113 | + return plugins_dict_list |
| 114 | + |
| 115 | + def validate(self, spec): |
| 116 | + workflow_file = WorkflowHelpPluginUtilizationValidator.load_workflow_file(spec) |
| 117 | + workflow = WorkflowHelpPluginUtilizationValidator.extract_workflow(workflow_file) |
| 118 | + plugins_used = WorkflowHelpPluginUtilizationValidator.extract_plugins_used(workflow) |
| 119 | + plugin_in_help = WorkflowHelpPluginUtilizationValidator.extract_plugins_in_help(spec.raw_help()) |
| 120 | + for plugin in plugins_used: |
| 121 | + if plugin not in plugin_in_help: |
| 122 | + raise ValidationException("The following plugin was found in the .icon file," |
| 123 | + f" but not in the help file {plugin}") |
| 124 | + for plugin in plugin_in_help: |
| 125 | + if plugin not in plugins_used: |
| 126 | + raise ValidationException("The following plugin was found in the help file," |
| 127 | + f" but not in the .icon file {plugin}") |
0 commit comments