diff --git a/custom_components/robovac/tuyalocalapi.py b/custom_components/robovac/tuyalocalapi.py index 4882d86e..b73344c0 100644 --- a/custom_components/robovac/tuyalocalapi.py +++ b/custom_components/robovac/tuyalocalapi.py @@ -52,6 +52,8 @@ from cryptography.hazmat.backends.openssl import backend as openssl_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +import os from cryptography.hazmat.primitives.hashes import Hash, MD5, SHA256 from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.primitives import hmac as crypto_hmac @@ -66,6 +68,14 @@ MAGIC_PREFIX = 0x000055AA MAGIC_SUFFIX = 0x0000AA55 MAGIC_SUFFIX_BYTES = struct.pack(">I", MAGIC_SUFFIX) + +# Protocol 3.5 constants +MAGIC_PREFIX_35 = 0x00006699 +MAGIC_SUFFIX_35 = 0x00009966 +MAGIC_SUFFIX_35_BYTES = struct.pack(">I", MAGIC_SUFFIX_35) +# Format: prefix(4) + version(1) + reserved(1) + seq(4) + cmd(4) + len(4) = 18 bytes +MESSAGE_PREFIX_FORMAT_35 = ">IBBIII" +MESSAGE_SUFFIX_FORMAT_35 = ">16sI" # 16-byte GCM tag + 4-byte suffix CRC_32_TABLE = [ 0x00000000, 0x77073096, @@ -377,6 +387,77 @@ def __init__(self, key: str, version: tuple[int, int]): self.cipher = Cipher( algorithms.AES(self.key_bytes), modes.ECB(), backend=openssl_backend ) + # Initialize GCM cipher for Protocol 3.5 + self._aesgcm = AESGCM(self.key_bytes) + + @property + def is_gcm_mode(self) -> bool: + """Check if this cipher uses GCM mode (Protocol 3.5+). + + Returns: + True if Protocol 3.5 or higher (uses GCM), False otherwise (uses ECB). + """ + return self.version >= (3, 5) + + def generate_iv(self) -> bytes: + """Generate a random 12-byte IV/nonce for GCM mode. + + Returns: + A 12-byte random IV suitable for AES-GCM. + """ + return os.urandom(12) + + def encrypt_gcm( + self, plaintext: bytes, aad: bytes | None = None + ) -> tuple[bytes, bytes, bytes]: + """Encrypt data using AES-GCM for Protocol 3.5. + + Args: + plaintext: The data to encrypt. + aad: Optional additional authenticated data (signed but not encrypted). + + Returns: + A tuple of (iv, ciphertext, tag) where: + - iv: 12-byte initialization vector/nonce + - ciphertext: The encrypted data (same length as plaintext) + - tag: 16-byte GCM authentication tag + """ + iv = self.generate_iv() + # AESGCM.encrypt returns ciphertext + tag concatenated + if aad is None: + aad = b"" + ct_with_tag = self._aesgcm.encrypt(iv, plaintext, aad) + # Split ciphertext and tag (tag is last 16 bytes) + ciphertext = ct_with_tag[:-16] + tag = ct_with_tag[-16:] + return (iv, ciphertext, tag) + + def decrypt_gcm( + self, + iv: bytes, + ciphertext: bytes, + tag: bytes, + aad: bytes | None = None, + ) -> bytes: + """Decrypt data using AES-GCM for Protocol 3.5. + + Args: + iv: 12-byte initialization vector/nonce. + ciphertext: The encrypted data. + tag: 16-byte GCM authentication tag. + aad: Optional additional authenticated data. + + Returns: + The decrypted plaintext. + + Raises: + InvalidTag: If the GCM tag verification fails. + """ + if aad is None: + aad = b"" + # AESGCM.decrypt expects ciphertext + tag concatenated + ct_with_tag = ciphertext + tag + return self._aesgcm.decrypt(iv, ct_with_tag, aad) def hmac_sha256(self, data: bytes) -> bytes: """Calculate HMAC-SHA256 for protocol 3.4. @@ -617,11 +698,18 @@ def to_bytes(self) -> bytes: if not isinstance(payload_data, bytes): payload_data = payload_data.encode("utf8") + # Check protocol version + is_v35 = self.device is not None and self.device.version >= (3, 5) + is_v34 = self.device is not None and self.device.version >= (3, 4) + + if is_v35 and self.device is not None: + # Protocol 3.5 uses AES-GCM encryption + return self._to_bytes_v35(payload_data) + if self.encrypt and self.device is not None: payload_data = self.device.cipher.encrypt(self.command, payload_data) # Determine suffix format based on protocol version - is_v34 = self.device is not None and self.device.version >= (3, 4) if is_v34: suffix_format = MESSAGE_SUFFIX_FORMAT_34 else: @@ -649,6 +737,45 @@ def to_bytes(self) -> bytes: footer = struct.pack(MESSAGE_SUFFIX_FORMAT, checksum, MAGIC_SUFFIX) return header + payload_data + footer + def _to_bytes_v35(self, payload_data: bytes) -> bytes: + """Return the message in Protocol 3.5 format. + + Protocol 3.5 format: + 00006699 VV RR SSSSSSSS MMMMMMMM LLLLLLLL (IV*12) (encrypted_data) (TAG*16) 00009966 + + Args: + payload_data: The payload data to encrypt. + + Returns: + A bytes object containing the Protocol 3.5 message. + """ + if self.device is None: + raise InvalidMessage("Cannot create v3.5 message without a device") + + cipher = self.device.cipher + + # Generate IV and encrypt payload with GCM + iv, ciphertext, tag = cipher.encrypt_gcm(payload_data) + + # Calculate payload size: IV(12) + ciphertext + tag(16) + payload_size = 12 + len(ciphertext) + 16 + + # Build header: prefix(4) + version(1) + reserved(1) + seq(4) + cmd(4) + len(4) + header = struct.pack( + MESSAGE_PREFIX_FORMAT_35, + MAGIC_PREFIX_35, + 0x00, # version field (always 0x00) + 0x00, # reserved field (always 0x00) + self.sequence, + self.command, + payload_size, + ) + + # Build footer: tag(16) + suffix(4) + footer = struct.pack(MESSAGE_SUFFIX_FORMAT_35, tag, MAGIC_SUFFIX_35) + + return header + iv + ciphertext + footer + def __bytes__(self) -> bytes: """Convert the message to bytes. @@ -687,6 +814,11 @@ def from_bytes( Returns: A Message object created from the bytes. """ + # Check for Protocol 3.5 prefix first + prefix_check = struct.unpack_from(">I", data)[0] + if prefix_check == MAGIC_PREFIX_35: + return cls._from_bytes_v35(device, data, cipher) + try: prefix, sequence, command, payload_size = struct.unpack_from( MESSAGE_PREFIX_FORMAT, data @@ -800,6 +932,78 @@ def from_bytes( return cls(command, payload, sequence) + @classmethod + def _from_bytes_v35( + cls, + device: "TuyaDevice", + data: bytes, + cipher: Optional[TuyaCipher] = None + ) -> "Message": + """Create a message from Protocol 3.5 bytes. + + Protocol 3.5 format: + 00006699 VV RR SSSSSSSS MMMMMMMM LLLLLLLL (IV*12) (encrypted_data) (TAG*16) 00009966 + + Args: + device: The device the message is from. + data: The bytes received from the device. + cipher: The cipher to use for decryption. + + Returns: + A Message object created from the bytes. + """ + header_size = struct.calcsize(MESSAGE_PREFIX_FORMAT_35) # 18 bytes + + try: + prefix, version_byte, reserved, sequence, command, payload_size = ( + struct.unpack_from(MESSAGE_PREFIX_FORMAT_35, data) + ) + except struct.error as e: + raise InvalidMessage("Invalid v3.5 message header format.") from e + + if prefix != MAGIC_PREFIX_35: + raise InvalidMessage("Magic prefix 0x6699 missing from v3.5 message.") + + # Extract IV (12 bytes after header) + iv = data[header_size:header_size + 12] + if len(iv) != 12: + raise InvalidMessage("Invalid IV length in v3.5 message.") + + # Extract ciphertext (between IV and tag) + # payload_size = IV(12) + ciphertext + tag(16) + ciphertext_len = payload_size - 12 - 16 + ciphertext_start = header_size + 12 + ciphertext = data[ciphertext_start:ciphertext_start + ciphertext_len] + + # Extract tag (16 bytes before suffix) + tag_start = ciphertext_start + ciphertext_len + tag = data[tag_start:tag_start + 16] + if len(tag) != 16: + raise InvalidMessage("Invalid GCM tag length in v3.5 message.") + + # Verify suffix + suffix_start = tag_start + 16 + try: + (suffix,) = struct.unpack_from(">I", data, suffix_start) + except struct.error as e: + raise InvalidMessage("Invalid v3.5 message suffix format.") from e + + if suffix != MAGIC_SUFFIX_35: + raise InvalidMessage("Magic suffix 0x9966 missing from v3.5 message.") + + # Decrypt payload using GCM + payload = None + if cipher is not None and ciphertext: + try: + payload_data = cipher.decrypt_gcm(iv, ciphertext, tag) + payload_text = payload_data.decode("utf8") + payload = json.loads(payload_text) + except Exception as e: + device._LOGGER.debug(f"v3.5 decryption failed: {e}") + raise InvalidMessage("GCM decryption/verification failed") from e + + return cls(command, payload, sequence) + class TuyaDevice: """Represents a generic Tuya device.""" diff --git a/site_docs/adding-new-vacuum.md b/site_docs/adding-new-vacuum.md new file mode 100644 index 00000000..b7c859e8 --- /dev/null +++ b/site_docs/adding-new-vacuum.md @@ -0,0 +1,326 @@ +# Adding a New Vacuum Model + +This guide explains how to add support for a new Eufy RoboVac model to the +integration. + +## Prerequisites + +Before adding a new model, you'll need: + +1. **Model number** - The Tuya model identifier (e.g., `T2250`, `T2080`) +2. **DPS codes** - Data Point Service codes your vacuum uses +3. **Command values** - The actual values sent/received for each command + +### Finding DPS Codes + +DPS codes can be discovered by: + +- Monitoring network traffic between the vacuum and the Eufy app +- Using Tuya debugging tools +- Checking existing similar models for reference + +## File Structure + +Each vacuum model is defined in its own file under +`custom_components/robovac/vacuums/`. The file naming convention is +`{MODEL_NUMBER}.py` (e.g., `T2250.py`). + +## Step-by-Step Guide + +### 1. Create the Model File + +Create a new file in `custom_components/robovac/vacuums/` named after your +model: + +```python +"""Model Name (MODEL_NUMBER)""" +from homeassistant.components.vacuum import VacuumEntityFeature +from .base import RoboVacEntityFeature, RobovacCommand, RobovacModelDetails + + +class T2XXX(RobovacModelDetails): + homeassistant_features = ( + VacuumEntityFeature.CLEAN_SPOT + | VacuumEntityFeature.FAN_SPEED + | VacuumEntityFeature.LOCATE + | VacuumEntityFeature.PAUSE + | VacuumEntityFeature.RETURN_HOME + | VacuumEntityFeature.SEND_COMMAND + | VacuumEntityFeature.START + | VacuumEntityFeature.STATE + | VacuumEntityFeature.STOP + ) + robovac_features = ( + RoboVacEntityFeature.CLEANING_TIME + | RoboVacEntityFeature.CLEANING_AREA + | RoboVacEntityFeature.DO_NOT_DISTURB + | RoboVacEntityFeature.AUTO_RETURN + ) + commands = { + # Define commands here + } +``` + +### 2. Define Home Assistant Features + +The `homeassistant_features` attribute specifies which standard vacuum features +your model supports. Available features from `VacuumEntityFeature`: + +| Feature | Description | +|----------------|--------------------------| +| `CLEAN_SPOT` | Spot cleaning mode | +| `FAN_SPEED` | Adjustable suction power | +| `LOCATE` | Find/locate the vacuum | +| `PAUSE` | Pause cleaning | +| `RETURN_HOME` | Return to charging dock | +| `SEND_COMMAND` | Send custom commands | +| `START` | Start cleaning | +| `STATE` | Report current state | +| `STOP` | Stop cleaning | +| `MAP` | Map support | + +### 3. Define RoboVac Features + +The `robovac_features` attribute specifies additional features specific to this +integration. Available features from `RoboVacEntityFeature`: + +| Feature | Description | +|------------------|------------------------------| +| `EDGE` | Edge cleaning mode | +| `SMALL_ROOM` | Small room cleaning mode | +| `CLEANING_TIME` | Report cleaning duration | +| `CLEANING_AREA` | Report cleaned area | +| `DO_NOT_DISTURB` | Do not disturb scheduling | +| `AUTO_RETURN` | Auto return when battery low | +| `CONSUMABLES` | Consumable status reporting | +| `ROOM` | Room-specific cleaning | +| `ZONE` | Zone cleaning | +| `MAP` | Map features | +| `BOOST_IQ` | BoostIQ auto-suction | + +### 4. Define Commands + +The `commands` dictionary maps `RobovacCommand` enums to their DPS codes and +values. Available commands: + +| Command | Description | +|------------------|--------------------------| +| `START_PAUSE` | Start or pause cleaning | +| `DIRECTION` | Manual direction control | +| `MODE` | Cleaning mode selection | +| `STATUS` | Current vacuum status | +| `RETURN_HOME` | Return to dock | +| `FAN_SPEED` | Suction power level | +| `MOP_LEVEL` | Mopping water level | +| `LOCATE` | Find the vacuum | +| `BATTERY` | Battery level | +| `ERROR` | Error codes | +| `CLEANING_AREA` | Cleaned area | +| `CLEANING_TIME` | Cleaning duration | +| `AUTO_RETURN` | Auto return setting | +| `DO_NOT_DISTURB` | DND setting | +| `BOOST_IQ` | BoostIQ setting | +| `CONSUMABLES` | Consumable status | + +#### Command Structure + +Each command entry has: + +- **`code`** (required): The DPS code number +- **`values`** (optional): A dictionary mapping human-readable keys to device + values + +```python +RobovacCommand.FAN_SPEED: { + "code": 102, + "values": { + "standard": "Standard", + "turbo": "Turbo", + "max": "Max", + "boost_iq": "Boost_IQ", + }, +}, +``` + +#### Naming Conventions + +- **Keys** (left side): Use lowercase snake_case (e.g., `standard`, `boost_iq`) +- **Values** (right side): Use the exact value the device expects (often + PascalCase) + +### 5. Add Activity Mapping (Optional) + +For models with complex status codes, add an `activity_mapping` to translate +status values to Home Assistant's `VacuumActivity` states: + +```python +from homeassistant.components.vacuum import VacuumActivity + +class T2XXX(RobovacModelDetails): + # ... features and commands ... + + activity_mapping = { + "Paused": VacuumActivity.PAUSED, + "Auto Cleaning": VacuumActivity.CLEANING, + "Charging": VacuumActivity.DOCKED, + "Heading Home": VacuumActivity.RETURNING, + "Standby": VacuumActivity.IDLE, + "Error": VacuumActivity.ERROR, + } +``` + +Available `VacuumActivity` states: + +- `CLEANING` - Actively cleaning +- `DOCKED` - On the charging dock +- `PAUSED` - Cleaning paused +- `RETURNING` - Returning to dock +- `IDLE` - Idle/standby +- `ERROR` - Error state + +### 6. Register the Model + +Add your model to `custom_components/robovac/vacuums/__init__.py`: + +1. Import your class: + + ```python + from .T2XXX import T2XXX + ``` + +2. Add to the `ROBOVAC_MODELS` dictionary: + + ```python + ROBOVAC_MODELS: Dict[str, Type[RobovacModelDetails]] = { + # ... existing models ... + "T2XXX": T2XXX, + } + ``` + +### 7. Write Tests + +Create a test file at `tests/test_vacuum/test_t2xxx_command_mappings.py`: + +```python +"""Tests for T2XXX command mappings.""" +import pytest +from custom_components.robovac.vacuums.T2XXX import T2XXX +from custom_components.robovac.vacuums.base import RobovacCommand + + +class TestT2XXXCommandMappings: + """Test T2XXX command mappings.""" + + def test_fan_speed_values(self): + """Test FAN_SPEED command has expected values.""" + fan_speed = T2XXX.commands[RobovacCommand.FAN_SPEED] + assert fan_speed["code"] == 102 + assert "standard" in fan_speed["values"] + assert fan_speed["values"]["standard"] == "Standard" +``` + +Run tests with: + +```bash +task test +``` + +## Complete Example + +Here's a complete example for a basic model: + +```python +"""G30 (T2250)""" +from homeassistant.components.vacuum import VacuumEntityFeature +from .base import RoboVacEntityFeature, RobovacCommand, RobovacModelDetails + + +class T2250(RobovacModelDetails): + homeassistant_features = ( + VacuumEntityFeature.CLEAN_SPOT + | VacuumEntityFeature.FAN_SPEED + | VacuumEntityFeature.LOCATE + | VacuumEntityFeature.PAUSE + | VacuumEntityFeature.RETURN_HOME + | VacuumEntityFeature.SEND_COMMAND + | VacuumEntityFeature.START + | VacuumEntityFeature.STATE + | VacuumEntityFeature.STOP + ) + robovac_features = ( + RoboVacEntityFeature.CLEANING_TIME + | RoboVacEntityFeature.CLEANING_AREA + | RoboVacEntityFeature.DO_NOT_DISTURB + | RoboVacEntityFeature.AUTO_RETURN + ) + commands = { + RobovacCommand.START_PAUSE: { + "code": 2, + }, + RobovacCommand.DIRECTION: { + "code": 3, + "values": { + "forward": "Forward", + "back": "Back", + "left": "Left", + "right": "Right", + }, + }, + RobovacCommand.MODE: { + "code": 5, + "values": { + "auto": "Auto", + "small_room": "SmallRoom", + "spot": "Spot", + "edge": "Edge", + "nosweep": "Nosweep", + }, + }, + RobovacCommand.STATUS: { + "code": 15, + }, + RobovacCommand.RETURN_HOME: { + "code": 101, + }, + RobovacCommand.FAN_SPEED: { + "code": 102, + "values": { + "standard": "Standard", + "turbo": "Turbo", + "max": "Max", + "boost_iq": "Boost_IQ", + }, + }, + RobovacCommand.LOCATE: { + "code": 103, + }, + RobovacCommand.BATTERY: { + "code": 104, + }, + RobovacCommand.ERROR: { + "code": 106, + "values": { + "0": "No error", + }, + }, + } +``` + +## Tips + +- **Start simple**: Begin with basic commands and add more as you verify them +- **Test incrementally**: Test each command as you add it +- **Document unknowns**: Use comments to note unverified features +- **Check similar models**: Look at existing models in the same series for + reference +- **Use debug logging**: Enable debug logging in Home Assistant to see raw DPS + values + +## Submitting Your Model + +1. Fork the repository +2. Create a feature branch: `git checkout -b feat/add-t2xxx-support` +3. Add your model file and tests +4. Run linting: `task lint` +5. Run tests: `task test` +6. Submit a pull request with details about your vacuum model diff --git a/site_docs/configuration.md b/site_docs/configuration.md index a63aded8..947ae952 100644 --- a/site_docs/configuration.md +++ b/site_docs/configuration.md @@ -17,44 +17,8 @@ You'll need the following information to configure your vacuum: ## Supported Models -The following Eufy RoboVac models are currently supported: - -- **T1250**: RoboVac G40 -- **T2080**: RoboVac G40 Hybrid -- **T2103**: RoboVac 11S -- **T2117**: RoboVac 35C -- **T2118**: RoboVac 30C -- **T2119**: RoboVac 11S Max -- **T2120**: RoboVac 15C -- **T2123**: RoboVac 25C -- **T2128**: RoboVac 15C Max -- **T2130**: RoboVac 30C Max -- **T2132**: RoboVac 25C Max -- **T2150**: RoboVac G10 Hybrid -- **T2181**: RoboVac G20 -- **T2190**: RoboVac G30 -- **T2192**: RoboVac G30 Edge -- **T2193**: RoboVac G30 Verge -- **T2194**: RoboVac L35 Hybrid -- **T2250**: RoboVac G30 -- **T2251**: RoboVac G30 Edge -- **T2252**: RoboVac G30 Verge -- **T2253**: RoboVac G30 Hybrid -- **T2254**: RoboVac G20 Hybrid -- **T2255**: RoboVac G35+ -- **T2259**: RoboVac G40+ -- **T2261**: RoboVac X8 -- **T2262**: RoboVac X8 Hybrid -- **T2267**: RoboVac LR30 Hybrid -- **T2268**: RoboVac LR30 Hybrid+ -- **T2270**: RoboVac G32 Pro -- **T2272**: RoboVac G20 -- **T2273**: RoboVac G30 -- **T2275**: RoboVac L35 Hybrid -- **T2276**: RoboVac L35 Hybrid+ -- **T2277**: RoboVac G40 Hybrid -- **T2278**: RoboVac G40+ -- **T2320**: RoboVac X8 Pro +See the full list of [Supported Models](supported-models.md) with protocol +versions and model series information. ## Finding Your Local Key diff --git a/site_docs/index.md b/site_docs/index.md index ec4b1f4c..de4239b6 100644 --- a/site_docs/index.md +++ b/site_docs/index.md @@ -33,6 +33,8 @@ See the full list in the [Configuration](configuration.md) section. - [Installation Guide](installation.md) - [Configuration](configuration.md) - [Troubleshooting](troubleshooting.md) +- [Supported Models](supported-models.md) +- [Adding a New Vacuum](adding-new-vacuum.md) - [GitHub Issues](https://github.com/damacus/robovac/issues) ## History & Credits diff --git a/site_docs/supported-models.md b/site_docs/supported-models.md new file mode 100644 index 00000000..64aed9ea --- /dev/null +++ b/site_docs/supported-models.md @@ -0,0 +1,92 @@ +# Supported Models + +This page lists all currently supported Eufy RoboVac models. + +## Model Compatibility Table + +| Model Code | Friendly Name | Protocol | +|------------|----------------------|----------| +| T1250 | RoboVac G40 | 3.3 | +| T2080 | RoboVac S1 Pro | 3.4 | +| T2103 | RoboVac 11S | 3.3 | +| T2117 | RoboVac 35C | 3.3 | +| T2118 | RoboVac 30C | 3.3 | +| T2119 | RoboVac 11S Max | 3.3 | +| T2120 | RoboVac 15C | 3.3 | +| T2123 | RoboVac 25C | 3.3 | +| T2128 | RoboVac 15C Max | 3.3 | +| T2130 | RoboVac 30C Max | 3.3 | +| T2132 | RoboVac 25C Max | 3.3 | +| T2150 | RoboVac G10 Hybrid | 3.3 | +| T2181 | RoboVac G20 | 3.3 | +| T2190 | RoboVac G30 | 3.3 | +| T2192 | RoboVac G30 Edge | 3.3 | +| T2193 | RoboVac G30 Verge | 3.3 | +| T2194 | RoboVac L35 Hybrid | 3.4 | +| T2250 | RoboVac G30 | 3.3 | +| T2251 | RoboVac G30 Edge | 3.3 | +| T2252 | RoboVac G30 Verge | 3.3 | +| T2253 | RoboVac G30 Hybrid | 3.3 | +| T2254 | RoboVac G20 Hybrid | 3.3 | +| T2255 | RoboVac G35+ | 3.3 | +| T2259 | RoboVac G40+ | 3.3 | +| T2261 | RoboVac X8 | 3.3 | +| T2262 | RoboVac X8 Hybrid | 3.3 | +| T2267 | RoboVac LR30 Hybrid | 3.4 | +| T2268 | RoboVac LR30 Hybrid+ | 3.4 | +| T2270 | RoboVac G32 Pro | 3.3 | +| T2272 | RoboVac G20 | 3.3 | +| T2273 | RoboVac G30 | 3.3 | +| T2275 | RoboVac L35 Hybrid | 3.4 | +| T2276 | RoboVac L35 Hybrid+ | 3.4 | +| T2277 | eufy Clean L60 SES | 3.4 | +| T2278 | RoboVac G40+ | 3.4 | +| T2280 | RoboVac G40 Hybrid+ | 3.4 | +| T2320 | RoboVac X8 Pro | 3.4 | + +## Model Series + +Models are grouped by series. If your model isn't listed but is from the same +series, it may work with an existing configuration. + +### G Series (Basic) + +- **G20**: T2181, T2254, T2272 +- **G30**: T2190, T2250, T2251, T2252, T2253, T2273 +- **G32**: T2270 +- **G35**: T2255 +- **G40**: T1250, T2259, T2277, T2278, T2280 + +### X Series (Premium) + +- **X8**: T2261, T2262, T2320 + +### L Series (Advanced) + +- **L35**: T2194, T2275, T2276 +- **LR30**: T2267, T2268 +- **L60**: T2277 + +### Classic Series + +- **11S**: T2103, T2119 +- **15C**: T2120, T2128 +- **25C**: T2123, T2132 +- **30C**: T2118, T2130 +- **35C**: T2117 + +### Hybrid Models + +- **G10 Hybrid**: T2150 +- **S1 Pro**: T2080 + +## Don't See Your Model? + +If your vacuum model isn't listed: + +1. **Check similar models** - Models in the same series often share the same + DPS codes +2. **Try an existing configuration** - Select a model from the same series and + test basic functions +3. **Contribute** - See [Adding a New Vacuum](adding-new-vacuum.md) to add + support for your model diff --git a/tests/test_vacuum/test_protocol_35_cipher.py b/tests/test_vacuum/test_protocol_35_cipher.py new file mode 100644 index 00000000..11485c9d --- /dev/null +++ b/tests/test_vacuum/test_protocol_35_cipher.py @@ -0,0 +1,131 @@ +"""Tests for Protocol 3.5 GCM cipher implementation.""" + +import pytest +from custom_components.robovac.tuyalocalapi import TuyaCipher + + +class TestProtocol35Cipher: + """Test Protocol 3.5 AES-GCM cipher functionality.""" + + @pytest.fixture + def cipher_v35(self) -> TuyaCipher: + """Create a Protocol 3.5 cipher instance.""" + return TuyaCipher("abcdefghijklmnop", (3, 5)) + + @pytest.fixture + def cipher_v34(self) -> TuyaCipher: + """Create a Protocol 3.4 cipher instance for comparison.""" + return TuyaCipher("abcdefghijklmnop", (3, 4)) + + def test_cipher_version_is_stored(self, cipher_v35: TuyaCipher) -> None: + """Cipher stores the protocol version correctly.""" + assert cipher_v35.version == (3, 5) + + def test_v35_uses_gcm_mode(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 should use GCM mode, not ECB.""" + assert hasattr(cipher_v35, 'is_gcm_mode') + assert cipher_v35.is_gcm_mode is True + + def test_v34_uses_ecb_mode(self, cipher_v34: TuyaCipher) -> None: + """Protocol 3.4 should use ECB mode.""" + assert hasattr(cipher_v34, 'is_gcm_mode') + assert cipher_v34.is_gcm_mode is False + + def test_v35_encrypt_returns_iv_ciphertext_tag( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 encrypt should return (iv, ciphertext, tag) tuple.""" + plaintext = b'{"dps":{"1":true}}' + result = cipher_v35.encrypt_gcm(plaintext) + + assert isinstance(result, tuple) + assert len(result) == 3 + + iv, ciphertext, tag = result + assert len(iv) == 12 # 96-bit IV/nonce + assert len(tag) == 16 # 128-bit GCM tag + assert len(ciphertext) > 0 + + def test_v35_encrypt_no_padding_required( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 GCM mode does not require padding.""" + # 17 bytes - not a multiple of 16 + plaintext = b'12345678901234567' + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext) + + # GCM ciphertext length equals plaintext length (no padding) + assert len(ciphertext) == len(plaintext) + + def test_v35_decrypt_with_valid_data(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 decrypt should work with valid iv, ciphertext, tag.""" + plaintext = b'{"dps":{"1":true}}' + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext) + + decrypted = cipher_v35.decrypt_gcm(iv, ciphertext, tag) + assert decrypted == plaintext + + def test_v35_decrypt_with_invalid_tag_raises( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 decrypt should raise on invalid GCM tag.""" + plaintext = b'{"dps":{"1":true}}' + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext) + + # Corrupt the tag + bad_tag = bytes([tag[0] ^ 0xFF]) + tag[1:] + + with pytest.raises(Exception): # GCM authentication failure + cipher_v35.decrypt_gcm(iv, ciphertext, bad_tag) + + def test_v35_decrypt_with_tampered_ciphertext_raises( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 decrypt should raise if ciphertext is tampered.""" + plaintext = b'{"dps":{"1":true}}' + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext) + + # Corrupt the ciphertext + bad_ciphertext = bytes([ciphertext[0] ^ 0xFF]) + ciphertext[1:] + + with pytest.raises(Exception): # GCM authentication failure + cipher_v35.decrypt_gcm(iv, bad_ciphertext, tag) + + def test_v35_encrypt_with_aad(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 encrypt should support additional authenticated data.""" + plaintext = b'{"dps":{"1":true}}' + aad = b'\x00\x00\x66\x99\x00\x00\x00\x00\x00\x01' # Header bytes + + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext, aad=aad) + + # Decrypt with same AAD should work + decrypted = cipher_v35.decrypt_gcm(iv, ciphertext, tag, aad=aad) + assert decrypted == plaintext + + def test_v35_decrypt_with_wrong_aad_raises( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 decrypt should fail if AAD doesn't match.""" + plaintext = b'{"dps":{"1":true}}' + aad = b'\x00\x00\x66\x99\x00\x00\x00\x00\x00\x01' + + iv, ciphertext, tag = cipher_v35.encrypt_gcm(plaintext, aad=aad) + + # Decrypt with different AAD should fail + wrong_aad = b'\x00\x00\x66\x99\x00\x00\x00\x00\x00\x02' + with pytest.raises(Exception): + cipher_v35.decrypt_gcm(iv, ciphertext, tag, aad=wrong_aad) + + def test_v35_generate_iv_returns_12_bytes( + self, cipher_v35: TuyaCipher + ) -> None: + """Protocol 3.5 IV generator should return 12 random bytes.""" + iv = cipher_v35.generate_iv() + assert len(iv) == 12 + assert isinstance(iv, bytes) + + def test_v35_generate_iv_is_random(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 IV generator should produce unique values.""" + ivs = [cipher_v35.generate_iv() for _ in range(100)] + # All IVs should be unique + assert len(set(ivs)) == 100 diff --git a/tests/test_vacuum/test_protocol_35_message.py b/tests/test_vacuum/test_protocol_35_message.py new file mode 100644 index 00000000..5ce189c3 --- /dev/null +++ b/tests/test_vacuum/test_protocol_35_message.py @@ -0,0 +1,216 @@ +"""Tests for Protocol 3.5 message format and parsing.""" + +import struct +import pytest +from unittest.mock import MagicMock, patch + +from custom_components.robovac.tuyalocalapi import ( + Message, + TuyaCipher, + TuyaDevice, + MAGIC_PREFIX, + MAGIC_SUFFIX, +) + + +# Protocol 3.5 magic constants +MAGIC_PREFIX_35 = 0x00006699 +MAGIC_SUFFIX_35 = 0x00009966 + + +class TestProtocol35MessageConstants: + """Test Protocol 3.5 message constants are defined.""" + + def test_magic_prefix_35_constant_exists(self) -> None: + """Protocol 3.5 should have a distinct magic prefix constant.""" + from custom_components.robovac import tuyalocalapi + assert hasattr(tuyalocalapi, 'MAGIC_PREFIX_35') + assert tuyalocalapi.MAGIC_PREFIX_35 == 0x00006699 + + def test_magic_suffix_35_constant_exists(self) -> None: + """Protocol 3.5 should have a distinct magic suffix constant.""" + from custom_components.robovac import tuyalocalapi + assert hasattr(tuyalocalapi, 'MAGIC_SUFFIX_35') + assert tuyalocalapi.MAGIC_SUFFIX_35 == 0x00009966 + + def test_message_prefix_format_35_exists(self) -> None: + """Protocol 3.5 should have a message prefix format constant.""" + from custom_components.robovac import tuyalocalapi + assert hasattr(tuyalocalapi, 'MESSAGE_PREFIX_FORMAT_35') + + +class TestProtocol35MessageFormat: + """Test Protocol 3.5 message format structure.""" + + @pytest.fixture + def cipher_v35(self) -> TuyaCipher: + """Create a Protocol 3.5 cipher instance.""" + return TuyaCipher("abcdefghijklmnop", (3, 5)) + + def test_v35_message_uses_6699_prefix(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 messages should use 0x00006699 prefix.""" + # Create a mock device with v3.5 + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"test": true}', + encrypt=True, + device=mock_device, + ) + + msg_bytes = message.to_bytes() + prefix = struct.unpack(">I", msg_bytes[:4])[0] + assert prefix == MAGIC_PREFIX_35 + + def test_v35_message_uses_9966_suffix(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 messages should use 0x00009966 suffix.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"test": true}', + encrypt=True, + device=mock_device, + ) + + msg_bytes = message.to_bytes() + suffix = struct.unpack(">I", msg_bytes[-4:])[0] + assert suffix == MAGIC_SUFFIX_35 + + def test_v35_message_contains_12_byte_iv(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 messages should contain a 12-byte IV after header.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"test": true}', + encrypt=True, + device=mock_device, + ) + + msg_bytes = message.to_bytes() + # Header is: prefix(4) + version(1) + reserved(1) + seq(4) + cmd(4) + len(4) = 18 bytes + # IV should be next 12 bytes + header_size = 18 + iv = msg_bytes[header_size:header_size + 12] + assert len(iv) == 12 + + def test_v35_message_contains_16_byte_tag(self, cipher_v35: TuyaCipher) -> None: + """Protocol 3.5 messages should contain a 16-byte GCM tag before suffix.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"test": true}', + encrypt=True, + device=mock_device, + ) + + msg_bytes = message.to_bytes() + # Tag is 16 bytes before the 4-byte suffix + tag = msg_bytes[-20:-4] + assert len(tag) == 16 + + def test_v34_message_still_uses_55aa_prefix(self) -> None: + """Protocol 3.4 messages should still use 0x000055AA prefix.""" + cipher_v34 = TuyaCipher("abcdefghijklmnop", (3, 4)) + mock_device = MagicMock() + mock_device.version = (3, 4) + mock_device.cipher = cipher_v34 + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"test": true}', + encrypt=True, + device=mock_device, + ) + + msg_bytes = message.to_bytes() + prefix = struct.unpack(">I", msg_bytes[:4])[0] + assert prefix == MAGIC_PREFIX # 0x000055AA + + +class TestProtocol35MessageParsing: + """Test Protocol 3.5 message parsing from bytes.""" + + @pytest.fixture + def cipher_v35(self) -> TuyaCipher: + """Create a Protocol 3.5 cipher instance.""" + return TuyaCipher("abcdefghijklmnop", (3, 5)) + + def test_v35_from_bytes_detects_6699_prefix( + self, cipher_v35: TuyaCipher + ) -> None: + """Message.from_bytes should detect Protocol 3.5 by 0x6699 prefix.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + mock_device._LOGGER = MagicMock() + + # Create a valid v3.5 message + message = Message( + command=Message.GET_COMMAND, + payload=b'{"dps":{"1":true}}', + encrypt=True, + device=mock_device, + ) + msg_bytes = message.to_bytes() + + # Parse it back + parsed = Message.from_bytes(mock_device, msg_bytes, cipher_v35) + assert parsed.command == Message.GET_COMMAND + + def test_v35_from_bytes_decrypts_payload( + self, cipher_v35: TuyaCipher + ) -> None: + """Message.from_bytes should correctly decrypt Protocol 3.5 payload.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + mock_device._LOGGER = MagicMock() + + original_payload = {"dps": {"1": True, "2": "test"}} + + message = Message( + command=Message.GET_COMMAND, + payload=original_payload, + encrypt=True, + device=mock_device, + ) + msg_bytes = message.to_bytes() + + parsed = Message.from_bytes(mock_device, msg_bytes, cipher_v35) + assert parsed.payload == original_payload + + def test_v35_from_bytes_validates_gcm_tag( + self, cipher_v35: TuyaCipher + ) -> None: + """Message.from_bytes should fail if GCM tag is invalid.""" + mock_device = MagicMock() + mock_device.version = (3, 5) + mock_device.cipher = cipher_v35 + mock_device._LOGGER = MagicMock() + + message = Message( + command=Message.GET_COMMAND, + payload=b'{"dps":{"1":true}}', + encrypt=True, + device=mock_device, + ) + msg_bytes = bytearray(message.to_bytes()) + + # Corrupt the GCM tag (16 bytes before suffix) + msg_bytes[-20] ^= 0xFF + + from custom_components.robovac.tuyalocalapi import InvalidMessage + with pytest.raises(InvalidMessage): + Message.from_bytes(mock_device, bytes(msg_bytes), cipher_v35)