Skip to content

Commit

Permalink
Refactored names of functions, improved consistency of strings
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-r-thorpe committed Oct 18, 2024
1 parent 958d962 commit 9673baa
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 81 deletions.
2 changes: 1 addition & 1 deletion nmostesting/IS14Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _do_request(self, test, method, url, **kwargs):
if not valid:
raise NMOSTestException(test.FAIL(f"{r} for {method}: {url}"))
try:
self.validate_reference_datatype_schema(test, r.json(), "NcMethodResult", f"{method} :{url} ")
self.reference_datatype_schema_validate(test, r.json(), "NcMethodResult", f"{method} :{url} ")
except ValueError as e:
raise NMOSTestException(test.FAIL(f"Error: {e.args[0]} for {method}: {url}"))
except NMOSTestException as e:
Expand Down
94 changes: 53 additions & 41 deletions nmostesting/MS05Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def query_device_model(self, test):
self.device_model_metadata set on Device Model validation error.
NMOSTestException raised if unable to query Device Model """
if not self.device_model:
self.device_model = self._nc_object_factory(
self.device_model = self.create_device_model(
test,
StandardClassIds.NCBLOCK.value,
self.ROOT_BLOCK_OID,
Expand Down Expand Up @@ -302,33 +302,45 @@ def get_datatype_schema(self, test, type_name):

return self.datatype_schemas.get(type_name)

def validate_reference_datatype_schema(self, test, payload, datatype_name, context=""):
def queried_datatype_schema_validate(self, test, payload, datatype_name, context=""):
"""Validate payload against datatype schema queried from Node under Test Class Manager"""
datatype_schema = self.get_datatype_schema(test, datatype_name)
self._validate_schema(test, payload, datatype_schema, f"{context}{datatype_name}")

def reference_datatype_schema_validate(self, test, payload, datatype_name, context=""):
"""Validate payload against reference datatype schema"""
context += f"{datatype_name}: "
self.validate_schema(test, payload, self.reference_datatype_schemas[datatype_name], context)
self._validate_schema(test, payload, self.reference_datatype_schemas[datatype_name],
f"{context}{datatype_name}")

def validate_schema(self, test, payload, schema, context=""):
"""Delegates to validate. Raises NMOSTestExceptions on error"""
def _validate_schema(self, test, payload, schema, context=""):
"""Delegates to jsonschema validate. Raises NMOSTestExceptions on error"""
if not schema:
raise NMOSTestException(test.FAIL("Missing schema. Possible unknown type: " + context))
raise NMOSTestException(test.FAIL(f"Missing schema. Possible unknown type: {context}"))
try:
# Validate the JSON schema is correct
checker = FormatChecker(["ipv4", "ipv6", "uri"])
validate(payload, schema, format_checker=checker)
except ValidationError as e:
raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message))
raise NMOSTestException(test.FAIL(f"{context}Schema validation error: {e.message}"))
except SchemaError as e:
raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message))
raise NMOSTestException(test.FAIL(f"{context}Schema error: {e.message}"))

return

def validate_descriptor(self, test, reference, descriptor, context=""):
"""Validate descriptor against reference NcDescriptor. Raises NMOSTestException on error"""
def sort_key(e):
if isinstance(e, NcDescriptor):
return e.name
else:
return e["name"]

non_normative_keys = ["description"]

# Compare disctionaries
if isinstance(reference, dict):
# Remove the json field if present
# NcDescriptor objects have a json field that caches the json used to construct it
reference.pop("json", None)

reference_keys = set(reference.keys())
Expand All @@ -338,35 +350,32 @@ def validate_descriptor(self, test, reference, descriptor, context=""):
key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys))
if len(key_diff) > 0:
error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys "
raise NMOSTestException(test.FAIL(context + error_description + str(key_diff)))
raise NMOSTestException(test.FAIL(f"{context}{error_description}{str(key_diff)}"))
for key in reference_keys:
# Make sure to ignore non normative keys, but not fields that use those keys
if key in non_normative_keys and not isinstance(reference[key], dict):
# Ignore keys that contain non-normative information
if key in non_normative_keys:
continue
self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->")
# We can't guarantee ordering of lists, so convert reference and descriptor to dictionaries
# Compare lists
elif isinstance(reference, list):
if len(reference) != len(descriptor):
raise NMOSTestException(test.FAIL(context + "List unexpected length. Expected: "
+ str(len(reference)) + ", actual: " + str(len(descriptor))))
raise NMOSTestException(test.FAIL(f"{context}List unexpected length. Expected: "
f"{str(len(reference))}, actual: {str(len(descriptor))}"))
if len(reference) > 0:
# If comparing lists of objects or dicts then sort by name first.
# Primitive arrays are unsorted as position is assumed to be important e.g. classId
if isinstance(reference[0], (dict, NcDescriptor)):
# Convert to dict and validate
if isinstance(reference[0], NcDescriptor):
references = {item.name: item.__dict__ for item in reference}
else:
references = {item["name"]: item for item in reference}
descriptors = {item["name"]: item for item in descriptor}
self.validate_descriptor(test, references, descriptors, context)
else:
for refvalue, value in zip(reference, descriptor):
self.validate_descriptor(test, refvalue, value, context)
reference.sort(key=sort_key)
descriptor.sort(key=sort_key)
for refvalue, value in zip(reference, descriptor):
self.validate_descriptor(test, refvalue, value, context)
# If the reference is an object then convert to a dict before comparison
elif isinstance(reference, (NcDescriptor, NcElementId)):
self.validate_descriptor(test, reference.__dict__, descriptor, context)
# Compare primitives and primitive arrays directly
elif reference != descriptor:
raise NMOSTestException(test.FAIL(context + "Expected value: "
+ str(reference) + ", actual value: " + str(descriptor)))
raise NMOSTestException(test.FAIL(f"{context}Expected value: "
f"{str(reference)}, actual value: {str(descriptor)}"))
return

def _get_class_manager_datatype_descriptors(self, test, class_manager_oid, role_path):
Expand All @@ -376,9 +385,9 @@ def _get_class_manager_datatype_descriptors(self, test, class_manager_oid, role_
if not response:
return None

# Validate descriptors
# Validate descriptors against schema
for r in response:
self.validate_reference_datatype_schema(test, r, NcDatatypeDescriptor.__name__,
self.reference_datatype_schema_validate(test, r, NcDatatypeDescriptor.__name__,
"/".join([str(r) for r in role_path]))

# Create NcDescriptor dictionary from response array
Expand All @@ -395,16 +404,16 @@ def _get_class_manager_class_descriptors(self, test, class_manager_oid, role_pat

# Validate descriptors
for r in response:
self.validate_reference_datatype_schema(test, r, NcClassDescriptor.__name__,
self.reference_datatype_schema_validate(test, r, NcClassDescriptor.__name__,
"/".join([str(r) for r in role_path]))

# Create NcClassDescriptor dictionary from response array
def key_lambda(classId): return ".".join(map(str, classId))
descriptors = {key_lambda(r.get("classId")): NcClassDescriptor(r) for r in response}
return descriptors

def _nc_object_factory(self, test, class_id, oid, role, _role_path=None):
"""Create NcObject or NcBlock based on class_id"""
def create_device_model(self, test, class_id, oid, role, _role_path=None):
"""Recursively create Device Model hierarchy"""
# will set self.device_model_error to True if problems encountered
if _role_path is None:
role_path = []
Expand All @@ -423,26 +432,25 @@ def _nc_object_factory(self, test, class_id, oid, role, _role_path=None):
oid=oid, role_path=role_path)

if member_descriptors is None:
raise NMOSTestException(test.FAIL("Unable to get members for object: oid={}, role Path={}"
.format(str(oid), str(role_path))))
raise NMOSTestException(test.FAIL("Unable to get members for object: "
f"oid={str(oid)}, role Path={str(role_path)}"))

block_member_descriptors = []
for m in member_descriptors:
self.validate_reference_datatype_schema(test, m, NcBlockMemberDescriptor.__name__,
self.reference_datatype_schema_validate(test, m, NcBlockMemberDescriptor.__name__,
"/".join([str(r) for r in role_path]))
block_member_descriptors.append(NcBlockMemberDescriptor(m))

nc_block = NcBlock(class_id, oid, role, role_path, block_member_descriptors, runtime_constraints)

for m in member_descriptors:
child_object = self._nc_object_factory(test, m["classId"], m["oid"], m["role"], role_path)
child_object = self.create_device_model(test, m["classId"], m["oid"], m["role"], role_path)
if child_object:
nc_block.add_child_object(child_object)

return nc_block
else:
# Check to determine if this is a Class Manager
if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2:
if self.is_class_manager(class_id):
class_descriptors = self._get_class_manager_class_descriptors(
test, class_manager_oid=oid, role_path=role_path)

Expand All @@ -460,7 +468,7 @@ def _nc_object_factory(self, test, class_id, oid, role, _role_path=None):
return NcObject(class_id, oid, role, role_path, runtime_constraints)

except NMOSTestException as e:
raise NMOSTestException(test.FAIL("Error in Device Model " + role + ": " + str(e.args[0].detail)))
raise NMOSTestException(test.FAIL(f"Error in Device Model {role}: {str(e.args[0].detail)}"))

def _get_object_by_class_id(self, test, class_id):
device_model = self.query_device_model(test)
Expand Down Expand Up @@ -516,11 +524,15 @@ def is_non_standard_class(self, class_id):
return len([v for v in dropwhile(lambda x: x > 0, class_id)]) > 1

def is_manager(self, class_id):
""" Check class id to determine if this is a manager """
""" Check class id to determine if this is a manager class_id"""
return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 3

def is_class_manager(self, class_id):
""" Check class id to determine is this is a class manager class_id """
return len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2

def is_block(self, class_id):
""" Check class id to determine if this is a block """
""" Check class id to determine if this is a block class_id"""
return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1

def resolve_datatype(self, test, datatype):
Expand Down
Loading

0 comments on commit 9673baa

Please sign in to comment.