diff --git a/app/wyzebridge/bridge_utils.py b/app/wyzebridge/bridge_utils.py index 57369cff..705b58bc 100644 --- a/app/wyzebridge/bridge_utils.py +++ b/app/wyzebridge/bridge_utils.py @@ -1,8 +1,7 @@ -import contextlib import os import secrets import shutil -from typing import Any, Optional +from typing import Any from wyzecam.api_models import WyzeCamera @@ -68,18 +67,6 @@ def is_livestream(uri: str) -> bool: return any(env_bool(f"{service}_{uri}") for service in services) -def is_fw11(fw_ver: Optional[str]) -> bool: - """ - Check if newer firmware that needs to use K10050GetVideoParam - """ - with contextlib.suppress(IndexError, ValueError): - if fw_ver and fw_ver.startswith(("4.51", "4.52", "4.53", "4.50.4")): - return True - if fw_ver and int(fw_ver.split(".")[2]) > 10: - return True - return False - - def get_secret(name: str) -> str: if not name: return "" diff --git a/app/wyzebridge/wyze_control.py b/app/wyzebridge/wyze_control.py index 435ad182..5a7309a9 100644 --- a/app/wyzebridge/wyze_control.py +++ b/app/wyzebridge/wyze_control.py @@ -1,5 +1,4 @@ import socket -import time from datetime import datetime, timedelta from multiprocessing import Queue from queue import Empty @@ -7,14 +6,20 @@ from typing import Any, Optional import requests -from wyzebridge.bridge_utils import env_bool, is_fw11 +from wyzebridge.bridge_utils import env_bool from wyzebridge.config import BOA_COOLDOWN, BOA_INTERVAL, IMG_PATH, MQTT_TOPIC from wyzebridge.logging import logger from wyzebridge.mqtt import MQTT_ENABLED, publish_messages from wyzebridge.wyze_commands import CMD_VALUES, GET_CMDS, GET_PAYLOAD, PARAMS, SET_CMDS -from wyzecam import WyzeIOTCSession, WyzeIOTCSessionState, tutk_protocol +from wyzecam import WyzeIOTCSession, tutk_protocol from wyzecam.tutk.tutk import TutkError +REQ_K10050 = ["4.51", "4.52", "4.53", "4.50.4"] +"""Firmware versions that require K10050GetVideoParam to get bitrate.""" + +NO_BITRATE = ["4.36.12", "4.50.4.9222"] +"""Firmware versions that are broken and no longer return the actual bitrate.""" + def cam_http_alive(ip: str) -> bool: """Test if camera http server is up.""" @@ -161,7 +166,7 @@ def camera_control(sess: WyzeIOTCSession, camera_info: Queue, camera_cmd: Queue) resp = update_bit_fps(sess, topic, payload) else: # Use K10050GetVideoParam if newer firmware - if topic == "bitrate" and is_fw11(sess.camera.firmware_ver): + if topic == "bitrate" and fw_check(sess.camera.firmware_ver, REQ_K10050): cmd = "_bitrate" elif topic == "motion_detection" and payload: if sess.camera.product_model in ( @@ -186,13 +191,13 @@ def update_params(sess: WyzeIOTCSession): """ if not sess.should_stream(0): return - fw_11 = is_fw11(sess.camera.firmware_ver) + newer_firmware = fw_check(sess.camera.firmware_ver, REQ_K10050) - if MQTT_ENABLED or not fw_11: - remove = {"bitrate", "res"} if fw_11 else set() + if MQTT_ENABLED or not newer_firmware: + remove = {"bitrate", "res"} if newer_firmware else set() params = ",".join([v for k, v in PARAMS.items() if k not in remove]) send_tutk_msg(sess, ("param_info", params), "debug") - if fw_11: + if newer_firmware: send_tutk_msg(sess, "_bitrate", "debug") @@ -276,7 +281,7 @@ def send_tutk_msg(sess: WyzeIOTCSession, cmd: tuple | str, log: str = "info") -> elif res := iotc.result(timeout=5): if tutk_msg.code in {10020, 10050}: update_mqtt_values(sess.camera.name_uri, res) - if sess.camera.firmware_ver not in {"4.50.4.9222", "4.36.12.9751"}: + if not fw_check(sess.camera.firmware_ver, NO_BITRATE): res = bitrate_check(sess, res, resp["command"]) params = None if isinstance(res, bytes): @@ -389,3 +394,23 @@ def motion_alarm(cam: dict): resp.raise_for_status() except requests.exceptions.HTTPError as ex: logger.error(ex) + + +def parse_fw(fw_ver: str) -> tuple[str, tuple[int, ...]]: + parts = fw_ver.split(".") + if len(parts) < 4: + parts.extend(["0"] * (4 - len(parts))) + return ".".join(parts[:2]), tuple(map(int, parts[2:])) + + +def fw_check(fw_ver: Optional[str], min_fw_ver: list) -> bool: + """Check firmware compatibility.""" + if not fw_ver: + return False + min_fw = {fw_type: ver_parts for fw_type, ver_parts in map(parse_fw, min_fw_ver)} + + fw_type, version = parse_fw(fw_ver) + if version and version >= min_fw.get(fw_type, (11, 0)): + return True + + return False