Skip to content

Commit

Permalink
dynamicly add door relays for indoor
Browse files Browse the repository at this point in the history
  • Loading branch information
pergolafabio committed Sep 12, 2023
1 parent a5c6f86 commit 7b30669
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
42 changes: 28 additions & 14 deletions hikvision-doorbell/src/doorbell.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ def unlock_com(self, com_id: int):
"switch": "open"
}
}
self._call_isapi("PUT", url, json.dumps(requestBody))
try:
self._call_isapi("PUT", url, json.dumps(requestBody))
except SDKError as err:
# If error code is 10 (NET_DVR_NETWORK_RECV_TIMEOUT) suppress it,
raise err
logger.info(" Com {} unlocked by ISAPI", com_id +1)

def unlock_door(self, lock_id: int):
Expand Down Expand Up @@ -185,20 +189,34 @@ def _call_isapi(self, http_method: str, url: str, requestBody: str = "") -> str:

def get_num_outputs_indoor(self) -> int:
"""
Get the number of output relays configured for the indoor station, manually only.
Get the number of output relays configured for the indoor station
"""

# Define various functions, each using a different method to gather this information
def user_config() -> int:
if self._config.output_relays is not None:
logger.debug("Using the configured number of switches: {}", self._config.output_relays)
return self._config.output_relays
else:
return 0
logger.debug("No manual config found to define output relays for indoor")
raise RuntimeError("No user configuration specified")


def isapi_door_capabilities() -> int:
io_doors_xml = self._call_isapi("GET", "/ISAPI/AccessControl/RemoteControl/door/capabilities")
try:
root = ET.fromstring(io_doors_xml)
door_number_element = root.find('{*}channelNo')
except ET.ParseError:
logger.debug("Error parsing: {}", io_doors_xml)
return 0
# Error out if we don't find attribute `max` inside the `doorNo` element
if door_number_element is None :
# Print a string representation of the response XML
logger.debug("No door relays found for the indoor device")
return 0
#logger.debug("We have found {} door relays for the indoor device", door_number_element.text)
return door_number_element.text

# Define the list of available endpoints to try
available_endpoints: list[Callable] = [user_config]
available_endpoints: list[Callable] = [user_config, isapi_door_capabilities]
for endpoint in available_endpoints:
# Invoke the endpoint, if it errors out try another one
try:
Expand Down Expand Up @@ -256,7 +274,6 @@ def sdk_device_ability() -> int:

def isapi_io_outputs() -> int:
io_outputs_xml = self._call_isapi("GET", "/ISAPI/System/IO/outputs")
logger.debug("Response url for /ISAPI/System/IO/outputs: {}", io_outputs_xml)
root = ET.fromstring(io_outputs_xml)
if 'IOOutputPortList' not in root.tag:
# XML does not contain the required tag
Expand All @@ -266,7 +283,6 @@ def isapi_io_outputs() -> int:
def isapi_remote_control() -> int:
# Device does not support previous ISAPI endpoint, try another
door_capabilities_xml = self._call_isapi("GET", "/ISAPI/AccessControl/RemoteControl/door/capabilities")
logger.debug("Response url for /ISAPI/AccessControl/RemoteControl/door/capabilities: {}", door_capabilities_xml)
root = ET.fromstring(door_capabilities_xml)

door_number_element = root.find('{*}doorNo')
Expand Down Expand Up @@ -294,21 +310,20 @@ def isapi_remote_control() -> int:
def get_num_coms_indoor(self) -> int:
"""
Get the number of com relays configured for this doorbell.
We can also use this method: POST /ISAPI/SecurityCP/status/outputStatus?format=json {"OutputCond":{"maxResults":2,"outputModuleNo":0,"searchID":"1","searchResultPosition":0}}
"""

# Define various functions, each using a different method to gather this information
def isapi_device_info() -> int:
io_coms_xml = self._call_isapi("GET", "/ISAPI/System/deviceInfo")
logger.debug("Response url for /ISAPI/System/IO/outputs: {}", io_coms_xml)
root = ET.fromstring(io_coms_xml)
com_number_element = root.find('{*}alarmOutNum')
# Error out if we don't find attribute `max` inside the `doorNo` element
if com_number_element is None :
# Print a string representation of the response XML
logger.debug("No com ports found for the indoor device")
return 0
logger.debug("We have found {} com ports found for the indoor device", com_number_element.text)
logger.debug("We have found {} com ports for the indoor device", com_number_element.text)
return int(com_number_element.text)

# Define the list of available endpoints to try
Expand All @@ -322,8 +337,7 @@ def isapi_device_info() -> int:
pass

# We have run out of available endpoints to call
#raise RuntimeError("Unable to get the number of doors, please configure the relays manually with this option in the config: output_relays")

raise RuntimeError("Unable to get the number of coms")

def get_device_info(self):
"""Retrieve device information (model, sw version, etc) using the ISAPI endpoint.
Expand Down
2 changes: 1 addition & 1 deletion hikvision-doorbell/src/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self, config: AppConfig.MQTT, doorbells: Registry) -> None:
if not doorbell._type is DeviceType.INDOOR:
num_doors = doorbell.get_num_outputs()
else:
num_doors = doorbell.get_num_outputs_indoor()
num_doors = int(doorbell.get_num_outputs_indoor())
logger.debug("Configuring {} door switches", num_doors)
for door_id in range(num_doors):
door_switch_info = SwitchInfo(
Expand Down

0 comments on commit 7b30669

Please sign in to comment.