Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Jun 27, 2024
1 parent 0aa10c0 commit 4e7626f
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions examples/wsmock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Chargeamps Websocket API Simulator
"""

import re
import asyncio
import aiohttp.web
from datetime import datetime, timezone

RECEIVE_TIMEOUT = 1


class ChargePoint:
def __init__(self):
self.pin = "123456"
self.serial = "1908000575A"
self.url = "ws://127.0.0.1:8080/"
self.connected = False

async def send_status(self, ws) -> None:
"""Send chargepoint status"""
timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
statuses = [
"7,iot,-68,1",
f"6,{self.serial},{timestamp}",
f"14,0,{self.url}",
"8,0,-1,-1,-1,0,Suspended EV",
"108,5698,11853,12212,11936,2,Charging",
]
for status in statuses:
await ws.send_str(status)

async def send_settings(self, ws) -> str:
"""Send chargepoint settings"""
settings = [
"preamble = 101",
"connectorOneEnable",
"cableLockOne",
"rfidLockOne",
"currentOne * 10",
"4",
"connectorTwoEnable",
"cableLockTwo",
"rfidLockTwo",
"currentTwo * 10",
"4",
"dimmer",
]
settings_str = ",".join(settings)
await ws.send_str(f"104,[{settings_str}]")


async def websocket_handler(request):
print("Websocket connection starting")

ws = aiohttp.web.WebSocketResponse(receive_timeout=RECEIVE_TIMEOUT)
await ws.prepare(request)

print("Websocket connection ready")

chargepoint = ChargePoint()

while True:
try:
print("Waiting for message")
async for msg in ws:
print(msg)
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
if match := re.match(r"^5,(\d+)$", msg.data):
if match.group(1) == chargepoint.pin:
print("Connected")
chargepoint.connected = True
else:
print("Invalid PIN")
elif re.match(r"^\?,104$", msg.data):
print("Get settings")
await chargepoint.send_settings(ws)
elif msg.data == "close":
await ws.close()
else:
print(f"Unknown command: {msg.data}")
except asyncio.TimeoutError as exc:
if chargepoint.connected:
print("Sending status")
await chargepoint.send_status(ws)
except RuntimeError as exc:
break

print("Websocket connection closed")
return ws


def main() -> None:
app = aiohttp.web.Application()
app.router.add_route("GET", "/", websocket_handler)
aiohttp.web.run_app(app, host="127.0.0.1", port=8080)


if __name__ == "__main__":
main()

0 comments on commit 4e7626f

Please sign in to comment.