Skip to content

Commit

Permalink
added sdk for acs events
Browse files Browse the repository at this point in the history
  • Loading branch information
pergolafabio committed Sep 7, 2023
1 parent a036807 commit a211da4
Show file tree
Hide file tree
Showing 4 changed files with 648 additions and 5 deletions.
30 changes: 29 additions & 1 deletion hikvision-doorbell/src/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from loguru import logger
from doorbell import Doorbell, Registry

from sdk.hcnetsdk import ALARMINFO_V30_ALARMTYPE_MOTION_DETECTION, BOOL, COMM_ALARM_V30, COMM_ALARM_VIDEO_INTERCOM, COMM_UPLOAD_VIDEO_INTERCOM_EVENT, DWORD, LONG, NET_DVR_ALARMER, NET_DVR_ALARMINFO_V30, NET_DVR_VIDEO_INTERCOM_ALARM, NET_DVR_VIDEO_INTERCOM_EVENT, NET_DVR_ALARM_ISAPI_INFO, COMM_ISAPI_ALARM, MessageCallbackAlarmInfoUnion
from sdk.hcnetsdk import ALARMINFO_V30_ALARMTYPE_MOTION_DETECTION, BOOL, COMM_ALARM_V30, COMM_ALARM_VIDEO_INTERCOM, COMM_UPLOAD_VIDEO_INTERCOM_EVENT, DWORD, LONG, NET_DVR_ALARMER, NET_DVR_ALARMINFO_V30, NET_DVR_VIDEO_INTERCOM_ALARM, NET_DVR_VIDEO_INTERCOM_EVENT, NET_DVR_ALARM_ISAPI_INFO, NET_DVR_ACS_ALARM_INFO, COMM_ISAPI_ALARM, COMM_ALARM_ACS, MessageCallbackAlarmInfoUnion
from sdk.utils import SDKError


Expand Down Expand Up @@ -43,6 +43,7 @@ async def video_intercom_alarm(
buffer_length,
user_pointer: c_void_p):
raise NotImplementedError

async def isapi_alarm(
self,
doorbell: Doorbell,
Expand All @@ -53,6 +54,16 @@ async def isapi_alarm(
user_pointer: c_void_p):
raise NotImplementedError

async def acs_alarm(
self,
doorbell: Doorbell,
command: int,
device: NET_DVR_ALARMER,
alarm_info: NET_DVR_ACS_ALARM_INFO,
buffer_length,
user_pointer: c_void_p):
raise NotImplementedError

async def unhandled_event(
self,
doorbell: Doorbell,
Expand Down Expand Up @@ -108,6 +119,7 @@ async def video_intercom_alarm(
user_pointer: c_void_p):
logger.info("Video intercom alarm from {}", doorbell._config.name)

@override
async def isapi_alarm(
self,
doorbell: Doorbell,
Expand All @@ -117,6 +129,18 @@ async def isapi_alarm(
buffer_length,
user_pointer: c_void_p):
logger.info("Isapi alarm from {}", doorbell._config.name)

@override
async def acs_alarm(
self,
doorbell: Doorbell,
command: int,
device: NET_DVR_ALARMER,
alarm_info: NET_DVR_ACS_ALARM_INFO,
buffer_length,
user_pointer: c_void_p):
logger.info("ACS alarm from {}", doorbell._config.name)

@override
async def unhandled_event(
self,
Expand Down Expand Up @@ -156,6 +180,8 @@ def _cast_alarm_info(self, command: int, callback_alarm_info_p):
return cast(callback_alarm_info_p, POINTER(NET_DVR_VIDEO_INTERCOM_EVENT)).contents
elif (command == COMM_ISAPI_ALARM):
return cast(callback_alarm_info_p, POINTER(NET_DVR_ALARM_ISAPI_INFO)).contents
elif (command == COMM_ALARM_ACS):
return cast(callback_alarm_info_p, POINTER(NET_DVR_ACS_ALARM_INFO)).contents
else:
logger.warning("Received unhandled command: {}", command)
return callback_alarm_info_p
Expand All @@ -176,6 +202,8 @@ async def _invoke_handlers(self, command, device: NET_DVR_ALARMER, alarm_info, b
handler_func = handler.video_intercom_event
case NET_DVR_ALARM_ISAPI_INFO():
handler_func = handler.isapi_alarm
case NET_DVR_ACS_ALARM_INFO():
handler_func = handler.acs_alarm
case _:
handler_func = handler.unhandled_event

Expand Down
37 changes: 34 additions & 3 deletions hikvision-doorbell/src/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
NET_DVR_VIDEO_INTERCOM_ALARM,
NET_DVR_VIDEO_INTERCOM_EVENT,
NET_DVR_ALARM_ISAPI_INFO,
NET_DVR_ACS_ALARM_INFO,
VIDEO_INTERCOM_ALARM_ALARMTYPE_DOOR_NOT_OPEN,
VIDEO_INTERCOM_EVENT_EVENTTYPE_UNLOCK_LOG,
VideoInterComAlarmType)
from sdk.acsalarminfo import (AcsAlarmInfoMajor, AcsAlarmInfoMajorAlarm, AcsAlarmInfoMajorException, AcsAlarmInfoMajorOperation, AcsAlarmInfoMajorEvent)
from typing_extensions import override
import xml.etree.ElementTree as ET
import json
Expand Down Expand Up @@ -52,7 +54,6 @@ def extract_device_info(doorbell: Doorbell) -> DeviceInfo:
hw_version=parsed_device_info["hardware"]
)


class DeviceTriggerMetadata(TypedDict):
"""
Helper dict class defining the information of a device trigger.
Expand All @@ -67,7 +68,6 @@ class DeviceTriggerMetadata(TypedDict):
payload: dict[str, str]
"""Optional payload sent in the trigger"""


DEVICE_TRIGGERS_DEFINITIONS: dict[VideoInterComAlarmType, DeviceTriggerMetadata] = {
VideoInterComAlarmType.TAMPERING_ALARM: DeviceTriggerMetadata(name='tampering_alarm', type='alarm', subtype='tampering'),
VideoInterComAlarmType.HIJACKING_ALARM: DeviceTriggerMetadata(name='hijacking_alarm', type='alarm', subtype='hijacking'),
Expand All @@ -85,7 +85,6 @@ class DeviceTriggerMetadata(TypedDict):
}
"""Define the attributes of each DeviceTrigger entity, indexing them by the enum VideoInterComAlarmType"""


class MQTTHandler(EventHandler):
name = 'MQTT'
_sensors: dict[Doorbell, dict[str, Discoverable[Any]]] = {}
Expand Down Expand Up @@ -201,6 +200,38 @@ async def motion_detection(
metadata = DeviceTriggerMetadata(name="motion_detection", type="Motion detected", subtype="")
self.handle_device_trigger(doorbell, metadata)

@override
async def acs_alarm(
self,
doorbell: Doorbell,
command: int,
device: NET_DVR_ALARMER,
alarm_info: NET_DVR_ACS_ALARM_INFO,
buffer_length,
user_pointer: c_void_p):

# Extract the type of alarm as a Python enum
try:
major = alarm_info.dwMajor
minor = alarm_info.dwMinor
logger.debug("Access control event occured, trying to find the event for Major: {} : Minor: {}", major, minor)
major_alarm = AcsAlarmInfoMajor(major)
match major:
case AcsAlarmInfoMajor.MAJOR_ALARM.value:
minor_alarm = AcsAlarmInfoMajorAlarm(minor)
case AcsAlarmInfoMajor.MAJOR_EXCEPTION.value:
minor_alarm = AcsAlarmInfoMajorException(minor)
case AcsAlarmInfoMajor.MAJOR_OPERATION.value:
minor_alarm = AcsAlarmInfoMajorOperation(minor)
case AcsAlarmInfoMajor.MAJOR_EVENT.value:
minor_alarm = AcsAlarmInfoMajorEvent(minor)
logger.info("Access control event: {} found with event: {}", major_alarm.name, minor_alarm.name)
trigger = DeviceTriggerMetadata(name=f"{major_alarm.name} {minor_alarm.name}", type=f"", subtype=f"{major_alarm.name} {minor_alarm.name}")
self.handle_device_trigger(doorbell, trigger)
except:
logger.warning("Received unknown Access control event with Major: {} Minor: {}", major, minor)
return

@override
async def isapi_alarm(
self,
Expand Down
Loading

0 comments on commit a211da4

Please sign in to comment.