diff --git a/tests/unit/connectors/opcua/data/constants/opcua_config_constants.json b/tests/unit/connectors/opcua/data/constants/opcua_config_constants.json new file mode 100644 index 000000000..74d1cf509 --- /dev/null +++ b/tests/unit/connectors/opcua/data/constants/opcua_config_constants.json @@ -0,0 +1,22 @@ +{ + "attributes": [ + { "key": "Customer", "type": "constant", "value": "ACME Corp" }, + { "key": "Frequency", "type": "path", "value": "${Frequency}" } + ], + "attributes_updates": [], + "deviceInfo": { + "deviceNameExpression": "OPCUA New Advanced Device", + "deviceNameExpressionSource": "path", + "deviceProfileExpression": "default", + "deviceProfileExpressionSource": "constant" + }, + "deviceNodePattern": "Root\\.Objects\\.MyObject", + "deviceNodeSource": "path", + "device_name": "OPCUA New Advanced Device", + "device_type": "default", + "rpc_methods": [], + "timeseries": [ + { "key": "InitialBatchSize", "type": "constant", "value": "12" }, + { "key": "Temperature", "type": "path", "value": "${Temperature}" } + ] +} \ No newline at end of file diff --git a/tests/unit/connectors/opcua/test_constants.py b/tests/unit/connectors/opcua/test_constants.py new file mode 100644 index 000000000..f5cdb3c30 --- /dev/null +++ b/tests/unit/connectors/opcua/test_constants.py @@ -0,0 +1,84 @@ +# Copyright 2025. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests.unit.connectors.opcua.opcua_base_test import OpcUABaseTest + + +class OpcUAConstantsTest(OpcUABaseTest): + + async def asyncSetUp(self): + await super().asyncSetUp() + # Prepare connector fields used by __send_constants_for_device + self.connector._OpcUaConnector__constant_telemetry_sent_devices = set() + # Avoid touching Thread.name setter; patch accessors instead + self.connector.get_name = lambda: "test-connector" + self.connector.get_id = lambda: "test-id" + + # Build device from constants config + self.fake_device = self.create_fake_device('constants/opcua_config_constants.json') + self.connector._OpcUaConnector__device_nodes[:] = [self.fake_device] + + async def asyncTearDown(self): + await super().asyncTearDown() + + async def test_parse_constants_buckets(self): + # Device should have parsed constants into dedicated buckets + self.assertTrue(hasattr(self.fake_device, "constant_attributes")) + self.assertTrue(hasattr(self.fake_device, "constant_timeseries")) + + self.assertEqual(len(self.fake_device.constant_attributes), 1, "Expected one constant attribute") + self.assertEqual(self.fake_device.constant_attributes[0]["key"], "Customer") + self.assertEqual(self.fake_device.constant_attributes[0]["value"], "ACME Corp") + + self.assertEqual(len(self.fake_device.constant_timeseries), 1, "Expected one constant timeseries") + self.assertEqual(self.fake_device.constant_timeseries[0]["key"], "InitialBatchSize") + self.assertEqual(self.fake_device.constant_timeseries[0]["value"], "12") + + async def test_send_constants_once_and_idempotent(self): + # First send: attributes + telemetry (one-time) + self.connector._OpcUaConnector__send_constants_for_device(self.fake_device) + + calls = self.connector._OpcUaConnector__gateway.send_to_storage.call_args_list + self.assertEqual(len(calls), 1, "Expected 1 send_to_storage call on first dispatch") + + first_data = calls[0][0][2] # args: (connector_name, connector_id, converted_data) + first_payload = first_data.to_dict() + + # Attributes include the constant "Customer" + self.assertIn("attributes", first_payload) + self.assertEqual(first_payload["attributes"].get("Customer"), "ACME Corp") + + # Telemetry includes one-time constant "InitialBatchSize" cast to integer + self.assertIn("telemetry", first_payload) + self.assertEqual(len(first_payload["telemetry"]), 1, "Expected one telemetry entry") + first_telemetry_entry = first_payload["telemetry"][0] + self.assertIn("values", first_telemetry_entry) + self.assertIsInstance(first_telemetry_entry.get("ts"), int, "Telemetry ts must be an integer timestamp") + self.assertEqual(first_telemetry_entry["values"].get("InitialBatchSize"), 12, "Expected int-casted value 12") + + # Second send: attributes again, telemetry should NOT be re-sent + self.connector._OpcUaConnector__send_constants_for_device(self.fake_device) + + calls = self.connector._OpcUaConnector__gateway.send_to_storage.call_args_list + self.assertEqual(len(calls), 2, "Expected 2 send_to_storage calls after second dispatch") + + second_data = calls[1][0][2] + second_payload = second_data.to_dict() + + # Attributes should still contain the constant "Customer" + self.assertEqual(second_payload["attributes"].get("Customer"), "ACME Corp") + + # Telemetry should be empty (one-time semantics) + self.assertIsInstance(second_payload.get("telemetry"), list) + self.assertEqual(len(second_payload["telemetry"]), 0, "Constant telemetry must be sent only once") \ No newline at end of file diff --git a/thingsboard_gateway/connectors/opcua/device.py b/thingsboard_gateway/connectors/opcua/device.py index c77f7f2ce..b60b41924 100644 --- a/thingsboard_gateway/connectors/opcua/device.py +++ b/thingsboard_gateway/connectors/opcua/device.py @@ -40,6 +40,9 @@ def __init__(self, path, name, device_profile, config, converter, converter_for_ 'timeseries': [], 'attributes': [] } + # Collections for constant datapoints (do not require OPC-UA reads) + self.constant_attributes = [] + self.constant_timeseries = [] self.shared_attributes_keys = self.__get_shared_attributes_keys() self.shared_attributes_keys_value_pairs = self.__match_key_value_for_attribute_updates() self.nodes = [] @@ -80,6 +83,20 @@ def load_values(self): try: value_str = node_config['value'] + # Handle constant datapoints explicitly + if node_config.get('type') == 'constant': + constant_entry = { + 'key': node_config['key'], + 'value': node_config.get('value') + } + if node_config.get(REPORT_STRATEGY_PARAMETER) is not None: + constant_entry[REPORT_STRATEGY_PARAMETER] = node_config.get(REPORT_STRATEGY_PARAMETER) + if section == 'attributes': + self.constant_attributes.append(constant_entry) + else: + self.constant_timeseries.append(constant_entry) + continue + # Match NodeId value (e.g. ns=2;s=SomeNode) node_id_match = re.search(Device.NODE_ID_PATTERN, value_str) if node_id_match: @@ -118,7 +135,12 @@ def load_values(self): except KeyError as e: self._log.error('Invalid config for %s (key %s not found)', node_config, e) - self.__configured_values_count += len(self.values[section]) + section_count = len(self.values[section]) + if section == 'attributes': + section_count += len(self.constant_attributes) + else: + section_count += len(self.constant_timeseries) + self.__configured_values_count += section_count self._log.debug('Loaded %r values for %s', len(self.values), self.name) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index a60315842..3fa56f178 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -30,6 +30,7 @@ from thingsboard_gateway.gateway.constants import CONNECTOR_PARAMETER, RECEIVED_TS_PARAMETER, CONVERTED_TS_PARAMETER, \ DATA_RETRIEVING_STARTED, REPORT_STRATEGY_PARAMETER, ON_ATTRIBUTE_UPDATE_DEFAULT_TIMEOUT from thingsboard_gateway.gateway.entities.converted_data import ConvertedData +from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService @@ -176,6 +177,7 @@ def __init__(self, gateway: 'TBGatewayService', config, connector_type): self.__device_nodes: List[Device] = [] self.__nodes_config_cache: Dict[NodeId, List[Device]] = {} + self.__constant_telemetry_sent_devices = set() self.__next_poll = 0 self.__next_scan = 0 self.__client_recreation_required = True @@ -823,16 +825,16 @@ async def _create_new_devices(self): device_config = {**device_config, 'device_name': device_name, 'device_type': device_profile} device_path = [node_path_node_object['path'] for node_path_node_object in node] - self.__device_nodes.append( - Device(path=device_path, name=device_name, device_profile=device_profile, - config=device_config, - converter=converter( - device_config, self.__converter_log), - converter_for_sub=converter(device_config, - self.__converter_log) if self.__enable_subscriptions else None, - device_node=node[-1]['node'], - logger=self.__log)) + new_device = Device(path=device_path, name=device_name, device_profile=device_profile, + config=device_config, + converter=converter(device_config, self.__converter_log), + converter_for_sub=converter(device_config, self.__converter_log) if self.__enable_subscriptions else None, + device_node=node[-1]['node'], + logger=self.__log) + self.__device_nodes.append(new_device) self.__log.info('Added device node: %s', device_name) + # Send constant attributes always on creation; constant telemetry only once per process + self.__send_constants_for_device(new_device) self.__log.debug('Device nodes: %s', self.__device_nodes) async def _load_devices_nodes(self): @@ -1098,6 +1100,49 @@ def __send_data_to_gateway_storage(self, data): self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 self.__log.debug('Count data msg to storage: %s', self.statistics['MessagesSent']) + def __send_constants_for_device(self, device: Device): + """ + Send constant attributes on every device creation and reconnection (because devices are recreated), + and constant telemetry only once per process lifetime for a given device. + """ + try: + has_attributes = False + has_telemetry = False + converted_data = ConvertedData(device_name=device.name, device_type=device.device_profile) + + # Constant attributes + for entry in getattr(device, 'constant_attributes', []): + key = entry.get('key') + value = entry.get('value') + casted_value = self.__guess_type_and_cast(value) + dp_key = TBUtility.convert_key_to_datapoint_key(key, device.report_strategy or None, entry, self.__log) + converted_data.add_to_attributes({dp_key: casted_value}) + has_attributes = True + + # Constant telemetry (send once per device name per process) + if device.name not in self.__constant_telemetry_sent_devices: + telemetry_values = {} + for entry in getattr(device, 'constant_timeseries', []): + key = entry.get('key') + value = entry.get('value') + casted_value = self.__guess_type_and_cast(value) + dp_key = TBUtility.convert_key_to_datapoint_key(key, device.report_strategy or None, entry, self.__log) + telemetry_values[dp_key] = casted_value + + if telemetry_values: + ts = int(time() * 1000) + telemetry_entry = TelemetryEntry(telemetry_values, ts) + converted_data.add_to_telemetry(telemetry_entry) + has_telemetry = True + self.__constant_telemetry_sent_devices.add(device.name) + + if has_attributes or has_telemetry: + self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data) + self.__log.info("Sent constant data for device %s: attributes=%s, telemetry=%s", + device.name, has_attributes, has_telemetry) + except Exception as e: + self.__log.exception("Failed to send constant data for device %s: %s", device.name, e) + @staticmethod def get_rpc_node_pattern_and_base_path(params, device, logger): try: