From 720b403022e3e8db23eae30dad7fbc034e495a3b Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Fri, 28 Jun 2024 12:08:01 +0200 Subject: [PATCH] start processing message --- chargeamps/chargeamps2mqtt.py | 40 ++++++++++++++++++++++++----------- chargeamps/local.py | 34 +++++++---------------------- tests/test_local.py | 8 ++----- 3 files changed, 38 insertions(+), 44 deletions(-) diff --git a/chargeamps/chargeamps2mqtt.py b/chargeamps/chargeamps2mqtt.py index cd06602..de8e917 100644 --- a/chargeamps/chargeamps2mqtt.py +++ b/chargeamps/chargeamps2mqtt.py @@ -5,10 +5,14 @@ import aiohttp import aiomqtt +from chargeamps.local import ChargeAmpsLocalClient, LocalChargePoint +from chargeamps.models import ChargePointType + mqtt_reconnect_interval = 30 mqtt_broker = "127.0.0.1" mqtt_commands_topic = "chargeamps/commands" mqtt_results_topic = "chargeamps/results" +mqtt_status_topic = "chargeamps/status" mqtt_commands_queue = Queue(0) mqtt_results_queue = Queue(0) @@ -18,35 +22,44 @@ ws_receive_timeout = 1 -async def mqtt_results(client: aiomqtt.Client): +async def mqtt_results( + mqtt_client: aiomqtt.Client, chargeamps_client: ChargeAmpsLocalClient +): logger = logging.getLogger("mqtt_results") while True: logger.debug("Waiting for results") result = await mqtt_results_queue.get() logger.debug("Got result: %s", result) - await client.publish(topic=mqtt_results_topic, payload=result.encode()) + await mqtt_client.publish(topic=mqtt_results_topic, payload=result.encode()) + chargeamps_client.process_message(result) + await mqtt_client.publish( + topic=mqtt_status_topic, + payload=chargeamps_client.state.model_dump_json().encode(), + ) -async def mqtt_commands(client: aiomqtt.Client): +async def mqtt_commands( + mqtt_client: aiomqtt.Client, chargeamps_client: ChargeAmpsLocalClient +): logger = logging.getLogger("mqtt_commands") - await client.subscribe(mqtt_commands_topic) + await mqtt_client.subscribe(mqtt_commands_topic) logger.info("Subscribed to %s", mqtt_commands_topic) logger.debug("Waiting for messages") - async for message in client.messages: + async for message in mqtt_client.messages: logger.debug("Received command on %s", message.topic) await mqtt_commands_queue.put(message.payload.decode()) -async def mqtt_handler(): +async def mqtt_handler(chargeamps_client: ChargeAmpsLocalClient): logger = logging.getLogger("mqtt_handler") logger.debug("Starting") while True: try: - async with aiomqtt.Client(mqtt_broker) as client: + async with aiomqtt.Client(mqtt_broker) as mqtt_client: logger.info("Connected to %s", mqtt_broker) async with asyncio.TaskGroup() as tg: - tg.create_task(mqtt_commands(client)) - tg.create_task(mqtt_results(client)) + tg.create_task(mqtt_commands(mqtt_client, chargeamps_client)) + tg.create_task(mqtt_results(mqtt_client, chargeamps_client)) logger.debug("Tasks done") except aiomqtt.MqttError: logger.error( @@ -56,7 +69,7 @@ async def mqtt_handler(): await asyncio.sleep(mqtt_reconnect_interval) -async def ws_handler(): +async def ws_handler(ws_server: str): logger = logging.getLogger("ws_handler") logger.debug("Starting") while True: @@ -94,9 +107,12 @@ async def ws_handler(): async def server(): + chargepoint = LocalChargePoint(url=ws_server, type=ChargePointType.HALO) + chargeamps_client = ChargeAmpsLocalClient(chargepoint) + async with asyncio.TaskGroup() as tg: - tg.create_task(mqtt_handler()) - tg.create_task(ws_handler()) + tg.create_task(mqtt_handler(chargeamps_client)) + tg.create_task(ws_handler(ws_server)) def main(): diff --git a/chargeamps/local.py b/chargeamps/local.py index 47c0f4c..7c40974 100644 --- a/chargeamps/local.py +++ b/chargeamps/local.py @@ -3,8 +3,6 @@ import logging import uuid from datetime import datetime -from typing import Self -from urllib.parse import urlparse from pydantic import BaseModel, ConfigDict, Field @@ -19,29 +17,12 @@ class LocalChargePoint(BaseModel): - name: str - host: str - port: int - pin: str + name: str = Field(default_factory=lambda: str(uuid.uuid4)) + url: str type: ChargePointType model_config = ConfigDict(frozen=True) - @classmethod - def from_url( - cls, url: str, pin: str, type: ChargePointType, name: str | None = None - ) -> Self: - res = urlparse(url) - if res.scheme != "ws": - raise ValueError("Only ws URLs supported") - return cls( - name=name or str(uuid.uuid4()), - host=res.hostname or DEFAULT_HOST, - port=res.port or DEFAULT_PORT, - pin=pin, - type=type, - ) - class LocalChargePointConnectorStatus(BaseModel): power: int | None = None @@ -49,7 +30,7 @@ class LocalChargePointConnectorStatus(BaseModel): status_text: str | None = None -class LocalChargePointBase(BaseModel): +class LocalChargePointStateBase(BaseModel): connector_settings: list[ChargePointConnectorSettings] connector_status: list[LocalChargePointConnectorStatus] @@ -81,7 +62,7 @@ def process_message(self, message: str): logging.warning("Unknown preamble %d", preamble) -class LocalChargePointStatusHalo(LocalChargePointBase): +class LocalChargePointStateHalo(LocalChargePointStateBase): connector_settings: list[ChargePointConnectorSettings] = Field( default=[ ChargePointConnectorSettings( @@ -133,7 +114,7 @@ def process_message(self, message: str) -> None: return super().process_message(message) -class LocalChargePointStatusAura(LocalChargePointBase): +class LocalChargePointStateAura(LocalChargePointStateBase): connector_settings: list[ChargePointConnectorSettings] = Field( default=[ ChargePointConnectorSettings( @@ -195,10 +176,11 @@ def __init__(self, chargepoint: LocalChargePoint) -> None: match self.chargepoint.type: case ChargePointType.HALO: - self.state = LocalChargePointStatusHalo() + self.state = LocalChargePointStateHalo() case ChargePointType.AURA: - self.state = LocalChargePointStatusAura() + self.state = LocalChargePointStateAura() def process_message(self, message: str) -> None: + logging.debug("Process message: %s", message) self.state.process_message(message) diff --git a/tests/test_local.py b/tests/test_local.py index 6de06ab..816340c 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -14,9 +14,7 @@ def test_local_halo(): - halo = LocalChargePoint.from_url( - "ws://127.0.0.1:8002", pin="12345678", type=ChargePointType.HALO - ) + halo = LocalChargePoint(url="ws://127.0.0.1:8002", type=ChargePointType.HALO) client = ChargeAmpsLocalClient(chargepoint=halo) for message in MESSAGES_AURA: @@ -26,9 +24,7 @@ def test_local_halo(): def test_local_aura(): - aura = LocalChargePoint.from_url( - "ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA - ) + aura = LocalChargePoint(url="ws://127.0.0.1:8002", type=ChargePointType.AURA) client = ChargeAmpsLocalClient(chargepoint=aura) for message in MESSAGES_AURA: