Skip to content
255 changes: 241 additions & 14 deletions azext_edge/edge/providers/check/deviceregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from ..edge_api import (
DEVICEREGISTRY_API_V1,
NAMESPACED_DEVICEREGISTRY_API_V1,
DeviceRegistryResourceKinds,
)

Expand All @@ -43,21 +44,57 @@ def check_deviceregistry_deployment(
resource_kinds: List[str] = None,
resource_name: str = None,
) -> List[dict]:
evaluate_funcs = {
DeviceRegistryResourceKinds.ASSET: evaluate_assets,
DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE: evaluate_asset_endpoint_profiles,
}
classic_resource_kinds = [
DeviceRegistryResourceKinds.ASSET.value,
DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE.value,
]
namespaced_resource_kinds = [
DeviceRegistryResourceKinds.DEVICE.value,
]

# When resource_kinds is None or empty, run both API groups with no filter.
# When resource_kinds is specified, only run the API group whose kinds were requested.
if not resource_kinds:
classic_filter = None
namespaced_filter = None
else:
classic_filter = [r for r in resource_kinds if r in classic_resource_kinds]
namespaced_filter = [r for r in resource_kinds if r in namespaced_resource_kinds]

results = []

if classic_filter is None or classic_filter:
classic_evaluate_funcs = {
DeviceRegistryResourceKinds.ASSET: evaluate_assets,
DeviceRegistryResourceKinds.ASSETENDPOINTPROFILE: evaluate_asset_endpoint_profiles,
}
results = check_post_deployment(
api_info=DEVICEREGISTRY_API_V1,
check_name="enumerateDeviceRegistryApi",
check_desc="Enumerate Device Registry API resources",
resource_name=resource_name,
evaluate_funcs=classic_evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=classic_filter,
)

return check_post_deployment(
api_info=DEVICEREGISTRY_API_V1,
check_name="enumerateDeviceRegistryApi",
check_desc="Enumerate Device Registry API resources",
resource_name=resource_name,
evaluate_funcs=evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=resource_kinds,
)
if namespaced_filter is None or namespaced_filter:
namespaced_evaluate_funcs = {
DeviceRegistryResourceKinds.DEVICE: evaluate_devices,
}
results.extend(check_post_deployment(
api_info=NAMESPACED_DEVICEREGISTRY_API_V1,
check_name="enumerateNamespacedDeviceRegistryApi",
check_desc="Enumerate Namespaced Device Registry API resources",
resource_name=resource_name,
evaluate_funcs=namespaced_evaluate_funcs,
as_list=as_list,
detail_level=detail_level,
resource_kinds=namespaced_filter,
))

return results


def evaluate_assets(
Expand Down Expand Up @@ -602,6 +639,196 @@ def evaluate_asset_endpoint_profiles(
return check_manager.as_dict(as_list)


def evaluate_devices(
as_list: bool = False,
detail_level: int = ResourceOutputDetailLevel.summary.value,
resource_name: str = None,
) -> Dict[str, Any]:
check_manager = CheckManager(check_name="evalDevices", check_desc="Evaluate Devices")

device_namespace_conditions = ["spec.enabled", "spec.uuid", "spec.endpoints.inbound"]

target_devices = generate_target_resource_name(
api_info=NAMESPACED_DEVICEREGISTRY_API_V1, resource_kind=DeviceRegistryResourceKinds.DEVICE.value
)

all_devices = get_resources_by_name(
api_info=NAMESPACED_DEVICEREGISTRY_API_V1,
kind=DeviceRegistryResourceKinds.DEVICE,
resource_name=resource_name,
)

if not all_devices:
fetch_devices_warning_text = "Unable to fetch devices in any namespaces."
check_manager.add_target(target_name=target_devices)
check_manager.add_display(target_name=target_devices, display=Padding(fetch_devices_warning_text, (0, 0, 0, 8)))
check_manager.add_target_eval(
target_name=target_devices,
status=CheckTaskStatus.skipped.value,
value=fetch_devices_warning_text,
)
return check_manager.as_dict(as_list)

for (namespace, devices) in get_resources_grouped_by_namespace(all_devices):
check_manager.add_target(target_name=target_devices, namespace=namespace, conditions=device_namespace_conditions)
check_manager.add_display(
target_name=target_devices,
namespace=namespace,
display=Padding(
f"Devices in namespace {{[purple]{namespace}[/purple]}}",
(0, 0, 0, 8),
),
)

devices: List[dict] = list(devices)
added_status_conditions = False
for device in devices:
padding = 10
device_name = device["metadata"]["name"]

check_manager.add_display(
target_name=target_devices,
namespace=namespace,
display=Padding(f"- Device {{[bright_blue]{device_name}[/bright_blue]}} detected.", (0, 0, 0, padding)),
)

spec_padding = padding + PADDING_SIZE
device_spec = device["spec"]

# spec.enabled
device_enabled = device_spec.get("enabled", None)
device_enabled_value = {"spec.enabled": device_enabled}
if device_enabled is True:
device_enabled_text = "Device [green]enabled[/green]."
device_enabled_status = CheckTaskStatus.success.value
elif device_enabled is False:
device_enabled_text = "Device [yellow]disabled[/yellow]."
device_enabled_status = CheckTaskStatus.warning.value
else:
device_enabled_text = "Device enabled [red]not detected[/red]."
device_enabled_status = CheckTaskStatus.error.value

add_display_and_eval(
check_manager=check_manager,
target_name=target_devices,
display_text=device_enabled_text,
eval_status=device_enabled_status,
eval_value=device_enabled_value,
resource_name=device_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding),
)

# spec.uuid
device_uuid = device_spec.get("uuid", "")
device_uuid_value = {"spec.uuid": device_uuid}
if device_uuid:
device_uuid_text = f"Uuid: {{[bright_blue]{device_uuid}[/bright_blue]}} [green]detected[/green]."
device_uuid_status = CheckTaskStatus.success.value
else:
device_uuid_text = "Uuid [red]not detected[/red]."
device_uuid_status = CheckTaskStatus.error.value

add_display_and_eval(
check_manager=check_manager,
target_name=target_devices,
display_text=device_uuid_text,
eval_status=device_uuid_status,
eval_value=device_uuid_value,
resource_name=device_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding),
)

# spec.endpoints.inbound
inbound_endpoints = device_spec.get("endpoints", {}).get("inbound", {})
inbound_endpoints_value = {"spec.endpoints.inbound": len(inbound_endpoints)}
if inbound_endpoints:
inbound_endpoints_text = (
f"[bright_blue]{len(inbound_endpoints)}[/bright_blue] inbound endpoint(s) detected."
)
inbound_endpoints_status = CheckTaskStatus.success.value
else:
inbound_endpoints_text = "Inbound endpoints [red]not detected[/red]."
inbound_endpoints_status = CheckTaskStatus.error.value

add_display_and_eval(
check_manager=check_manager,
target_name=target_devices,
display_text=inbound_endpoints_text,
eval_status=inbound_endpoints_status,
eval_value=inbound_endpoints_value,
resource_name=device_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding),
)

device_status = device.get("status") or {}
status_inbound = device_status.get("endpoints", {}).get("inbound", {})

if detail_level > ResourceOutputDetailLevel.summary.value and inbound_endpoints:
for ep_name, ep in inbound_endpoints.items():
ep_address = ep.get("address", "")
ep_health = status_inbound.get(ep_name, {}).get("healthState") or {}
health_status = ep_health.get("status", "")
health_color = {
"Available": "green",
"Degraded": "yellow",
"Unavailable": "red",
}.get(health_status, "white")

ep_addr_text = ep_address if ep_address else "[yellow]no address[/yellow]"
health_text = f" [[{health_color}]{health_status}[/{health_color}]]" if health_status else ""
ep_text = f"- Endpoint {{[bright_blue]{ep_name}[/bright_blue]}}: {ep_addr_text}{health_text}"
check_manager.add_display(
target_name=target_devices,
namespace=namespace,
display=Padding(ep_text, (0, 0, 0, spec_padding + PADDING_SIZE)),
)

if detail_level > ResourceOutputDetailLevel.detail.value and ep_health.get("message"):
check_manager.add_display(
target_name=target_devices,
namespace=namespace,
display=Padding(
f" {ep_health['message']}",
(0, 0, 0, spec_padding + PADDING_SIZE * 2),
),
)

# status.config.error
config_error = device_status.get("config", {}).get("error")
if config_error:
if not added_status_conditions:
check_manager.add_target_conditions(
target_name=target_devices,
namespace=namespace,
conditions=["status.config.error"],
)
added_status_conditions = True

error_code = config_error.get("code", "")
error_message = config_error.get("message", "")
error_value = {"status.config.error": error_code or error_message}
error_text = (
f"Config error [red]{error_code}[/red]: {error_message}"
if error_code
else f"Config error: {error_message}"
)
add_display_and_eval(
check_manager=check_manager,
target_name=target_devices,
display_text=error_text,
eval_status=CheckTaskStatus.error.value,
eval_value=error_value,
resource_name=device_name,
namespace=namespace,
padding=(0, 0, 0, spec_padding),
)

return check_manager.as_dict(as_list)


def _process_asset_status(
check_manager: CheckManager,
added_status_conditions: bool,
Expand Down
3 changes: 1 addition & 2 deletions azext_edge/edge/providers/support_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
DATAFLOW_API_V1,
DATAFLOW_API_V1B1,
DEVICEREGISTRY_API_V1,
DEVICEREGISTRY_API_V1B1,
META_API_V1,
META_API_V1B1,
MQTT_BROKER_API_V1,
Expand All @@ -42,7 +41,7 @@
COMPAT_CLUSTER_CONFIG_APIS = EdgeApiManager(resource_apis=[CLUSTER_CONFIG_API_V1])
COMPAT_MQTT_BROKER_APIS = EdgeApiManager(resource_apis=[MQTT_BROKER_API_V1, MQTT_BROKER_API_V1B1])
COMPAT_DEVICEREGISTRY_APIS = EdgeApiManager(
resource_apis=[DEVICEREGISTRY_API_V1, DEVICEREGISTRY_API_V1B1, NAMESPACED_DEVICEREGISTRY_API_V1]
resource_apis=[DEVICEREGISTRY_API_V1, NAMESPACED_DEVICEREGISTRY_API_V1]
)
COMPAT_DATAFLOW_APIS = EdgeApiManager(resource_apis=[DATAFLOW_API_V1, DATAFLOW_API_V1B1])
COMPAT_META_APIS = EdgeApiManager(resource_apis=[META_API_V1, META_API_V1B1])
Expand Down
10 changes: 9 additions & 1 deletion azext_edge/tests/edge/checks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,20 @@ def mock_evaluate_opcua_pod_health(mocker):

@pytest.fixture
def mock_generate_deviceregistry_asset_target_resources(mocker):
def _side_effect(api_info, resource_kind):
return api_info.group

patched = mocker.patch(
"azext_edge.edge.providers.check.deviceregistry.generate_target_resource_name",
return_value="deviceregistry.microsoft.com",
side_effect=_side_effect,
)
yield patched


# alias so tests can use the more specific name, both patch the same function with the same logic
mock_generate_deviceregistry_device_target_resources = mock_generate_deviceregistry_asset_target_resources


@pytest.fixture
def mock_opcua_get_namespaced_pods_by_prefix(mocker):
patched = mocker.patch("azext_edge.edge.providers.check.opcua.get_namespaced_pods_by_prefix", return_value=[])
Expand Down Expand Up @@ -95,6 +102,7 @@ def mock_resource_types(mocker, ops_service):
"deviceregistry": {
"Asset": [{}],
"AssetEndpointProfile": [{}],
"Device": [{}],
},
"dataflow": {
"Dataflow": [{}],
Expand Down
Loading
Loading