diff --git a/examples/wsmock.py b/examples/wsmock.py new file mode 100644 index 0000000..52dae95 --- /dev/null +++ b/examples/wsmock.py @@ -0,0 +1,101 @@ +""" +Chargeamps Websocket API Simulator +""" + +import re +import asyncio +import aiohttp.web +from datetime import datetime, timezone + +RECEIVE_TIMEOUT = 1 + + +class ChargePoint: + def __init__(self): + self.pin = "123456" + self.serial = "1908000575A" + self.url = "ws://127.0.0.1:8080/" + self.connected = False + + async def send_status(self, ws) -> None: + """Send chargepoint status""" + timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + statuses = [ + "7,iot,-68,1", + f"6,{self.serial},{timestamp}", + f"14,0,{self.url}", + "8,0,-1,-1,-1,0,Suspended EV", + "108,5698,11853,12212,11936,2,Charging", + ] + for status in statuses: + await ws.send_str(status) + + async def send_settings(self, ws) -> str: + """Send chargepoint settings""" + settings = [ + "preamble = 101", + "connectorOneEnable", + "cableLockOne", + "rfidLockOne", + "currentOne * 10", + "4", + "connectorTwoEnable", + "cableLockTwo", + "rfidLockTwo", + "currentTwo * 10", + "4", + "dimmer", + ] + settings_str = ",".join(settings) + await ws.send_str(f"104,[{settings_str}]") + + +async def websocket_handler(request): + print("Websocket connection starting") + + ws = aiohttp.web.WebSocketResponse(receive_timeout=RECEIVE_TIMEOUT) + await ws.prepare(request) + + print("Websocket connection ready") + + chargepoint = ChargePoint() + + while True: + try: + print("Waiting for message") + async for msg in ws: + print(msg) + if msg.type == aiohttp.WSMsgType.TEXT: + print(msg.data) + if match := re.match(r"^5,(\d+)$", msg.data): + if match.group(1) == chargepoint.pin: + print("Connected") + chargepoint.connected = True + else: + print("Invalid PIN") + elif re.match(r"^\?,104$", msg.data): + print("Get settings") + await chargepoint.send_settings(ws) + elif msg.data == "close": + await ws.close() + else: + print(f"Unknown command: {msg.data}") + except asyncio.TimeoutError as exc: + if chargepoint.connected: + print("Sending status") + await chargepoint.send_status(ws) + except RuntimeError as exc: + break + + print("Websocket connection closed") + return ws + + +def main() -> None: + app = aiohttp.web.Application() + app.router.add_route("GET", "/", websocket_handler) + aiohttp.web.run_app(app, host="127.0.0.1", port=8080) + + +if __name__ == "__main__": + main()