Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Jun 27, 2024
1 parent ec8214a commit 272c607
Showing 1 changed file with 61 additions and 21 deletions.
82 changes: 61 additions & 21 deletions examples/wsmock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
from datetime import datetime, timezone
import string
import random
from enum import Enum

HOST = "127.0.0.1"
PORT = 8080
DEFAULT_PIN = "12345678"
RECEIVE_TIMEOUT = 1


class ChargePointType(Enum):
HALO = 1
AURA = 2


class ChargePoint:
def __init__(self, serial: str | None = None, pin: str | None = None):
self.serial = (
Expand All @@ -23,6 +29,14 @@ def __init__(self, serial: str | None = None, pin: str | None = None):
self.pin = pin or DEFAULT_PIN
self.url = f"ws://{HOST}:{PORT}/"
self.connected = False
self.periodic_status = False
self.hardware_type = ChargePointType.AURA
self.max_current = [16, 16]
self.enable = [True, True]
self.cable_lock = [False, False]
self.rfid_lock = [False, False]
self.light = True
self.dimmer = 64 # 0-255

async def send_status(self, ws) -> None:
"""Send chargepoint status"""
Expand All @@ -39,21 +53,37 @@ async def send_status(self, ws) -> None:

async def send_settings(self, ws) -> str:
"""Send chargepoint settings"""
settings = [
"101", # preamble
"1", # connector one enable
"1", # connector one cable lock
"0", # connector one rfid lock
"120", # connector one max current (A*10)
"4", # delimiter?
"0", # connector two enable
"1", # connector two cable lock
"0", # connector two rfid lock
"160", # connector two max current (A*10)
"4", # delimiter?
"64", # dimmer (0-255)
]
settings_str = ",".join(settings)
match self.hardware_type:
case ChargePointType.AURA:
settings = [
101, # preamble
int(self.enable[0]), # connector one enable
int(self.cable_lock[0]), # connector one cable lock
int(self.rfid_lock[0]), # connector one rfid lock
self.max_current[0] * 10, # connector one max current (A*10)
4, # delimiter?
int(self.enable[1]), # connector two enable
int(self.cable_lock[1]), # connector two cable lock
int(self.rfid_lock[1]), # connector two rfid lock
self.max_current[1] * 10, # connector two max current (A*10)
4, # delimiter?
self.dimmer, # dimmer (0-255)
]
case ChargePointType.HALO:
settings = [
1, # preamble
int(self.enable[0]), # connector enable
int(self.enable[1]), # schuko enable
int(self.rfid_lock[0]), # rfid lock
int(self.light), # light on,
self.max_current[0] * 10, # max current (A*10)
self.dimmer, # dimmer (0-255)
4, # delimiter?
]
case _:
settings = []

settings_str = ",".join([str(x) for x in settings])
await ws.send_str(f"{settings_str}")


Expand All @@ -74,23 +104,33 @@ async def websocket_handler(request):
print(msg)
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
if match := re.match(r"^5,(\d+)$", msg.data):
if msg.data == "close":
await ws.close()
elif match := re.match(r"^5,(\d+)$", msg.data):
if match.group(1) == chargepoint.pin:
print("Connected")
chargepoint.connected = True
else:
print("Invalid PIN")
elif re.match(r"^\?,6$", msg.data):
print("Request to get periodic status updates")
chargepoint.periodic_status = True
elif re.match(r"^\?,1$", msg.data):
if chargepoint.hardware_type == ChargePointType.HALO:
print("Get settings (Halo)")
await chargepoint.send_settings(ws)
else:
print("Hardware mismatch")
elif re.match(r"^\?,101$", msg.data):
print("Get settings")
await chargepoint.send_settings(ws)
elif msg.data == "close":
await ws.close()
if chargepoint.hardware_type == ChargePointType.AURA:
print("Get settings (Aura)")
await chargepoint.send_settings(ws)
else:
print("Hardware mismatch")
else:
print(f"Unknown command: {msg.data}")
except asyncio.TimeoutError:
if chargepoint.connected:
if chargepoint.periodic_status:
print("Sending status")
await chargepoint.send_status(ws)
except RuntimeError:
Expand Down

0 comments on commit 272c607

Please sign in to comment.