From 7b306691c4c09a7a4f9a6bd7dd9f150253ee692d Mon Sep 17 00:00:00 2001 From: Pergola Fabio Date: Tue, 12 Sep 2023 11:22:58 +0200 Subject: [PATCH] dynamicly add door relays for indoor --- hikvision-doorbell/src/doorbell.py | 42 ++++++++++++++++++++---------- hikvision-doorbell/src/mqtt.py | 2 +- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/hikvision-doorbell/src/doorbell.py b/hikvision-doorbell/src/doorbell.py index e42d3f7..f1fc2a6 100644 --- a/hikvision-doorbell/src/doorbell.py +++ b/hikvision-doorbell/src/doorbell.py @@ -99,7 +99,11 @@ def unlock_com(self, com_id: int): "switch": "open" } } - self._call_isapi("PUT", url, json.dumps(requestBody)) + try: + self._call_isapi("PUT", url, json.dumps(requestBody)) + except SDKError as err: + # If error code is 10 (NET_DVR_NETWORK_RECV_TIMEOUT) suppress it, + raise err logger.info(" Com {} unlocked by ISAPI", com_id +1) def unlock_door(self, lock_id: int): @@ -185,20 +189,34 @@ def _call_isapi(self, http_method: str, url: str, requestBody: str = "") -> str: def get_num_outputs_indoor(self) -> int: """ - Get the number of output relays configured for the indoor station, manually only. + Get the number of output relays configured for the indoor station """ - # Define various functions, each using a different method to gather this information def user_config() -> int: if self._config.output_relays is not None: logger.debug("Using the configured number of switches: {}", self._config.output_relays) return self._config.output_relays - else: - return 0 + logger.debug("No manual config found to define output relays for indoor") raise RuntimeError("No user configuration specified") - + + def isapi_door_capabilities() -> int: + io_doors_xml = self._call_isapi("GET", "/ISAPI/AccessControl/RemoteControl/door/capabilities") + try: + root = ET.fromstring(io_doors_xml) + door_number_element = root.find('{*}channelNo') + except ET.ParseError: + logger.debug("Error parsing: {}", io_doors_xml) + return 0 + # Error out if we don't find attribute `max` inside the `doorNo` element + if door_number_element is None : + # Print a string representation of the response XML + logger.debug("No door relays found for the indoor device") + return 0 + #logger.debug("We have found {} door relays for the indoor device", door_number_element.text) + return door_number_element.text + # Define the list of available endpoints to try - available_endpoints: list[Callable] = [user_config] + available_endpoints: list[Callable] = [user_config, isapi_door_capabilities] for endpoint in available_endpoints: # Invoke the endpoint, if it errors out try another one try: @@ -256,7 +274,6 @@ def sdk_device_ability() -> int: def isapi_io_outputs() -> int: io_outputs_xml = self._call_isapi("GET", "/ISAPI/System/IO/outputs") - logger.debug("Response url for /ISAPI/System/IO/outputs: {}", io_outputs_xml) root = ET.fromstring(io_outputs_xml) if 'IOOutputPortList' not in root.tag: # XML does not contain the required tag @@ -266,7 +283,6 @@ def isapi_io_outputs() -> int: def isapi_remote_control() -> int: # Device does not support previous ISAPI endpoint, try another door_capabilities_xml = self._call_isapi("GET", "/ISAPI/AccessControl/RemoteControl/door/capabilities") - logger.debug("Response url for /ISAPI/AccessControl/RemoteControl/door/capabilities: {}", door_capabilities_xml) root = ET.fromstring(door_capabilities_xml) door_number_element = root.find('{*}doorNo') @@ -294,13 +310,12 @@ def isapi_remote_control() -> int: def get_num_coms_indoor(self) -> int: """ Get the number of com relays configured for this doorbell. + We can also use this method: POST /ISAPI/SecurityCP/status/outputStatus?format=json {"OutputCond":{"maxResults":2,"outputModuleNo":0,"searchID":"1","searchResultPosition":0}} """ - # Define various functions, each using a different method to gather this information def isapi_device_info() -> int: io_coms_xml = self._call_isapi("GET", "/ISAPI/System/deviceInfo") - logger.debug("Response url for /ISAPI/System/IO/outputs: {}", io_coms_xml) root = ET.fromstring(io_coms_xml) com_number_element = root.find('{*}alarmOutNum') # Error out if we don't find attribute `max` inside the `doorNo` element @@ -308,7 +323,7 @@ def isapi_device_info() -> int: # Print a string representation of the response XML logger.debug("No com ports found for the indoor device") return 0 - logger.debug("We have found {} com ports found for the indoor device", com_number_element.text) + logger.debug("We have found {} com ports for the indoor device", com_number_element.text) return int(com_number_element.text) # Define the list of available endpoints to try @@ -322,8 +337,7 @@ def isapi_device_info() -> int: pass # We have run out of available endpoints to call - #raise RuntimeError("Unable to get the number of doors, please configure the relays manually with this option in the config: output_relays") - + raise RuntimeError("Unable to get the number of coms") def get_device_info(self): """Retrieve device information (model, sw version, etc) using the ISAPI endpoint. diff --git a/hikvision-doorbell/src/mqtt.py b/hikvision-doorbell/src/mqtt.py index 930b6ba..26d25f4 100644 --- a/hikvision-doorbell/src/mqtt.py +++ b/hikvision-doorbell/src/mqtt.py @@ -140,7 +140,7 @@ def __init__(self, config: AppConfig.MQTT, doorbells: Registry) -> None: if not doorbell._type is DeviceType.INDOOR: num_doors = doorbell.get_num_outputs() else: - num_doors = doorbell.get_num_outputs_indoor() + num_doors = int(doorbell.get_num_outputs_indoor()) logger.debug("Configuring {} door switches", num_doors) for door_id in range(num_doors): door_switch_info = SwitchInfo(