Skip to content

Commit

Permalink
openlst: standardize on bytes type where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
bcwaldon committed Sep 21, 2024
1 parent 15f8930 commit b458329
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 51 deletions.
16 changes: 8 additions & 8 deletions satcom/openlst/client_packet_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ def err(self):
return None

def to_bytes(self):
"""Packs client packet header metadata into a bytearray"""
bs = bytearray(CLIENT_PACKET_HEADER_LENGTH) # Create empty bytearray of header length
"""Packs client packet header metadata into bytes"""
bs = bytearray(CLIENT_PACKET_HEADER_LENGTH)

bs[0] = self.length
bs[1:3] = utils.pack_ushort_little_endian(self.hardware_id)
bs[3:5] = utils.pack_ushort_little_endian(self.sequence_number)
bs[5] = self.destination
bs[6] = self.command_number

return bs
return bytes(bs)

@classmethod
def from_bytes(cls, bs: bytearray):
"""Hydrates the client packet header metadata from a bytearray"""
def from_bytes(cls, bs: bytes):
"""Hydrates the client packet header metadata from bytes"""
if len(bs) != CLIENT_PACKET_HEADER_LENGTH:
raise ValueError('unexpected header length')

Expand All @@ -56,7 +56,7 @@ def from_bytes(cls, bs: bytearray):
return obj

class ClientPacket():
def __init__(self, data: bytearray, header=None):
def __init__(self, data: bytes, header=None):
self.header = header or ClientPacketHeader()
self.header.length = CLIENT_PACKET_HEADER_LENGTH + len(data) - 1
self._data = data
Expand All @@ -80,10 +80,10 @@ def to_bytes(self):
buf[:CLIENT_PACKET_HEADER_LENGTH] = self.header.to_bytes()
buf[CLIENT_PACKET_HEADER_LENGTH:] = self.data

return buf
return bytes(buf)

@classmethod
def from_bytes(cls, bs: bytearray):
def from_bytes(cls, bs: bytes):
"""Hydrates the client packet object from provided byte array"""
if len(bs) < CLIENT_PACKET_HEADER_LENGTH:
raise ValueError('insufficient data')
Expand Down
38 changes: 17 additions & 21 deletions satcom/openlst/space_packet_lib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel

from satcom.utils import utils

Expand Down Expand Up @@ -29,8 +29,8 @@ def err(self):
return ValueError('command_number must be 0-255')
return None

def to_bytes(self) -> bytearray:
"""Packs space packet header metadata into a bytearray"""
def to_bytes(self) -> bytes:
"""Packs space packet header metadata into bytes"""
bs = bytearray(SPACE_PACKET_HEADER_LENGTH)

bs[0] = self.length
Expand All @@ -39,10 +39,10 @@ def to_bytes(self) -> bytearray:
bs[4] = self.destination
bs[5] = self.command_number

return bs
return bytes(bs)

@classmethod
def from_bytes(cls, bs: bytearray):
def from_bytes(cls, bs: bytes):
"""Unpacks space packet header metadata from bytes"""
if len(bs) != SPACE_PACKET_HEADER_LENGTH:
raise ValueError('unexpected header length')
Expand All @@ -58,11 +58,8 @@ def from_bytes(cls, bs: bytearray):
return obj

class SpacePacketFooter(BaseModel):
# allow pydantic to use bytearray as a field type
model_config = ConfigDict(arbitrary_types_allowed=True)

hardware_id: int = 0
crc16_checksum: bytearray = []
crc16_checksum: bytes = []

def err(self):
"""Throws an error if any fields are out of bounds"""
Expand All @@ -72,7 +69,7 @@ def err(self):
return ValueError('crc16_checksum set incorrectly.')
return None

def to_bytes(self) -> bytearray:
def to_bytes(self) -> bytes:
"""Packs space packet footer metadata into bytes"""
bs = bytearray(SPACE_PACKET_FOOTER_LENGTH)

Expand All @@ -83,11 +80,11 @@ def to_bytes(self) -> bytearray:
bs[2] = self.crc16_checksum[1]
bs[3] = self.crc16_checksum[0]

return bs
return bytes(bs)

@classmethod
def from_bytes(cls, bs: bytearray):
"""Unpack space packet footer from a bytearray"""
def from_bytes(cls, bs: bytes):
"""Unpack space packet footer from bytes"""
if len(bs) != SPACE_PACKET_FOOTER_LENGTH:
return ValueError('unexpected footer length')

Expand All @@ -98,13 +95,13 @@ def from_bytes(cls, bs: bytearray):

obj = cls(
hardware_id = utils.unpack_ushort_little_endian(bs[0:2]),
crc16_checksum = crc16_checksum
crc16_checksum = bytes(crc16_checksum)
)

return obj

class SpacePacket():
def __init__(self, data: bytearray, header=None, footer=None):
def __init__(self, data: bytes, header=None, footer=None):
self.header = header or SpacePacketHeader()
self.header.length = SPACE_PACKET_HEADER_LENGTH + len(data) + SPACE_PACKET_FOOTER_LENGTH - 1
self._data = data
Expand All @@ -124,7 +121,7 @@ def _verify_crc16(self):
if got[0] != want[0] or got[1] != want[1]:
return ValueError(f'checksum mismatch: got={got} want={want}')

def _make_packet_checksum(self) -> bytearray:
def _make_packet_checksum(self) -> bytes:
"""Creates checksum of packet from candidate bytes"""
bs = self.to_bytes()
inp = bs[0:len(bs)-2]
Expand All @@ -140,8 +137,7 @@ def _make_packet_checksum(self) -> bytearray:
b = b << 1
ck = ck & 0xFFFF

ckb = bytearray(2)
ckb = utils.pack_ushort_big_endian(ck)
ckb = bytes(utils.pack_ushort_big_endian(ck))

return ckb

Expand All @@ -157,17 +153,17 @@ def err(self):
return self._verify_crc16()
return None

def to_bytes(self) -> bytearray:
def to_bytes(self) -> bytes:
"""Encodes space packet to byte slice, including header, data, and footer"""
buf = bytearray(self.header.length)

buf[:SPACE_PACKET_HEADER_LENGTH] = self.header.to_bytes()
buf[SPACE_PACKET_HEADER_LENGTH:] = self.data
buf[SPACE_PACKET_HEADER_LENGTH+len(self.data):] = self.footer.to_bytes()
return buf
return bytes(buf)

@classmethod
def from_bytes(cls, bs: bytearray):
def from_bytes(cls, bs: bytes):
"""Hydrates the space packet from provided byte array, returning non-nil if errors are present"""
if len(bs) < SPACE_PACKET_HEADER_LENGTH:
return ValueError('insufficient data')
Expand Down
18 changes: 9 additions & 9 deletions satcom/openlst/test_client_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ def test_client_packet_header_encode(self):
destination=212,
command_number=57
)
want = bytearray([0x0A, 0xF3, 0x02, 0x0C, 0x00, 0xD4, 0x39])
want = bytes([0x0A, 0xF3, 0x02, 0x0C, 0x00, 0xD4, 0x39])
got = ph.to_bytes()

self.assertIsNone(ph.err(), msg = ph.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

def test_client_packet_header_decode(self):
"""Verifies ClientPacktHeader byte decode"""
hdr = bytearray([0x0D, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38])
hdr = bytes([0x0D, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38])

want = client_pkt_lib.ClientPacketHeader(
length=13,
Expand All @@ -36,7 +36,7 @@ def test_client_packet_header_decode(self):

def test_new_client_packet_to_bytes_small_frame(self):
"""Test new client packet creation with a small dataframe"""
dat = bytearray([0x0A, 0x0B, 0x0C, 0x0D])
dat = bytes([0x0A, 0x0B, 0x0C, 0x0D])
hdr = client_pkt_lib.ClientPacketHeader(
hardware_id=1023,
sequence_number=1,
Expand All @@ -45,15 +45,15 @@ def test_new_client_packet_to_bytes_small_frame(self):
)
pkt = client_pkt_lib.ClientPacket(dat, hdr)

want = bytearray([0x0A, 0xFF, 0x03, 0x01, 0x00, 0xFD, 0x38, 0x0A, 0x0B, 0x0C, 0x0D])
want = bytes([0x0A, 0xFF, 0x03, 0x01, 0x00, 0xFD, 0x38, 0x0A, 0x0B, 0x0C, 0x0D])
got = pkt.to_bytes()

self.assertIsNone(pkt.err(), msg=pkt.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

def test_new_client_packet_from_bytes_too_much_data(self):
"""Tests if new client packet is rejected due to too much data"""
dat = bytearray(1024)
dat = bytes(1024)
hdr = client_pkt_lib.ClientPacketHeader(
hardware_id=1023,
sequence_number=1,
Expand All @@ -66,21 +66,21 @@ def test_new_client_packet_from_bytes_too_much_data(self):

def test_client_packet_from_bytes_small_frame(self):
"""Verifies that client packet structure is preserved when passing a small frame"""
val = bytearray([0x0A, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38, 0x01, 0x02, 0x03])
val = bytes([0x0A, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38, 0x01, 0x02, 0x03])
p = client_pkt_lib.ClientPacket(val)
pkt = p.from_bytes(val)

want = bytearray([0x01, 0x02, 0x03])
want = bytes([0x01, 0x02, 0x03])
got = pkt.data

self.assertIsNone(pkt.err(), msg=pkt.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

def test_client_packet_from_bytes_empty_frame(self):
"""Verifies expected client packet behavior for no data"""
val = bytearray([0x0A, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38])
val = bytes([0x0A, 0xFF, 0x03, 0x04, 0x00, 0xFD, 0x38])
p = client_pkt_lib.ClientPacket(val)
pkt = p.from_bytes(val)

self.assertIsNone(pkt.err(), msg=f'{pkt.header.length}')
self.assertEqual(len(pkt.data), 0, f'expected empty result, got {pkt.data}')
self.assertEqual(len(pkt.data), 0, f'expected empty result, got {pkt.data}')
26 changes: 13 additions & 13 deletions satcom/openlst/test_space_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ def test_space_packet_header_encode(self):
destination=23,
command_number=132
)
want = bytearray([0x1B, 0x00, 0x6E, 0x04, 0x17, 0x84])
want = bytes([0x1B, 0x00, 0x6E, 0x04, 0x17, 0x84])
got = ph.to_bytes()

self.assertIsNone(ph.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

def test_space_packet_header_decode(self):
"""Verifies SpacePacketHeader byte decode"""
hdr = bytearray([0x0D, 0x01, 0x04, 0x00, 0xFD, 0x38])
hdr = bytes([0x0D, 0x01, 0x04, 0x00, 0xFD, 0x38])

want = space_pkt_lib.SpacePacketHeader(
length=13,
Expand All @@ -38,21 +38,21 @@ def test_space_packet_footer_encode(self):
"""Verifies SpacePacketFooter conversion to bytes"""
pf = space_pkt_lib.SpacePacketFooter(
hardware_id=2047,
crc16_checksum=bytearray([0x01, 0x02])
crc16_checksum=bytes([0x01, 0x02])
)
want = bytearray([0xFF, 0x07, 0x02, 0x01])
want = bytes([0xFF, 0x07, 0x02, 0x01])
got = pf.to_bytes()

self.assertIsNone(pf.err(), msg=pf.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

def test_space_packet_footer_decode(self):
"""Verifies SpacePacketFooter byte decode"""
ftr = bytearray([0x0E, 0x01, 0x0B, 0x0A])
ftr = bytes([0x0E, 0x01, 0x0B, 0x0A])

want = space_pkt_lib.SpacePacketFooter(
hardware_id=270,
crc16_checksum=bytearray([0x0A,0x0B])
crc16_checksum=bytes([0x0A,0x0B])
)
got = space_pkt_lib.SpacePacketFooter.from_bytes(ftr)

Expand All @@ -61,7 +61,7 @@ def test_space_packet_footer_decode(self):

def test_new_space_packet_from_bytes_too_much_data(self):
"""Tests if new space packet is rejected due to too much data"""
dat = bytearray(1024)
dat = bytes(1024)
hdr = space_pkt_lib.SpacePacketHeader(
port=1,
sequence_number=4000,
Expand All @@ -76,7 +76,7 @@ def test_new_space_packet_from_bytes_too_much_data(self):

def test_new_space_packet_to_bytes_success(self):
"""Verifies successful space packet creation"""
dat = bytearray([0x11, 0x22, 0x33])
dat = bytes([0x11, 0x22, 0x33])
hdr = space_pkt_lib.SpacePacketHeader(
port=1,
sequence_number=4000,
Expand All @@ -85,8 +85,8 @@ def test_new_space_packet_to_bytes_success(self):
)
ftr = space_pkt_lib.SpacePacketFooter(hardware_id=12)
pkt = space_pkt_lib.SpacePacket(dat, hdr, ftr)
want = bytearray([

want = bytes([
0x0C, 0x01, 0xA0, 0x0F, 0xFD, 0x38, # packet header
0x11, 0x22, 0x33, # original message
0x0C, 0x00, 0xFA, 0x45 # packet footer
Expand All @@ -98,12 +98,12 @@ def test_new_space_packet_to_bytes_success(self):

def test_new_space_packet_from_bytes(self):
"""Validates succesful space frame encoding"""
val = bytearray([0x0D, 0xC0, 0x04, 0x00, 0xFD, 0x38, 0x01, 0x02, 0x03, 0xFF, 0x03, 0xBF, 0x04])
val = bytes([0x0D, 0xC0, 0x04, 0x00, 0xFD, 0x38, 0x01, 0x02, 0x03, 0xFF, 0x03, 0xBF, 0x04])
p = space_pkt_lib.SpacePacket(val)
pkt = p.from_bytes(val)

want = bytearray([0x01, 0x02, 0x03])
want = bytes([0x01, 0x02, 0x03])
got = pkt.data

self.assertIsNone(pkt.err(), msg=pkt.err())
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')
self.assertEqual(got, want, f'unexpected result: want={want} got={got}')

0 comments on commit b458329

Please sign in to comment.