Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inherit channel name from application channel #462

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from xknxproject.util import get_dpt_type, parse_dpt_types
from xknxproject.util import get_dpt_type, parse_dpt_types, strip_module_instance


@pytest.mark.parametrize(
Expand Down Expand Up @@ -49,3 +49,16 @@ def test_get_dpt_type(dpt_string, expected):
def test_parse_dpt_types(dpt_string, expected):
"""Test parsing list of DPT from ETS project."""
assert parse_dpt_types(dpt_string) == expected


@pytest.mark.parametrize(
("text", "search_id", "expected"),
[
("CH-4", "CH", "CH-4"),
("MD-1_M-1_MI-1_CH-4", "CH", "MD-1_CH-4"),
("MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1_O-3-1_R-2", "O", "MD-4_SM-1_O-3-1_R-2"),
],
)
def test_strip_module_instance(text: str, search_id: str, expected: str) -> None:
"""Test strip_module_instance."""
assert strip_module_instance(text, search_id) == expected
48 changes: 35 additions & 13 deletions xknxproject/loader/application_program_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from __future__ import annotations

from collections.abc import Iterator
import logging
from typing import Any
from xml.etree import ElementTree
from zipfile import Path

from xknxproject.models import (
Allocator,
ApplicationProgram,
ApplicationProgramChannel,
ComObject,
ComObjectRef,
DeviceInstance,
Expand All @@ -19,8 +19,6 @@
)
from xknxproject.util import parse_dpt_types, parse_xml_flag

_LOGGER = logging.getLogger("xknxproject.log")


class ApplicationProgramLoader:
"""Load the application program from KNX XML."""
Expand Down Expand Up @@ -51,6 +49,9 @@ def load(
for attribute in device.module_instance_arguments()
}
numeric_args: dict[str, ModuleDefinitionNumericArg] = {}
channels: dict[
str, ApplicationProgramChannel
] = {} # {Id: ApplicationProgramChannel}
allocators: dict[str, Allocator] = {}

with application_program_path.open(mode="rb") as application_xml:
Expand Down Expand Up @@ -95,6 +96,16 @@ def load(
value=int(value) if value is not None else None,
)
elem.clear()
elif elem.tag.endswith("Channel"):
_id = elem.attrib.get("Id")
channels[_id] = ApplicationProgramChannel(
identifier=_id,
name=elem.attrib.get("Name"),
number=elem.attrib.get("Number"),
text=elem.attrib.get("Text"),
text_parameter_ref_id=elem.attrib.get("TextParameterRefId"),
)
elem.clear()
elif elem.tag.endswith("Languages"):
elem.clear()
# hold iterator for optional translation parsing
Expand All @@ -107,6 +118,7 @@ def load(
com_objects=com_objects,
com_object_refs=com_object_refs,
used_com_object_ref_ids=used_com_object_ref_ids,
channels=channels,
language_code=language_code,
)

Expand All @@ -116,6 +128,7 @@ def load(
allocators=allocators,
module_def_arguments=used_module_arguments,
numeric_args=numeric_args,
channels=channels,
)

@staticmethod
Expand All @@ -124,21 +137,24 @@ def parse_translations(
com_objects: dict[str, ComObject],
com_object_refs: dict[str, ComObjectRef],
used_com_object_ref_ids: set[str],
channels: dict[str, ApplicationProgramChannel],
language_code: str,
) -> None:
"""Parse translations. Replace translated text in com_objects and com_object_refs."""
used_com_object_ids = {
_used_com_object_ids = {
com_object_ref.ref_id for com_object_ref in com_object_refs.values()
}
used_translation_ids = used_com_object_ids | used_com_object_ref_ids
used_translation_ids = (
_used_com_object_ids | used_com_object_ref_ids | channels.keys()
)
in_language = False
in_translation_ref: str | None = None # TranslationElement RefId
# translation_map: {TranslationElement RefId: {AttributeName: Text}}
translation_map: dict[str, dict[str, str]] = {}
for _, elem in tree_iterator:
if elem.tag.endswith("Language"):
if in_language:
# Already found the language we are looking for.
# Hitting the next language tag after the one we were looking for.
# We don't need anything after that tag (there isn't much anyway)
elem.clear()
break
Expand All @@ -158,6 +174,7 @@ def parse_translations(

ApplicationProgramLoader.apply_translations(com_object_refs, translation_map)
ApplicationProgramLoader.apply_translations(com_objects, translation_map)
ApplicationProgramLoader.apply_translations(channels, translation_map)

@staticmethod
def parse_com_object(
Expand Down Expand Up @@ -202,21 +219,26 @@ def parse_com_object_ref(
update_flag=parse_xml_flag(elem.get("UpdateFlag")),
read_on_init_flag=parse_xml_flag(elem.get("ReadOnInitFlag")),
datapoint_types=parse_dpt_types(elem.get("DatapointType")),
text_parameter_ref_id=elem.get("TextParameterRefId"),
)

@staticmethod
def apply_translations(
com_objects: dict[str, ComObject] | dict[str, ComObjectRef],
translatable_object_map: dict[str, ComObject]
| dict[str, ComObjectRef]
| dict[str, ApplicationProgramChannel],
translation_map: dict[str, dict[str, str]],
) -> None:
"""Apply translations to ComObjects and ComObjectRefs."""
for identifier in com_objects.keys() & translation_map.keys():
"""Apply translations to Objects."""
for identifier in translatable_object_map.keys() & translation_map.keys():
translation = translation_map[identifier]
com_object = com_objects[identifier]
obj = translatable_object_map[identifier]
if _text := translation.get("Text"):
com_object.text = _text
if _function_text := translation.get("FunctionText"):
com_object.function_text = _function_text
obj.text = _text
if hasattr(obj, "function_text") and (
_function_text := translation.get("FunctionText")
):
obj.function_text = _function_text

@staticmethod
def get_application_program_files_for_devices(
Expand Down
12 changes: 12 additions & 0 deletions xknxproject/loader/project_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
KNXMasterData,
ModuleInstance,
ModuleInstanceArgument,
ParameterInstanceRef,
SpaceType,
XMLArea,
XMLFunction,
Expand Down Expand Up @@ -277,6 +278,16 @@ def _create_device(
)
)

parameter_instances = [
ParameterInstanceRef(
ref_id=param_instance_node.get("RefId"), # type: ignore[arg-type]
value=param_instance_node.get("Value"),
)
for param_instance_node in device_element.findall(
"{*}ParameterInstanceRefs/{*}ParameterInstancRef"
)
]

return DeviceInstance(
identifier=device_element.get("Id", ""),
address=int(address),
Expand All @@ -292,6 +303,7 @@ def _create_device(
channels=channels,
com_object_instance_refs=com_obj_inst_refs,
module_instances=module_instances,
parameter_instance_refs=parameter_instances,
)

@staticmethod
Expand Down
2 changes: 2 additions & 0 deletions xknxproject/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
)
from .models import (
ApplicationProgram,
ApplicationProgramChannel,
ParameterInstanceRef,
Allocator,
ChannelNode,
ComObject,
Expand Down
61 changes: 47 additions & 14 deletions xknxproject/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import re

from xknxproject import util
from xknxproject.models.knxproject import DPTType, ModuleInstanceInfos
from xknxproject.models.static import GroupAddressStyle, SpaceType
from xknxproject.zip import KNXProjContents
Expand Down Expand Up @@ -137,6 +138,7 @@
channels: list[ChannelNode],
com_object_instance_refs: list[ComObjectInstanceRef],
module_instances: list[ModuleInstance],
parameter_instance_refs: list[ParameterInstanceRef],
com_objects: list[ComObject] | None = None,
):
"""Initialize a Device Instance."""
Expand All @@ -157,6 +159,7 @@
self.com_object_instance_refs = com_object_instance_refs
self.module_instances = module_instances
self.com_objects = com_objects or []
self.parameter_instance_refs = parameter_instance_refs
self.application_program_ref: str | None = None

self.individual_address = (
Expand Down Expand Up @@ -197,6 +200,15 @@
application=application,
)

for channel in self.channels:
if not channel.name:
application_channel_id = util.strip_module_instance(

Check warning on line 205 in xknxproject/models/models.py

View check run for this annotation

Codecov / codecov/patch

xknxproject/models/models.py#L205

Added line #L205 was not covered by tests
channel.ref_id, search_id="CH"
)
application_channel = application.channels[

Check warning on line 208 in xknxproject/models/models.py

View check run for this annotation

Codecov / codecov/patch

xknxproject/models/models.py#L208

Added line #L208 was not covered by tests
f"{self.application_program_ref}_{application_channel_id}"
]
channel.name = application_channel.text or application_channel.name

Check warning on line 211 in xknxproject/models/models.py

View check run for this annotation

Codecov / codecov/patch

xknxproject/models/models.py#L211

Added line #L211 was not covered by tests
self._complete_channel_placeholders()

def _complete_channel_placeholders(self) -> None:
Expand Down Expand Up @@ -327,20 +339,7 @@
self.com_object_ref_id = self.ref_id
return

if self.ref_id.startswith("O-"):
ref_id = self.ref_id
elif self.ref_id.startswith("MD-"):
# Remove module and ModuleInstance occurrence as they will not be in the application program directly
module_definition = self.ref_id.split("_")[0]
object_reference = self.ref_id[self.ref_id.index("_O-") :]
_submodule_match = re.search(r"(_SM-[^_]+)", self.ref_id)
submodule = _submodule_match.group() if _submodule_match is not None else ""
ref_id = f"{module_definition}{submodule}{object_reference}"
else:
raise ValueError(
f"Unknown ref_id format: {self.ref_id} in application: {application_program_ref}"
)

ref_id = util.strip_module_instance(self.ref_id, search_id="O")
self.application_program_id_prefix = f"{application_program_ref}_"
self.com_object_ref_id = f"{application_program_ref}_{ref_id}"

Expand Down Expand Up @@ -482,6 +481,14 @@
)


@dataclass
class ParameterInstanceRef:
"""ParameterInstanceRef."""

ref_id: str
value: str | None


@dataclass
class ApplicationProgram:
"""Class that represents an ApplicationProgram instance."""
Expand All @@ -491,6 +498,7 @@
allocators: dict[str, Allocator] # {Id: Allocator}
module_def_arguments: dict[str, ModuleDefinitionArgumentInfo] # {Id: ...}
numeric_args: dict[str, ModuleDefinitionNumericArg] # {RefId: ...}
channels: dict[str, ApplicationProgramChannel] # {Id: ApplicationProgramChannel}


@dataclass
Expand Down Expand Up @@ -523,6 +531,29 @@
base_value: str | None


@dataclass
class ApplicationProgramChannel:
"""ApplicationProgramChannel."""

__slots__ = (
"identifier",
"text",
"text_parameter_ref_id",
"name",
"number",
)

identifier: str # name="Id" type="xs:ID" use="required"
text: (
str | None
) # name="Text" type="knx:LanguageDependentString255_t" use="optional"
text_parameter_ref_id: (
str | None
) # name="TextParameterRefId" type="knx:RELIDREF" use="optional"
name: str # name="Name" type="knx:String255_t" use="required"
number: str # name="Number" type="knx:String50_t" use="required"


@dataclass
class ComObject:
"""Class that represents a ComObject instance."""
Expand Down Expand Up @@ -582,6 +613,7 @@
"update_flag",
"read_on_init_flag",
"datapoint_types",
"text_parameter_ref_id",
)

identifier: str # "Id" - xs:ID - required
Expand All @@ -597,6 +629,7 @@
update_flag: bool | None # "UpdateFlag" - knx:Enable_t
read_on_init_flag: bool | None # "ReadOnInitFlag" - knx:Enable_t
datapoint_types: list[DPTType] # "DataPointType" - knx:IDREFS
text_parameter_ref_id: str | None # type="knx:IDREF" use="optional"


@dataclass
Expand Down
24 changes: 24 additions & 0 deletions xknxproject/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
import re
from typing import overload

from xknxproject.const import MAIN_AND_SUB_DPT, MAIN_DPT
Expand Down Expand Up @@ -65,3 +66,26 @@ def parse_xml_flag(flag: str | None, default: bool | None = None) -> bool | None
if flag is None:
return default
return flag == "Enabled"


def strip_module_instance(text: str, search_id: str) -> str:
"""
Remove module and module instance from text, keep module definition and rest.

text: full text to be processed
search_id: search term to be kept without "-" eg. "CH" for channel

Examples
--------
search_id="CH": "CH-4" -> "CH-4"
search_id="CH": "MD-1_M-1_MI-1_CH-4" -> "MD-1_CH-4"
search_id="O": "MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1_O-3-1_R-2" -> "MD-4_SM-1_O-3-1_R-2"

"""
# For submodules SM- must be the last item before search_id
# because I couldn't create a regex that works otherwise :(
return re.sub(
r"(MD-\w+_)?.*?(SM-\w+_)?(" + re.escape(search_id) + r"-.*)",
lambda matchobj: "".join(part for part in matchobj.groups() if part),
text,
)