From 969330fb1c56183a3ae61ffcc8259346a8725a2e Mon Sep 17 00:00:00 2001 From: farmio Date: Thu, 12 Sep 2024 15:38:17 +0200 Subject: [PATCH 1/4] Grab ParameterInstanceRef and ApplicationProgramChannel from project and parse text_parameter_ref_id for ComObjectRef --- .../loader/application_program_loader.py | 48 ++++++++++++++----- xknxproject/loader/project_loader.py | 12 +++++ xknxproject/models/__init__.py | 2 + xknxproject/models/models.py | 36 ++++++++++++++ 4 files changed, 85 insertions(+), 13 deletions(-) diff --git a/xknxproject/loader/application_program_loader.py b/xknxproject/loader/application_program_loader.py index a53a8c0..242ce8d 100644 --- a/xknxproject/loader/application_program_loader.py +++ b/xknxproject/loader/application_program_loader.py @@ -3,7 +3,6 @@ from __future__ import annotations from collections.abc import Iterator -import logging from typing import Any from xml.etree import ElementTree from zipfile import Path @@ -11,6 +10,7 @@ from xknxproject.models import ( Allocator, ApplicationProgram, + ApplicationProgramChannel, ComObject, ComObjectRef, DeviceInstance, @@ -19,8 +19,6 @@ ) from xknxproject.util import parse_dpt_types, parse_xml_flag -_LOGGER = logging.getLogger("xknxproject.log") - class ApplicationProgramLoader: """Load the application program from KNX XML.""" @@ -51,6 +49,9 @@ def load( for attribute in device.module_instance_arguments() } numeric_args: dict[str, ModuleDefinitionNumericArg] = {} + channels: dict[ + str, ApplicationProgramChannel + ] = {} # {Id: ApplicationProgramChannel} allocators: dict[str, Allocator] = {} with application_program_path.open(mode="rb") as application_xml: @@ -95,6 +96,16 @@ def load( value=int(value) if value is not None else None, ) elem.clear() + elif elem.tag.endswith("Channel"): + _id = elem.attrib.get("Id") + channels[_id] = ApplicationProgramChannel( + identifier=_id, + name=elem.attrib.get("Name"), + number=elem.attrib.get("Number"), + text=elem.attrib.get("Text"), + text_parameter_ref_id=elem.attrib.get("TextParameterRefId"), + ) + elem.clear() elif elem.tag.endswith("Languages"): elem.clear() # hold iterator for optional translation parsing @@ -107,6 +118,7 @@ def load( com_objects=com_objects, com_object_refs=com_object_refs, used_com_object_ref_ids=used_com_object_ref_ids, + channels=channels, language_code=language_code, ) @@ -116,6 +128,7 @@ def load( allocators=allocators, module_def_arguments=used_module_arguments, numeric_args=numeric_args, + channels=channels, ) @staticmethod @@ -124,13 +137,16 @@ def parse_translations( com_objects: dict[str, ComObject], com_object_refs: dict[str, ComObjectRef], used_com_object_ref_ids: set[str], + channels: dict[str, ApplicationProgramChannel], language_code: str, ) -> None: """Parse translations. Replace translated text in com_objects and com_object_refs.""" - used_com_object_ids = { + _used_com_object_ids = { com_object_ref.ref_id for com_object_ref in com_object_refs.values() } - used_translation_ids = used_com_object_ids | used_com_object_ref_ids + used_translation_ids = ( + _used_com_object_ids | used_com_object_ref_ids | channels.keys() + ) in_language = False in_translation_ref: str | None = None # TranslationElement RefId # translation_map: {TranslationElement RefId: {AttributeName: Text}} @@ -138,7 +154,7 @@ def parse_translations( for _, elem in tree_iterator: if elem.tag.endswith("Language"): if in_language: - # Already found the language we are looking for. + # Hitting the next language tag after the one we were looking for. # We don't need anything after that tag (there isn't much anyway) elem.clear() break @@ -158,6 +174,7 @@ def parse_translations( ApplicationProgramLoader.apply_translations(com_object_refs, translation_map) ApplicationProgramLoader.apply_translations(com_objects, translation_map) + ApplicationProgramLoader.apply_translations(channels, translation_map) @staticmethod def parse_com_object( @@ -202,21 +219,26 @@ def parse_com_object_ref( update_flag=parse_xml_flag(elem.get("UpdateFlag")), read_on_init_flag=parse_xml_flag(elem.get("ReadOnInitFlag")), datapoint_types=parse_dpt_types(elem.get("DatapointType")), + text_parameter_ref_id=elem.get("TextParameterRefId"), ) @staticmethod def apply_translations( - com_objects: dict[str, ComObject] | dict[str, ComObjectRef], + translatable_object_map: dict[str, ComObject] + | dict[str, ComObjectRef] + | dict[str, ApplicationProgramChannel], translation_map: dict[str, dict[str, str]], ) -> None: - """Apply translations to ComObjects and ComObjectRefs.""" - for identifier in com_objects.keys() & translation_map.keys(): + """Apply translations to Objects.""" + for identifier in translatable_object_map.keys() & translation_map.keys(): translation = translation_map[identifier] - com_object = com_objects[identifier] + obj = translatable_object_map[identifier] if _text := translation.get("Text"): - com_object.text = _text - if _function_text := translation.get("FunctionText"): - com_object.function_text = _function_text + obj.text = _text + if hasattr(obj, "function_text") and ( + _function_text := translation.get("FunctionText") + ): + obj.function_text = _function_text @staticmethod def get_application_program_files_for_devices( diff --git a/xknxproject/loader/project_loader.py b/xknxproject/loader/project_loader.py index bb8c6c4..17f39cf 100644 --- a/xknxproject/loader/project_loader.py +++ b/xknxproject/loader/project_loader.py @@ -13,6 +13,7 @@ KNXMasterData, ModuleInstance, ModuleInstanceArgument, + ParameterInstanceRef, SpaceType, XMLArea, XMLFunction, @@ -277,6 +278,16 @@ def _create_device( ) ) + parameter_instances = [ + ParameterInstanceRef( + ref_id=param_instance_node.get("RefId"), # type: ignore[arg-type] + value=param_instance_node.get("Value"), + ) + for param_instance_node in device_element.findall( + "{*}ParameterInstanceRefs/{*}ParameterInstancRef" + ) + ] + return DeviceInstance( identifier=device_element.get("Id", ""), address=int(address), @@ -292,6 +303,7 @@ def _create_device( channels=channels, com_object_instance_refs=com_obj_inst_refs, module_instances=module_instances, + parameter_instance_refs=parameter_instances, ) @staticmethod diff --git a/xknxproject/models/__init__.py b/xknxproject/models/__init__.py index 53ad706..71e0c25 100644 --- a/xknxproject/models/__init__.py +++ b/xknxproject/models/__init__.py @@ -19,6 +19,8 @@ ) from .models import ( ApplicationProgram, + ApplicationProgramChannel, + ParameterInstanceRef, Allocator, ChannelNode, ComObject, diff --git a/xknxproject/models/models.py b/xknxproject/models/models.py index 5b3b2bb..10f598d 100644 --- a/xknxproject/models/models.py +++ b/xknxproject/models/models.py @@ -137,6 +137,7 @@ def __init__( channels: list[ChannelNode], com_object_instance_refs: list[ComObjectInstanceRef], module_instances: list[ModuleInstance], + parameter_instance_refs: list[ParameterInstanceRef], com_objects: list[ComObject] | None = None, ): """Initialize a Device Instance.""" @@ -157,6 +158,7 @@ def __init__( self.com_object_instance_refs = com_object_instance_refs self.module_instances = module_instances self.com_objects = com_objects or [] + self.parameter_instance_refs = parameter_instance_refs self.application_program_ref: str | None = None self.individual_address = ( @@ -482,6 +484,14 @@ def _base_number_from_allocator( ) +@dataclass +class ParameterInstanceRef: + """ParameterInstanceRef.""" + + ref_id: str + value: str | None + + @dataclass class ApplicationProgram: """Class that represents an ApplicationProgram instance.""" @@ -491,6 +501,7 @@ class ApplicationProgram: allocators: dict[str, Allocator] # {Id: Allocator} module_def_arguments: dict[str, ModuleDefinitionArgumentInfo] # {Id: ...} numeric_args: dict[str, ModuleDefinitionNumericArg] # {RefId: ...} + channels: dict[str, ApplicationProgramChannel] # {Id: ApplicationProgramChannel} @dataclass @@ -523,6 +534,29 @@ class ModuleDefinitionNumericArg: base_value: str | None +@dataclass +class ApplicationProgramChannel: + """ApplicationProgramChannel.""" + + __slots__ = ( + "identifier", + "text", + "text_parameter_ref_id", + "name", + "number", + ) + + identifier: str # name="Id" type="xs:ID" use="required" + text: ( + str | None + ) # name="Text" type="knx:LanguageDependentString255_t" use="optional" + text_parameter_ref_id: ( + str | None + ) # name="TextParameterRefId" type="knx:RELIDREF" use="optional" + name: str # name="Name" type="knx:String255_t" use="required" + number: str # name="Number" type="knx:String50_t" use="required" + + @dataclass class ComObject: """Class that represents a ComObject instance.""" @@ -582,6 +616,7 @@ class ComObjectRef: "update_flag", "read_on_init_flag", "datapoint_types", + "text_parameter_ref_id", ) identifier: str # "Id" - xs:ID - required @@ -597,6 +632,7 @@ class ComObjectRef: update_flag: bool | None # "UpdateFlag" - knx:Enable_t read_on_init_flag: bool | None # "ReadOnInitFlag" - knx:Enable_t datapoint_types: list[DPTType] # "DataPointType" - knx:IDREFS + text_parameter_ref_id: str | None # type="knx:IDREF" use="optional" @dataclass From 6e254ca02523222af3545fcab9866b5f00097a11 Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 13 Sep 2024 14:05:00 +0200 Subject: [PATCH 2/4] Inherit channel name from application channel --- xknxproject/models/models.py | 25 +++++++++++-------------- xknxproject/util.py | 24 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/xknxproject/models/models.py b/xknxproject/models/models.py index 10f598d..b2fd820 100644 --- a/xknxproject/models/models.py +++ b/xknxproject/models/models.py @@ -9,6 +9,7 @@ from xknxproject.models.knxproject import DPTType, ModuleInstanceInfos from xknxproject.models.static import GroupAddressStyle, SpaceType +import xknxproject.util as util from xknxproject.zip import KNXProjContents _LOGGER = logging.getLogger("xknxproject.log") @@ -199,6 +200,15 @@ def merge_application_program_info(self, application: ApplicationProgram) -> Non application=application, ) + for channel in self.channels: + if not channel.name: + application_channel_id = util.strip_module_instance( + channel.ref_id, search_id="CH" + ) + application_channel = application.channels[ + f"{self.application_program_ref}_{application_channel_id}" + ] + channel.name = application_channel.text or application_channel.name self._complete_channel_placeholders() def _complete_channel_placeholders(self) -> None: @@ -329,20 +339,7 @@ def resolve_com_object_ref_id( self.com_object_ref_id = self.ref_id return - if self.ref_id.startswith("O-"): - ref_id = self.ref_id - elif self.ref_id.startswith("MD-"): - # Remove module and ModuleInstance occurrence as they will not be in the application program directly - module_definition = self.ref_id.split("_")[0] - object_reference = self.ref_id[self.ref_id.index("_O-") :] - _submodule_match = re.search(r"(_SM-[^_]+)", self.ref_id) - submodule = _submodule_match.group() if _submodule_match is not None else "" - ref_id = f"{module_definition}{submodule}{object_reference}" - else: - raise ValueError( - f"Unknown ref_id format: {self.ref_id} in application: {application_program_ref}" - ) - + ref_id = util.strip_module_instance(self.ref_id, search_id="O") self.application_program_id_prefix = f"{application_program_ref}_" self.com_object_ref_id = f"{application_program_ref}_{ref_id}" diff --git a/xknxproject/util.py b/xknxproject/util.py index 1312803..a58ee03 100644 --- a/xknxproject/util.py +++ b/xknxproject/util.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import re from typing import overload from xknxproject.const import MAIN_AND_SUB_DPT, MAIN_DPT @@ -65,3 +66,26 @@ def parse_xml_flag(flag: str | None, default: bool | None = None) -> bool | None if flag is None: return default return flag == "Enabled" + + +def strip_module_instance(text: str, search_id: str) -> str: + """ + Remove module and module instance from text, keep module definition and rest. + + text: full text to be processed + search_id: search term to be kept without "-" eg. "CH" for channel + + Examples + -------- + search_id="CH": "CH-4" -> "CH-4" + search_id="CH": "MD-1_M-1_MI-1_CH-4" -> "MD-1_CH-4" + search_id="O": "MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1_O-3-1_R-2" -> "MD-4_SM-1_O-3-1_R-2" + + """ + # For submodules SM- must be the last item before search_id + # because I couldn't create a regex that works otherwise :( + return re.sub( + r"(MD-\w+_)?.*?(SM-\w+_)?(" + re.escape(search_id) + r"-.*)", + lambda matchobj: "".join(part for part in matchobj.groups() if part), + text, + ) From 50b9740adb7da083994b560b61afa95aef5fa420 Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 13 Sep 2024 14:10:32 +0200 Subject: [PATCH 3/4] fix import --- xknxproject/models/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xknxproject/models/models.py b/xknxproject/models/models.py index b2fd820..5ece393 100644 --- a/xknxproject/models/models.py +++ b/xknxproject/models/models.py @@ -7,9 +7,9 @@ import logging import re +from xknxproject import util from xknxproject.models.knxproject import DPTType, ModuleInstanceInfos from xknxproject.models.static import GroupAddressStyle, SpaceType -import xknxproject.util as util from xknxproject.zip import KNXProjContents _LOGGER = logging.getLogger("xknxproject.log") From 9e997925386874c0d7aa06e1faa97ae79bad2b3e Mon Sep 17 00:00:00 2001 From: farmio Date: Fri, 13 Sep 2024 14:22:31 +0200 Subject: [PATCH 4/4] test regex --- test/test_util.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/test/test_util.py b/test/test_util.py index 8ce1be8..29dcd1d 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -2,7 +2,7 @@ import pytest -from xknxproject.util import get_dpt_type, parse_dpt_types +from xknxproject.util import get_dpt_type, parse_dpt_types, strip_module_instance @pytest.mark.parametrize( @@ -49,3 +49,16 @@ def test_get_dpt_type(dpt_string, expected): def test_parse_dpt_types(dpt_string, expected): """Test parsing list of DPT from ETS project.""" assert parse_dpt_types(dpt_string) == expected + + +@pytest.mark.parametrize( + ("text", "search_id", "expected"), + [ + ("CH-4", "CH", "CH-4"), + ("MD-1_M-1_MI-1_CH-4", "CH", "MD-1_CH-4"), + ("MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1_O-3-1_R-2", "O", "MD-4_SM-1_O-3-1_R-2"), + ], +) +def test_strip_module_instance(text: str, search_id: str, expected: str) -> None: + """Test strip_module_instance.""" + assert strip_module_instance(text, search_id) == expected