From aa7fb69ed91e05582131b69fbe29443a41bd7c35 Mon Sep 17 00:00:00 2001 From: Matthias Alphart Date: Sat, 21 Oct 2023 16:58:18 +0200 Subject: [PATCH] Restructure parser module (#311) * Restructure parser module * recursive convert spaces on all levels * recursive convert ranges on all levels * more dict comprehensions --- xknxproject/xml/parser.py | 395 +++++++++++++++++++------------------- 1 file changed, 199 insertions(+), 196 deletions(-) diff --git a/xknxproject/xml/parser.py b/xknxproject/xml/parser.py index e615038..a46fca1 100644 --- a/xknxproject/xml/parser.py +++ b/xknxproject/xml/parser.py @@ -45,188 +45,40 @@ _LOGGER = logging.getLogger("xknxproject.log") -class XMLParser: - """Class that parses XMLs and returns useful information.""" - - def __init__(self, knx_proj_contents: KNXProjContents) -> None: - """Initialize the parser.""" - self.knx_proj_contents = knx_proj_contents - self.spaces: list[XMLSpace] = [] - self.group_addresses: list[XMLGroupAddress] = [] - self.group_ranges: list[XMLGroupRange] = [] - self.areas: list[XMLArea] = [] - self.devices: list[DeviceInstance] = [] - self.language_code: str | None = None - - self.project_info: XMLProjectInformation - self.functions: list[XMLFunction] = [] - - def parse(self, language: str | None = None) -> KNXProject: - """Parse ETS files.""" - self.load(language=language) - self.sort() - _ga_id_to_address = {ga.identifier: ga.address for ga in self.group_addresses} - - communication_objects: dict[str, CommunicationObject] = {} - devices_dict: dict[str, Device] = {} - for device in self.devices: - device_com_objects: list[str] = [] - for com_object in device.com_object_instance_refs: - if not com_object.links: - continue - group_address_links = [ - valid_link - for link in com_object.links - if (valid_link := _ga_id_to_address.get(link)) - ] - if not group_address_links: - # skip orphaned ComObjectInstanceRef pointing only to non-existent GroupAddress - # see https://github.com/XKNX/knx-frontend/issues/71 - continue - com_object_key = f"{device.individual_address}/{com_object.ref_id}" - communication_objects[com_object_key] = CommunicationObject( - name=com_object.name, # type: ignore[typeddict-item] - number=com_object.number, # type: ignore[typeddict-item] - text=com_object.text, # type: ignore[typeddict-item] - function_text=com_object.function_text, # type: ignore[typeddict-item] - description=com_object.description or "", - device_address=device.individual_address, - dpts=com_object.datapoint_types, - object_size=com_object.object_size, # type: ignore[typeddict-item] - flags=Flags( - read=com_object.read_flag, # type: ignore[typeddict-item] - write=com_object.write_flag, # type: ignore[typeddict-item] - communication=com_object.communication_flag, # type: ignore[typeddict-item] - update=com_object.update_flag, # type: ignore[typeddict-item] - read_on_init=com_object.read_on_init_flag, # type: ignore[typeddict-item] - transmit=com_object.transmit_flag, # type: ignore[typeddict-item] - ), - group_address_links=group_address_links, - ) - device_com_objects.append(com_object_key) - - devices_dict[device.individual_address] = Device( - name=device.product_name, - hardware_name=device.hardware_name, - description=device.description, - manufacturer_name=device.manufacturer_name, - individual_address=device.individual_address, - project_uid=device.project_uid, - communication_object_ids=device_com_objects, - ) - - topology_dict: dict[str, Area] = {} - for area in self.areas: - lines_dict: dict[str, Line] = {} - for line in area.lines: - devices_topology: list[str] = [] - for device in line.devices: - devices_topology.append(device.individual_address) - lines_dict[str(line.address)] = Line( - name=line.name, - description=line.description, - devices=devices_topology, - medium_type=MEDIUM_TYPES.get(line.medium_type, "Unknown"), - ) - topology_dict[str(area.address)] = Area( - name=area.name, description=area.description, lines=lines_dict - ) - - group_address_dict: dict[str, GroupAddress] = {} - for group_address in self.group_addresses: - _com_object_ids = [ - com_object_id - for com_object_id, com_object in communication_objects.items() - if group_address.address in com_object["group_address_links"] - ] - group_address_dict[group_address.address] = GroupAddress( - name=group_address.name, - identifier=group_address.identifier, - raw_address=group_address.raw_address, - address=group_address.address, - project_uid=group_address.project_uid, - dpt=group_address.dpt, - data_secure=bool(group_address.data_secure_key), - communication_object_ids=_com_object_ids, - description=group_address.description, - comment=html.unescape(rtf_to_text(group_address.comment)), - ) - - group_range_dict: dict[str, GroupRange] = {} - for group_range in self.group_ranges: - group_range_dict[group_range.str_address()] = self.convert_group_range( - group_range, self.project_info.group_address_style - ) - - space_dict: dict[str, Space] = {} - for space in self.spaces: - space_dict[space.name] = self.recursive_convert_spaces(space) - - functions_dict: dict[str, Function] = {} - for function in self.functions: - functions_dict[function.identifier] = self.convert_functions(function) - - info = ProjectInfo( - project_id=self.project_info.project_id, - name=self.project_info.name, - last_modified=self.project_info.last_modified, - group_address_style=self.project_info.group_address_style.value, - guid=self.project_info.guid, - created_by=self.project_info.created_by, - schema_version=self.project_info.schema_version, - tool_version=self.project_info.tool_version, - xknxproject_version=__version__, - language_code=self.language_code, - ) - - return KNXProject( - info=info, - communication_objects=communication_objects, - topology=topology_dict, - devices=devices_dict, - group_addresses=group_address_dict, - group_ranges=group_range_dict, - locations=space_dict, - functions=functions_dict, - ) - - def convert_group_address_ref( - self, group_address_ref: XMLGroupAddressRef - ) -> GroupAddressRef: - """Convert group address ref to the final output format.""" - return GroupAddressRef( - address=group_address_ref.address, - name=group_address_ref.name, - project_uid=group_address_ref.project_uid, - role=group_address_ref.role, - ) - - def convert_functions(self, function: XMLFunction) -> Function: - """Convert function to the final output format.""" - - ga_dict = {} - for group_address in function.group_addresses: - ga_dict[group_address.address] = self.convert_group_address_ref( - group_address - ) - - return Function( - function_type=function.function_type, - group_addresses=ga_dict, - identifier=function.identifier, - name=function.name, - project_uid=function.project_uid, - space_id=function.space_id, - usage_text=function.usage_text, - ) - - def recursive_convert_spaces(self, space: XMLSpace) -> Space: - """Convert spaces to the final output format.""" - subspaces: dict[str, Space] = {} - for subspace in space.spaces: - subspaces[subspace.name] = self.recursive_convert_spaces(subspace) - - return Space( +def _convert_group_address_ref( + group_address_ref: XMLGroupAddressRef, +) -> GroupAddressRef: + """Convert group address ref to the final output format.""" + return GroupAddressRef( + address=group_address_ref.address, + name=group_address_ref.name, + project_uid=group_address_ref.project_uid, + role=group_address_ref.role, + ) + + +def _convert_functions(function: XMLFunction) -> Function: + """Convert function to the final output format.""" + + ga_dict = {} + for group_address in function.group_addresses: + ga_dict[group_address.address] = _convert_group_address_ref(group_address) + + return Function( + function_type=function.function_type, + group_addresses=ga_dict, + identifier=function.identifier, + name=function.name, + project_uid=function.project_uid, + space_id=function.space_id, + usage_text=function.usage_text, + ) + + +def _recursive_convert_spaces(spaces: list[XMLSpace]) -> dict[str, Space]: + """Convert spaces to the final output format.""" + return { + space.name: Space( type=space.space_type.value, identifier=space.identifier, name=space.name, @@ -236,21 +88,20 @@ def recursive_convert_spaces(self, space: XMLSpace) -> Space: description=space.description, project_uid=space.project_uid, devices=space.devices, - spaces=subspaces, + spaces=_recursive_convert_spaces(space.spaces), functions=space.functions, ) + for space in spaces + } - def convert_group_range( - self, group_range: XMLGroupRange, group_address_style: GroupAddressStyle - ) -> GroupRange: - """Convert XMLGroupRange into GroupRange.""" - group_ranges: dict[str, GroupRange] = {} - for child_gr in group_range.group_ranges: - group_ranges[child_gr.str_address()] = self.convert_group_range( - child_gr, group_address_style - ) - return GroupRange( +def _recursive_convert_group_range( + group_ranges: list[XMLGroupRange], + group_address_style: GroupAddressStyle, +) -> dict[str, GroupRange]: + """Convert XMLGroupRange into GroupRange.""" + return { + group_range.str_address(): GroupRange( name=group_range.name, address_start=group_range.range_start, address_end=group_range.range_end, @@ -259,10 +110,38 @@ def convert_group_range( for ga in group_range.group_addresses ], comment=html.unescape(rtf_to_text(group_range.comment)), - group_ranges=group_ranges, + group_ranges=_recursive_convert_group_range( + group_range.group_ranges, + group_address_style, + ), ) + for group_range in group_ranges + } + + +class XMLParser: + """Class that parses XMLs and returns useful information.""" + + def __init__(self, knx_proj_contents: KNXProjContents) -> None: + """Initialize the parser.""" + self.knx_proj_contents = knx_proj_contents + self.spaces: list[XMLSpace] = [] + self.group_addresses: list[XMLGroupAddress] = [] + self.group_ranges: list[XMLGroupRange] = [] + self.areas: list[XMLArea] = [] + self.devices: list[DeviceInstance] = [] + self.language_code: str | None = None + + self.project_info: XMLProjectInformation + self.functions: list[XMLFunction] = [] - def load(self, language: str | None) -> None: + def parse(self, language: str | None = None) -> KNXProject: + """Parse ETS project.""" + self._load(language=language) + self._sort() + return self._transform() + + def _load(self, language: str | None) -> None: """Load XML files.""" ( knx_master_data, @@ -347,7 +226,7 @@ def load(self, language: str | None) -> None: language_code=self.language_code, ) - def sort(self) -> None: + def _sort(self) -> None: """Sort loaded structures as XML content is sorted by creation time.""" def recursive_sort_spaces(spaces: list[XMLSpace]) -> None: @@ -375,3 +254,127 @@ def recursive_sort_group_ranges(group_ranges: list[XMLGroupRange]) -> None: self.areas.sort(key=attrgetter("address")) self.devices.sort(key=attrgetter("area_address", "line_address", "address")) + + def _transform(self) -> KNXProject: + """Convert XML Data to KNXProject structure.""" + _ga_id_to_address = {ga.identifier: ga.address for ga in self.group_addresses} + + communication_objects: dict[str, CommunicationObject] = {} + devices_dict: dict[str, Device] = {} + for device in self.devices: + device_com_objects: list[str] = [] + for com_object in device.com_object_instance_refs: + if not com_object.links: + continue + group_address_links = [ + valid_link + for link in com_object.links + if (valid_link := _ga_id_to_address.get(link)) + ] + if not group_address_links: + # skip orphaned ComObjectInstanceRef pointing only to non-existent GroupAddress + # see https://github.com/XKNX/knx-frontend/issues/71 + continue + com_object_key = f"{device.individual_address}/{com_object.ref_id}" + communication_objects[com_object_key] = CommunicationObject( + name=com_object.name, # type: ignore[typeddict-item] + number=com_object.number, # type: ignore[typeddict-item] + text=com_object.text, # type: ignore[typeddict-item] + function_text=com_object.function_text, # type: ignore[typeddict-item] + description=com_object.description or "", + device_address=device.individual_address, + dpts=com_object.datapoint_types, + object_size=com_object.object_size, # type: ignore[typeddict-item] + flags=Flags( + read=com_object.read_flag, # type: ignore[typeddict-item] + write=com_object.write_flag, # type: ignore[typeddict-item] + communication=com_object.communication_flag, # type: ignore[typeddict-item] + update=com_object.update_flag, # type: ignore[typeddict-item] + read_on_init=com_object.read_on_init_flag, # type: ignore[typeddict-item] + transmit=com_object.transmit_flag, # type: ignore[typeddict-item] + ), + group_address_links=group_address_links, + ) + device_com_objects.append(com_object_key) + + devices_dict[device.individual_address] = Device( + name=device.product_name, + hardware_name=device.hardware_name, + description=device.description, + manufacturer_name=device.manufacturer_name, + individual_address=device.individual_address, + project_uid=device.project_uid, + communication_object_ids=device_com_objects, + ) + + topology_dict: dict[str, Area] = {} + for area in self.areas: + lines_dict: dict[str, Line] = {} + for line in area.lines: + devices_topology: list[str] = [] + for device in line.devices: + devices_topology.append(device.individual_address) + lines_dict[str(line.address)] = Line( + name=line.name, + description=line.description, + devices=devices_topology, + medium_type=MEDIUM_TYPES.get(line.medium_type, "Unknown"), + ) + topology_dict[str(area.address)] = Area( + name=area.name, description=area.description, lines=lines_dict + ) + + group_address_dict: dict[str, GroupAddress] = { + group_address.address: GroupAddress( + name=group_address.name, + identifier=group_address.identifier, + raw_address=group_address.raw_address, + address=group_address.address, + project_uid=group_address.project_uid, + dpt=group_address.dpt, + data_secure=bool(group_address.data_secure_key), + communication_object_ids=[ + com_object_id + for com_object_id, com_object in communication_objects.items() + if group_address.address in com_object["group_address_links"] + ], + description=group_address.description, + comment=html.unescape(rtf_to_text(group_address.comment)), + ) + for group_address in self.group_addresses + } + + group_range_dict: dict[str, GroupRange] = _recursive_convert_group_range( + self.group_ranges, self.project_info.group_address_style + ) + + space_dict: dict[str, Space] = _recursive_convert_spaces(self.spaces) + + functions_dict: dict[str, Function] = { + function.identifier: _convert_functions(function) + for function in self.functions + } + + info = ProjectInfo( + project_id=self.project_info.project_id, + name=self.project_info.name, + last_modified=self.project_info.last_modified, + group_address_style=self.project_info.group_address_style.value, + guid=self.project_info.guid, + created_by=self.project_info.created_by, + schema_version=self.project_info.schema_version, + tool_version=self.project_info.tool_version, + xknxproject_version=__version__, + language_code=self.language_code, + ) + + return KNXProject( + info=info, + communication_objects=communication_objects, + topology=topology_dict, + devices=devices_dict, + group_addresses=group_address_dict, + group_ranges=group_range_dict, + locations=space_dict, + functions=functions_dict, + )