Skip to content

Commit

Permalink
Add multipleRequests and do update from component_nego list
Browse files Browse the repository at this point in the history
  • Loading branch information
sdb9696 committed Dec 19, 2023
1 parent 20ea670 commit 1e25d82
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 121 deletions.
109 changes: 88 additions & 21 deletions devtools/dump_devinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
import re
from collections import defaultdict, namedtuple
from pprint import pprint
from typing import Optional

import asyncclick as click

from kasa import Credentials, Discover, SmartDevice
from kasa.discover import DiscoveryResult
from kasa.smartprotocol import (
COMPONENT_INFO_MAP,
GetTriggerLogsParams,
SmartComponentInfo,
SmartRequest,
)
from kasa.tapo.tapodevice import TapoDevice

Call = namedtuple("Call", "module method")
SmartCall = namedtuple("SmartCall", "module request should_succeed")


def scrub(res):
Expand Down Expand Up @@ -202,43 +210,102 @@ async def get_legacy_fixture(device):

async def get_smart_fixture(device: SmartDevice):
"""Get fixture for new TAPO style protocol."""
items = [
Call(module="component_nego", method="component_nego"),
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(module="temp_humidity_records", method="get_temp_humidity_records"),
Call(module="child_device_list", method="get_child_device_list"),
Call(
extra_test_calls = [
SmartCall(
module="device_usage",
request=SmartRequest("get_device_usage"),
should_succeed=True,
),
SmartCall(
module="temp_humidity_records",
request=SmartRequest("get_temp_humidity_records"),
should_succeed=False,
),
SmartCall(
module="child_device_list",
request=SmartRequest("get_child_device_list"),
should_succeed=False,
),
SmartCall(
module="trigger_logs",
method={"get_trigger_logs": {"page_size": 5, "start_id": 0}},
request=SmartRequest("get_trigger_logs", GetTriggerLogsParams(5, 0)),
should_succeed=False,
),
Call(
SmartCall(
module="child_device_component_list",
method="get_child_device_component_list",
request=SmartRequest("get_child_device_component_list"),
should_succeed=False,
),
]

successes = []

for test_call in items:
try:
click.echo("Testing component_nego call ..", nl=False)
component_info_response = await device.protocol.query(
SmartRequest("component_nego").to_dict()
)
click.echo(click.style("OK", fg="green"))
successes.append(
SmartCall(
module="component_nego",
request=SmartRequest("component_nego"),
should_succeed=True,
)
)
except Exception:
click.echo(
click.style("CRITICAL FAIL on component_nego call, exiting", fg="red")
)
return

test_calls = []
should_succeed = []

component_info: Optional[SmartComponentInfo]
for item in component_info_response["component_nego"]["component_list"]:
component_id = item["id"]
if component_info := COMPONENT_INFO_MAP.get(component_id):
params = (
component_info.param_class() if component_info.param_class else None
)
request = SmartRequest(component_info.get_method_name, params)
test_call = SmartCall(
module=component_id, request=request, should_succeed=True
)
test_calls.append(test_call)
should_succeed.append(test_call)
else:
click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow"))

test_calls.extend(extra_test_calls)

for test_call in test_calls:
click.echo(f"Testing {test_call.module}..", nl=False)
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
response = await device.protocol.query(test_call.request.to_dict())

except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
if not test_call.should_succeed and "UNKNOWN_METHOD_ERROR" in ex.args[0]:
click.echo(click.style("FAIL - EXPECTED", fg="green"))
else:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
click.echo(click.style("FAIL no response", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
if not test_call.should_succeed:
click.echo(click.style("OK - EXPECTED FAIL", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)

requests = []
for succ in successes:
requests.append({"method": succ.method})
requests.append(
{"method": succ.request.method_name, "params": succ.request.params}
)

final_query = {"multipleRequest": {"requests": requests}}

Expand All @@ -251,7 +318,7 @@ async def get_smart_fixture(device: SmartDevice):
)
)
final = {}
for response in responses["responses"]:
for response in responses["multipleRequest"]["responses"]:
final[response["method"]] = response["result"]

# Need to recreate a DiscoverResult here because we don't want the aliases
Expand Down
6 changes: 6 additions & 0 deletions kasa/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def _strip_rich_formatting(echo_func):
@wraps(echo_func)
def wrapper(message=None, *args, **kwargs):
if message is not None:
if not isinstance(message, str):
message = repr(message)
message = re.sub(r"\[/?.+?]", "", message)
echo_func(message, *args, **kwargs)

Expand Down Expand Up @@ -479,6 +481,10 @@ async def state(dev: SmartDevice):
else:
echo(f"\t[red]- {module}[/red]")

echo("\n\t[bold]== Features ==[/bold]")
for feature in dev.features:
echo(f"\t[green]+ {feature}[/green]")

return dev.internal_state


Expand Down
Loading

0 comments on commit 1e25d82

Please sign in to comment.