From f4054c9d9f980821af64856c102705aec2fed292 Mon Sep 17 00:00:00 2001 From: ShriramS-Emerson Date: Tue, 3 Feb 2026 16:52:12 +0530 Subject: [PATCH 1/6] feat: add example notebook for asset calibration alarm and notification --- ... Calibration Alarms and Notification.ipynb | 870 ++++++++++++++++++ poetry.lock | 27 +- pyproject.toml | 2 +- 3 files changed, 881 insertions(+), 18 deletions(-) create mode 100644 examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb new file mode 100644 index 0000000..1937373 --- /dev/null +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -0,0 +1,870 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "885b9804", + "metadata": {}, + "source": [ + "# Calibration Status to Alarm Automation\n", + "\n", + "This notebook monitors asset calibration status and automatically creates or updates alarms in SystemLink based on the calibration state of managed assets.\n", + "\n", + "## Severity Mapping\n", + "\n", + "The notebook maps asset calibration statuses to alarm severity levels:\n", + "\n", + "- **PAST_RECOMMENDED_DUE_DATE** → Severity 3 (High) \n", + " Asset is past its recommended calibration due date - creates or updates alarm \"Calibration Overdue\" with high severity\n", + "\n", + "- **APPROACHING_RECOMMENDED_DUE_DATE** → Severity 1 (Low) \n", + " Asset is approaching its recommended calibration due date - creates or updates alarm \"Calibration Due\" with low severity\n", + "\n", + "- **OK** → Severity -1 (Clear) \n", + " Asset calibration is up to date - clears any existing alarms" + ] + }, + { + "cell_type": "markdown", + "id": "9257d5f4-c6bb-4fd4-a110-b99fa5b4042e", + "metadata": {}, + "source": [ + "## Imports and Setup\n", + "This section imports required libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "022ea191", + "metadata": {}, + "outputs": [], + "source": [ + "import importlib.util\n", + "\n", + "if importlib.util.find_spec(\"nisystemlink.clients.notification\") is None:\n", + " %pip install --upgrade nisystemlink-clients" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e9408df-2552-4d0b-bdd1-34412b5a51c9", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime, timezone\n", + "from typing import Any, Dict, List\n", + "from urllib.parse import urlsplit, urlunsplit\n", + "\n", + "import scrapbook as sb\n", + "from nisystemlink.clients.alarm import AlarmClient\n", + "from nisystemlink.clients.alarm.models import (\n", + " Alarm,\n", + " ClearAlarmTransition,\n", + " CreateOrUpdateAlarmRequest,\n", + " QueryAlarmsWithFilterRequest,\n", + " SetAlarmTransition,\n", + ")\n", + "from nisystemlink.clients.assetmanagement import AssetManagementClient\n", + "from nisystemlink.clients.assetmanagement.models import (\n", + " Asset,\n", + " CalibrationStatus,\n", + " QueryAssetsRequest,\n", + ")\n", + "from nisystemlink.clients.notification import NotificationClient\n", + "from nisystemlink.clients.notification.models import (\n", + " DynamicNotificationConfiguration,\n", + " DynamicNotificationStrategy,\n", + " DynamicStrategyRequest,\n", + " SmtpAddressFields,\n", + " SmtpAddressGroup,\n", + " SmtpMessageTemplate,\n", + " SmtpMessageTemplateFields,\n", + ")\n", + "from nisystemlink.clients.systems import SystemsClient\n", + "from nisystemlink.clients.systems.models import QuerySystemsRequest" + ] + }, + { + "cell_type": "markdown", + "id": "92b40e11-e565-4981-828b-c08fd75fe5e5", + "metadata": {}, + "source": [ + "## Configuration and Parameters\n", + "\n", + "In this section, we define key parameters and configuration values that will be used throughout the notebook.\n", + "\n", + "**Configuration:**\n", + "- **`workspace_ids`**: List of workspace IDs to monitor (empty list = all workspaces)\n", + " - Example: `[\"workspace-123\", \"workspace-456\"]` or `[]` for all workspaces\n", + "- **`calibration_subscribers`**: List of email addresses to receive notifications about calibration status changes\n", + " - Example: `[\"user@example.com\", \"admin@example.com\"]`\n", + "- **`http_url`**: Base SystemLink URL for building asset links in email notifications\n", + "- **`CALIBRATION_STATUS_TO_SEVERITY`**: Mapping of calibration status to alarm severity levels (1=Low, 3=High, -1=Clear)\n", + "- **`CALIBRATION_STATUS_TO_LABEL`**: Mapping of calibration status to human-readable alarm labels" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aba8f5b7-cbd3-4f63-bc52-da707f60e940", + "metadata": { + "papermill": { + "parameters": { + "calibration_subscribers": [], + "workspaces": [] + } + }, + "systemlink": { + "namespaces": [], + "parameters": [ + { + "display_name": "Workspaces", + "id": "workspaces", + "type": "string[]" + }, + { + "display_name": "Calibration Subscribers", + "id": "calibration_subscribers", + "type": "string[]" + } + ], + "version": 2 + }, + "tags": [ + "parameters" + ], + "trusted": true + }, + "outputs": [], + "source": [ + "workspace_ids: List[str] = []\n", + "calibration_subscribers: List[str] = []\n", + "\n", + "# Base SystemLink URL for building asset links in email notifications\n", + "http_url = \"https://test.lifecyclesolutions.ni.com/\"\n", + "\n", + "# Calibration status -> desired severity mapping\n", + "CALIBRATION_STATUS_TO_SEVERITY = {\n", + " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value: 3,\n", + " CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value: 1,\n", + " CalibrationStatus.OK.value: -1,\n", + "}\n", + "\n", + "CALIBRATION_STATUS_TO_LABEL = {\n", + " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value: \"Calibration Overdue\",\n", + " CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value: \"Calibration Due\",\n", + " CalibrationStatus.OK.value: \"Calibration up to date\",\n", + "}" + ] + }, + { + "cell_type": "markdown", + "id": "192287c7-1294-4c18-b951-c65960c4d2ab", + "metadata": {}, + "source": [ + "## SLE API Utilities" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dbb81746-38e0-47b7-8b3b-0f54296f0b93", + "metadata": {}, + "outputs": [], + "source": [ + "asset_client = AssetManagementClient()\n", + "system_client = SystemsClient()\n", + "alarm_client = AlarmClient()\n", + "notification_client = NotificationClient()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e13a3263-8522-414c-8242-0e8734cb9a45", + "metadata": {}, + "outputs": [], + "source": [ + "def query_all_assets(filter: str = \"\", take: int = 100) -> List[Asset]:\n", + " \"\"\"\n", + " Query all assets from SystemLink by paginating through all results.\n", + "\n", + " Args:\n", + " filter (str): Filter string to apply when querying assets\n", + " take (int): Number of assets to fetch per page\n", + "\n", + " Returns:\n", + " List[Asset]: List of all assets matching the filter criteria\n", + " \"\"\"\n", + " query_assets_request = QueryAssetsRequest(\n", + " take=take,\n", + " skip=0,\n", + " filter=filter,\n", + " )\n", + "\n", + " all_assets: List[Asset] = []\n", + "\n", + " while True:\n", + " query_assets_response = asset_client.query_assets(query=query_assets_request)\n", + " assets = query_assets_response.assets\n", + " if not assets:\n", + " break\n", + "\n", + " all_assets.extend(assets)\n", + " query_assets_request.skip = (query_assets_request.skip or 0) + len(assets)\n", + "\n", + " return all_assets\n", + "\n", + "\n", + "def get_asset_path(asset: Asset) -> str:\n", + " \"\"\"\n", + " Generate a unique asset path for alarm identification.\n", + "\n", + " Constructs a hierarchical path using vendor, model, and serial number.\n", + "\n", + " Args:\n", + " asset (Asset): Asset object containing vendor, model, and serial information\n", + "\n", + " Returns:\n", + " str: Formatted asset path string\n", + " \"\"\"\n", + " vendor = asset.vendor_name or str(asset.vendor_number or \"\")\n", + " model = asset.model_name or str(asset.model_number or \"\")\n", + " serial_number = str(asset.serial_number or \"\")\n", + "\n", + " return f\"Assets.{vendor}.{model}.{serial_number}.Calibration\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28e91d5e-286e-46d0-9fd4-f78899c72e13", + "metadata": {}, + "outputs": [], + "source": [ + "def query_systems(filter: str = \"\", take: int = 1000) -> List[Dict[str, Any]]:\n", + " \"\"\"\n", + " Query all systems from SystemLink through batch query.\n", + "\n", + " Args:\n", + " filter (str): Filter string to apply when querying systems\n", + " take (int): Number of systems to fetch per batch\n", + "\n", + " Returns:\n", + " List[Dict[str, Any]]: List of dictionaries containing system id and alias\n", + " \"\"\"\n", + " request = QuerySystemsRequest(\n", + " skip=0,\n", + " take=take,\n", + " filter=filter,\n", + " projection=\"new(id,alias)\",\n", + " )\n", + "\n", + " all_systems: List[Dict[str, Any]] = []\n", + "\n", + " while True:\n", + " response = system_client.query_systems(request)\n", + " systems = response.data\n", + " if not systems:\n", + " break\n", + "\n", + " all_systems.extend(systems)\n", + " request.skip = (request.skip or 0) + len(systems)\n", + "\n", + " return all_systems" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb2f078e-d2b2-4f67-9083-9f5b7796b5ed", + "metadata": {}, + "outputs": [], + "source": [ + "def send_email_notification(\n", + " to_addresses: List[str] | None = None,\n", + " subject: str = \"subject\",\n", + " body: str = \"body\",\n", + ") -> None:\n", + " \"\"\"\n", + " Send email notifications using SystemLink's notification service.\n", + "\n", + " Args:\n", + " to_addresses (List[str] | None): List of recipient email addresses\n", + " subject (str): Email subject line\n", + " body (str): Email body content\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " if not to_addresses:\n", + " print(\"No email recipients configured - skipping email notification\")\n", + " return\n", + "\n", + " # Build SMTP address group with recipient addresses\n", + " address_group = SmtpAddressGroup(\n", + " fields=SmtpAddressFields(toAddresses=to_addresses)\n", + " )\n", + "\n", + " # Build SMTP message template with subject and body\n", + " message_template = SmtpMessageTemplate(\n", + " fields=SmtpMessageTemplateFields(subject_template=subject, body_template=body)\n", + " )\n", + "\n", + " # Create notification configuration\n", + " notification_config = DynamicNotificationConfiguration(\n", + " address_group=address_group, message_template=message_template\n", + " )\n", + "\n", + " # Create notification strategy\n", + " notification_strategy = DynamicNotificationStrategy(\n", + " notification_configurations=[notification_config]\n", + " )\n", + "\n", + " # Create request and send notification\n", + " request = DynamicStrategyRequest(notification_strategy=notification_strategy)\n", + " notification_client.apply_dynamic_notification_strategy(request)\n", + " print(\"Email notification sent successfully\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a02a6f8-c170-4525-839e-c096306c2e95", + "metadata": {}, + "outputs": [], + "source": [ + "def query_alarms(\n", + " alarm_filter: str = \"\",\n", + " take: int = 1000,\n", + ") -> List[Alarm]:\n", + " \"\"\"\n", + " Query all alarms matching the filter by batching through all pages using continuation tokens.\n", + "\n", + " Args:\n", + " alarm_filter (str): Filter string to apply when querying alarms\n", + " take (int): Number of alarms to fetch per page\n", + "\n", + " Returns:\n", + " List[Alarm]: List of all matching alarms\n", + " \"\"\"\n", + " # First query\n", + " request = QueryAlarmsWithFilterRequest(\n", + " filter=alarm_filter,\n", + " continuation_token=None,\n", + " take=take,\n", + " return_count=False,\n", + " )\n", + " response = alarm_client.query_alarms(request)\n", + "\n", + " all_alarms = list(response.alarms or [])\n", + " continuation_token = response.continuation_token\n", + "\n", + " # Continue with remaining pages if continuation token exists\n", + " while continuation_token:\n", + " request = QueryAlarmsWithFilterRequest(\n", + " filter=alarm_filter,\n", + " continuation_token=continuation_token,\n", + " take=take,\n", + " return_count=False,\n", + " )\n", + " response = alarm_client.query_alarms(request)\n", + "\n", + " alarms = response.alarms or []\n", + " all_alarms.extend(alarms)\n", + "\n", + " continuation_token = response.continuation_token\n", + "\n", + " return all_alarms" + ] + }, + { + "cell_type": "markdown", + "id": "39a3b118-b3bd-41b9-860c-c7bfbe78038a", + "metadata": {}, + "source": [ + "## Alarm helpers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "71c28551-40c3-4056-a0e6-5dc23deb426d", + "metadata": {}, + "outputs": [], + "source": [ + "def get_alarm_severity(alarm: Alarm) -> int | None:\n", + " \"\"\"\n", + " Extract severity level from an alarm.\n", + "\n", + " Args:\n", + " alarm (Alarm): Alarm object containing severity information\n", + "\n", + " Returns:\n", + " int | None: Severity level as an integer, or None if not found\n", + " \"\"\"\n", + " # Direct fields\n", + " for key in (\"current_severity_level\", \"severity_level\"):\n", + " value = getattr(alarm, key, None)\n", + " if value is not None:\n", + " if isinstance(value, int):\n", + " return value\n", + " if isinstance(value, str):\n", + " try:\n", + " return int(value)\n", + " except ValueError:\n", + " pass\n", + "\n", + " # Nested current state\n", + " for state_key in (\"current_state\", \"state\"):\n", + " state = getattr(alarm, state_key, None)\n", + " if state is not None:\n", + " severity_level = getattr(state, \"severity_level\", None)\n", + " if severity_level is not None:\n", + " if isinstance(severity_level, int):\n", + " return severity_level\n", + " if isinstance(severity_level, str):\n", + " try:\n", + " return int(severity_level)\n", + " except ValueError:\n", + " pass\n", + "\n", + " return None\n", + "\n", + "\n", + "def resolve_asset_location(asset: Asset) -> tuple[str | None, str | None, str | None]:\n", + " \"\"\"\n", + " Resolve the location information for an asset.\n", + "\n", + " Prioritizes system location (via minion_id) over physical location.\n", + "\n", + " Args:\n", + " asset (Asset): Asset object containing location information\n", + "\n", + " Returns:\n", + " tuple[str | None, str | None, str | None]: A tuple containing:\n", + " - resolved_location: Human-readable location (system alias or physical location)\n", + " - system_id: System ID if asset is in a system, None otherwise\n", + " - system_alias: System alias if asset is in a system, None otherwise\n", + " \"\"\"\n", + " resolved_location = None\n", + " system_id = None\n", + " system_alias = None\n", + "\n", + " location = asset.location\n", + " if location:\n", + " # First check for system location via minion_id (preferred if available)\n", + " if location.minion_id:\n", + " minion_id = location.minion_id\n", + " filter_str = f'id == \"{minion_id}\"'\n", + " systems = query_systems(filter=filter_str)\n", + "\n", + " if systems:\n", + " system_id = systems[0][\"id\"]\n", + " system_alias = systems[0][\"alias\"]\n", + " resolved_location = system_alias\n", + " # Fall back to physical location\n", + " elif location.physical_location:\n", + " resolved_location = location.physical_location\n", + "\n", + " return resolved_location, system_id, system_alias\n", + "\n", + "\n", + "def build_calibration_description(\n", + " alarm_display_name: str,\n", + " due_verb_phrase: str,\n", + " resolved_location: str | None,\n", + " asset: Asset,\n", + ") -> str:\n", + " \"\"\"\n", + " Build a calibration alarm description with optional date and location information.\n", + "\n", + " Args:\n", + " alarm_display_name (str): The asset display name for the alarm\n", + " due_verb_phrase (str): The verb phrase (e.g., \"has exceeded\", \"is approaching\")\n", + " resolved_location (str | None): The resolved location (system alias or physical location, or None)\n", + " asset (Asset): The asset object containing calibration information\n", + "\n", + " Returns:\n", + " str: Formatted description string\n", + " \"\"\"\n", + " date_phrase = \"\"\n", + " if asset.external_calibration and asset.external_calibration.resolved_due_date:\n", + " try:\n", + " calibration_date = asset.external_calibration.resolved_due_date\n", + " calibration_date_str = calibration_date.strftime(\"%B %d, %Y\")\n", + " date_phrase = f\" of {calibration_date_str}\"\n", + " except Exception:\n", + " pass # Leave date_phrase empty if formatting fails\n", + "\n", + " description = f\"The {alarm_display_name} {due_verb_phrase} the next recommended calibration date{date_phrase}.\"\n", + " \n", + " # Only add location sentence if we have a location\n", + " if resolved_location:\n", + " description += f\" You can find this device in the following location: {resolved_location}.\"\n", + " \n", + " return description\n", + "\n", + "\n", + "def build_transition_payload(\n", + " asset: Asset,\n", + " alarm_id: str,\n", + " target_severity: int,\n", + " calibration_status: str,\n", + ") -> CreateOrUpdateAlarmRequest:\n", + " \"\"\"\n", + " Build alarm transition payload for creating or updating an alarm.\n", + "\n", + " Uses template strings for display_name and description with placeholders that are\n", + " dynamically resolved using properties from the transition.\n", + "\n", + " Args:\n", + " asset (Asset): Asset object containing asset details\n", + " alarm_id (str): Unique identifier for the alarm\n", + " target_severity (int): Desired severity level (-1 for clear, 1 for low, 3 for high)\n", + " calibration_status (str): Current calibration status string\n", + "\n", + " Returns:\n", + " CreateOrUpdateAlarmRequest: Request object for creating or updating an alarm\n", + " \"\"\"\n", + " display_name = (\n", + " f\"{asset.model_name} ({asset.serial_number})\"\n", + " if asset.model_name and asset.serial_number\n", + " else asset.model_name or asset.serial_number or alarm_id\n", + " )\n", + " now_utc = datetime.now(timezone.utc)\n", + " iso_with_z = now_utc.replace(tzinfo=timezone.utc).isoformat().replace(\"+00:00\", \"Z\")\n", + "\n", + " detail_text = (\n", + " f\"{display_name} calibration status = {calibration_status}, \"\n", + " f\"severity = {target_severity} at {iso_with_z}\"\n", + " )\n", + "\n", + " # Resolve asset location information\n", + " resolved_location, system_id, system_alias = resolve_asset_location(asset)\n", + "\n", + " # Build properties base\n", + " properties: Dict[str, Any] = {\n", + " \"Model name\": asset.model_name,\n", + " \"Serial number\": asset.serial_number,\n", + " \"Vendor name\": asset.vendor_name,\n", + " }\n", + "\n", + " # Only add minionId and system if we resolved them\n", + " if system_id is not None and system_alias is not None:\n", + " properties[\"minionId\"] = system_id\n", + " properties[\"system\"] = system_alias\n", + "\n", + " # Build description based on calibration status\n", + " if calibration_status == CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value:\n", + " description = build_calibration_description(\n", + " alarm_display_name=display_name,\n", + " due_verb_phrase=\"has exceeded\",\n", + " resolved_location=resolved_location,\n", + " asset=asset,\n", + " )\n", + " elif calibration_status == CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value:\n", + " description = build_calibration_description(\n", + " alarm_display_name=display_name,\n", + " due_verb_phrase=\"is approaching\",\n", + " resolved_location=resolved_location,\n", + " asset=asset,\n", + " )\n", + " else:\n", + " # Fallback for OK status\n", + " description = f\"{display_name} {CALIBRATION_STATUS_TO_LABEL[calibration_status]}\"\n", + "\n", + " # Build dynamic transition properties for template resolution\n", + " transition_properties: Dict[str, Any] = {\n", + " \"CALIBRATION_STATUS\": CALIBRATION_STATUS_TO_LABEL[calibration_status],\n", + " \"ALARM_DESCRIPTION\": description,\n", + " }\n", + "\n", + " # Template strings\n", + " display_name_template = f\"{display_name} - \"\n", + " description_template = \"\"\n", + "\n", + " # Create appropriate transition based on severity\n", + " if target_severity == -1:\n", + " transition = ClearAlarmTransition(\n", + " occurred_at=iso_with_z,\n", + " value=calibration_status,\n", + " condition=calibration_status,\n", + " detail_text=detail_text,\n", + " properties=transition_properties,\n", + " )\n", + " else:\n", + " transition = SetAlarmTransition(\n", + " occurred_at=iso_with_z,\n", + " severity_level=target_severity,\n", + " value=calibration_status,\n", + " condition=calibration_status,\n", + " detail_text=detail_text,\n", + " properties=transition_properties,\n", + " )\n", + "\n", + " return CreateOrUpdateAlarmRequest(\n", + " alarm_id=alarm_id,\n", + " workspace=asset.workspace,\n", + " channel=alarm_id,\n", + " display_name=display_name_template,\n", + " description=description_template,\n", + " transition=transition,\n", + " properties=properties,\n", + " )\n", + "\n", + "\n", + "def upsert_alarms_for_assets(\n", + " assets: List[Asset],\n", + " target_severity: int,\n", + " calibration_status: str,\n", + " chunk_size: int = 500,\n", + ") -> List[Asset]:\n", + " \"\"\"\n", + " Create or update alarms for a list of assets based on calibration status.\n", + "\n", + " Args:\n", + " assets (List[Asset]): List of Asset objects to process\n", + " target_severity (int): Desired severity level (-1 to clear, 1 for low, 3 for high)\n", + " calibration_status (str): Calibration status string\n", + " chunk_size (int): Number of assets to process per batch\n", + "\n", + " Returns:\n", + " List[Asset]: List of assets that had their alarm state updated\n", + " \"\"\"\n", + " assets_with_updated_state: List[Asset] = []\n", + " if not assets:\n", + " return assets_with_updated_state\n", + "\n", + " for i in range(0, len(assets), chunk_size):\n", + " chunk = assets[i : i + chunk_size]\n", + "\n", + " # Map asset paths -> asset\n", + " asset_paths: List[str] = []\n", + " asset_path_to_asset: Dict[str, Asset] = {}\n", + " for asset in chunk:\n", + " alarm_id = get_asset_path(asset)\n", + " asset_paths.append(alarm_id)\n", + " asset_path_to_asset[alarm_id] = asset\n", + "\n", + " if not asset_paths:\n", + " continue\n", + "\n", + " alarm_filter = \" or \".join(f'alarmId = \"{p}\"' for p in asset_paths)\n", + "\n", + " # Query existing alarms (now automatically batches through all pages)\n", + " alarms = query_alarms(alarm_filter=alarm_filter)\n", + "\n", + " alarm_map: Dict[str, Alarm] = {alarm.alarm_id: alarm for alarm in alarms}\n", + "\n", + " for alarm_id, asset in asset_path_to_asset.items():\n", + " existing_alarm = alarm_map.get(alarm_id)\n", + " should_upsert = False\n", + "\n", + " if existing_alarm:\n", + " current_severity = get_alarm_severity(existing_alarm)\n", + " # If current severity is the same as desired, skip\n", + " if current_severity == target_severity:\n", + " continue\n", + " # Update to new severity level\n", + " print(\n", + " f\"Updating alarm for {alarm_id}: \"\n", + " f\"current_severity={current_severity}, target={target_severity}\"\n", + " )\n", + " should_upsert = True\n", + " else:\n", + " # No existing alarm\n", + " if target_severity == -1:\n", + " # For OK (-1) we do NOT create a new \"clean\" alarm - just skip\n", + " continue\n", + " # Create alarm with severity\n", + " print(f\"Creating alarm for {alarm_id} with severity {target_severity}\")\n", + " should_upsert = True\n", + "\n", + " if should_upsert:\n", + " request = build_transition_payload(\n", + " asset=asset,\n", + " alarm_id=alarm_id,\n", + " target_severity=target_severity,\n", + " calibration_status=calibration_status,\n", + " )\n", + " try:\n", + " alarm_client.create_or_update_alarm(request)\n", + " print(\n", + " f\"Alarm {'update' if existing_alarm else 'create'} OK for {alarm_id}\"\n", + " )\n", + " assets_with_updated_state.append(asset)\n", + " except Exception as e:\n", + " print(\n", + " f\"Error {'updating' if existing_alarm else 'creating'} alarm for {alarm_id}: {e}\"\n", + " )\n", + "\n", + " return assets_with_updated_state" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bfdf36a3", + "metadata": {}, + "outputs": [], + "source": [ + "def build_email_body(\n", + " http_url: str,\n", + " assets_with_past_date: List[Asset],\n", + " assets_with_approaching_due_date: List[Asset],\n", + ") -> str:\n", + " \"\"\"\n", + " Build email notification body with asset calibration status information.\n", + "\n", + " Generates a formatted email body containing links to assets grouped by calibration status:\n", + " - Assets with past calibration due date\n", + " - Assets with approaching calibration due date\n", + "\n", + " Args:\n", + " http_url (str): Base SystemLink HTTP URL\n", + " assets_with_past_date (List[Asset]): List of assets past their calibration due date\n", + " assets_with_approaching_due_date (List[Asset]): List of assets approaching their calibration due date\n", + "\n", + " Returns:\n", + " str: Formatted email body as a string with asset links\n", + " \"\"\"\n", + " parsed = urlsplit(http_url)\n", + " host_without_api = parsed.netloc.replace(\"-api\", \"\")\n", + " base_url = urlunsplit((parsed.scheme, host_without_api, \"\", \"\", \"\"))\n", + "\n", + " def asset_link(asset: Asset) -> str:\n", + " asset_id = asset.id\n", + " return f\"{base_url}/assets/{asset_id}\"\n", + "\n", + " lines: List[str] = []\n", + "\n", + " # Assets with past date\n", + " lines.append(\"Assets with past calibration due date: \")\n", + " if assets_with_past_date:\n", + " for idx, asset in enumerate(assets_with_past_date, start=1):\n", + " lines.append(f\"{idx}. {asset_link(asset)}\")\n", + " else:\n", + " lines.append(\"No new assets with past calibration due date.\")\n", + "\n", + " lines.append(\"\") # blank line between sections\n", + " \n", + " # Assets with approaching due date\n", + " lines.append(\"Assets with approaching calibration due date: \")\n", + " if assets_with_approaching_due_date:\n", + " for idx, asset in enumerate(assets_with_approaching_due_date, start=1):\n", + " lines.append(f\"{idx}. {asset_link(asset)}\")\n", + " else:\n", + " lines.append(\"No new assets with approaching calibration due date.\")\n", + "\n", + " return \"\\n\".join(lines)" + ] + }, + { + "cell_type": "markdown", + "id": "d2f56483", + "metadata": {}, + "source": [ + "## Implementation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f822784e-5a5c-48cb-b206-8e19069d92e6", + "metadata": {}, + "outputs": [], + "source": [ + "filter_str = \"\"\n", + "if workspace_ids:\n", + " conditions = \" OR \".join(f'workspace = \"{ws}\"' for ws in workspace_ids)\n", + " filter_str = f\"({conditions})\"\n", + "\n", + "print(\"Querying all assets...\")\n", + "all_assets = query_all_assets(filter=filter_str)\n", + "print(f\"Total assets: {len(all_assets)}\")\n", + "\n", + "# Divide assets into 3 categories based on calibrationStatus\n", + "assets_with_past_date: List[Asset] = []\n", + "assets_with_approaching_due_date: List[Asset] = []\n", + "calibrated_assets: List[Asset] = []\n", + "\n", + "for asset in all_assets:\n", + " status = asset.calibration_status\n", + " if status == CalibrationStatus.PAST_RECOMMENDED_DUE_DATE:\n", + " assets_with_past_date.append(asset)\n", + " elif status == CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE:\n", + " assets_with_approaching_due_date.append(asset)\n", + " elif status == CalibrationStatus.OK:\n", + " calibrated_assets.append(asset)\n", + "\n", + "sb.glue(\"Assets with calibration past due date\", len(assets_with_past_date))\n", + "sb.glue(\"Assets with approaching due date\", len(assets_with_approaching_due_date))\n", + "sb.glue(\"Calibrated assets\", len(calibrated_assets))\n", + "\n", + "# PAST_RECOMMENDED_DUE_DATE -> severity 3\n", + "new_assets_with_past_date = upsert_alarms_for_assets(\n", + " assets=assets_with_past_date,\n", + " target_severity=CALIBRATION_STATUS_TO_SEVERITY[\n", + " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value\n", + " ],\n", + " calibration_status=CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value,\n", + ")\n", + "\n", + "# APPROACHING_RECOMMENDED_DUE_DATE -> severity 1\n", + "new_assets_with_approaching_due_date = upsert_alarms_for_assets(\n", + " assets=assets_with_approaching_due_date,\n", + " target_severity=CALIBRATION_STATUS_TO_SEVERITY[\n", + " CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value\n", + " ],\n", + " calibration_status=CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value,\n", + ")\n", + "\n", + "mail_body = build_email_body(\n", + " http_url,\n", + " new_assets_with_past_date,\n", + " new_assets_with_approaching_due_date,\n", + ")\n", + "\n", + "# Send email notification if there are assets to report\n", + "if not new_assets_with_past_date and not new_assets_with_approaching_due_date:\n", + " print(\"No assets with calibration status changes to report - skipping email notification\")\n", + "else:\n", + " send_email_notification(\n", + " calibration_subscribers,\n", + " \"Asset due-date alerts\",\n", + " mail_body,\n", + " )\n", + "\n", + "# OK -> severity -1\n", + "upsert_alarms_for_assets(\n", + " assets=calibrated_assets,\n", + " target_severity=CALIBRATION_STATUS_TO_SEVERITY[CalibrationStatus.OK.value],\n", + " calibration_status=CalibrationStatus.OK.value,\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "nisystemlink-examples-py3.13", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/poetry.lock b/poetry.lock index f635011..a22f87e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. [[package]] name = "aenum" @@ -272,7 +272,7 @@ description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" groups = ["main", "dev"] -markers = "python_version == \"3.10\"" +markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598"}, {file = "exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219"}, @@ -593,14 +593,14 @@ files = [ [[package]] name = "nisystemlink-clients" -version = "2.20.0" +version = "2.31.0" description = "NI-SystemLink Python API" optional = false -python-versions = "<4.0,>=3.9" +python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "nisystemlink_clients-2.20.0-py3-none-any.whl", hash = "sha256:79f1f1532bb711a2f5f300f3b0a8d205e2fcef2f58594f68a35c703a8de68ee5"}, - {file = "nisystemlink_clients-2.20.0.tar.gz", hash = "sha256:fc2a9ced1a7f0b83d7dc907dca1db2fd96d91570975f8aeff2522a14417bc319"}, + {file = "nisystemlink_clients-2.31.0-py3-none-any.whl", hash = "sha256:58888793a23b7bd99b3093da183f490ca09ae1fd71f7ed87aeac386bd474145e"}, + {file = "nisystemlink_clients-2.31.0.tar.gz", hash = "sha256:c0a5193170c3d2d79c80c21749d7f5e39b0c573aacaf401ef6cedd0bc27c4ad5"}, ] [package.dependencies] @@ -611,7 +611,7 @@ pandas = ">=2.1.0,<3.0.0" pydantic = ">=2.11.3,<3.0.0" pyyaml = ">=6.0.1,<7.0.0" requests = ">=2.28.1,<3.0.0" -uplink = {version = ">=0.10.0,<0.11.0", extras = ["pydantic"], markers = "python_version >= \"3.10\""} +uplink = {version = ">=0.10.0,<0.11.0", extras = ["pydantic"]} [package.extras] pyarrow = ["pyarrow (>=21.0.0,<22.0.0)"] @@ -623,7 +623,7 @@ description = "Fundamental package for array computing in Python" optional = false python-versions = ">=3.10" groups = ["main"] -markers = "python_version == \"3.10\"" +markers = "python_version < \"3.11\"" files = [ {file = "numpy-2.2.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b412caa66f72040e6d268491a59f2c43bf03eb6c96dd8f0307829feb7fa2b6fb"}, {file = "numpy-2.2.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e41fd67c52b86603a91c1a505ebaef50b3314de0213461c7a6e99c9a3beff90"}, @@ -1228,13 +1228,6 @@ optional = false python-versions = ">=3.8" groups = ["main", "dev"] files = [ - {file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"}, - {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"}, - {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"}, - {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"}, - {file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"}, - {file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"}, - {file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"}, {file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"}, {file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"}, @@ -1377,7 +1370,7 @@ description = "A lil' TOML parser" optional = false python-versions = ">=3.8" groups = ["dev"] -markers = "python_version == \"3.10\"" +markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:88bd15eb972f3664f5ed4b57c1634a97153b4bac4479dcb6a495f41921eb7f45"}, {file = "tomli-2.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:883b1c0d6398a6a9d29b508c331fa56adbcdff647f6ace4dfca0f50e90dfd0ba"}, @@ -1534,4 +1527,4 @@ zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "9054a301f6f6f2a24df4dda113dcd571334d13ea60757ee8c463af239fb60ffa" +content-hash = "246e9a8556f8c9c0a8ea209720fafb196c36fbbec8bfd8f0e5d9b8e1ef985207" diff --git a/pyproject.toml b/pyproject.toml index 57ca765..7326d97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ packages = [ [tool.poetry.dependencies] python = "^3.10" -nisystemlink-clients = "^2.18.0" +nisystemlink-clients = "^2.31.0" requests = "^2.32.5" urllib3 = "^2.6.0" From 76eff98c9bde0a7ff51c8f0901af0f288df50986 Mon Sep 17 00:00:00 2001 From: ShriramS-Emerson Date: Tue, 3 Feb 2026 18:13:14 +0530 Subject: [PATCH 2/6] update calibration past due severity level --- ...set Calibration Alarms and Notification.ipynb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb index 1937373..0b5c2dc 100644 --- a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -13,8 +13,8 @@ "\n", "The notebook maps asset calibration statuses to alarm severity levels:\n", "\n", - "- **PAST_RECOMMENDED_DUE_DATE** → Severity 3 (High) \n", - " Asset is past its recommended calibration due date - creates or updates alarm \"Calibration Overdue\" with high severity\n", + "- **PAST_RECOMMENDED_DUE_DATE** → Severity 2 (Moderate) \n", + " Asset is past its recommended calibration due date - creates or updates alarm \"Calibration Overdue\" with moderate severity\n", "\n", "- **APPROACHING_RECOMMENDED_DUE_DATE** → Severity 1 (Low) \n", " Asset is approaching its recommended calibration due date - creates or updates alarm \"Calibration Due\" with low severity\n", @@ -100,7 +100,7 @@ "- **`calibration_subscribers`**: List of email addresses to receive notifications about calibration status changes\n", " - Example: `[\"user@example.com\", \"admin@example.com\"]`\n", "- **`http_url`**: Base SystemLink URL for building asset links in email notifications\n", - "- **`CALIBRATION_STATUS_TO_SEVERITY`**: Mapping of calibration status to alarm severity levels (1=Low, 3=High, -1=Clear)\n", + "- **`CALIBRATION_STATUS_TO_SEVERITY`**: Mapping of calibration status to alarm severity levels (1=Low, 2=Moderate, -1=Clear)\n", "- **`CALIBRATION_STATUS_TO_LABEL`**: Mapping of calibration status to human-readable alarm labels" ] }, @@ -142,11 +142,11 @@ "calibration_subscribers: List[str] = []\n", "\n", "# Base SystemLink URL for building asset links in email notifications\n", - "http_url = \"https://test.lifecyclesolutions.ni.com/\"\n", + "http_url = \"https://abc.example.com/\"\n", "\n", "# Calibration status -> desired severity mapping\n", "CALIBRATION_STATUS_TO_SEVERITY = {\n", - " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value: 3,\n", + " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value: 2,\n", " CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value: 1,\n", " CalibrationStatus.OK.value: -1,\n", "}\n", @@ -522,7 +522,7 @@ " Args:\n", " asset (Asset): Asset object containing asset details\n", " alarm_id (str): Unique identifier for the alarm\n", - " target_severity (int): Desired severity level (-1 for clear, 1 for low, 3 for high)\n", + " target_severity (int): Desired severity level (-1 for clear, 1 for low, 2 for moderate)\n", " calibration_status (str): Current calibration status string\n", "\n", " Returns:\n", @@ -626,7 +626,7 @@ "\n", " Args:\n", " assets (List[Asset]): List of Asset objects to process\n", - " target_severity (int): Desired severity level (-1 to clear, 1 for low, 3 for high)\n", + " target_severity (int): Desired severity level (-1 to clear, 1 for low, 2 for moderate)\n", " calibration_status (str): Calibration status string\n", " chunk_size (int): Number of assets to process per batch\n", "\n", @@ -803,7 +803,7 @@ "sb.glue(\"Assets with approaching due date\", len(assets_with_approaching_due_date))\n", "sb.glue(\"Calibrated assets\", len(calibrated_assets))\n", "\n", - "# PAST_RECOMMENDED_DUE_DATE -> severity 3\n", + "# PAST_RECOMMENDED_DUE_DATE -> severity 2\n", "new_assets_with_past_date = upsert_alarms_for_assets(\n", " assets=assets_with_past_date,\n", " target_severity=CALIBRATION_STATUS_TO_SEVERITY[\n", From fb61d3e0dfee33d87687e4a8c24e1e1c8cb8f094 Mon Sep 17 00:00:00 2001 From: ShriramS-Emerson Date: Tue, 3 Feb 2026 18:22:41 +0530 Subject: [PATCH 3/6] format notebook --- ... Calibration Alarms and Notification.ipynb | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb index 0b5c2dc..6c1a2c0 100644 --- a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -303,9 +303,7 @@ " return\n", "\n", " # Build SMTP address group with recipient addresses\n", - " address_group = SmtpAddressGroup(\n", - " fields=SmtpAddressFields(toAddresses=to_addresses)\n", - " )\n", + " address_group = SmtpAddressGroup(fields=SmtpAddressFields(toAddresses=to_addresses))\n", "\n", " # Build SMTP message template with subject and body\n", " message_template = SmtpMessageTemplate(\n", @@ -499,11 +497,13 @@ " pass # Leave date_phrase empty if formatting fails\n", "\n", " description = f\"The {alarm_display_name} {due_verb_phrase} the next recommended calibration date{date_phrase}.\"\n", - " \n", + "\n", " # Only add location sentence if we have a location\n", " if resolved_location:\n", - " description += f\" You can find this device in the following location: {resolved_location}.\"\n", - " \n", + " description += (\n", + " f\" You can find this device in the following location: {resolved_location}.\"\n", + " )\n", + "\n", " return description\n", "\n", "\n", @@ -573,7 +573,9 @@ " )\n", " else:\n", " # Fallback for OK status\n", - " description = f\"{display_name} {CALIBRATION_STATUS_TO_LABEL[calibration_status]}\"\n", + " description = (\n", + " f\"{display_name} {CALIBRATION_STATUS_TO_LABEL[calibration_status]}\"\n", + " )\n", "\n", " # Build dynamic transition properties for template resolution\n", " transition_properties: Dict[str, Any] = {\n", @@ -749,7 +751,7 @@ " lines.append(\"No new assets with past calibration due date.\")\n", "\n", " lines.append(\"\") # blank line between sections\n", - " \n", + "\n", " # Assets with approaching due date\n", " lines.append(\"Assets with approaching calibration due date: \")\n", " if assets_with_approaching_due_date:\n", @@ -829,7 +831,9 @@ "\n", "# Send email notification if there are assets to report\n", "if not new_assets_with_past_date and not new_assets_with_approaching_due_date:\n", - " print(\"No assets with calibration status changes to report - skipping email notification\")\n", + " print(\n", + " \"No assets with calibration status changes to report - skipping email notification\"\n", + " )\n", "else:\n", " send_email_notification(\n", " calibration_subscribers,\n", From 9b90798b7d8190f640fee0302a65681593984896 Mon Sep 17 00:00:00 2001 From: Rohith Raja Sakthivel Date: Thu, 5 Feb 2026 18:51:41 +0530 Subject: [PATCH 4/6] refactor: Resolved changes mentioned and included batch_query mechanism --- ... Calibration Alarms and Notification.ipynb | 436 ++++++++---------- 1 file changed, 190 insertions(+), 246 deletions(-) diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb index 6c1a2c0..2d917f1 100644 --- a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -55,7 +55,7 @@ "from datetime import datetime, timezone\n", "from typing import Any, Dict, List\n", "from urllib.parse import urlsplit, urlunsplit\n", - "\n", + "from typing import Callable, Iterator\n", "import scrapbook as sb\n", "from nisystemlink.clients.alarm import AlarmClient\n", "from nisystemlink.clients.alarm.models import (\n", @@ -182,97 +182,141 @@ { "cell_type": "code", "execution_count": null, - "id": "e13a3263-8522-414c-8242-0e8734cb9a45", + "id": "57a0e942", "metadata": {}, "outputs": [], "source": [ - "def query_all_assets(filter: str = \"\", take: int = 100) -> List[Asset]:\n", + "def __batch_query(\n", + " query_request: QueryAssetsRequest | QuerySystemsRequest | QueryAlarmsWithFilterRequest,\n", + " query_func: Callable[\n", + " [QueryAssetsRequest | QuerySystemsRequest | QueryAlarmsWithFilterRequest], Asset | Dict[str, Any] | Alarm,\n", + " ],\n", + " paged_data_field: str,\n", + ") -> Iterator[List[Any]]:\n", " \"\"\"\n", - " Query all assets from SystemLink by paginating through all results.\n", + " Execute a paginated query and yield results in batches.\n", "\n", " Args:\n", - " filter (str): Filter string to apply when querying assets\n", - " take (int): Number of assets to fetch per page\n", - "\n", - " Returns:\n", - " List[Asset]: List of all assets matching the filter criteria\n", + " query_request:\n", + " Request object used for querying. May support pagination via`continuation_token`, `skip`.\n", + " query_func:\n", + " Callable that executes the query using `query_request` and\n", + " paged_data_field:\n", + " Name of the attribute on the response object that contains the list of returned items.\n", + "\n", + " Yields:\n", + " List[Any]:\n", + " A list of items returned for each page of results.\n", " \"\"\"\n", - " query_assets_request = QueryAssetsRequest(\n", - " take=take,\n", - " skip=0,\n", - " filter=filter,\n", - " )\n", - "\n", - " all_assets: List[Asset] = []\n", "\n", " while True:\n", - " query_assets_response = asset_client.query_assets(query=query_assets_request)\n", - " assets = query_assets_response.assets\n", - " if not assets:\n", + " response = query_func(query_request)\n", + " items = getattr(response, paged_data_field, None)\n", + "\n", + " if not items:\n", " break\n", "\n", - " all_assets.extend(assets)\n", - " query_assets_request.skip = (query_assets_request.skip or 0) + len(assets)\n", + " yield items\n", + "\n", + " # Continuation-token based pagination (Alarms)\n", + " continuation_token = getattr(response, \"continuation_token\", None)\n", + " if continuation_token:\n", + " query_request.continuation_token = continuation_token\n", + " continue\n", "\n", - " return all_assets\n", + " # Skip based pagination (Systems / Assets)\n", + " if hasattr(query_request, \"skip\"):\n", + " prev_skip = query_request.skip\n", + " query_request.skip = (query_request.skip or 0) + len(items)\n", "\n", + " if query_request.skip == prev_skip:\n", + " break\n", + " continue\n", "\n", - "def get_asset_path(asset: Asset) -> str:\n", + " # No pagination mechanism\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb7fff9f", + "metadata": {}, + "outputs": [], + "source": [ + "def resolve_asset_location(asset: Asset) -> tuple[str | None, str | None, str | None]:\n", " \"\"\"\n", - " Generate a unique asset path for alarm identification.\n", + " Resolve the location information for an asset.\n", "\n", - " Constructs a hierarchical path using vendor, model, and serial number.\n", + " Prioritizes system location (via minion_id) over physical location.\n", "\n", " Args:\n", - " asset (Asset): Asset object containing vendor, model, and serial information\n", + " asset (Asset): Asset object containing location information\n", "\n", " Returns:\n", - " str: Formatted asset path string\n", + " tuple[str | None, str | None, str | None]: A tuple containing:\n", + " - resolved_location: Human-readable location (system alias or physical location)\n", + " - system_id: System ID if asset is in a system, None otherwise\n", + " - system_alias: System alias if asset is in a system, None otherwise\n", " \"\"\"\n", - " vendor = asset.vendor_name or str(asset.vendor_number or \"\")\n", - " model = asset.model_name or str(asset.model_number or \"\")\n", - " serial_number = str(asset.serial_number or \"\")\n", + " location = asset.location\n", + " if not location:\n", + " return None, None, None\n", "\n", - " return f\"Assets.{vendor}.{model}.{serial_number}.Calibration\"" + " query_system_request = QuerySystemsRequest(\n", + " skip=0,\n", + " take=100,\n", + " filter=f'id == \"{location.minion_id}\"',\n", + " projection=\"new(id,alias)\",\n", + " )\n", + "\n", + " systems_iter: Iterator[List[Dict[str, Any]]] = __batch_query(\n", + " query_request=query_system_request,\n", + " query_func=system_client.query_systems,\n", + " paged_data_field=\"data\",\n", + " )\n", + "\n", + " try:\n", + " systems = next(systems_iter)\n", + " system = systems[0]\n", + " except (StopIteration, IndexError):\n", + " return None, None, None\n", + "\n", + " system_id = system.get(\"id\")\n", + " system_alias = system.get(\"alias\")\n", + "\n", + " resolved_location = system_alias or getattr(location, \"physical_location\", None)\n", + " return resolved_location, system_id, system_alias" ] }, { "cell_type": "code", "execution_count": null, - "id": "28e91d5e-286e-46d0-9fd4-f78899c72e13", + "id": "e13a3263-8522-414c-8242-0e8734cb9a45", "metadata": {}, "outputs": [], "source": [ - "def query_systems(filter: str = \"\", take: int = 1000) -> List[Dict[str, Any]]:\n", + "def get_asset_path(asset: Asset) -> str:\n", " \"\"\"\n", - " Query all systems from SystemLink through batch query.\n", + " Generate a unique asset path for alarm identification.\n", + "\n", + " Constructs a hierarchical path using vendor, model, and serial number.\n", "\n", " Args:\n", - " filter (str): Filter string to apply when querying systems\n", - " take (int): Number of systems to fetch per batch\n", + " asset (Asset): Asset object containing vendor, model, and serial information\n", "\n", " Returns:\n", - " List[Dict[str, Any]]: List of dictionaries containing system id and alias\n", + " str: Formatted asset path string\n", " \"\"\"\n", - " request = QuerySystemsRequest(\n", - " skip=0,\n", - " take=take,\n", - " filter=filter,\n", - " projection=\"new(id,alias)\",\n", - " )\n", - "\n", - " all_systems: List[Dict[str, Any]] = []\n", - "\n", - " while True:\n", - " response = system_client.query_systems(request)\n", - " systems = response.data\n", - " if not systems:\n", - " break\n", - "\n", - " all_systems.extend(systems)\n", - " request.skip = (request.skip or 0) + len(systems)\n", + " resolved_asset_location = resolve_asset_location(asset=asset)\n", + " location = resolved_asset_location[0]\n", + " vendor = asset.vendor_name or str(asset.vendor_number or \"\")\n", + " model = asset.model_name or str(asset.model_number or \"\")\n", + " serial_number = str(asset.serial_number or \"\")\n", + " if location:\n", + " return f\"Assets.{location}.{vendor}.{model}.{serial_number}.Calibration\"\n", "\n", - " return all_systems" + " return f\"Assets.{vendor}.{model}.{serial_number}.Calibration\"" ] }, { @@ -326,57 +370,6 @@ " print(\"Email notification sent successfully\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "1a02a6f8-c170-4525-839e-c096306c2e95", - "metadata": {}, - "outputs": [], - "source": [ - "def query_alarms(\n", - " alarm_filter: str = \"\",\n", - " take: int = 1000,\n", - ") -> List[Alarm]:\n", - " \"\"\"\n", - " Query all alarms matching the filter by batching through all pages using continuation tokens.\n", - "\n", - " Args:\n", - " alarm_filter (str): Filter string to apply when querying alarms\n", - " take (int): Number of alarms to fetch per page\n", - "\n", - " Returns:\n", - " List[Alarm]: List of all matching alarms\n", - " \"\"\"\n", - " # First query\n", - " request = QueryAlarmsWithFilterRequest(\n", - " filter=alarm_filter,\n", - " continuation_token=None,\n", - " take=take,\n", - " return_count=False,\n", - " )\n", - " response = alarm_client.query_alarms(request)\n", - "\n", - " all_alarms = list(response.alarms or [])\n", - " continuation_token = response.continuation_token\n", - "\n", - " # Continue with remaining pages if continuation token exists\n", - " while continuation_token:\n", - " request = QueryAlarmsWithFilterRequest(\n", - " filter=alarm_filter,\n", - " continuation_token=continuation_token,\n", - " take=take,\n", - " return_count=False,\n", - " )\n", - " response = alarm_client.query_alarms(request)\n", - "\n", - " alarms = response.alarms or []\n", - " all_alarms.extend(alarms)\n", - "\n", - " continuation_token = response.continuation_token\n", - "\n", - " return all_alarms" - ] - }, { "cell_type": "markdown", "id": "39a3b118-b3bd-41b9-860c-c7bfbe78038a", @@ -392,83 +385,6 @@ "metadata": {}, "outputs": [], "source": [ - "def get_alarm_severity(alarm: Alarm) -> int | None:\n", - " \"\"\"\n", - " Extract severity level from an alarm.\n", - "\n", - " Args:\n", - " alarm (Alarm): Alarm object containing severity information\n", - "\n", - " Returns:\n", - " int | None: Severity level as an integer, or None if not found\n", - " \"\"\"\n", - " # Direct fields\n", - " for key in (\"current_severity_level\", \"severity_level\"):\n", - " value = getattr(alarm, key, None)\n", - " if value is not None:\n", - " if isinstance(value, int):\n", - " return value\n", - " if isinstance(value, str):\n", - " try:\n", - " return int(value)\n", - " except ValueError:\n", - " pass\n", - "\n", - " # Nested current state\n", - " for state_key in (\"current_state\", \"state\"):\n", - " state = getattr(alarm, state_key, None)\n", - " if state is not None:\n", - " severity_level = getattr(state, \"severity_level\", None)\n", - " if severity_level is not None:\n", - " if isinstance(severity_level, int):\n", - " return severity_level\n", - " if isinstance(severity_level, str):\n", - " try:\n", - " return int(severity_level)\n", - " except ValueError:\n", - " pass\n", - "\n", - " return None\n", - "\n", - "\n", - "def resolve_asset_location(asset: Asset) -> tuple[str | None, str | None, str | None]:\n", - " \"\"\"\n", - " Resolve the location information for an asset.\n", - "\n", - " Prioritizes system location (via minion_id) over physical location.\n", - "\n", - " Args:\n", - " asset (Asset): Asset object containing location information\n", - "\n", - " Returns:\n", - " tuple[str | None, str | None, str | None]: A tuple containing:\n", - " - resolved_location: Human-readable location (system alias or physical location)\n", - " - system_id: System ID if asset is in a system, None otherwise\n", - " - system_alias: System alias if asset is in a system, None otherwise\n", - " \"\"\"\n", - " resolved_location = None\n", - " system_id = None\n", - " system_alias = None\n", - "\n", - " location = asset.location\n", - " if location:\n", - " # First check for system location via minion_id (preferred if available)\n", - " if location.minion_id:\n", - " minion_id = location.minion_id\n", - " filter_str = f'id == \"{minion_id}\"'\n", - " systems = query_systems(filter=filter_str)\n", - "\n", - " if systems:\n", - " system_id = systems[0][\"id\"]\n", - " system_alias = systems[0][\"alias\"]\n", - " resolved_location = system_alias\n", - " # Fall back to physical location\n", - " elif location.physical_location:\n", - " resolved_location = location.physical_location\n", - "\n", - " return resolved_location, system_id, system_alias\n", - "\n", - "\n", "def build_calibration_description(\n", " alarm_display_name: str,\n", " due_verb_phrase: str,\n", @@ -557,22 +473,20 @@ " properties[\"system\"] = system_alias\n", "\n", " # Build description based on calibration status\n", - " if calibration_status == CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value:\n", - " description = build_calibration_description(\n", - " alarm_display_name=display_name,\n", - " due_verb_phrase=\"has exceeded\",\n", - " resolved_location=resolved_location,\n", - " asset=asset,\n", - " )\n", - " elif calibration_status == CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value:\n", + " verb_phrase_by_status = {\n", + " CalibrationStatus.PAST_RECOMMENDED_DUE_DATE.value: \"has exceeded\",\n", + " CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value: \"is approaching\",\n", + " }\n", + " verb_phrase = verb_phrase_by_status.get(calibration_status)\n", + "\n", + " if verb_phrase:\n", " description = build_calibration_description(\n", " alarm_display_name=display_name,\n", - " due_verb_phrase=\"is approaching\",\n", + " due_verb_phrase=verb_phrase,\n", " resolved_location=resolved_location,\n", " asset=asset,\n", " )\n", " else:\n", - " # Fallback for OK status\n", " description = (\n", " f\"{display_name} {CALIBRATION_STATUS_TO_LABEL[calibration_status]}\"\n", " )\n", @@ -617,6 +531,22 @@ " )\n", "\n", "\n", + "def should_update_alarm(existing_alarm: Alarm | None, target_severity: int) -> bool:\n", + " \"\"\"\n", + " Checks if an alarm should be updated or not based on severity.\n", + "\n", + " Args:\n", + " existing_alarm (Alarm): Alarm to be checked\n", + " target_severity (int): Desired severity level (-1 to clear, 1 for low, 2 for moderate)\n", + "\n", + " Returns:\n", + " bool: True if the alarm needs to be updated.\n", + " \"\"\"\n", + " if existing_alarm:\n", + " return existing_alarm.current_severity_level != target_severity\n", + " return target_severity != -1\n", + "\n", + "\n", "def upsert_alarms_for_assets(\n", " assets: List[Asset],\n", " target_severity: int,\n", @@ -643,64 +573,69 @@ " chunk = assets[i : i + chunk_size]\n", "\n", " # Map asset paths -> asset\n", - " asset_paths: List[str] = []\n", - " asset_path_to_asset: Dict[str, Asset] = {}\n", - " for asset in chunk:\n", - " alarm_id = get_asset_path(asset)\n", - " asset_paths.append(alarm_id)\n", - " asset_path_to_asset[alarm_id] = asset\n", - "\n", - " if not asset_paths:\n", + " asset_path_to_asset: Dict[str, Asset] = {\n", + " get_asset_path(asset): asset for asset in chunk\n", + " }\n", + "\n", + " if not asset_path_to_asset:\n", " continue\n", "\n", - " alarm_filter = \" or \".join(f'alarmId = \"{p}\"' for p in asset_paths)\n", + " alarm_filter = \" or \".join(\n", + " f'alarmId = \"{alarm_id}\"' for alarm_id in asset_path_to_asset\n", + " )\n", "\n", " # Query existing alarms (now automatically batches through all pages)\n", - " alarms = query_alarms(alarm_filter=alarm_filter)\n", + " query_alarm_request = QueryAlarmsWithFilterRequest(\n", + " filter=alarm_filter,\n", + " continuation_token=None,\n", + " take=100,\n", + " return_count=False,\n", + " )\n", "\n", - " alarm_map: Dict[str, Alarm] = {alarm.alarm_id: alarm for alarm in alarms}\n", + " fetch_alarms: Iterator[List[Alarm]] = __batch_query(\n", + " query_request=query_alarm_request,\n", + " query_func=alarm_client.query_alarms,\n", + " paged_data_field=\"alarms\",\n", + " )\n", + "\n", + " alarm_map: Dict[str, Alarm] = {\n", + " alarm.alarm_id: alarm for alarms in fetch_alarms for alarm in alarms\n", + " }\n", "\n", " for alarm_id, asset in asset_path_to_asset.items():\n", " existing_alarm = alarm_map.get(alarm_id)\n", - " should_upsert = False\n", "\n", + " if not should_update_alarm(existing_alarm, target_severity):\n", + " continue\n", + " \n", + " action = \"update\" if existing_alarm else \"create\"\n", " if existing_alarm:\n", - " current_severity = get_alarm_severity(existing_alarm)\n", - " # If current severity is the same as desired, skip\n", - " if current_severity == target_severity:\n", - " continue\n", - " # Update to new severity level\n", " print(\n", - " f\"Updating alarm for {alarm_id}: \"\n", - " f\"current_severity={current_severity}, target={target_severity}\"\n", + " f\"Updating alarm for {alarm_id} \"\n", + " f\"current_severity={existing_alarm.current_severity_level}, (target_severity={target_severity})\"\n", " )\n", - " should_upsert = True\n", " else:\n", - " # No existing alarm\n", - " if target_severity == -1:\n", - " # For OK (-1) we do NOT create a new \"clean\" alarm - just skip\n", - " continue\n", - " # Create alarm with severity\n", - " print(f\"Creating alarm for {alarm_id} with severity {target_severity}\")\n", - " should_upsert = True\n", - "\n", - " if should_upsert:\n", - " request = build_transition_payload(\n", - " asset=asset,\n", - " alarm_id=alarm_id,\n", - " target_severity=target_severity,\n", - " calibration_status=calibration_status,\n", + " print(\n", + " f\"Creating alarm for {alarm_id} \"\n", + " f\"(target_severity={target_severity})\"\n", + " )\n", + "\n", + " request = build_transition_payload(\n", + " asset=asset,\n", + " alarm_id=alarm_id,\n", + " target_severity=target_severity,\n", + " calibration_status=calibration_status,\n", + " )\n", + " try:\n", + " alarm_client.create_or_update_alarm(request)\n", + " print(\n", + " f\"Alarm {action} OK for {alarm_id}\"\n", + " )\n", + " assets_with_updated_state.append(asset)\n", + " except Exception as e:\n", + " print(\n", + " f\"Error {action} alarm for {alarm_id}: {e}\"\n", " )\n", - " try:\n", - " alarm_client.create_or_update_alarm(request)\n", - " print(\n", - " f\"Alarm {'update' if existing_alarm else 'create'} OK for {alarm_id}\"\n", - " )\n", - " assets_with_updated_state.append(asset)\n", - " except Exception as e:\n", - " print(\n", - " f\"Error {'updating' if existing_alarm else 'creating'} alarm for {alarm_id}: {e}\"\n", - " )\n", "\n", " return assets_with_updated_state" ] @@ -783,8 +718,17 @@ " conditions = \" OR \".join(f'workspace = \"{ws}\"' for ws in workspace_ids)\n", " filter_str = f\"({conditions})\"\n", "\n", - "print(\"Querying all assets...\")\n", - "all_assets = query_all_assets(filter=filter_str)\n", + "query_assets_request = QueryAssetsRequest(\n", + " take=100,\n", + " skip=0,\n", + " filter=filter_str,\n", + ")\n", + "fetch_assets: Iterator[List[Asset]] = __batch_query(\n", + " query_request=query_assets_request,\n", + " query_func=asset_client.query_assets,\n", + " paged_data_field=\"assets\",\n", + ")\n", + "all_assets = [asset for batch in fetch_assets for asset in batch]\n", "print(f\"Total assets: {len(all_assets)}\")\n", "\n", "# Divide assets into 3 categories based on calibrationStatus\n", @@ -823,18 +767,18 @@ " calibration_status=CalibrationStatus.APPROACHING_RECOMMENDED_DUE_DATE.value,\n", ")\n", "\n", - "mail_body = build_email_body(\n", - " http_url,\n", - " new_assets_with_past_date,\n", - " new_assets_with_approaching_due_date,\n", - ")\n", - "\n", "# Send email notification if there are assets to report\n", "if not new_assets_with_past_date and not new_assets_with_approaching_due_date:\n", " print(\n", " \"No assets with calibration status changes to report - skipping email notification\"\n", " )\n", "else:\n", + " mail_body = build_email_body(\n", + " http_url,\n", + " new_assets_with_past_date,\n", + " new_assets_with_approaching_due_date,\n", + " )\n", + "\n", " send_email_notification(\n", " calibration_subscribers,\n", " \"Asset due-date alerts\",\n", @@ -852,7 +796,7 @@ ], "metadata": { "kernelspec": { - "display_name": "nisystemlink-examples-py3.13", + "display_name": "nisystemlink-examples-py3.13 (3.13.5)", "language": "python", "name": "python3" }, From 5ce21adfca3a28e84ffd0c64fe9d48902cfd4208 Mon Sep 17 00:00:00 2001 From: Rohith Raja Sakthivel Date: Thu, 5 Feb 2026 19:43:33 +0530 Subject: [PATCH 5/6] refactor: Change in variabe name and reduced send email notification code --- ... Calibration Alarms and Notification.ipynb | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb index 2d917f1..8b04a30 100644 --- a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -270,14 +270,14 @@ " projection=\"new(id,alias)\",\n", " )\n", "\n", - " systems_iter: Iterator[List[Dict[str, Any]]] = __batch_query(\n", + " systems_iterator: Iterator[List[Dict[str, Any]]] = __batch_query(\n", " query_request=query_system_request,\n", " query_func=system_client.query_systems,\n", " paged_data_field=\"data\",\n", " )\n", "\n", " try:\n", - " systems = next(systems_iter)\n", + " systems = next(systems_iterator)\n", " system = systems[0]\n", " except (StopIteration, IndexError):\n", " return None, None, None\n", @@ -346,26 +346,23 @@ " print(\"No email recipients configured - skipping email notification\")\n", " return\n", "\n", - " # Build SMTP address group with recipient addresses\n", - " address_group = SmtpAddressGroup(fields=SmtpAddressFields(toAddresses=to_addresses))\n", - "\n", - " # Build SMTP message template with subject and body\n", - " message_template = SmtpMessageTemplate(\n", - " fields=SmtpMessageTemplateFields(subject_template=subject, body_template=body)\n", - " )\n", - "\n", - " # Create notification configuration\n", - " notification_config = DynamicNotificationConfiguration(\n", - " address_group=address_group, message_template=message_template\n", - " )\n", - "\n", - " # Create notification strategy\n", - " notification_strategy = DynamicNotificationStrategy(\n", - " notification_configurations=[notification_config]\n", - " )\n", - "\n", " # Create request and send notification\n", - " request = DynamicStrategyRequest(notification_strategy=notification_strategy)\n", + " request = DynamicStrategyRequest(\n", + " notification_strategy=DynamicNotificationStrategy(\n", + " notification_configurations=[\n", + " DynamicNotificationConfiguration(\n", + " address_group=SmtpAddressGroup(\n", + " fields=SmtpAddressFields(toAddresses=to_addresses)\n", + " ),\n", + " message_template=SmtpMessageTemplate(\n", + " fields=SmtpMessageTemplateFields(\n", + " subject_template=subject, body_template=body\n", + " )\n", + " ),\n", + " )\n", + " ]\n", + " )\n", + " )\n", " notification_client.apply_dynamic_notification_strategy(request)\n", " print(\"Email notification sent successfully\")" ] @@ -592,14 +589,14 @@ " return_count=False,\n", " )\n", "\n", - " fetch_alarms: Iterator[List[Alarm]] = __batch_query(\n", + " alarms_iterator: Iterator[List[Alarm]] = __batch_query(\n", " query_request=query_alarm_request,\n", " query_func=alarm_client.query_alarms,\n", " paged_data_field=\"alarms\",\n", " )\n", "\n", " alarm_map: Dict[str, Alarm] = {\n", - " alarm.alarm_id: alarm for alarms in fetch_alarms for alarm in alarms\n", + " alarm.alarm_id: alarm for alarms in alarms_iterator for alarm in alarms\n", " }\n", "\n", " for alarm_id, asset in asset_path_to_asset.items():\n", @@ -723,12 +720,12 @@ " skip=0,\n", " filter=filter_str,\n", ")\n", - "fetch_assets: Iterator[List[Asset]] = __batch_query(\n", + "assets_iterator: Iterator[List[Asset]] = __batch_query(\n", " query_request=query_assets_request,\n", " query_func=asset_client.query_assets,\n", " paged_data_field=\"assets\",\n", ")\n", - "all_assets = [asset for batch in fetch_assets for asset in batch]\n", + "all_assets = [asset for batch in assets_iterator for asset in batch]\n", "print(f\"Total assets: {len(all_assets)}\")\n", "\n", "# Divide assets into 3 categories based on calibrationStatus\n", From b69c153241cfd01bb35b35ce5ef9137c269fb895 Mon Sep 17 00:00:00 2001 From: Rohith Raja Sakthivel Date: Mon, 9 Feb 2026 11:02:52 +0530 Subject: [PATCH 6/6] refactor: Changed http_url to ui_url --- ...Asset Calibration Alarms and Notification.ipynb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb index 8b04a30..94036eb 100644 --- a/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb +++ b/examples/Alarms and Notification Examples/Asset Calibration Alarms and Notification.ipynb @@ -99,7 +99,7 @@ " - Example: `[\"workspace-123\", \"workspace-456\"]` or `[]` for all workspaces\n", "- **`calibration_subscribers`**: List of email addresses to receive notifications about calibration status changes\n", " - Example: `[\"user@example.com\", \"admin@example.com\"]`\n", - "- **`http_url`**: Base SystemLink URL for building asset links in email notifications\n", + "- **`ui_url`**: Base SystemLink UI URL for building asset links in email notifications\n", "- **`CALIBRATION_STATUS_TO_SEVERITY`**: Mapping of calibration status to alarm severity levels (1=Low, 2=Moderate, -1=Clear)\n", "- **`CALIBRATION_STATUS_TO_LABEL`**: Mapping of calibration status to human-readable alarm labels" ] @@ -141,8 +141,8 @@ "workspace_ids: List[str] = []\n", "calibration_subscribers: List[str] = []\n", "\n", - "# Base SystemLink URL for building asset links in email notifications\n", - "http_url = \"https://abc.example.com/\"\n", + "# Base SystemLink UI URL for building asset links in email notifications\n", + "ui_url = \"https://abc.example.com/\"\n", "\n", "# Calibration status -> desired severity mapping\n", "CALIBRATION_STATUS_TO_SEVERITY = {\n", @@ -645,7 +645,7 @@ "outputs": [], "source": [ "def build_email_body(\n", - " http_url: str,\n", + " ui_url: str,\n", " assets_with_past_date: List[Asset],\n", " assets_with_approaching_due_date: List[Asset],\n", ") -> str:\n", @@ -657,14 +657,14 @@ " - Assets with approaching calibration due date\n", "\n", " Args:\n", - " http_url (str): Base SystemLink HTTP URL\n", + " ui_url (str): Base SystemLink UI URL\n", " assets_with_past_date (List[Asset]): List of assets past their calibration due date\n", " assets_with_approaching_due_date (List[Asset]): List of assets approaching their calibration due date\n", "\n", " Returns:\n", " str: Formatted email body as a string with asset links\n", " \"\"\"\n", - " parsed = urlsplit(http_url)\n", + " parsed = urlsplit(ui_url)\n", " host_without_api = parsed.netloc.replace(\"-api\", \"\")\n", " base_url = urlunsplit((parsed.scheme, host_without_api, \"\", \"\", \"\"))\n", "\n", @@ -771,7 +771,7 @@ " )\n", "else:\n", " mail_body = build_email_body(\n", - " http_url,\n", + " ui_url,\n", " new_assets_with_past_date,\n", " new_assets_with_approaching_due_date,\n", " )\n",