Skip to content

Commit

Permalink
Ignore wrong bitrate on newer firmware #1194
Browse files Browse the repository at this point in the history
  • Loading branch information
mrlt8 committed Jul 18, 2024
1 parent c04d8da commit 922b076
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
15 changes: 1 addition & 14 deletions app/wyzebridge/bridge_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import contextlib
import os
import secrets
import shutil
from typing import Any, Optional
from typing import Any

from wyzecam.api_models import WyzeCamera

Expand Down Expand Up @@ -68,18 +67,6 @@ def is_livestream(uri: str) -> bool:
return any(env_bool(f"{service}_{uri}") for service in services)


def is_fw11(fw_ver: Optional[str]) -> bool:
"""
Check if newer firmware that needs to use K10050GetVideoParam
"""
with contextlib.suppress(IndexError, ValueError):
if fw_ver and fw_ver.startswith(("4.51", "4.52", "4.53", "4.50.4")):
return True
if fw_ver and int(fw_ver.split(".")[2]) > 10:
return True
return False


def get_secret(name: str) -> str:
if not name:
return ""
Expand Down
43 changes: 34 additions & 9 deletions app/wyzebridge/wyze_control.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import socket
import time
from datetime import datetime, timedelta
from multiprocessing import Queue
from queue import Empty
from re import findall
from typing import Any, Optional

import requests
from wyzebridge.bridge_utils import env_bool, is_fw11
from wyzebridge.bridge_utils import env_bool
from wyzebridge.config import BOA_COOLDOWN, BOA_INTERVAL, IMG_PATH, MQTT_TOPIC
from wyzebridge.logging import logger
from wyzebridge.mqtt import MQTT_ENABLED, publish_messages
from wyzebridge.wyze_commands import CMD_VALUES, GET_CMDS, GET_PAYLOAD, PARAMS, SET_CMDS
from wyzecam import WyzeIOTCSession, WyzeIOTCSessionState, tutk_protocol
from wyzecam import WyzeIOTCSession, tutk_protocol
from wyzecam.tutk.tutk import TutkError

REQ_K10050 = ["4.51", "4.52", "4.53", "4.50.4"]
"""Firmware versions that require K10050GetVideoParam to get bitrate."""

NO_BITRATE = ["4.36.12", "4.50.4.9222"]
"""Firmware versions that are broken and no longer return the actual bitrate."""


def cam_http_alive(ip: str) -> bool:
"""Test if camera http server is up."""
Expand Down Expand Up @@ -161,7 +166,7 @@ def camera_control(sess: WyzeIOTCSession, camera_info: Queue, camera_cmd: Queue)
resp = update_bit_fps(sess, topic, payload)
else:
# Use K10050GetVideoParam if newer firmware
if topic == "bitrate" and is_fw11(sess.camera.firmware_ver):
if topic == "bitrate" and fw_check(sess.camera.firmware_ver, REQ_K10050):
cmd = "_bitrate"
elif topic == "motion_detection" and payload:
if sess.camera.product_model in (
Expand All @@ -186,13 +191,13 @@ def update_params(sess: WyzeIOTCSession):
"""
if not sess.should_stream(0):
return
fw_11 = is_fw11(sess.camera.firmware_ver)
newer_firmware = fw_check(sess.camera.firmware_ver, REQ_K10050)

if MQTT_ENABLED or not fw_11:
remove = {"bitrate", "res"} if fw_11 else set()
if MQTT_ENABLED or not newer_firmware:
remove = {"bitrate", "res"} if newer_firmware else set()
params = ",".join([v for k, v in PARAMS.items() if k not in remove])
send_tutk_msg(sess, ("param_info", params), "debug")
if fw_11:
if newer_firmware:
send_tutk_msg(sess, "_bitrate", "debug")


Expand Down Expand Up @@ -276,7 +281,7 @@ def send_tutk_msg(sess: WyzeIOTCSession, cmd: tuple | str, log: str = "info") ->
elif res := iotc.result(timeout=5):
if tutk_msg.code in {10020, 10050}:
update_mqtt_values(sess.camera.name_uri, res)
if sess.camera.firmware_ver not in {"4.50.4.9222", "4.36.12.9751"}:
if not fw_check(sess.camera.firmware_ver, NO_BITRATE):
res = bitrate_check(sess, res, resp["command"])
params = None
if isinstance(res, bytes):
Expand Down Expand Up @@ -389,3 +394,23 @@ def motion_alarm(cam: dict):
resp.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.error(ex)


def parse_fw(fw_ver: str) -> tuple[str, tuple[int, ...]]:
parts = fw_ver.split(".")
if len(parts) < 4:
parts.extend(["0"] * (4 - len(parts)))
return ".".join(parts[:2]), tuple(map(int, parts[2:]))


def fw_check(fw_ver: Optional[str], min_fw_ver: list) -> bool:
"""Check firmware compatibility."""
if not fw_ver:
return False
min_fw = {fw_type: ver_parts for fw_type, ver_parts in map(parse_fw, min_fw_ver)}

fw_type, version = parse_fw(fw_ver)
if version and version >= min_fw.get(fw_type, (11, 0)):
return True

return False

0 comments on commit 922b076

Please sign in to comment.