diff --git a/src/rove_control_board/api/api.h b/src/rove_control_board/api/api.h index d9feee9..91b06d4 100644 --- a/src/rove_control_board/api/api.h +++ b/src/rove_control_board/api/api.h @@ -4,6 +4,8 @@ */ #include +// --- ENUMS --- +// --- STRUCTS --- struct Void { euint8_t pad0; @@ -19,16 +21,31 @@ static_assert(sizeof(Vector2D) == 8); struct State { - euint8_t state; + eboolean_t state; }; static_assert(sizeof(State) == 1); struct Status { - euint32_t statusCode; + euint8_t statusCode; }; -static_assert(sizeof(Status) == 4); +static_assert(sizeof(Status) == 1); +struct UInt8 +{ + euint8_t value; +}; +static_assert(sizeof(UInt8) == 1); + +struct RGB +{ + efloat_t r; + efloat_t g; + efloat_t b; +}; +static_assert(sizeof(RGB) == 12); + +// --- COMMANDS --- Status ledOn(Void); static_assert((sizeof(Void)+1) == 2); @@ -38,15 +55,19 @@ static_assert((sizeof(Void)+1) == 2); Status setLedState(State); static_assert((sizeof(State)+1) == 2); -State loopback(State); -static_assert((sizeof(State)+1) == 2); +UInt8 loopback(UInt8); +static_assert((sizeof(UInt8)+1) == 2); + +State patate(Status); +static_assert((sizeof(Status)+1) == 2); BaseFunction_ptr commands[] = { new Function(&ledOn), new Function(&ledOff), new Function(&setLedState), - new Function(&loopback), + new Function(&loopback), + new Function(&patate), }; -#define COMMANDS_COUNT 4 -#define MAX_DECODED_SIZE 9 -#define MAX_ENCODED_SIZE 13 +#define COMMANDS_COUNT 5 +#define MAX_DECODED_SIZE 13 +#define MAX_ENCODED_SIZE 21 diff --git a/src/rove_control_board/rove_control_board/api.py b/src/rove_control_board/rove_control_board/api.py index 9e5e1be..30293a6 100644 --- a/src/rove_control_board/rove_control_board/api.py +++ b/src/rove_control_board/rove_control_board/api.py @@ -1,29 +1,51 @@ +from enum import Enum from typing import Callable, Optional from rove_control_board.capra_micro_comm import BinaryData, SerialCommandManager, Void, CommandManager manager = SerialCommandManager() +@manager.struct('ff') class Vector2D(BinaryData): def __init__(self, x:float=0, y:float=0): - super().__init__('ff', x=x, y=y) + super().__init__(x=x, y=y) + self.x:float + self.y:float +@manager.struct('?') class State(BinaryData): def __init__(self, state:bool=False): - super().__init__('B', state=state) + super().__init__(state=state) + self.state:bool +@manager.struct('B') class Status(BinaryData): - def __init__(self, statusCode:int=0): - super().__init__('I', statusCode=statusCode) + def __init__(self, statusCode:int = 0): + super().__init__(statusCode=statusCode) + self.statusCode:int + +@manager.struct('B') +class UInt8(BinaryData): + def __init__(self, value:int = 0): + super().__init__(value=value) + self.value:int + +@manager.struct('fff') +class RGB(BinaryData): + def __init__(self, r:float=0,g:float=0,b:float=0): + super().__init__(r=r,g=g,b=b) + self.r:float + self.g:float + self.b:float -@manager.binaryFunction(Void, Status) +@manager.command(Void, Status) def ledOn(): print("on") -@manager.binaryFunction(Void, Status) +@manager.command(Void, Status) def ledOff(): print("off") -@manager.binaryFunction(State, Status) +@manager.command(State, Status) def setLedState(state:State): print('on' if state.state else 'off') @@ -34,14 +56,18 @@ def ledStatus(status:Status): print(f"code: {status.statusCode}") -@manager.binaryFunction(State, State) -def loopback(state:State) -> State: +@manager.command(UInt8, UInt8) +def loopback(state:UInt8) -> UInt8: pass @loopback.preCall -def preLoop(state:State): - print(f"Sending {state.state}") +def preLoop(state:UInt8): + print(f"Sending {state.value}") @loopback.postCall -def postLoop(state:State): - print(f"Recieving {state.state}") \ No newline at end of file +def postLoop(state:UInt8): + print(f"Recieving {state.value}") + +@manager.command(Status, State) +def patate(s:Status): + pass \ No newline at end of file diff --git a/src/rove_control_board/rove_control_board/capra_micro_comm/__init__.py b/src/rove_control_board/rove_control_board/capra_micro_comm/__init__.py index 7546bab..4621afd 100644 --- a/src/rove_control_board/rove_control_board/capra_micro_comm/__init__.py +++ b/src/rove_control_board/rove_control_board/capra_micro_comm/__init__.py @@ -1,10 +1,11 @@ from __future__ import annotations import base64 +from enum import Enum import functools from io import IOBase, RawIOBase import struct from textwrap import dedent -from typing import Callable, Generic, List, NoReturn, Type, TypeVar, Union +from typing import Callable, ForwardRef, Generic, List, NoReturn, Type, TypeVar, Union, ClassVar from serial import Serial @@ -27,16 +28,81 @@ 'f' : 'efloat_t', } +SIZE_MAP = { + 'x' : 1, + 'c' : 1, + 'b' : 1, + 'B' : 1, + '?' : 1, + 'h' : 2, + 'H' : 2, + 'i' : 4, + 'I' : 4, + 'l' : 4, + 'L' : 4, + 'q' : 8, + 'Q' : 8, + 'f' : 4, +} + class BinaryData: - def __init__(self, fmt:str, **values): - self._fmt = fmt - self._keys = list(values.keys()) + # _fmt = '' + # _keys:List[str] = [] + # _values:List[Union[BinaryData, BinaryData]] = [] + # _len = 0 + def __init__(self, **values): + # Set instance attributes + self._keys = values.keys() + self._values = values.values() self.__dict__.update(values) + + def __len__(self): + # return struct.calcsize('<'+self._fmt) + return self._len def unpack(self, buff:bytes): - for k, v in zip(self._keys, struct.unpack('<'+self._fmt, buff)): - self.__dict__[k] = v - + # Unpack data + b = struct.unpack('<'+self._fmt, buff) + # Buffer index + idx = 0 + # Format index + fidx = 0 + for k, v in zip(self.__class__._keys, self.__class__._values): + if isinstance(v, (int, float, bool)): + # Set value + self.__dict__[k] = b[fidx] + + # Find size in bytes and move buffer index accordingly + idx+= SIZE_MAP[self.__class__._fmt[fidx]] + + # Base type taking one format space + fidx+=1 + elif isinstance(v, BinaryData): + # Subclass of BinaryData + # Instanciate + inst = v.__class__() + + # Unpack + inst.unpack(buff[idx:idx+len(inst)]) + + # Assign value + self.__dict__[k] = inst + + # Move buffer index by the byte length + idx+=len(inst) + + # Move the format index by the values count + fidx+=len(v.__class__._values) + elif isinstance(v, Enum) and hasattr(v.__class__, '_fmt'): + # Enum that contains formatting inromation + self.__dict__[k] = v.__class__(b[fidx]) + + # Find size in bytes and move buffer index accordingly + idx+= SIZE_MAP[self.__class__._fmt[fidx]] + + # Base type taking one format space + fidx+=1 + @property def values(self): v = [] @@ -48,17 +114,18 @@ def values(self): def __len__(self): return struct.calcsize(self._fmt) - def parse(self): + def parseCls(self): + cls = self.__class__ i = 0 pad = 0 - s = f'struct {self.__class__.__name__}' + '\n{\n' - for f in self._fmt: + s = f'struct {cls.__name__}' + '\n{\n' + for f in cls._fmt: l = ' ' + TYPE_MAP[f] + ' ' if f == 'x': n = f'pad{pad}' pad += 1 else: - n = self._keys[i] + n = cls._keys[i] l += n + ';\n' @@ -66,11 +133,11 @@ def parse(self): i+=1 s += l s += '};\n' - s += f'static_assert(sizeof({self.__class__.__name__}) == {len(self)});' + s += f'static_assert(sizeof({cls.__name__}) == {self._len});' return s - - +B = TypeVar('B', bound=Type[BinaryData]) +E = TypeVar('E', bound=Type[Enum]) R = TypeVar('R', bound=BinaryData) P = TypeVar('P', bound=BinaryData) @@ -99,7 +166,7 @@ def __call__(self, p:P=None) -> R: # Encode data param = struct.pack(' R: # Read data if self.returnType != Void: res = stream.read(len(self.returnType())) - print(bin(int.from_bytes(res, 'little'))) + # print(bin(int.from_bytes(res, 'little'))) # Handle Void return r = None @@ -169,29 +236,36 @@ def __init__(self, stream:IOBase): def read(self, size: int = -1) -> bytes | None: l = self.stream.readline().rstrip(b'\n') - print(l) + # print(l) return base64.decodebytes(l) def write(self, b) -> int | None: line = base64.encodebytes(b) - print(line) + # print(line) return self.stream.write(line+b'\n') def flush(self) -> None: return self.stream.flush() + + + class CommandManager: + _basetypes:List[BinaryData] = [] def __init__(self): self._id = 0 self._stream = None self._commands:List[CommandHook] = [] + self._structs:List[Type[BinaryData]] = [] + self._enums:List[Type[Enum]] = [] + self._structs = list(CommandManager._basetypes) def _genID(self): res = self._id self._id += 1 return res - def binaryFunction(self, paramType:Type[P], returnType:Type[R]): + def command(self, paramType:Type[P], returnType:Type[R]): id = self._genID() def predicate(func:Callable[[P], R]) -> Union[Callable[[P], R], CommandHook[P,R]]: res = CommandHook(self, id, func, paramType, returnType) @@ -199,7 +273,24 @@ def predicate(func:Callable[[P], R]) -> Union[Callable[[P], R], CommandHook[P,R] return functools.update_wrapper(res, func) return predicate - + + def struct(self, fmt:str): + return _addtype(fmt, self._structs) + + def enum(self, fmt:str = 'I'): + def pred(cls:E): + def parse(): + s = f'enum class {cls.__name__} : {TYPE_MAP[fmt]}\n' + s += '{\n' + for k, v in cls.__members__.items(): + s += f' {k} = {v.value},\n' + s += '};' + return s + cls.parse = parse + cls._fmt = fmt + self._enums.append(cls) + return cls + return pred def __enter__(self) -> IOBase: ... def __exit__(self, exc_type, exc_val, exc_tb): ... @@ -212,13 +303,19 @@ def buildAPI(self) -> str: */ #include - """) + // --- ENUMS ---\n""") + for e in self._enums: + res += e.parse() + '\n\n' + + res += "// --- STRUCTS ---\n" maxSize = 0 - for cls in BinaryData.__subclasses__(): - inst = cls() - res += inst.parse() + '\n\n' - maxSize = max(maxSize, len(inst)) + for cls in self._structs: + v = cls() + res += v.parseCls() + '\n\n' + maxSize = max(maxSize, len(v)) + lst = "BaseFunction_ptr commands[] = {\n" + res += "// --- COMMANDS ---\n" for cmd in self._commands: res += cmd.parse() + '\n' res += cmd.assertParse() + '\n\n' @@ -229,8 +326,45 @@ def buildAPI(self) -> str: encoded = base64.encodebytes(bytes(range(maxSize+1))) res += f"#define MAX_ENCODED_SIZE {len(encoded)}\n" + + return res +def _addtype(fmt:str, lst:List[BinaryData]): + def pred(cls:B): + # Setup class attributes + cls._fmt = '' + inst = cls() + global fidx + fidx = 0 + def pad(): + global fidx + while fidx < len(fmt) and fmt[fidx] == 'x': + cls._fmt += 'x' + fidx+=1 + pad() + for v in inst._values: + if isinstance(v, (int, float, bool)): + cls._fmt += fmt[fidx] + elif isinstance(v, BinaryData): + # Subclass of BinaryData + cls._fmt += v._fmt + elif isinstance(v, Enum) and hasattr(v.__class__, '_fmt'): + # Enum that contains formatting information + cls._fmt += v.__class__._fmt + fidx+=1 + pad() + cls._keys = list(inst._keys) + cls._values = list(inst.values) + cls._len = len(inst) + lst.append(cls) + + return cls + return pred + +def _basetype(fmt:str): + return _addtype(fmt, CommandManager._basetypes) + class SerialCommandManager(CommandManager): def __init__(self, port:str = None, baud:int=9600): @@ -265,9 +399,9 @@ def __exit__(self, *args, **kwargs): self._stream.flush() - +@_basetype('x') class Void(BinaryData): def __init__(self): - BinaryData.__init__(self, 'x') + super().__init__() \ No newline at end of file diff --git a/src/rove_control_board/rove_control_board/control_board_bridge.py b/src/rove_control_board/rove_control_board/control_board_bridge.py index a152d90..efee430 100644 --- a/src/rove_control_board/rove_control_board/control_board_bridge.py +++ b/src/rove_control_board/rove_control_board/control_board_bridge.py @@ -34,17 +34,24 @@ def timerCB(self): self.offset -= 1 return # if self.state: - # api.ledOn() + # s = api.Status(api.StatusCode.ERROR) + # print(s.statusCode) + # print(api.patate(s)) + # # api.ledOn() # else: + # s = api.Status(api.StatusCode.IDLE) + # print(s.statusCode) + # print(api.patate(s)) # api.ledOff() # api.setLedState(api.State(self.state)) - api.loopback(api.State(self.value)) + api.loopback(api.UInt8(self.value)) self.state = (self.state + 1) %2 self.value = (self.value + 1) % 10 def main(args=None): + # print(api.manager.buildAPI()) api.manager.port = DEV api.manager.baud = 9600 api.manager._stream.rts = True