Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 205 additions & 1 deletion custom_components/robovac/tuyalocalapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@

from cryptography.hazmat.backends.openssl import backend as openssl_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
from cryptography.hazmat.primitives.hashes import Hash, MD5, SHA256
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives import hmac as crypto_hmac
Expand All @@ -66,6 +68,14 @@
MAGIC_PREFIX = 0x000055AA
MAGIC_SUFFIX = 0x0000AA55
MAGIC_SUFFIX_BYTES = struct.pack(">I", MAGIC_SUFFIX)

# Protocol 3.5 constants
MAGIC_PREFIX_35 = 0x00006699
MAGIC_SUFFIX_35 = 0x00009966
MAGIC_SUFFIX_35_BYTES = struct.pack(">I", MAGIC_SUFFIX_35)
# Format: prefix(4) + version(1) + reserved(1) + seq(4) + cmd(4) + len(4) = 18 bytes
MESSAGE_PREFIX_FORMAT_35 = ">IBBIII"
MESSAGE_SUFFIX_FORMAT_35 = ">16sI" # 16-byte GCM tag + 4-byte suffix
CRC_32_TABLE = [
0x00000000,
0x77073096,
Expand Down Expand Up @@ -377,6 +387,77 @@ def __init__(self, key: str, version: tuple[int, int]):
self.cipher = Cipher(
algorithms.AES(self.key_bytes), modes.ECB(), backend=openssl_backend
)
# Initialize GCM cipher for Protocol 3.5
self._aesgcm = AESGCM(self.key_bytes)

@property
def is_gcm_mode(self) -> bool:
"""Check if this cipher uses GCM mode (Protocol 3.5+).

Returns:
True if Protocol 3.5 or higher (uses GCM), False otherwise (uses ECB).
"""
return self.version >= (3, 5)

def generate_iv(self) -> bytes:
"""Generate a random 12-byte IV/nonce for GCM mode.

Returns:
A 12-byte random IV suitable for AES-GCM.
"""
return os.urandom(12)

def encrypt_gcm(
self, plaintext: bytes, aad: bytes | None = None
) -> tuple[bytes, bytes, bytes]:
"""Encrypt data using AES-GCM for Protocol 3.5.

Args:
plaintext: The data to encrypt.
aad: Optional additional authenticated data (signed but not encrypted).

Returns:
A tuple of (iv, ciphertext, tag) where:
- iv: 12-byte initialization vector/nonce
- ciphertext: The encrypted data (same length as plaintext)
- tag: 16-byte GCM authentication tag
"""
iv = self.generate_iv()
# AESGCM.encrypt returns ciphertext + tag concatenated
if aad is None:
aad = b""
ct_with_tag = self._aesgcm.encrypt(iv, plaintext, aad)
# Split ciphertext and tag (tag is last 16 bytes)
ciphertext = ct_with_tag[:-16]
tag = ct_with_tag[-16:]
return (iv, ciphertext, tag)

def decrypt_gcm(
self,
iv: bytes,
ciphertext: bytes,
tag: bytes,
aad: bytes | None = None,
) -> bytes:
"""Decrypt data using AES-GCM for Protocol 3.5.

Args:
iv: 12-byte initialization vector/nonce.
ciphertext: The encrypted data.
tag: 16-byte GCM authentication tag.
aad: Optional additional authenticated data.

Returns:
The decrypted plaintext.

Raises:
InvalidTag: If the GCM tag verification fails.
"""
if aad is None:
aad = b""
# AESGCM.decrypt expects ciphertext + tag concatenated
ct_with_tag = ciphertext + tag
return self._aesgcm.decrypt(iv, ct_with_tag, aad)

def hmac_sha256(self, data: bytes) -> bytes:
"""Calculate HMAC-SHA256 for protocol 3.4.
Expand Down Expand Up @@ -617,11 +698,18 @@ def to_bytes(self) -> bytes:
if not isinstance(payload_data, bytes):
payload_data = payload_data.encode("utf8")

# Check protocol version
is_v35 = self.device is not None and self.device.version >= (3, 5)
is_v34 = self.device is not None and self.device.version >= (3, 4)

if is_v35 and self.device is not None:
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check is_v35 and self.device is not None on line 705 is redundant because is_v35 already includes the check self.device is not None from line 702. The condition can be simplified to just if is_v35: for better readability and maintainability.

Suggested change
if is_v35 and self.device is not None:
if is_v35:

Copilot uses AI. Check for mistakes.
# Protocol 3.5 uses AES-GCM encryption
return self._to_bytes_v35(payload_data)

if self.encrypt and self.device is not None:
payload_data = self.device.cipher.encrypt(self.command, payload_data)

# Determine suffix format based on protocol version
is_v34 = self.device is not None and self.device.version >= (3, 4)
if is_v34:
suffix_format = MESSAGE_SUFFIX_FORMAT_34
else:
Expand Down Expand Up @@ -649,6 +737,45 @@ def to_bytes(self) -> bytes:
footer = struct.pack(MESSAGE_SUFFIX_FORMAT, checksum, MAGIC_SUFFIX)
return header + payload_data + footer

def _to_bytes_v35(self, payload_data: bytes) -> bytes:
"""Return the message in Protocol 3.5 format.

Protocol 3.5 format:
00006699 VV RR SSSSSSSS MMMMMMMM LLLLLLLL (IV*12) (encrypted_data) (TAG*16) 00009966

Args:
payload_data: The payload data to encrypt.

Returns:
A bytes object containing the Protocol 3.5 message.
"""
if self.device is None:
raise InvalidMessage("Cannot create v3.5 message without a device")

cipher = self.device.cipher

# Generate IV and encrypt payload with GCM
iv, ciphertext, tag = cipher.encrypt_gcm(payload_data)

# Calculate payload size: IV(12) + ciphertext + tag(16)
payload_size = 12 + len(ciphertext) + 16

# Build header: prefix(4) + version(1) + reserved(1) + seq(4) + cmd(4) + len(4)
header = struct.pack(
MESSAGE_PREFIX_FORMAT_35,
MAGIC_PREFIX_35,
0x00, # version field (always 0x00)
0x00, # reserved field (always 0x00)
self.sequence,
self.command,
payload_size,
)

# Build footer: tag(16) + suffix(4)
footer = struct.pack(MESSAGE_SUFFIX_FORMAT_35, tag, MAGIC_SUFFIX_35)

return header + iv + ciphertext + footer

def __bytes__(self) -> bytes:
"""Convert the message to bytes.

Expand Down Expand Up @@ -687,6 +814,11 @@ def from_bytes(
Returns:
A Message object created from the bytes.
"""
# Check for Protocol 3.5 prefix first
prefix_check = struct.unpack_from(">I", data)[0]
if prefix_check == MAGIC_PREFIX_35:
return cls._from_bytes_v35(device, data, cipher)

try:
prefix, sequence, command, payload_size = struct.unpack_from(
MESSAGE_PREFIX_FORMAT, data
Expand Down Expand Up @@ -800,6 +932,78 @@ def from_bytes(

return cls(command, payload, sequence)

@classmethod
def _from_bytes_v35(
cls,
device: "TuyaDevice",
data: bytes,
cipher: Optional[TuyaCipher] = None
) -> "Message":
"""Create a message from Protocol 3.5 bytes.

Protocol 3.5 format:
00006699 VV RR SSSSSSSS MMMMMMMM LLLLLLLL (IV*12) (encrypted_data) (TAG*16) 00009966

Args:
device: The device the message is from.
data: The bytes received from the device.
cipher: The cipher to use for decryption.

Returns:
A Message object created from the bytes.
"""
header_size = struct.calcsize(MESSAGE_PREFIX_FORMAT_35) # 18 bytes

try:
prefix, version_byte, reserved, sequence, command, payload_size = (
struct.unpack_from(MESSAGE_PREFIX_FORMAT_35, data)
)
except struct.error as e:
raise InvalidMessage("Invalid v3.5 message header format.") from e

if prefix != MAGIC_PREFIX_35:
raise InvalidMessage("Magic prefix 0x6699 missing from v3.5 message.")

# Extract IV (12 bytes after header)
iv = data[header_size:header_size + 12]
if len(iv) != 12:
raise InvalidMessage("Invalid IV length in v3.5 message.")

# Extract ciphertext (between IV and tag)
# payload_size = IV(12) + ciphertext + tag(16)
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ciphertext_len calculation on line 974 could potentially be negative if payload_size is less than 28 (12 + 16). This would cause issues with data extraction. Add validation to ensure payload_size is at least 28 bytes before calculating ciphertext_len, and raise InvalidMessage with a descriptive error if it's too small.

Suggested change
# payload_size = IV(12) + ciphertext + tag(16)
# payload_size = IV(12) + ciphertext + tag(16)
if payload_size < 28:
raise InvalidMessage(
"Invalid v3.5 message payload size; must be at least 28 bytes "
"to contain IV and GCM tag."
)

Copilot uses AI. Check for mistakes.
ciphertext_len = payload_size - 12 - 16
ciphertext_start = header_size + 12
ciphertext = data[ciphertext_start:ciphertext_start + ciphertext_len]

# Extract tag (16 bytes before suffix)
tag_start = ciphertext_start + ciphertext_len
tag = data[tag_start:tag_start + 16]
if len(tag) != 16:
raise InvalidMessage("Invalid GCM tag length in v3.5 message.")

# Verify suffix
suffix_start = tag_start + 16
try:
(suffix,) = struct.unpack_from(">I", data, suffix_start)
except struct.error as e:
raise InvalidMessage("Invalid v3.5 message suffix format.") from e

if suffix != MAGIC_SUFFIX_35:
raise InvalidMessage("Magic suffix 0x9966 missing from v3.5 message.")

# Decrypt payload using GCM
payload = None
if cipher is not None and ciphertext:
try:
payload_data = cipher.decrypt_gcm(iv, ciphertext, tag)
payload_text = payload_data.decode("utf8")
payload = json.loads(payload_text)
except Exception as e:
device._LOGGER.debug(f"v3.5 decryption failed: {e}")
raise InvalidMessage("GCM decryption/verification failed") from e
Comment on lines +1001 to +1003
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a bare except Exception clause is too broad and could catch unexpected errors like KeyboardInterrupt or SystemExit (though these inherit from BaseException). Consider catching more specific exceptions like cryptography.exceptions.InvalidTag, UnicodeDecodeError, and json.JSONDecodeError to provide more precise error handling and allow other exceptions to propagate properly.

Copilot uses AI. Check for mistakes.

return cls(command, payload, sequence)


class TuyaDevice:
"""Represents a generic Tuya device."""
Expand Down
Loading