From 69e01b44d4e692d7109c39df77052273686d1ce8 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Fri, 28 Jun 2024 11:13:13 +0200 Subject: [PATCH] sync --- chargeamps/local.py | 161 +++++++++++++++++++++++++++++--------------- tests/test_local.py | 38 ++++++++--- 2 files changed, 135 insertions(+), 64 deletions(-) diff --git a/chargeamps/local.py b/chargeamps/local.py index 388edc9..bb36400 100644 --- a/chargeamps/local.py +++ b/chargeamps/local.py @@ -1,10 +1,12 @@ """Charge-Amps Local API Client""" +import logging import uuid +from datetime import datetime from typing import Self from urllib.parse import urlparse -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field from .models import ( ChargePointConnectorMode, @@ -41,18 +43,47 @@ def from_url( ) +class LocalChargePointConnectorStatus(BaseModel): + power: int | None = None + energy: int | None = None + status_text: str | None = None + + class LocalChargePointStatus(BaseModel): - chargepoint: LocalChargePoint + connector_settings: list[ChargePointConnectorSettings] + connector_status: list[LocalChargePointConnectorStatus] + charge_point_id: str | None = None - connector_settings: list[ChargePointConnectorSettings] | None = None dimmer: int = 0 light: bool = False + ssid: str | None = None + signal_strength: int | None = None + status_text: str | None = None + last_updated: datetime | None = None + + def process_message(self, message: str): + parameters = message.split(",") + + match preamble := int(parameters[0]): + case 6: + assert len(parameters) == 3 + self.charge_point_id = parameters[1] + self.last_updated = parameters[2] + case 7: + assert len(parameters) == 4 + self.ssid = parameters[1] + self.signal_strength = int(parameters[2]) + pass + case 8: + assert len(parameters) == 7 + self.status_text = parameters[6] + case _: + logging.warning("Unknown preamble %d", preamble) class LocalChargePointStatusHalo(LocalChargePointStatus): - @staticmethod - def get_default_connectors() -> list[ChargePointConnectorSettings]: - return [ + connector_settings: list[ChargePointConnectorSettings] = Field( + default=[ ChargePointConnectorSettings( charge_point_id="", connector_id=0, @@ -70,26 +101,41 @@ def get_default_connectors() -> list[ChargePointConnectorSettings]: max_current=None, ), ] + ) + connector_status: list[LocalChargePointConnectorStatus] = Field( + default=[ + LocalChargePointConnectorStatus(), + LocalChargePointConnectorStatus(), + ] + ) + + def process_message(self, message: str): + parameters = message.split(",") + + match int(parameters[0]): + case 1: + assert len(parameters) == 8 + values = [int(x) for x in parameters.split(",")] - def update_settings(self, message: str): - parameters = [int(x) for x in message.split(",")] + self.connector_settings[0].mode = "On" if values[1] else "Off" + self.connector_settings[1].mode = "On" if values[2] else "Off" + self.connector_settings[0].rfid_lock = bool(values[3]) + self.light = bool(values[4]) + self.connector_settings[0].max_current = values[5] // 10 + self.dimmer = bool(values[6]) + assert values[7] == 4 - assert parameters[0] == 1 - assert len(parameters) == 8 + case 108: + assert len(parameters) == 7 + # "108,5698,11853,12212,11936,2,Charging", - self.connector_settings[0].mode = "On" if parameters[1] else "Off" - self.connector_settings[1].mode = "On" if parameters[2] else "Off" - self.connector_settings[0].rfid_lock = bool(parameters[3]) - self.light = bool(parameters[4]) - self.connector_settings[0].max_current = parameters[5] // 10 - self.dimmer = bool(parameters[6]) - assert parameters[7] == 4 + case _: + return super().process_message(message) class LocalChargePointStatusAura(LocalChargePointStatus): - @staticmethod - def get_default_connectors() -> list[ChargePointConnectorSettings]: - return [ + connector_settings: list[ChargePointConnectorSettings] = Field( + default=[ ChargePointConnectorSettings( charge_point_id="", connector_id=0, @@ -107,44 +153,49 @@ def get_default_connectors() -> list[ChargePointConnectorSettings]: max_current=None, ), ] + ) + connector_status: list[LocalChargePointConnectorStatus] = Field( + default=[ + LocalChargePointConnectorStatus(), + LocalChargePointConnectorStatus(), + ] + ) + + def process_message(self, message: str): + parameters = message.split(",") + + match int(parameters[0]): + case 1: + assert len(parameters) == 12 + values = [int(x) for x in parameters.split(",")] - def update_settings(self, message: str): - parameters = [int(x) for x in message.split(",")] + self.connector_settings[0].mode = "On" if values[1] else "Off" + self.connector_settings[0].cable_lock = bool(values[2]) + self.connector_settings[0].rfid_lock = bool(values[3]) + self.connector_settings[0].max_current = values[4] // 10 + assert values[5] == 4 + self.connector_settings[1].mode = "On" if values[6] else "Off" + self.connector_settings[1].cable_lock = bool(values[7]) + self.connector_settings[1].rfid_lock = bool(values[8]) + self.connector_settings[1].max_current = values[9] // 10 + assert values[10] == 4 + self.dimmer = values[11] - assert parameters[0] == 101 - assert len(parameters) == 12 + case 108: + assert len(parameters) == 7 + # "108,5698,11853,12212,11936,2,Charging", - self.connector_settings[0].mode = "On" if parameters[1] else "Off" - self.connector_settings[0].cable_lock = bool(parameters[2]) - self.connector_settings[0].rfid_lock = bool(parameters[3]) - self.connector_settings[0].max_current = parameters[4] // 10 - assert parameters[5] == 4 - self.connector_settings[1].mode = "On" if parameters[6] else "Off" - self.connector_settings[1].cable_lock = bool(parameters[7]) - self.connector_settings[1].rfid_lock = bool(parameters[8]) - self.connector_settings[1].max_current = parameters[9] // 10 - assert parameters[10] == 4 - self.dimmer = parameters[11] + case _: + return super().process_message(message) class ChargeAmpsLocalClient: # (ChargeAmpsClient): - def __init__(self, chargepoints: list[LocalChargePoint]) -> None: - self.chargepoints: list[LocalChargePoint] = chargepoints - self.status: list[LocalChargePointStatus] = [] - - for cp in self.chargepoints: - match cp.type: - case ChargePointType.HALO: - self.status.append( - LocalChargePointStatusHalo( - chargepoint=cp, - connector_settings=LocalChargePointStatusHalo.get_default_connectors(), - ) - ) - case ChargePointType.AURA: - self.status.append( - LocalChargePointStatusAura( - chargepoint=cp, - connector_settings=LocalChargePointStatusAura.get_default_connectors(), - ) - ) + def __init__(self, chargepoint: LocalChargePoint) -> None: + self.chargepoint = chargepoint + + match self.chargepoint.type: + case ChargePointType.HALO: + self.status = LocalChargePointStatusHalo() + + case ChargePointType.AURA: + self.status = LocalChargePointStatusAura() diff --git a/tests/test_local.py b/tests/test_local.py index cab0423..4c5de5d 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -1,17 +1,37 @@ from chargeamps.local import ChargeAmpsLocalClient, ChargePointType, LocalChargePoint -SETTINGS_AURA = "101,1,1,0,160,4,1,1,0,160,4,100" -SETTINGS_HALO = "1,1,1,0,0,160,0,4" +MESSAGES_HALO = [ + "1,1,1,0,0,160,0,4", + "108,5698,11853,12212,11936,2,Charging", + "108,8269,12104,12218,12157,24,Charging", + "108,8330,12187,12275,12157,47,Charging", + "108,8308,12313,12287,12204,71,Charging", + "108,8334,12265,12271,12175,94,Charging", + "108,8290,12299,12087,12018,117,Charging", + "108,8308,12247,12126,12257,142,Charging", +] +MESSAGES_AURA = ["101,1,1,0,160,4,1,1,0,160,4,100"] -def test_local(): - aura = LocalChargePoint.from_url( - "ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA - ) +def test_local_halo(): halo = LocalChargePoint.from_url( "ws://127.0.0.1:8002", pin="12345678", type=ChargePointType.HALO ) - client = ChargeAmpsLocalClient(chargepoints=[aura, halo]) + client = ChargeAmpsLocalClient(chargepoint=halo) + + for message in MESSAGES_AURA: + client.status.process_message(message) + + print(client.status) + + +def test_local_aura(): + aura = LocalChargePoint.from_url( + "ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA + ) + client = ChargeAmpsLocalClient(chargepoint=aura) + + for message in MESSAGES_AURA: + client.status.process_message(message) - client.status[0].update_settings(SETTINGS_AURA) - client.status[1].update_settings(SETTINGS_HALO) + print(client.status)