Skip to content

Commit

Permalink
add whitening + fec, port modules to openlst/ and utils/
Browse files Browse the repository at this point in the history
  • Loading branch information
Louise Tamondong authored and Louise Tamondong committed Sep 20, 2024
1 parent ada4338 commit 3132e06
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 63 deletions.
37 changes: 22 additions & 15 deletions satcom/client_packet_lib.py → satcom/openlst/client_packet_lib.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import utils
from pydantic import BaseModel
from satcom.utils import utils

CLIENT_PACKET_ASM = [0x22, 0x69]
CLIENT_PACKET_HEADER_LENGTH = 7
Expand All @@ -13,8 +13,8 @@ class ClientPacketHeader(BaseModel):

def err(self):
"""Throws an error if any params are out of bounds"""
if self.length < 7 or self.length > 251:
return ValueError('length must be 7-251')
if self.length < 6 or self.length > 251:
return ValueError('length must be 6-251')
if self.hardware_id < 0 or self.hardware_id > 65535:
return ValueError('hardware_id must be 0-65535')
if self.sequence_number < 0 or self.sequence_number > 65535:
Expand All @@ -30,8 +30,8 @@ def to_bytes(self):
bs = bytearray(CLIENT_PACKET_HEADER_LENGTH) # Create empty bytearray of header length

bs[0] = self.length
bs[1:3] = utils.pack_uint16_little_endian(self.hardware_id)
bs[3:5] = utils.pack_uint16_little_endian(self.sequence_number)
bs[1:3] = utils.pack_ushort_little_endian(self.hardware_id)
bs[3:5] = utils.pack_ushort_little_endian(self.sequence_number)
bs[5] = self.destination
bs[6] = self.command_number

Expand All @@ -45,8 +45,8 @@ def from_bytes(cls, bs: bytearray):

obj = cls(
length = bs[0],
hardware_id = utils.unpack_uint16_little_endian(bs[1:3]),
sequence_number = utils.unpack_uint16_little_endian(bs[3:5]),
hardware_id = utils.unpack_ushort_little_endian(bs[1:3]),
sequence_number = utils.unpack_ushort_little_endian(bs[3:5]),
destination = bs[5],
command_number = bs[6]
)
Expand All @@ -56,35 +56,42 @@ def from_bytes(cls, bs: bytearray):
class ClientPacket():
def __init__(self, data: bytearray, header=None):
self.header = header or ClientPacketHeader()
self.header.length = CLIENT_PACKET_HEADER_LENGTH + len(data)
self.header.length = CLIENT_PACKET_HEADER_LENGTH + len(data) - 1
self._data = data

@property
def data(self):
"""Protects data from being modified after instance call"""
"""Protects data from being modified after instantiation"""
return self._data

def err(self):
"""Throws an error if any params are out of bounds"""
if self.header.err() is not None:
return self.header.err()
if self.header.length != CLIENT_PACKET_HEADER_LENGTH + len(self.data):
if self.header.length != CLIENT_PACKET_HEADER_LENGTH + len(self.data) - 1:
return ValueError('packet length unequal to header length')
return None

def to_bytes(self):
"""Encodes client packet header and data into bytes"""
"""Encodes client packet and data into bytes"""
buf = bytearray(CLIENT_PACKET_HEADER_LENGTH)
buf = self.header.to_bytes()
buf[:CLIENT_PACKET_HEADER_LENGTH] = self.header.to_bytes()
buf[0] += 1
buf[CLIENT_PACKET_HEADER_LENGTH:] = self.data

return buf

def from_bytes(self, bs: bytearray):
@classmethod
def from_bytes(cls, bs: bytearray):
"""Hydrates the client packet object from provided byte array"""
if len(bs) < CLIENT_PACKET_HEADER_LENGTH:
raise ValueError('insufficient data')

hdr = self.header.from_bytes(bs[0:CLIENT_PACKET_HEADER_LENGTH])
hdr = ClientPacketHeader.from_bytes(bs[0:CLIENT_PACKET_HEADER_LENGTH])

obj = cls(
data = bs[CLIENT_PACKET_HEADER_LENGTH:],
header = hdr
)

return ClientPacket(data=bs[CLIENT_PACKET_HEADER_LENGTH:], header=hdr)
return obj
4 changes: 4 additions & 0 deletions satcom/openlst/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import pydantic

def sample_func():
return True
136 changes: 136 additions & 0 deletions satcom/openlst/fec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2023 Robert Zimmerman.
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

aTrellisSourceStateLut = (
(0, 4), (0, 4), (1, 5), (1, 5), (2, 6), (2, 6), (3, 7), (3, 7),
)
aTrellisTransitionOutput = (
(0, 3), (3, 0), (1, 2), (2, 1), (3, 0), (0, 3), (2, 1), (1, 2),
)
aTrellisTransitionInput = (0, 1, 0, 1, 0, 1, 0, 1,)


def hamming_weight(byte: int) -> int:
"""Return the number of one bits in a byte"""
return sum(int(b) for b in f"{byte:b}")


def interleave(chunk: bytes) -> bytes:
"""Interleave or deinterleave a 4 byte chunk"""
if len(chunk) != 4:
raise ValueError("interleaving only works on 4 byte chunks")
chunk_int = int.from_bytes(chunk, byteorder='little')
grid = []
for _ in range(4):
row = []
for _ in range(4):
bits = (chunk_int & 0xc0000000) >> 30
chunk_int = chunk_int << 2
row.append(bits)
grid.append(row)

flipped = 0
for x in range(4):
for y in range(4):
flipped = flipped << 2
flipped |= grid[y][x]
return flipped.to_bytes(4, byteorder='little')

def decode_fec_chunk():
"""decode_fec_chunk returns a generator for FEC decode/correction
This generator decodes FEC + interleaved data per CC1110 DN504 (A).
This involves deinterleaving and decoding a 2:1 Viterbit sequence.
The caller passes in 4 byte chunks using the `send` function. The
generator yields decoded chunks.
"""
path_bits = 0
cost = [[100] * 8, [0] * 8]
path = [[0] * 8, [0] * 8]
last_buf = 0
cur_buf = 1
out = []

while True:
chunk = yield bytes(out)
chunk = interleave(chunk)

symbols = []
for b in chunk:
for _ in range(4):
symbols.append((b & 0xc0) >> 6)
b <<= 2
out = []
for symbol in symbols:
# check each state in the trellis
min_cost = 0xff
for dest_state in range(8):
input_bit = aTrellisTransitionInput[dest_state]
src_state0 = aTrellisSourceStateLut[dest_state][0]
cost0 = cost[last_buf][src_state0]
cost0 += hamming_weight(symbol ^ aTrellisTransitionOutput[dest_state][0])
src_state1 = aTrellisSourceStateLut[dest_state][1]
cost1 = cost[last_buf][src_state1]
cost1 += hamming_weight(symbol ^ aTrellisTransitionOutput[dest_state][1])

if cost0 < cost1:
cost[cur_buf][dest_state] = cost0
min_cost = min(min_cost, cost0)
path[cur_buf][dest_state] = (path[last_buf][src_state0] << 1) | input_bit
else:
cost[cur_buf][dest_state] = cost1
min_cost = min(min_cost, cost1)
path[cur_buf][dest_state] = (path[last_buf][src_state1] << 1) | input_bit
path_bits += 1


if path_bits >= 32:
out.append((path[cur_buf][0] >> 24) & 0xff)
path_bits -= 8
last_buf = (last_buf + 1) % 2
cur_buf = (cur_buf + 1) % 2
for i in range(8):
cost[last_buf][i] -= min_cost


# From CC1110 DN504 (A)
FEC_ENCODE_TABLE = [
0, 3, 1, 2,
3, 0, 2, 1,
3, 0, 2, 1,
0, 3, 1, 2
]


def encode_fec(raw: bytes):
"""Encode bytes with the CC1110 FEC + interleaving mechanism
Poorly copied and half-heartedly translated to Python from CC1110 DN504 (A)
"""
rv = b""
terminated = raw + b"\x0b\x0b"
fec_reg = 0
chunk = b""
for c in terminated:
fec_reg = (fec_reg & 0x700) | c
fec_output = 0
for j in range(8):
fec_output = (fec_output << 2) | FEC_ENCODE_TABLE[fec_reg >> 7]
fec_reg = (fec_reg << 1) & 0x07ff
chunk += fec_output.to_bytes(2, byteorder="big")
if len(chunk) == 4:
rv += interleave(chunk)
chunk = b""
if len(chunk) == 0:
pass
elif len(chunk) == 2:
rv += interleave(chunk + b"\0\0")
else:
raise Exception(f"unexpected chunk length {len(chunk)}")
return rv
36 changes: 20 additions & 16 deletions satcom/space_packet_lib.py → satcom/openlst/space_packet_lib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import utils
from satcom.utils import utils
from pydantic import BaseModel, ConfigDict

SPACE_PACKET_PREAMBLE = [0xAA, 0xAA, 0xAA, 0xAA]
Expand All @@ -16,8 +16,8 @@ class SpacePacketHeader(BaseModel):

def err(self):
"""Throws an error if any params are out of bounds"""
if self.length < 10 or self.length > 251:
return ValueError('length must be 10-251')
if self.length < 9 or self.length > 251:
return ValueError('length must be 9-251')
if self.sequence_number < 0 or self.sequence_number > 65535:
return ValueError('sequence_number must be 0-65535')
if self.destination < 0 or self.destination > 255:
Expand All @@ -32,7 +32,7 @@ def to_bytes(self) -> bytearray:

bs[0] = self.length
bs[1] = self.port
bs[2:4] = utils.pack_uint16_little_endian(self.sequence_number)
bs[2:4] = utils.pack_ushort_little_endian(self.sequence_number)
bs[4] = self.destination
bs[5] = self.command_number

Expand All @@ -47,7 +47,7 @@ def from_bytes(cls, bs: bytearray):
obj = cls(
length = bs[0],
port = bs[1],
sequence_number = utils.unpack_uint16_little_endian(bs[2:4]),
sequence_number = utils.unpack_ushort_little_endian(bs[2:4]),
destination = bs[4],
command_number = bs[5]
)
Expand All @@ -73,7 +73,7 @@ def to_bytes(self) -> bytearray:
"""Packs space packet footer metadata into bytes"""
bs = bytearray(SPACE_PACKET_FOOTER_LENGTH)

bs[0:2] = utils.pack_uint16_little_endian(self.hardware_id)
bs[0:2] = utils.pack_ushort_little_endian(self.hardware_id)

if len(self.crc16_checksum) != 0:
# big to little endian mapping
Expand All @@ -94,7 +94,7 @@ def from_bytes(cls, bs: bytearray):
crc16_checksum[1] = bs[2]

obj = cls(
hardware_id = utils.unpack_uint16_little_endian(bs[0:2]),
hardware_id = utils.unpack_ushort_little_endian(bs[0:2]),
crc16_checksum = crc16_checksum
)

Expand All @@ -103,7 +103,7 @@ def from_bytes(cls, bs: bytearray):
class SpacePacket():
def __init__(self, data: bytearray, header=None, footer=None):
self.header = header or SpacePacketHeader()
self.header.length = SPACE_PACKET_HEADER_LENGTH + len(data) + SPACE_PACKET_FOOTER_LENGTH
self.header.length = SPACE_PACKET_HEADER_LENGTH + len(data) + SPACE_PACKET_FOOTER_LENGTH - 1
self._data = data
self.footer = footer or SpacePacketFooter()
self.footer.crc16_checksum = self._make_packet_checksum()
Expand Down Expand Up @@ -138,7 +138,7 @@ def _make_packet_checksum(self) -> bytearray:
ck = ck & 0xFFFF

ckb = bytearray(2)
ckb = utils.pack_uint16_big_endian(ck)
ckb = utils.pack_ushort_big_endian(ck)

return ckb

Expand All @@ -148,7 +148,7 @@ def err(self):
return self.header.err()
if self.footer.err() is not None:
return self.footer.err()
if self.header.length != SPACE_PACKET_HEADER_LENGTH + len(self.data) + SPACE_PACKET_FOOTER_LENGTH:
if self.header.length != SPACE_PACKET_HEADER_LENGTH + len(self.data) + SPACE_PACKET_FOOTER_LENGTH - 1:
return ValueError('packet length unequal to header length')
if self._verify_crc16() is not None:
return self._verify_crc16()
Expand All @@ -158,21 +158,25 @@ def to_bytes(self) -> bytearray:
"""Encodes space packet to byte slice, including header, data, and footer"""
buf = bytearray(self.header.length)

buf = self.header.to_bytes()
buf[:SPACE_PACKET_HEADER_LENGTH] = self.header.to_bytes()
buf[0] += 1
buf[SPACE_PACKET_HEADER_LENGTH:] = self.data
buf[SPACE_PACKET_HEADER_LENGTH+len(self.data):] = self.footer.to_bytes()
return buf

def from_bytes(self, bs: bytearray):
@classmethod
def from_bytes(cls, bs: bytearray):
"""Hydrates the space packet from provided byte array, returning non-nil if errors are present"""
if len(bs) < SPACE_PACKET_HEADER_LENGTH:
return ValueError('insufficient data')

hdr = self.header.from_bytes(bs[0:SPACE_PACKET_HEADER_LENGTH])
ftr = self.footer.from_bytes(bs[len(bs)-SPACE_PACKET_FOOTER_LENGTH:])
hdr = SpacePacketHeader.from_bytes(bs[0:SPACE_PACKET_HEADER_LENGTH])
ftr = SpacePacketFooter.from_bytes(bs[len(bs)-SPACE_PACKET_FOOTER_LENGTH:])

return SpacePacket(
obj = cls(
data=bs[SPACE_PACKET_HEADER_LENGTH : len(bs)-SPACE_PACKET_FOOTER_LENGTH],
header=hdr,
footer=ftr
)
)

return obj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
import satcom.client_packet_lib as client_pkt_lib
import satcom.openlst.client_packet_lib as client_pkt_lib

class TestClientPacket(unittest.TestCase):

Expand Down Expand Up @@ -82,5 +82,6 @@ def test_client_packet_from_bytes_empty_frame(self):
p = client_pkt_lib.ClientPacket(val)
pkt = p.from_bytes(val)

self.assertIsNone(pkt.err(), msg=pkt.err())
#self.assertIsNone(pkt.err(), msg=pkt.err())
self.assertIsNone(pkt.err(), msg=f'{pkt.header.length}')
self.assertEqual(len(pkt.data), 0, f'expected empty result, got {pkt.data}')
8 changes: 8 additions & 0 deletions satcom/openlst/test_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import unittest
import satcom.openlst.example


class TestExample(unittest.TestCase):

def test_sample_func(self):
self.assertTrue(satcom.openlst.example.sample_func())
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
import satcom.space_packet_lib as space_pkt_lib
import satcom.openlst.space_packet_lib as space_pkt_lib

class TestSpacePacket(unittest.TestCase):

Expand Down
Loading

0 comments on commit 3132e06

Please sign in to comment.