Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multipleRequests and do update from component_nego list #17

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions devtools/dump_devinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
import re
from collections import defaultdict, namedtuple
from pprint import pprint
from typing import Optional

import asyncclick as click

from kasa import Credentials, Discover, SmartDevice
from kasa.discover import DiscoveryResult
from kasa.smartprotocol import (
COMPONENT_INFO_MAP,
GetTriggerLogsParams,
SmartComponentInfo,
SmartRequest,
)
from kasa.tapo.tapodevice import TapoDevice

Call = namedtuple("Call", "module method")
SmartCall = namedtuple("SmartCall", "module request should_succeed")


def scrub(res):
Expand Down Expand Up @@ -202,43 +210,102 @@ async def get_legacy_fixture(device):

async def get_smart_fixture(device: SmartDevice):
"""Get fixture for new TAPO style protocol."""
items = [
Call(module="component_nego", method="component_nego"),
Call(module="device_info", method="get_device_info"),
Call(module="device_usage", method="get_device_usage"),
Call(module="device_time", method="get_device_time"),
Call(module="energy_usage", method="get_energy_usage"),
Call(module="current_power", method="get_current_power"),
Call(module="temp_humidity_records", method="get_temp_humidity_records"),
Call(module="child_device_list", method="get_child_device_list"),
Call(
extra_test_calls = [
SmartCall(
module="device_usage",
request=SmartRequest("get_device_usage"),
should_succeed=True,
),
SmartCall(
module="temp_humidity_records",
request=SmartRequest("get_temp_humidity_records"),
should_succeed=False,
),
SmartCall(
module="child_device_list",
request=SmartRequest("get_child_device_list"),
should_succeed=False,
),
SmartCall(
module="trigger_logs",
method={"get_trigger_logs": {"page_size": 5, "start_id": 0}},
request=SmartRequest("get_trigger_logs", GetTriggerLogsParams(5, 0)),
should_succeed=False,
),
Call(
SmartCall(
module="child_device_component_list",
method="get_child_device_component_list",
request=SmartRequest("get_child_device_component_list"),
should_succeed=False,
),
]

successes = []

for test_call in items:
try:
click.echo("Testing component_nego call ..", nl=False)
component_info_response = await device.protocol.query(
SmartRequest("component_nego").to_dict()
)
click.echo(click.style("OK", fg="green"))
successes.append(
SmartCall(
module="component_nego",
request=SmartRequest("component_nego"),
should_succeed=True,
)
)
except Exception:
click.echo(
click.style("CRITICAL FAIL on component_nego call, exiting", fg="red")
)
return

test_calls = []
should_succeed = []

component_info: Optional[SmartComponentInfo]
for item in component_info_response["component_nego"]["component_list"]:
component_id = item["id"]
if component_info := COMPONENT_INFO_MAP.get(component_id):
params = (
component_info.param_class() if component_info.param_class else None
)
request = SmartRequest(component_info.get_method_name, params)
test_call = SmartCall(
module=component_id, request=request, should_succeed=True
)
test_calls.append(test_call)
should_succeed.append(test_call)
else:
click.echo(f"Skipping {component_id}..", nl=False)
click.echo(click.style("UNSUPPORTED", fg="yellow"))

test_calls.extend(extra_test_calls)

for test_call in test_calls:
click.echo(f"Testing {test_call.module}..", nl=False)
try:
click.echo(f"Testing {test_call}..", nl=False)
response = await device.protocol.query(test_call.method)
response = await device.protocol.query(test_call.request.to_dict())

except Exception as ex:
click.echo(click.style(f"FAIL {ex}", fg="red"))
if not test_call.should_succeed and "UNKNOWN_METHOD_ERROR" in ex.args[0]:
click.echo(click.style("FAIL - EXPECTED", fg="green"))
else:
click.echo(click.style(f"FAIL {ex}", fg="red"))
else:
if not response:
click.echo(click.style("FAIL not suported", fg="red"))
click.echo(click.style("FAIL no response", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
if not test_call.should_succeed:
click.echo(click.style("OK - EXPECTED FAIL", fg="red"))
else:
click.echo(click.style("OK", fg="green"))
successes.append(test_call)

requests = []
for succ in successes:
requests.append({"method": succ.method})
requests.append(
{"method": succ.request.method_name, "params": succ.request.params}
)

final_query = {"multipleRequest": {"requests": requests}}

Expand All @@ -251,7 +318,7 @@ async def get_smart_fixture(device: SmartDevice):
)
)
final = {}
for response in responses["responses"]:
for response in responses["multipleRequest"]["responses"]:
final[response["method"]] = response["result"]

# Need to recreate a DiscoverResult here because we don't want the aliases
Expand Down
6 changes: 6 additions & 0 deletions kasa/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def _strip_rich_formatting(echo_func):
@wraps(echo_func)
def wrapper(message=None, *args, **kwargs):
if message is not None:
if not isinstance(message, str):
message = repr(message)
message = re.sub(r"\[/?.+?]", "", message)
echo_func(message, *args, **kwargs)

Expand Down Expand Up @@ -479,6 +481,10 @@ async def state(dev: SmartDevice):
else:
echo(f"\t[red]- {module}[/red]")

echo("\n\t[bold]== Features ==[/bold]")
for feature in dev.features:
echo(f"\t[green]+ {feature}[/green]")

return dev.internal_state


Expand Down
Loading
Loading