Skip to content

Commit

Permalink
Resolve TextParameter values in ComObjectRef and Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Sep 15, 2024
1 parent e26a3ca commit fefc0b5
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 48 deletions.
86 changes: 82 additions & 4 deletions test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import pytest

from xknxproject.util import get_dpt_type, parse_dpt_types, strip_module_instance
from xknxproject import util
from xknxproject.models import ParameterInstanceRef


@pytest.mark.parametrize(
Expand All @@ -25,7 +26,7 @@
)
def test_get_dpt_type(dpt_string, expected):
"""Test parsing single DPT from ETS project."""
assert get_dpt_type(dpt_string) == expected
assert util.get_dpt_type(dpt_string) == expected


@pytest.mark.parametrize(
Expand All @@ -48,7 +49,29 @@ def test_get_dpt_type(dpt_string, expected):
)
def test_parse_dpt_types(dpt_string, expected):
"""Test parsing list of DPT from ETS project."""
assert parse_dpt_types(dpt_string) == expected
assert util.parse_dpt_types(dpt_string) == expected


@pytest.mark.parametrize(
("text", "parameter", "expected"),
[
("{{0}}", ParameterInstanceRef("id", "test"), "test"),
("{{0:default}}", ParameterInstanceRef("id", None), "default"),
("{{0:default}}", ParameterInstanceRef("id", "test"), "test"),
("{{0}}", None, ""),
("{{0:default}}", None, "default"),
("Hello {{0}}", ParameterInstanceRef("id", "test"), "Hello test"),
("Hi {{0:def}} again", ParameterInstanceRef("id", None), "Hi def again"),
("Hi{{0:default}}again", ParameterInstanceRef("id", "test"), "Hitestagain"),
("{{1}}", ParameterInstanceRef("id", "test"), "{{1}}"),
("{{XY}}:{{0}}{{ZZ}}", ParameterInstanceRef("id", "test"), "{{XY}}:test{{ZZ}}"),
],
)
def test_text_parameter_template_replace(
text: str, parameter: ParameterInstanceRef | None, expected: str
) -> None:
"""Test strip_module_instance."""
assert util.text_parameter_template_replace(text, parameter) == expected


@pytest.mark.parametrize(
Expand All @@ -61,4 +84,59 @@ def test_parse_dpt_types(dpt_string, expected):
)
def test_strip_module_instance(text: str, search_id: str, expected: str) -> None:
"""Test strip_module_instance."""
assert strip_module_instance(text, search_id) == expected
assert util.strip_module_instance(text, search_id) == expected


@pytest.mark.parametrize(
("ref", "next_id", "expected"),
[
("M-0083_A-0098-12-489B_MD-1_M-1_MI-1_P-43_R-87", "P", "MD-1_M-1_MI-1"),
("MD-1_M-1_MI-1_CH-4", "CH", "MD-1_M-1_MI-1"),
(
"MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1_O-3-1_R-2",
"O",
"MD-4_M-15_MI-1_SM-1_M-1_MI-1-1-2_SM-1",
),
("M-00FA_A-A228-0A-A6C3_O-2002002_R-200200202", "O", ""),
("MD-1_M-1_MI-1_CH-4", "CH", "MD-1_M-1_MI-1"),
("CH-SOM03", "CH", ""),
],
)
def test_get_module_instance_part(ref: str, next_id: str, expected: str) -> None:
"""Test strip_module_instance."""
assert util.get_module_instance_part(ref, next_id) == expected


@pytest.mark.parametrize(
("instance_ref", "instance_next_id", "text_parameter_ref_id", "expected"),
[
(
"MD-2_M-17_MI-1_O-3-0_R-159",
"O",
"M-0083_A-00B0-32-0DFC_MD-2_P-23_R-1",
"M-0083_A-00B0-32-0DFC_MD-2_M-17_MI-1_P-23_R-1",
),
(
"MD-2_M-6_MI-1_CH-1",
"CH",
"M-0083_A-013A-32-DCC1_MD-2_P-1_R-1",
"M-0083_A-013A-32-DCC1_MD-2_M-6_MI-1_P-1_R-1",
),
(
"O-595_R-688",
"O",
"M-0004_A-20D3-11-EC49-O000A_P-875_R-2697", # no module - return same string,
"M-0004_A-20D3-11-EC49-O000A_P-875_R-2697",
),
],
)
def test_text_parameter_insert_module_instance(
instance_ref: str, instance_next_id: str, text_parameter_ref_id: str, expected: str
) -> None:
"""Test strip_module_instance."""
assert (
util.text_parameter_insert_module_instance(
instance_ref, instance_next_id, text_parameter_ref_id
)
== expected
)
14 changes: 7 additions & 7 deletions xknxproject/loader/project_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ def _create_device(
)
)

parameter_instances = [
ParameterInstanceRef(
ref_id=param_instance_node.get("RefId"), # type: ignore[arg-type]
parameter_instances = {}
for param_instance_node in device_element.findall(
"{*}ParameterInstanceRefs/{*}ParameterInstanceRef"
):
pr_ref_id: str = param_instance_node.get("RefId") # type: ignore[assignment]
parameter_instances[pr_ref_id] = ParameterInstanceRef(
ref_id=pr_ref_id,
value=param_instance_node.get("Value"),
)
for param_instance_node in device_element.findall(
"{*}ParameterInstanceRefs/{*}ParameterInstancRef"
)
]

return DeviceInstance(
identifier=device_element.get("Id", ""),
Expand Down
159 changes: 123 additions & 36 deletions xknxproject/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
channels: list[ChannelNode],
com_object_instance_refs: list[ComObjectInstanceRef],
module_instances: list[ModuleInstance],
parameter_instance_refs: list[ParameterInstanceRef],
parameter_instance_refs: dict[str, ParameterInstanceRef],
com_objects: list[ComObject] | None = None,
):
"""Initialize a Device Instance."""
Expand Down Expand Up @@ -194,42 +194,17 @@ def merge_application_program_info(self, application: ApplicationProgram) -> Non
].allocates

for com_instance in self.com_object_instance_refs:
com_instance.merge_application_program_info(application)
com_instance.merge_application_program_info(
application, self.parameter_instance_refs
)
com_instance.apply_module_base_number_argument(
module_instances=self.module_instances,
application=application,
)

for channel in self.channels:
if not channel.name:
application_channel_id = util.strip_module_instance(
channel.ref_id, search_id="CH"
)
application_channel = application.channels[
f"{self.application_program_ref}_{application_channel_id}"
]
channel.name = application_channel.text or application_channel.name
self._complete_channel_placeholders()

def _complete_channel_placeholders(self) -> None:
"""Replace placeholders in channel names with module instance arguments."""
for channel in self.channels:
if not (
channel.ref_id.startswith("MD-") # only applicable if modules used
and "{{" in channel.name # placeholders are denoted "{{name}}"
):
continue

module_instance_ref = channel.ref_id.split("_CH")[0]
module_instance = next(
mi
for mi in self.module_instances
if mi.identifier == module_instance_ref
)
for argument in module_instance.arguments:
channel.name = channel.name.replace(
f"{{{{{argument.name}}}}}", argument.value
)
channel.resolve_channel_name(device_instance=self, application=application)
channel.resolve_channel_module_placeholders(device_instance=self)


@dataclass
Expand All @@ -242,6 +217,73 @@ class ChannelNode:
str
] # name="GroupObjectInstances" type="knx:RELIDREFS" use="optional"

def resolve_channel_name(
self,
device_instance: DeviceInstance,
application: ApplicationProgram,
) -> None:
"""
Resolve the channel name from device instance infos.
Replace TextParameter values in channel names with the
actual values of the parameter instances.
"""
if not self.name:
application_channel_id = util.strip_module_instance(
self.ref_id, search_id="CH"
)
application_channel = application.channels[
f"{device_instance.application_program_ref}_{application_channel_id}"
]
if application_channel.text and application_channel.text_parameter_ref_id:
parameter_instance_ref = util.text_parameter_insert_module_instance(
instance_ref=self.ref_id,
instance_next_id="CH",
text_parameter_ref_id=application_channel.text_parameter_ref_id,
)
try:
parameter = device_instance.parameter_instance_refs[
parameter_instance_ref
]
except KeyError:
_LOGGER.debug(
"ParameterInstanceRef %s not found for Channel %s in device %s (%s)",
parameter_instance_ref,
self.ref_id,
device_instance.identifier,
device_instance.individual_address,
)
parameter = None

self.name = (
util.text_parameter_template_replace(
application_channel.text, parameter
)
or application_channel.name
)
else:
self.name = application_channel.text or application_channel.name

def resolve_channel_module_placeholders(
self,
device_instance: DeviceInstance,
) -> None:
"""Replace module placeholders in channel names with module instance argument values."""
if not (
self.ref_id.startswith("MD-") # only applicable if modules used
and "{{" in self.name # placeholders are denoted "{{name}}"
):
return

module_instance_ref = self.ref_id.split("_CH")[0]
module_instance = next(
mi
for mi in device_instance.module_instances
if mi.identifier == module_instance_ref
)
for argument in module_instance.arguments:
self.name = self.name.replace(f"{{{{{argument.name}}}}}", argument.value)


@dataclass
class ModuleInstance:
Expand Down Expand Up @@ -343,7 +385,11 @@ def resolve_com_object_ref_id(
self.application_program_id_prefix = f"{application_program_ref}_"
self.com_object_ref_id = f"{application_program_ref}_{ref_id}"

def merge_application_program_info(self, application: ApplicationProgram) -> None:
def merge_application_program_info(
self,
application: ApplicationProgram,
parameters: dict[str, ParameterInstanceRef],
) -> None:
"""Fill missing information with information parsed from the application program."""
if self.com_object_ref_id is None:
_LOGGER.warning(
Expand All @@ -352,16 +398,30 @@ def merge_application_program_info(self, application: ApplicationProgram) -> Non
)
return
com_object_ref = application.com_object_refs[self.com_object_ref_id]
self._merge_from_parent_object(com_object_ref, parameters=parameters)

com_object = application.com_objects[com_object_ref.ref_id]
self._merge_from_parent_object(com_object_ref)
self._merge_from_parent_object(com_object)
self._merge_from_parent_object(com_object, parameters=parameters)

def _merge_from_parent_object(self, com_object: ComObject | ComObjectRef) -> None:
def _merge_from_parent_object(
self,
com_object: ComObject | ComObjectRef,
parameters: dict[str, ParameterInstanceRef],
) -> None:
"""Fill missing information with information parsed from the application program."""
if self.name is None:
self.name = com_object.name
if self.text is None:
self.text = com_object.text
if isinstance(com_object, ComObjectRef):
self.text = (
com_object.com_object_ref_text_with_paramter(
com_object_instance_ref_id=self.ref_id,
instance_parameters=parameters,
)
or com_object.text
)
else:
self.text = com_object.text
if self.function_text is None:
self.function_text = com_object.function_text
if self.object_size is None:
Expand Down Expand Up @@ -631,6 +691,33 @@ class ComObjectRef:
datapoint_types: list[DPTType] # "DataPointType" - knx:IDREFS
text_parameter_ref_id: str | None # type="knx:IDREF" use="optional"

def com_object_ref_text_with_paramter(
self,
com_object_instance_ref_id: str,
instance_parameters: dict[str, ParameterInstanceRef],
) -> str | None:
"""Return the text with parameter if available."""
if self.text and self.text_parameter_ref_id:
parameter_instance_ref = util.text_parameter_insert_module_instance(
instance_ref=com_object_instance_ref_id,
instance_next_id="O",
text_parameter_ref_id=self.text_parameter_ref_id,
)
try:
parameter = instance_parameters[parameter_instance_ref]
except KeyError:
_LOGGER.debug(
"ParameterInstanceRef %s for ComObjectRef %s not found.",
parameter_instance_ref,
self.identifier,
)
parameter = None
return util.text_parameter_template_replace(
self.text or "",
parameter=parameter,
)
return None


@dataclass
class KNXMasterData:
Expand Down
Loading

0 comments on commit fefc0b5

Please sign in to comment.