Skip to content

Commit

Permalink
start processing message
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Jun 28, 2024
1 parent 5e22894 commit 720b403
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 44 deletions.
40 changes: 28 additions & 12 deletions chargeamps/chargeamps2mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import aiohttp
import aiomqtt

from chargeamps.local import ChargeAmpsLocalClient, LocalChargePoint
from chargeamps.models import ChargePointType

mqtt_reconnect_interval = 30
mqtt_broker = "127.0.0.1"
mqtt_commands_topic = "chargeamps/commands"
mqtt_results_topic = "chargeamps/results"
mqtt_status_topic = "chargeamps/status"

mqtt_commands_queue = Queue(0)
mqtt_results_queue = Queue(0)
Expand All @@ -18,35 +22,44 @@
ws_receive_timeout = 1


async def mqtt_results(client: aiomqtt.Client):
async def mqtt_results(
mqtt_client: aiomqtt.Client, chargeamps_client: ChargeAmpsLocalClient
):
logger = logging.getLogger("mqtt_results")
while True:
logger.debug("Waiting for results")
result = await mqtt_results_queue.get()
logger.debug("Got result: %s", result)
await client.publish(topic=mqtt_results_topic, payload=result.encode())
await mqtt_client.publish(topic=mqtt_results_topic, payload=result.encode())
chargeamps_client.process_message(result)
await mqtt_client.publish(
topic=mqtt_status_topic,
payload=chargeamps_client.state.model_dump_json().encode(),
)


async def mqtt_commands(client: aiomqtt.Client):
async def mqtt_commands(
mqtt_client: aiomqtt.Client, chargeamps_client: ChargeAmpsLocalClient
):
logger = logging.getLogger("mqtt_commands")
await client.subscribe(mqtt_commands_topic)
await mqtt_client.subscribe(mqtt_commands_topic)
logger.info("Subscribed to %s", mqtt_commands_topic)
logger.debug("Waiting for messages")
async for message in client.messages:
async for message in mqtt_client.messages:
logger.debug("Received command on %s", message.topic)
await mqtt_commands_queue.put(message.payload.decode())


async def mqtt_handler():
async def mqtt_handler(chargeamps_client: ChargeAmpsLocalClient):
logger = logging.getLogger("mqtt_handler")
logger.debug("Starting")
while True:
try:
async with aiomqtt.Client(mqtt_broker) as client:
async with aiomqtt.Client(mqtt_broker) as mqtt_client:
logger.info("Connected to %s", mqtt_broker)
async with asyncio.TaskGroup() as tg:
tg.create_task(mqtt_commands(client))
tg.create_task(mqtt_results(client))
tg.create_task(mqtt_commands(mqtt_client, chargeamps_client))
tg.create_task(mqtt_results(mqtt_client, chargeamps_client))
logger.debug("Tasks done")
except aiomqtt.MqttError:
logger.error(
Expand All @@ -56,7 +69,7 @@ async def mqtt_handler():
await asyncio.sleep(mqtt_reconnect_interval)


async def ws_handler():
async def ws_handler(ws_server: str):
logger = logging.getLogger("ws_handler")
logger.debug("Starting")
while True:
Expand Down Expand Up @@ -94,9 +107,12 @@ async def ws_handler():


async def server():
chargepoint = LocalChargePoint(url=ws_server, type=ChargePointType.HALO)
chargeamps_client = ChargeAmpsLocalClient(chargepoint)

async with asyncio.TaskGroup() as tg:
tg.create_task(mqtt_handler())
tg.create_task(ws_handler())
tg.create_task(mqtt_handler(chargeamps_client))
tg.create_task(ws_handler(ws_server))


def main():
Expand Down
34 changes: 8 additions & 26 deletions chargeamps/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import logging
import uuid
from datetime import datetime
from typing import Self
from urllib.parse import urlparse

from pydantic import BaseModel, ConfigDict, Field

Expand All @@ -19,37 +17,20 @@


class LocalChargePoint(BaseModel):
name: str
host: str
port: int
pin: str
name: str = Field(default_factory=lambda: str(uuid.uuid4))
url: str
type: ChargePointType

model_config = ConfigDict(frozen=True)

@classmethod
def from_url(
cls, url: str, pin: str, type: ChargePointType, name: str | None = None
) -> Self:
res = urlparse(url)
if res.scheme != "ws":
raise ValueError("Only ws URLs supported")
return cls(
name=name or str(uuid.uuid4()),
host=res.hostname or DEFAULT_HOST,
port=res.port or DEFAULT_PORT,
pin=pin,
type=type,
)


class LocalChargePointConnectorStatus(BaseModel):
power: int | None = None
energy: int | None = None
status_text: str | None = None


class LocalChargePointBase(BaseModel):
class LocalChargePointStateBase(BaseModel):
connector_settings: list[ChargePointConnectorSettings]
connector_status: list[LocalChargePointConnectorStatus]

Expand Down Expand Up @@ -81,7 +62,7 @@ def process_message(self, message: str):
logging.warning("Unknown preamble %d", preamble)


class LocalChargePointStatusHalo(LocalChargePointBase):
class LocalChargePointStateHalo(LocalChargePointStateBase):
connector_settings: list[ChargePointConnectorSettings] = Field(
default=[
ChargePointConnectorSettings(
Expand Down Expand Up @@ -133,7 +114,7 @@ def process_message(self, message: str) -> None:
return super().process_message(message)


class LocalChargePointStatusAura(LocalChargePointBase):
class LocalChargePointStateAura(LocalChargePointStateBase):
connector_settings: list[ChargePointConnectorSettings] = Field(
default=[
ChargePointConnectorSettings(
Expand Down Expand Up @@ -195,10 +176,11 @@ def __init__(self, chargepoint: LocalChargePoint) -> None:

match self.chargepoint.type:
case ChargePointType.HALO:
self.state = LocalChargePointStatusHalo()
self.state = LocalChargePointStateHalo()

case ChargePointType.AURA:
self.state = LocalChargePointStatusAura()
self.state = LocalChargePointStateAura()

def process_message(self, message: str) -> None:
logging.debug("Process message: %s", message)
self.state.process_message(message)
8 changes: 2 additions & 6 deletions tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@


def test_local_halo():
halo = LocalChargePoint.from_url(
"ws://127.0.0.1:8002", pin="12345678", type=ChargePointType.HALO
)
halo = LocalChargePoint(url="ws://127.0.0.1:8002", type=ChargePointType.HALO)
client = ChargeAmpsLocalClient(chargepoint=halo)

for message in MESSAGES_AURA:
Expand All @@ -26,9 +24,7 @@ def test_local_halo():


def test_local_aura():
aura = LocalChargePoint.from_url(
"ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA
)
aura = LocalChargePoint(url="ws://127.0.0.1:8002", type=ChargePointType.AURA)
client = ChargeAmpsLocalClient(chargepoint=aura)

for message in MESSAGES_AURA:
Expand Down

0 comments on commit 720b403

Please sign in to comment.