From 6d71dc2324620480ed81e35ac580122c667b6e1c Mon Sep 17 00:00:00 2001 From: Christian Date: Sat, 2 Nov 2024 14:33:12 +0100 Subject: [PATCH] Add (de)serialization for dynamic arrays, fixed and dynamic strings --- README.md | 7 - src/someipy/serialization.py | 533 ++++++++++++++++++++++++++++++++++- tests/test_serialization.py | 255 +++++++++++++++++ 3 files changed, 783 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f4c7afa..e3fa76a 100644 --- a/README.md +++ b/README.md @@ -63,10 +63,3 @@ The library is still under development. The current major limitations and deviat - Configuration and load balancing options in SOME/IP SD messages are not supported. - TTL of Service Discovery entries is not checked yet. - The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. The Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically. - -### De-/Serialization - -- Only fixed size arrays are supported. Dynamically sized arrays are not supported. -- Optional length fields for SOME/IP arrays are not supported. -- Strings are not supported yet. -- Configuration of padding is not supported yet. diff --git a/src/someipy/serialization.py b/src/someipy/serialization.py index b244699..6b46649 100644 --- a/src/someipy/serialization.py +++ b/src/someipy/serialization.py @@ -13,6 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import codecs import struct from dataclasses import dataclass @@ -76,6 +77,7 @@ def deserialize(self, payload): None """ (self.value,) = struct.unpack(">B", payload) + return self @dataclass @@ -509,8 +511,16 @@ def deserialize(self, payload: bytes): pos = 0 for key, value in ordered_items: - type_length = len(value) - self.__dict__[key].deserialize(payload[pos : (pos + type_length)]) + + if hasattr(value, "_has_dynamic_size") and value._has_dynamic_size == True: + # If the length is not known before deserialization, first deserialize using the + # remaining payload and then calculate the length + self.__dict__[key].deserialize(payload[pos:]) + type_length = len(self.__dict__[key]) + else: + # If the length is known beforehand, only deserialize the part of the payload needed + type_length = len(value) + self.__dict__[key].deserialize(payload[pos : (pos + type_length)]) pos += type_length return self @@ -523,8 +533,6 @@ class SomeIpFixedSizeArray(Generic[T]): A datatype for a SOME/IP fixed size array. This type shall be used with someipy datatypes that support serialization and deserialization. """ - data: List[T] - def __init__(self, class_reference: Type[T], size: int): """ Initializes a new instance of the SomeIpFixedSizeArray class. @@ -536,7 +544,7 @@ def __init__(self, class_reference: Type[T], size: int): Returns: None """ - self.data = [class_reference() for i in range(size)] + self.data: List[T] = [class_reference() for i in range(size)] def __eq__(self, other): """ @@ -615,3 +623,518 @@ def deserialize(self, payload: bytes): payload[(i * single_element_length) : ((i + 1) * single_element_length)] ) return self + + +class SomeIpDynamicSizeArray(Generic[T]): + """ + A datatype for a SOME/IP dynamically sized array. This type shall be used in someipy datatypes that support serialization and deserialization. + """ + + _has_dynamic_size = True + + def __init__(self, class_reference: Type[T]): + """ + Initializes a new instance of the SomeIpDynamicSizeArray class. + + Parameters: + class_reference (Type[T]): The type of elements to be stored in the array. + + Returns: + None + """ + self._data: List[T] = [] + self._length_field_length = 4 # The length of the length field in bytes. It can be either 0 (no length field), 1, 2 or 4 bytes. + self._single_element_length = len(class_reference()) + self._class_reference = class_reference + + @property + def data(self) -> List[T]: + return self._data + + @data.setter + def data(self, value: List[T]): + self._data = value + + @property + def length_field_length(self): + return self._length_field_length + + @length_field_length.setter + def length_field_length(self, value): + if value in [0, 1, 2, 4]: + self._length_field_length = value + else: + raise ValueError("Length field length must be 0, 1, 2 or 4 bytes") + + def __eq__(self, other) -> bool: + """ + Compare two SomeIpDynamicSizeArray objects for equality. + + This method compares the length (number of elements) of the current array and the other array to determine if they are equal. It also compares the length of the bytes representation of the arrays. Finally, it compares the content of all elements in the arrays to check if they are equal. + + Parameters: + other (SomeIpDynamicSizeArray): The object to compare with the current object. + + Returns: + bool: True if the arrays are equal, False otherwise. + """ + if isinstance(other, SomeIpDynamicSizeArray): + # Compare if the length (number of elements) of other array is the same + if len(self.data) != len(other.data): + return False + + # Compare if bytes length of other is the same + if len(self) != len(other): + return False + + if self.length_field_length != other.length_field_length: + return False + + # Compare if the content of all elements is the same + for i in range(len(self.data)): + if self.data[i] != other.data[i]: + return False + return True + + return False + + def __len__(self) -> int: + """ + Return the length of the object in bytes. + + This method calculates the length of the object based on the number of elements in the `data` list and the length of each element. If the `data` list is empty, it returns 0. Otherwise, it returns the product of the length of the `data` list and the length of the first element in the `data` list. + + Returns: + int: The length of the object. + """ + return self.length_field_length + len(self.data) * self._single_element_length + + def serialize(self) -> bytes: + """ + Serialize the object into bytes by iterating over its attributes, excluding those starting with double underscores or underscores. + For each attribute, it calls the `serialize` method of the attribute and appends the returned bytes to the output. + + Returns: + bytes: The serialized representation of the object as bytes. + """ + result = bytes() + length_data_in_bytes = len(self.data) * self._single_element_length + if self._length_field_length == 1: + result += struct.pack(">B", length_data_in_bytes) + elif self._length_field_length == 2: + result += struct.pack(">H", length_data_in_bytes) + elif self._length_field_length == 4: + result += struct.pack(">L", length_data_in_bytes) + + for element in self.data: + result += element.serialize() + return result + + def deserialize(self, payload: bytes): + """ + Deserialize the payload into the object. + + Args: + payload (bytes): The payload to be deserialized. + + Returns: + self: The deserialized object. + + This method deserializes the payload into the object. It iterates over the `data` list and for each element, it calls the `deserialize` method of that element, passing a slice of the payload corresponding to the element's length. The deserialized values are assigned back to the corresponding elements of the `data` list. If the `data` list is empty, the method returns immediately. Finally, the deserialized object is returned. + """ + + self.data = [] + length = 0 + + if self._length_field_length == 1: + (length,) = struct.unpack(">B", payload[:1]) + elif self._length_field_length == 2: + (length,) = struct.unpack(">H", payload[:2]) + elif self._length_field_length == 4: + (length,) = struct.unpack(">L", payload[:4]) + else: + return + + number_of_elements = length / self._single_element_length + for i in range(int(number_of_elements)): + start_idx = (i * self._single_element_length) + self._length_field_length + end_idx = start_idx + self._single_element_length + next_element = self._class_reference().deserialize( + payload[start_idx:end_idx] + ) + self.data.append(next_element) + + return self + + +class SomeIpFixedSizeString(Generic[T]): + """ + A datatype for a SOME/IP fixed size string. + """ + + def __init__(self, size: int, value: str = ""): + """ + Initializes a new instance of the SomeIpFixedSizeString class. + + Parameters: + size (int): The size of the string including the terminating '\0' character. + + Returns: + None + """ + self._size = size + self._data = value + self._encoding = "utf-8" + + @property + def size(self) -> int: + return self._size + + @property + def data(self) -> str: + return self._data + + @data.setter + def data(self, value: str): + if ( + len(value) + ) > self._size - 1: # -1 since the terminating '\0' character is included in the size + raise ValueError( + f"String length exceeds maximum size of {self._size} including the terminating character" + ) + self._data = value + + @property + def encoding(self) -> str: + return self._encoding + + @encoding.setter + def encoding(self, value: str): + if value not in ["utf-8", "utf-16le", "utf-16be"]: + raise ValueError( + f"Encoding {value} is not supported. Supported encodings are 'utf-8', 'utf-16le' and 'utf-16be'" + ) + self._encoding = value + + def __eq__(self, other): + """ + Compare two SomeIpFixedSizeString objects for equality. + + Parameters: + other (SomeIpFixedSizeArray): The object to compare with the current object. + + Returns: + bool: True if the strings are equal, False otherwise. + """ + if isinstance(other, SomeIpFixedSizeString): + # Compare if the length (number of elements) of other array is the same + if self._size != other._size: + return False + + return self._data == other._data + + return False + + def __len__(self) -> int: + """ + Return the length of the object on the wire in bytes. Includes the BOM and terminating '\0' character. + + Returns: + int: The length of the object on the wire in bytes. + """ + + if self.encoding == "utf-8": + return self.size + len(codecs.BOM_UTF8) + elif self.encoding == "utf-16le" or self.encoding == "utf-16be": + return (self.size * 2) + len(codecs.BOM_UTF16_LE) + raise ValueError("Unknown encoding") + + def serialize(self) -> bytes: + """ + Serialize the object into bytes by iterating over its attributes, excluding those starting with double underscores or underscores. + For each attribute, it calls the `serialize` method of the attribute and appends the returned bytes to the output. + + Returns: + bytes: The serialized representation of the object as bytes. + """ + + result = bytes() + if self._encoding == "utf-8": + result += codecs.BOM_UTF8 + result += self.data.encode("utf-8") + filler_chars = self.size - len(self._data) + result += "\0".encode("utf-8") * filler_chars + assert len(result) == self.size + len(codecs.BOM_UTF8) + elif self._encoding == "utf-16le": + result += codecs.BOM_UTF16_LE + result += self.data.encode("utf-16le") + filler_chars = self.size - len(self._data) + result += "\0".encode("utf-16le") * filler_chars + assert len(result) == (self.size * 2) + len(codecs.BOM_UTF16_LE) + elif self._encoding == "utf-16be": + result += codecs.BOM_UTF16_BE + result += self.data.encode("utf-16be") + filler_chars = self.size - len(self._data) + result += "\0".encode("utf-16be") * filler_chars + assert len(result) == (self.size * 2) + len(codecs.BOM_UTF16_BE) + + return result + + def deserialize(self, payload: bytes): + """ + Deserialize the payload into the object. + + Args: + payload (bytes): The payload to be deserialized. + + Returns: + self: The deserialized object. + + This method deserializes the payload into the string. It automatically detects the encoding from the BOM + at the beginning of the payload. + """ + + # Get the byte order mark, either 3 bytes for utf-8 or 2 bytes for utf-16 + bom = payload[:3] if payload.startswith(codecs.BOM_UTF8) else payload[:2] + if bom == codecs.BOM_UTF8: + self.encoding = "utf-8" + start_idx = 3 + end_idx = start_idx + self.size + decoded_string = payload[start_idx:end_idx].decode("utf-8") + self.data = decoded_string.rstrip("\0") + elif bom == codecs.BOM_UTF16_LE: + self.encoding = "utf-16le" + start_idx = 2 + end_idx = start_idx + self.size * 2 + decoded_string = payload[start_idx:end_idx].decode("utf-16le") + self.data = decoded_string.rstrip("\0") + elif bom == codecs.BOM_UTF16_BE: + self.encoding = "utf-16be" + start_idx = 2 + end_idx = start_idx + self.size * 2 + decoded_string = payload[start_idx:end_idx].decode("utf-16be") + self.data = decoded_string.rstrip("\0") + else: + raise ValueError("Unknown encoding") + + return self + + +class SomeIpDynamicSizeString(Generic[T]): + """ + A datatype for a SOME/IP dynamically sized string. + """ + + _has_dynamic_size = True + + def __init__(self, value: str = ""): + """ + Initializes a new instance of the SomeIpDynamicSizeString class. + + Parameters: + value (str): The initial string data of the object. + + Returns: + None + """ + self._data = value + self._length_field_length = 4 # The length of the length field in bytes. It can be either 1, 2 or 4 bytes. + # BOM + string + terminating '\0' character + self._length_field_value = 3 + len(value) + 1 + self._encoding = "utf-8" + self._length = self._length_field_length + self._length_field_value + + @property + def data(self) -> str: + return self._data + + @data.setter + def data(self, value: str): + self._data = value + if self.encoding == "utf-8": + self._length_field_value = 3 + len(value) + 1 + elif self.encoding == "utf-16le" or self.encoding == "utf-16be": + self._length_field_value = 2 + len(value) * 2 + 2 + self._length = self._length_field_length + self._length_field_value + + @property + def length_field_length(self): + return self._length_field_length + + @length_field_length.setter + def length_field_length(self, value): + if value in [1, 2, 4]: + self._length_field_length = value + self._length = self._length_field_length + self._length_field_value + else: + raise ValueError("Length field length must be 1, 2 or 4 bytes") + + @property + def encoding(self) -> str: + return self._encoding + + @encoding.setter + def encoding(self, value: str): + if value not in ["utf-8", "utf-16le", "utf-16be"]: + raise ValueError( + f"Encoding {value} is not supported. Supported encodings are 'utf-8', 'utf-16le' and 'utf-16be'" + ) + self._encoding = value + if self.encoding == "utf-8": + self._length_field_value = 3 + len(self.data) + 1 + elif self.encoding == "utf-16le" or self.encoding == "utf-16be": + self._length_field_value = 2 + len(self.data) * 2 + 2 + self._length = self._length_field_length + self._length_field_value + + def __eq__(self, other): + """ + Compare two SomeIpDynamicSizeString objects for equality. + + Parameters: + other (SomeIpDynamicSizeString): The object to compare with the current object. + + Returns: + bool: True if the strings are equal, False otherwise. + """ + if isinstance(other, SomeIpDynamicSizeString): + if len(self) != len(other): + return False + if self.encoding != other.encoding: + return False + if self.length_field_length != other.length_field_length: + return False + + return self._data == other._data + + return False + + def __len__(self) -> int: + """ + Return the length of the serialized string in bytes. + + Returns: + int: The length of the string. + """ + return self._length + + def serialize(self) -> bytes: + """ + Serialize the object into bytes by iterating over its attributes, excluding those starting with double underscores or underscores. + For each attribute, it calls the `serialize` method of the attribute and appends the returned bytes to the output. + + Returns: + bytes: The serialized representation of the object as bytes. + """ + + # The length field is placed first + # The length is measured in bytes and includes the BOM length. The length of the length field is not included + + result = bytes() + + bom = None + encoded_str = None + if self.encoding == "utf-8": + bom = codecs.BOM_UTF8 + encoded_str = self.data.encode("utf-8") + elif self.encoding == "utf-16le": + bom = codecs.BOM_UTF16_LE + encoded_str = self.data.encode("utf-16le") + elif self.encoding == "utf-16be": + bom = codecs.BOM_UTF16_BE + encoded_str = self.data.encode("utf-16be") + + length = ( + len(bom) + len(encoded_str) + 1 + ) # +1 for the terminating '\0' character + if self.encoding == "utf-16le" or self.encoding == "utf-16be": + length += 1 # The terminating '\0' character is 2 bytes in utf-16 + + if self.length_field_length == 1: + if length > 255: + raise ValueError( + "Length of the string exceeds maximum value of 255 for 1 byte length field." + ) + result += struct.pack(">B", length) + elif self.length_field_length == 2: + if length > 65535: + raise ValueError( + "Length of the string exceeds maximum value of 65535 for 2 byte length field." + ) + result += struct.pack(">H", length) + elif self.length_field_length == 4: + if length > 4294967295: + raise ValueError( + "Length of the string exceeds maximum value of 4294967295 for 4 byte length field." + ) + result += struct.pack(">L", length) + + result += bom + result += encoded_str + result += "\0".encode(self.encoding) + assert len(result) == self._length + return result + + def deserialize(self, payload: bytes): + """ + Deserialize the payload into the object. + + Args: + payload (bytes): The payload to be deserialized. + + Returns: + self: The deserialized object. + + This method deserializes the payload into the string. It automatically detects the encoding from the BOM + at the beginning of the payload. + """ + + if len(payload) < self.length_field_length: + raise ValueError( + f"Deserialization failed: Payload is too short. Payload length: {len(payload)}" + ) + + length_field = payload[: self.length_field_length] + if self.length_field_length == 1: + (length,) = struct.unpack(">B", length_field) + elif self.length_field_length == 2: + (length,) = struct.unpack(">H", length_field) + elif self.length_field_length == 4: + (length,) = struct.unpack(">L", length_field) + + if len(payload) < length: + raise ValueError( + f"Deserialization failed: Payload is too short. Payload length: {len(payload)}. Expected length: {length}" + ) + + bom_start = self.length_field_length + + # Get the byte order mark, either 3 bytes for utf-8 or 2 bytes for utf-16 + bom = ( + payload[bom_start : bom_start + 3] + if payload[bom_start:].startswith(codecs.BOM_UTF8) + else payload[bom_start : bom_start + 2] + ) + if bom == codecs.BOM_UTF8: + self.encoding = "utf-8" + start_idx = self.length_field_length + 3 + end_idx = start_idx + length - 3 + decoded_string = payload[start_idx:end_idx].decode("utf-8") + self.data = decoded_string.rstrip("\0") + elif bom == codecs.BOM_UTF16_LE: + self.encoding = "utf-16le" + start_idx = self.length_field_length + 2 + end_idx = start_idx + length - 2 + decoded_string = payload[start_idx:end_idx].decode("utf-16le") + self.data = decoded_string.rstrip("\0") + elif bom == codecs.BOM_UTF16_BE: + self.encoding = "utf-16be" + start_idx = self.length_field_length + 2 + end_idx = start_idx + length - 2 + decoded_string = payload[start_idx:end_idx].decode("utf-16be") + self.data = decoded_string.rstrip("\0") + else: + raise ValueError("Unknown encoding") + + self._length = self._length_field_length + length + + return self diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 73f1eb3..9bd74cc 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,4 +1,6 @@ from dataclasses import dataclass + +import pytest from someipy.serialization import ( Uint8, Uint16, @@ -13,6 +15,9 @@ Float64, SomeIpPayload, SomeIpFixedSizeArray, + SomeIpDynamicSizeArray, + SomeIpFixedSizeString, + SomeIpDynamicSizeString, ) @@ -138,3 +143,253 @@ def test_fixed_size_array_serialization_and_deserialization(): a_again = SomeIpFixedSizeArray(Uint8, 4).deserialize(bytes.fromhex("01020304")) assert a_again == a + + +def test_dynamic_size_array_length(): + a = SomeIpDynamicSizeArray(Uint16) + + assert len(a) == a.length_field_length + e = Uint16(1) + a.data.append(e) + assert len(a) == a.length_field_length + len(Uint16()) + del a.data[0] + assert len(a) == a.length_field_length + + a.length_field_length = 2 + # Except only the length field with 2 bytes to be serialized and no data + assert bytes.fromhex("0000") == a.serialize() + + a.length_field_length = 4 + # Except only the length field with 4 bytes to be serialized and no data + assert bytes.fromhex("00000000") == a.serialize() + + a.data.append(Uint16(1)) + assert bytes.fromhex("000000020001") == a.serialize() + + a.data.append(Uint16(4)) + assert len(a) == 2 * len(Uint16()) + a.length_field_length + assert bytes.fromhex("0000000400010004") == a.serialize() + + b = SomeIpDynamicSizeArray(Uint16) + b.length_field_length = 4 + b = b.deserialize(bytes.fromhex("0000000400010004")) + assert b == a + assert b.data[0] == Uint16(1) + assert b.data[1] == Uint16(4) + assert len(b) == 2 * len(Uint16()) + a.length_field_length + + +def test_someip_fixed_size_string(): + a = SomeIpFixedSizeString(4) + assert a.size == 4 + assert a.data == "" + assert a.encoding == "utf-8" + assert len(a) == 4 + 3 + + with pytest.raises(ValueError): + a.data = "Hello World" + with pytest.raises(ValueError): + a.encoding = "Hello World" + + a.data = "He" + a.encoding = "utf-8" + assert len(a) == 4 + 3 + + b = SomeIpFixedSizeString(4) + b.data = "He" + assert a == b + + assert bytes.fromhex("EF BB BF 48 65 00 00") == a.serialize() + + a.encoding = "utf-16le" + assert bytes.fromhex("FF FE 48 00 65 00 00 00 00 00") == a.serialize() + + a.encoding = "utf-16be" + assert bytes.fromhex("FE FF 00 48 00 65 00 00 00 00") == a.serialize() + + b = SomeIpFixedSizeString(4) + b.encoding = "utf-16be" + b = b.deserialize(bytes.fromhex("EF BB BF 48 65 00 00 00 00")) + assert b.encoding == "utf-8" + assert b.data == "He" + assert len(b) == 7 + + b = b.deserialize(bytes.fromhex("FF FE 48 00 65 00 00 00 00 00")) + assert b.encoding == "utf-16le" + assert b.data == "He" + assert len(b) == 10 + + b = b.deserialize(bytes.fromhex("FE FF 00 48 00 65 00 00 00 00")) + assert b.encoding == "utf-16be" + assert b.data == "He" + assert len(b) == 10 + + +def test_someip_dynamic_size_string(): + a = SomeIpDynamicSizeString() + assert a.data == "" + assert a.encoding == "utf-8" + assert a.length_field_length == 4 + assert a._length_field_value == 4 + assert len(a) == 4 + 3 + 1 + + with pytest.raises(ValueError): + a.encoding = "Hello World" + with pytest.raises(ValueError): + a.length_field_length = 0 + + a.data = "He" + a.encoding = "utf-8" + assert len(a) == 4 + 3 + 2 + 1 + # 4 bytes for the length field, 3 bytes BOM and 3 bytes for the data including '\0' + assert bytes.fromhex("00 00 00 06 EF BB BF 48 65 00") == a.serialize() + + a.encoding = "utf-16le" + assert len(a) == 4 + 2 + 2 * 2 + 2 + assert bytes.fromhex("00 00 00 08 FF FE 48 00 65 00 00 00") == a.serialize() + + a.encoding = "utf-16be" + assert len(a) == 4 + 2 + 2 * 2 + 2 + assert bytes.fromhex("00 00 00 08 FE FF 00 48 00 65 00 00") == a.serialize() + + a.encoding = "utf-8" + a.length_field_length = 2 + assert len(a) == 2 + 3 + 2 + 1 + # 2 bytes for the length field, 3 bytes BOM and 3 bytes for the data including '\0' + assert bytes.fromhex("00 06 EF BB BF 48 65 00") == a.serialize() + + b = SomeIpDynamicSizeString() + b.encoding = "utf-16be" + b.length_field_length = 4 + b = b.deserialize(bytes.fromhex("00 00 00 09 EF BB BF 48 65 00 00 00 00")) + assert len(b) == 13 + assert b.encoding == "utf-8" + assert b.data == "He" + + b = b.deserialize(bytes.fromhex("00 00 00 0A FF FE 48 00 65 00 00 00 00 00")) + assert b.encoding == "utf-16le" + assert b.data == "He" + + b = b.deserialize(bytes.fromhex("00 00 00 0A FE FF 00 48 00 65 00 00 00 00")) + assert b.encoding == "utf-16be" + assert b.data == "He" + + b.length_field_length = 2 + b = b.deserialize(bytes.fromhex("00 0A FE FF 00 48 00 65 00 00 00 00")) + assert b.encoding == "utf-16be" + assert b.data == "He" + + +@dataclass +class MsgWithStrings(SomeIpPayload): + a: Uint16 + b: SomeIpDynamicSizeString + d: Uint32 + + def __init__(self): + self.a = Uint16(10) + self.b = SomeIpDynamicSizeString("123") + self.c = Uint32(5) + + +def test_struct_with_dynamic_sized_types(): + a = MsgWithStrings() + assert len(a) == len(Uint16()) + len(SomeIpDynamicSizeString("123")) + len(Uint32()) + assert len(a) == 2 + 4 + 3 + 3 + 1 + 4 + assert len(a.serialize()) == len(a) + + assert ( + bytes.fromhex("00 0A 00 00 00 07 EF BB BF 31 32 33 00 00 00 00 05") + == a.serialize() + ) + + a.b = SomeIpDynamicSizeString("1234") + assert len(a) == 2 + 4 + 3 + 3 + 1 + 4 + 1 + assert ( + bytes.fromhex("00 0A 00 00 00 08 EF BB BF 31 32 33 34 00 00 00 00 05") + == a.serialize() + ) + + b = MsgWithStrings().deserialize( + bytes.fromhex("00 0A 00 00 00 08 EF BB BF 31 32 33 34 00 00 00 00 05") + ) + assert b.a == Uint16(10) + assert b.b == SomeIpDynamicSizeString("1234") + assert b.c == Uint32(5) + + +@dataclass +class MsgWithTwoStrings(SomeIpPayload): + a: Uint16 + b: SomeIpDynamicSizeString + c: SomeIpDynamicSizeString + d: Uint32 + + def __init__(self): + self.a = Uint16(10) + self.b = SomeIpDynamicSizeString("123") + self.c = SomeIpDynamicSizeString("4321") + self.c.encoding = "utf-16be" + self.d = Uint32(5) + + +def test_struct_with_dynamic_sized_types(): + a = MsgWithTwoStrings() + assert len(a) == 2 + 4 + 3 + 3 + 1 + 4 + 2 + 4 * 2 + 2 + 4 + assert len(a.serialize()) == len(a) + + assert ( + bytes.fromhex( + "00 0A 00 00 00 07 EF BB BF 31 32 33 00 00 00 00 0C FE FF 00 34 00 33 00 32 00 31 00 00 00 00 00 05" + ) + == a.serialize() + ) + + b = MsgWithTwoStrings().deserialize( + bytes.fromhex( + "00 0A 00 00 00 07 EF BB BF 31 32 33 00 00 00 00 0C FE FF 00 34 00 33 00 32 00 31 00 00 00 00 00 05" + ) + ) + assert b.a == Uint16(10) + assert b.b == SomeIpDynamicSizeString("123") + + c = SomeIpDynamicSizeString("4321") + c.encoding = "utf-16be" + assert b.c == c + assert b.d == Uint32(5) + + +@dataclass +class MsgWithDynamicArrays(SomeIpPayload): + a: Uint16 + b: SomeIpDynamicSizeArray[Uint8] + c: Uint32 + d: SomeIpDynamicSizeArray[Uint16] + + def __init__(self): + self.a = Uint16(10) + self.b = SomeIpDynamicSizeArray(Uint8) + self.c = Uint32(5) + self.d = SomeIpDynamicSizeArray(Uint16) + + +def test_struct_with_dynamic_arrays(): + a = MsgWithDynamicArrays() + assert len(a) == 2 + 4 + 4 + 4 + assert len(a.serialize()) == len(a) + assert bytes.fromhex("00 0A 00 00 00 00 00 00 00 05 00 00 00 00") == a.serialize() + + a.b.data.append(Uint8(1)) + assert len(a.serialize()) == len(a) + assert ( + bytes.fromhex("00 0A 00 00 00 01 01 00 00 00 05 00 00 00 00") == a.serialize() + ) + + b = MsgWithDynamicArrays().deserialize( + bytes.fromhex("00 0A 00 00 00 01 01 00 00 00 05 00 00 00 00") + ) + + assert b.a == Uint16(10) + assert b.b.data[0] == Uint8(1) + assert b.c == Uint32(5) + assert len(b.d) == b.d.length_field_length