diff --git a/nmostesting/IS14Utils.py b/nmostesting/IS14Utils.py index 7e303666..e28ad109 100644 --- a/nmostesting/IS14Utils.py +++ b/nmostesting/IS14Utils.py @@ -55,7 +55,7 @@ def _do_request(self, test, method, url, **kwargs): if not valid: raise NMOSTestException(test.FAIL(f"{r} for {method}: {url}")) try: - self.validate_reference_datatype_schema(test, r.json(), "NcMethodResult", f"{method} :{url} ") + self.reference_datatype_schema_validate(test, r.json(), "NcMethodResult", f"{method} :{url} ") except ValueError as e: raise NMOSTestException(test.FAIL(f"Error: {e.args[0]} for {method}: {url}")) except NMOSTestException as e: diff --git a/nmostesting/MS05Utils.py b/nmostesting/MS05Utils.py index 22c87b79..a414e871 100644 --- a/nmostesting/MS05Utils.py +++ b/nmostesting/MS05Utils.py @@ -109,7 +109,7 @@ def query_device_model(self, test): self.device_model_metadata set on Device Model validation error. NMOSTestException raised if unable to query Device Model """ if not self.device_model: - self.device_model = self._nc_object_factory( + self.device_model = self.create_device_model( test, StandardClassIds.NCBLOCK.value, self.ROOT_BLOCK_OID, @@ -302,33 +302,45 @@ def get_datatype_schema(self, test, type_name): return self.datatype_schemas.get(type_name) - def validate_reference_datatype_schema(self, test, payload, datatype_name, context=""): + def queried_datatype_schema_validate(self, test, payload, datatype_name, context=""): + """Validate payload against datatype schema queried from Node under Test Class Manager""" + datatype_schema = self.get_datatype_schema(test, datatype_name) + self._validate_schema(test, payload, datatype_schema, f"{context}{datatype_name}") + + def reference_datatype_schema_validate(self, test, payload, datatype_name, context=""): """Validate payload against reference datatype schema""" context += f"{datatype_name}: " - self.validate_schema(test, payload, self.reference_datatype_schemas[datatype_name], context) + self._validate_schema(test, payload, self.reference_datatype_schemas[datatype_name], + f"{context}{datatype_name}") - def validate_schema(self, test, payload, schema, context=""): - """Delegates to validate. Raises NMOSTestExceptions on error""" + def _validate_schema(self, test, payload, schema, context=""): + """Delegates to jsonschema validate. Raises NMOSTestExceptions on error""" if not schema: - raise NMOSTestException(test.FAIL("Missing schema. Possible unknown type: " + context)) + raise NMOSTestException(test.FAIL(f"Missing schema. Possible unknown type: {context}")) try: # Validate the JSON schema is correct checker = FormatChecker(["ipv4", "ipv6", "uri"]) validate(payload, schema, format_checker=checker) except ValidationError as e: - raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message)) + raise NMOSTestException(test.FAIL(f"{context}Schema validation error: {e.message}")) except SchemaError as e: - raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message)) + raise NMOSTestException(test.FAIL(f"{context}Schema error: {e.message}")) return def validate_descriptor(self, test, reference, descriptor, context=""): """Validate descriptor against reference NcDescriptor. Raises NMOSTestException on error""" + def sort_key(e): + if isinstance(e, NcDescriptor): + return e.name + else: + return e["name"] + non_normative_keys = ["description"] # Compare disctionaries if isinstance(reference, dict): - # Remove the json field if present + # NcDescriptor objects have a json field that caches the json used to construct it reference.pop("json", None) reference_keys = set(reference.keys()) @@ -338,35 +350,32 @@ def validate_descriptor(self, test, reference, descriptor, context=""): key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys)) if len(key_diff) > 0: error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys " - raise NMOSTestException(test.FAIL(context + error_description + str(key_diff))) + raise NMOSTestException(test.FAIL(f"{context}{error_description}{str(key_diff)}")) for key in reference_keys: - # Make sure to ignore non normative keys, but not fields that use those keys - if key in non_normative_keys and not isinstance(reference[key], dict): + # Ignore keys that contain non-normative information + if key in non_normative_keys: continue self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->") - # We can't guarantee ordering of lists, so convert reference and descriptor to dictionaries + # Compare lists elif isinstance(reference, list): if len(reference) != len(descriptor): - raise NMOSTestException(test.FAIL(context + "List unexpected length. Expected: " - + str(len(reference)) + ", actual: " + str(len(descriptor)))) + raise NMOSTestException(test.FAIL(f"{context}List unexpected length. Expected: " + f"{str(len(reference))}, actual: {str(len(descriptor))}")) if len(reference) > 0: + # If comparing lists of objects or dicts then sort by name first. + # Primitive arrays are unsorted as position is assumed to be important e.g. classId if isinstance(reference[0], (dict, NcDescriptor)): - # Convert to dict and validate - if isinstance(reference[0], NcDescriptor): - references = {item.name: item.__dict__ for item in reference} - else: - references = {item["name"]: item for item in reference} - descriptors = {item["name"]: item for item in descriptor} - self.validate_descriptor(test, references, descriptors, context) - else: - for refvalue, value in zip(reference, descriptor): - self.validate_descriptor(test, refvalue, value, context) + reference.sort(key=sort_key) + descriptor.sort(key=sort_key) + for refvalue, value in zip(reference, descriptor): + self.validate_descriptor(test, refvalue, value, context) # If the reference is an object then convert to a dict before comparison elif isinstance(reference, (NcDescriptor, NcElementId)): self.validate_descriptor(test, reference.__dict__, descriptor, context) + # Compare primitives and primitive arrays directly elif reference != descriptor: - raise NMOSTestException(test.FAIL(context + "Expected value: " - + str(reference) + ", actual value: " + str(descriptor))) + raise NMOSTestException(test.FAIL(f"{context}Expected value: " + f"{str(reference)}, actual value: {str(descriptor)}")) return def _get_class_manager_datatype_descriptors(self, test, class_manager_oid, role_path): @@ -376,9 +385,9 @@ def _get_class_manager_datatype_descriptors(self, test, class_manager_oid, role_ if not response: return None - # Validate descriptors + # Validate descriptors against schema for r in response: - self.validate_reference_datatype_schema(test, r, NcDatatypeDescriptor.__name__, + self.reference_datatype_schema_validate(test, r, NcDatatypeDescriptor.__name__, "/".join([str(r) for r in role_path])) # Create NcDescriptor dictionary from response array @@ -395,7 +404,7 @@ def _get_class_manager_class_descriptors(self, test, class_manager_oid, role_pat # Validate descriptors for r in response: - self.validate_reference_datatype_schema(test, r, NcClassDescriptor.__name__, + self.reference_datatype_schema_validate(test, r, NcClassDescriptor.__name__, "/".join([str(r) for r in role_path])) # Create NcClassDescriptor dictionary from response array @@ -403,8 +412,8 @@ def key_lambda(classId): return ".".join(map(str, classId)) descriptors = {key_lambda(r.get("classId")): NcClassDescriptor(r) for r in response} return descriptors - def _nc_object_factory(self, test, class_id, oid, role, _role_path=None): - """Create NcObject or NcBlock based on class_id""" + def create_device_model(self, test, class_id, oid, role, _role_path=None): + """Recursively create Device Model hierarchy""" # will set self.device_model_error to True if problems encountered if _role_path is None: role_path = [] @@ -423,26 +432,25 @@ def _nc_object_factory(self, test, class_id, oid, role, _role_path=None): oid=oid, role_path=role_path) if member_descriptors is None: - raise NMOSTestException(test.FAIL("Unable to get members for object: oid={}, role Path={}" - .format(str(oid), str(role_path)))) + raise NMOSTestException(test.FAIL("Unable to get members for object: " + f"oid={str(oid)}, role Path={str(role_path)}")) block_member_descriptors = [] for m in member_descriptors: - self.validate_reference_datatype_schema(test, m, NcBlockMemberDescriptor.__name__, + self.reference_datatype_schema_validate(test, m, NcBlockMemberDescriptor.__name__, "/".join([str(r) for r in role_path])) block_member_descriptors.append(NcBlockMemberDescriptor(m)) nc_block = NcBlock(class_id, oid, role, role_path, block_member_descriptors, runtime_constraints) for m in member_descriptors: - child_object = self._nc_object_factory(test, m["classId"], m["oid"], m["role"], role_path) + child_object = self.create_device_model(test, m["classId"], m["oid"], m["role"], role_path) if child_object: nc_block.add_child_object(child_object) return nc_block else: - # Check to determine if this is a Class Manager - if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2: + if self.is_class_manager(class_id): class_descriptors = self._get_class_manager_class_descriptors( test, class_manager_oid=oid, role_path=role_path) @@ -460,7 +468,7 @@ def _nc_object_factory(self, test, class_id, oid, role, _role_path=None): return NcObject(class_id, oid, role, role_path, runtime_constraints) except NMOSTestException as e: - raise NMOSTestException(test.FAIL("Error in Device Model " + role + ": " + str(e.args[0].detail))) + raise NMOSTestException(test.FAIL(f"Error in Device Model {role}: {str(e.args[0].detail)}")) def _get_object_by_class_id(self, test, class_id): device_model = self.query_device_model(test) @@ -516,11 +524,15 @@ def is_non_standard_class(self, class_id): return len([v for v in dropwhile(lambda x: x > 0, class_id)]) > 1 def is_manager(self, class_id): - """ Check class id to determine if this is a manager """ + """ Check class id to determine if this is a manager class_id""" return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 3 + def is_class_manager(self, class_id): + """ Check class id to determine is this is a class manager class_id """ + return len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2 + def is_block(self, class_id): - """ Check class id to determine if this is a block """ + """ Check class id to determine if this is a block class_id""" return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1 def resolve_datatype(self, test, datatype): diff --git a/nmostesting/suites/MS0501Test.py b/nmostesting/suites/MS0501Test.py index 327e1f91..6d585439 100644 --- a/nmostesting/suites/MS0501Test.py +++ b/nmostesting/suites/MS0501Test.py @@ -101,7 +101,7 @@ def validate_model_definitions(self, descriptors, reference_descriptors): descriptor = descriptors[key] # Validate descriptor obeys the JSON schema - self.ms05_utils.validate_reference_datatype_schema(test, descriptor.json, + self.ms05_utils.reference_datatype_schema_validate(test, descriptor.json, descriptor.__class__.__name__) # Validate the descriptor is correct @@ -201,8 +201,7 @@ def _validate_property_type(self, test, value, data_type, is_nullable, context=" if not isinstance(value, self.ms05_utils.primitive_to_python_type(data_type)): raise NMOSTestException(test.FAIL(f"{context}{str(value)} is not of type {str(data_type)}")) else: - self.ms05_utils.validate_schema(test, value, self.ms05_utils.get_datatype_schema(test, data_type), - f"{context}{data_type}") + self.ms05_utils.queried_datatype_schema_validate(test, value, data_type, f"{context}{data_type}") return @@ -350,14 +349,10 @@ def check_touchpoints(self, test, oid, role_path, context): try: for touchpoint_json in touchpoints: touchpoint = NcTouchpoint(touchpoint_json) - schema = self.ms05_utils.get_datatype_schema(test, NcTouchpointNmos.__name__) \ - if touchpoint.context_namespace == "x-nmos" \ - else self.ms05_utils.get_datatype_schema(test, NcTouchpointNmosChannelMapping.__name__) - self.ms05_utils.validate_schema( - test, - touchpoint_json, - schema, - context=f"{context}{schema["title"]}") + datatype_name = NcTouchpointNmos.__name__ \ + if touchpoint.context_namespace == "x-nmos" else NcTouchpointNmosChannelMapping.__name__ + self.ms05_utils.queried_datatype_schema_validate(test, touchpoint_json, datatype_name, + f"{context}{datatype_name}") except NMOSTestException as e: self.touchpoints_metadata.error = True @@ -658,7 +653,7 @@ def test_ms05_12(self, test): expected_descriptor = class_manager.get_control_class(class_descriptor.classId, include_inherited) context = f"Class: {str(class_descriptor.classId)}: " - self.ms05_utils.validate_reference_datatype_schema( + self.ms05_utils.reference_datatype_schema_validate( test, actual_descriptor, expected_descriptor.__class__.__name__, @@ -689,7 +684,7 @@ def test_ms05_13(self, test): expected_descriptor = class_manager.get_datatype(datatype_descriptor.name, include_inherited) context = f"Datatype: {datatype_descriptor.name}: " - self.ms05_utils.validate_reference_datatype_schema( + self.ms05_utils.reference_datatype_schema_validate( test, actual_descriptor, expected_descriptor.__class__.__name__, @@ -818,7 +813,7 @@ def do_get_member_descriptors_test(self, test, block, context=""): expected_members_oids = [m.oid for m in expected_members] for queried_member in queried_members: - self.ms05_utils.validate_reference_datatype_schema( + self.ms05_utils.reference_datatype_schema_validate( test, queried_member, NcBlockMemberDescriptor.__name__, @@ -862,7 +857,7 @@ def do_find_member_by_path_test(self, test, block, context=""): f": Did not return an array of results for query: {str(path)}")) for queried_member in queried_members: - self.ms05_utils.validate_reference_datatype_schema( + self.ms05_utils.reference_datatype_schema_validate( test, queried_member, NcBlockMemberDescriptor.__name__, @@ -939,7 +934,7 @@ def do_find_member_by_role_test(self, test, block, context=""): f"recurse={str(condition["recurse"])}")) for actual_result in actual_results: - self.ms05_utils.validate_reference_datatype_schema( + self.ms05_utils.reference_datatype_schema_validate( test, actual_result, NcBlockMemberDescriptor.__name__, @@ -1024,7 +1019,6 @@ def test_ms05_20(self, test): def check_constraint(self, test, constraint, type_name, is_sequence, test_metadata, context): if constraint.get("defaultValue"): - datatype_schema = self.ms05_utils.get_datatype_schema(test, type_name) if isinstance(constraint.get("defaultValue"), list) is not is_sequence: test_metadata.error = True test_metadata.error_msg = f"{context} {"a default value sequence was expected" @@ -1032,13 +1026,11 @@ def check_constraint(self, test, constraint, type_name, is_sequence, test_metada return if is_sequence: for value in constraint.get("defaultValue"): - self.ms05_utils.validate_schema(test, value, datatype_schema, f"{context}: defaultValue ") + self.ms05_utils.queried_datatype_schema_validate(test, value, type_name, + f"{context}: defaultValue ") else: - self.ms05_utils.validate_schema( - test, - constraint.get("defaultValue"), - datatype_schema, - f"{context}: defaultValue ") + self.ms05_utils.queried_datatype_schema_validate(test, constraint.get("defaultValue"), type_name, + f"{context}: defaultValue ") datatype = self.ms05_utils.resolve_datatype(test, type_name) # check NcXXXConstraintsNumber @@ -1312,7 +1304,7 @@ def _check_constraints(self, test, block, test_metadata, context=""): self._check_constraints_hierarchy(test, property_descriptor, class_manager.datatype_descriptors, object_runtime_constraints, f"{context}: {class_descriptor['name']}: " - + f"{property_descriptor.name}: ") + f"{property_descriptor.name}: ") except NMOSTestException as e: test_metadata.error = True test_metadata.error_msg += f"{str(e.args[0].detail)}; " @@ -1361,7 +1353,7 @@ def test_ms05_25(self, test): return test.FAIL("Read only properties error expected.", f"https://specs.amwa.tv/ms-05-02/branches/{self.apis[MS05_API_KEY]['spec_branch']}" - + "/docs/Framework.html#ncmethodresult") + "/docs/Framework.html#ncmethodresult") except NMOSTestException as e: error_msg = e.args[0].detail @@ -1376,12 +1368,12 @@ def test_ms05_25(self, test): if error_msg['status'] != NcMethodStatus.Readonly.value: return test.WARNING(f"Unexpected status. Expected: {NcMethodStatus.Readonly.name}" - + f" ({str(NcMethodStatus.Readonly)})" - + f", actual: {NcMethodStatus(error_msg['status']).name}" - + f" ({str(error_msg['status'])})", - + "https://specs.amwa.tv/ms-05-02/branches/" - + f"{self.apis[MS05_API_KEY]['spec_branch']}" - + "/docs/Framework.html#ncmethodresult") + f" ({str(NcMethodStatus.Readonly)})" + f", actual: {NcMethodStatus(error_msg['status']).name}" + f" ({str(error_msg['status'])})", + "https://specs.amwa.tv/ms-05-02/branches/" + f"{self.apis[MS05_API_KEY]['spec_branch']}" + "/docs/Framework.html#ncmethodresult") return test.PASS() @@ -1406,7 +1398,7 @@ def test_ms05_26(self, test): return test.FAIL("Sequence out of bounds error expected.", f"https://specs.amwa.tv/ms-05-02/branches/{self.apis[MS05_API_KEY]['spec_branch']}" - + "/docs/Framework.html#ncmethodresult") + "/docs/Framework.html#ncmethodresult") except NMOSTestException as e: error_msg = e.args[0].detail @@ -1421,11 +1413,11 @@ def test_ms05_26(self, test): if error_msg['status'] != NcMethodStatus.IndexOutOfBounds.value: return test.WARNING(f"Unexpected status. Expected: {NcMethodStatus.IndexOutOfBounds.name}" - + f" ({str(NcMethodStatus.IndexOutOfBounds)})" - + f", actual: {NcMethodStatus(error_msg['status']).name}" - + f" ({str(error_msg['status'])})", - + "https://specs.amwa.tv/ms-05-02/branches/" - + f"{self.apis[MS05_API_KEY]['spec_branch']}" - + "/docs/Framework.html#ncmethodresult") + f" ({str(NcMethodStatus.IndexOutOfBounds)})" + f", actual: {NcMethodStatus(error_msg['status']).name}" + f" ({str(error_msg['status'])})", + "https://specs.amwa.tv/ms-05-02/branches/" + f"{self.apis[MS05_API_KEY]['spec_branch']}" + "/docs/Framework.html#ncmethodresult") return test.PASS() diff --git a/nmostesting/suites/MS0502Test.py b/nmostesting/suites/MS0502Test.py index 8ea52169..fcbb52e2 100644 --- a/nmostesting/suites/MS0502Test.py +++ b/nmostesting/suites/MS0502Test.py @@ -777,7 +777,7 @@ def _do_check_methods_test(self, test, question): """.format(method.get("name"), result.status) except NMOSTestException as e: # ignore 4xx errors - self.ms05_utils.validate_reference_datatype_schema(test, e.args[0].detail, "NcMethodResult") + self.ms05_utils.reference_datatype_schema_validate(test, e.args[0].detail, "NcMethodResult") if e.args[0].detail['status'] >= 500: self.invoke_methods_metadata.error = True self.invoke_methods_metadata.error_msg += """