Skip to content

Commit

Permalink
added basic tests for pytmtc app
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed May 9, 2024
1 parent 783388a commit d1476eb
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 192 deletions.
199 changes: 8 additions & 191 deletions satrs-example/pytmtc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,17 @@
import logging
import sys
import time
from typing import Any, Optional
from prompt_toolkit.history import History
from prompt_toolkit.history import FileHistory

from spacepackets.ccsds import PacketId, PacketType
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusVerificator
from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss import PusVerificator

from tmtccmd import TcHandlerBase, ProcedureParamsWrapper
from tmtccmd import ProcedureParamsWrapper
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase
from tmtccmd.com import ComInterface
from tmtccmd.tmtc import CcsdsTmHandler
from tmtccmd.config import (
CmdTreeNode,
default_json_path,
SetupParams,
HookBase,
params_to_procedure_conversion,
)
from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper
Expand All @@ -33,193 +23,20 @@
RawTmtcTimedLogWrapper,
TimedLogWhen,
)
from tmtccmd.tmtc import (
TcQueueEntryType,
ProcedureWrapper,
TcProcedureType,
FeedWrapper,
SendCbParams,
DefaultPusQueueHelper,
QueueWrapper,
)
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
from spacepackets.seqcount import PusFileSeqCountProvider


from pytmtc.pus_tc import pack_pus_telecommands, create_cmd_definition_tree
from pytmtc.common import Apid, EventU32
from pytmtc.config import SatrsConfigHook
from pytmtc.pus_tc import TcHandler
from pytmtc.pus_tm import PusHandler

_LOGGER = logging.getLogger()


class SatRsConfigHook(HookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)

def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import (
create_com_interface_default,
create_com_interface_cfg_default,
)

assert self.cfg_path is not None
packet_id_list = []
for apid in Apid:
packet_id_list.append(PacketId(PacketType.TM, True, apid))
cfg = create_com_interface_cfg_default(
com_if_key=com_if_key,
json_cfg_path=self.cfg_path,
space_packet_ids=packet_id_list,
)
assert cfg is not None
return create_com_interface_default(cfg)

def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree."""
return create_cmd_definition_tree()

def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used
when prompting a command path from the user in CLI mode."""
return FileHistory(".tmtc-history.txt")

def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_core_object_ids

return get_core_object_ids()


class PusHandler(GenericApidHandlerBase):
def __init__(
self,
file_logger: logging.Logger,
verif_wrapper: VerificationWrapper,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__(None)
self.file_logger = file_logger
self.raw_logger = raw_logger
self.verif_wrapper = verif_wrapper

def handle_tm(self, apid: int, packet: bytes, _user_args: Any):
try:
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
except ValueError as e:
_LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
raise e
service = pus_tm.service
if service == 1:
tm_packet = Service1Tm.unpack(
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
)
res = self.verif_wrapper.add_tm(tm_packet)
if res is None:
_LOGGER.info(
f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] "
f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}"
)
_LOGGER.warning(
f"No matching telecommand found for {tm_packet.tc_req_id}"
)
else:
self.verif_wrapper.log_to_console(tm_packet, res)
self.verif_wrapper.log_to_file(tm_packet, res)
elif service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:]
_LOGGER.info(json_str)
elif service == 5:
tm_packet = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
src_data = tm_packet.source_data
event_u32 = EventU32.unpack(src_data)
_LOGGER.info(
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
)
if event_u32.group_id == 0 and event_u32.unique_id == 0:
_LOGGER.info("Received test event")
elif service == 17:
tm_packet = Service17Tm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
if tm_packet.subservice == 2:
self.file_logger.info("Received Ping Reply TM[17,2]")
_LOGGER.info("Received Ping Reply TM[17,2]")
else:
self.file_logger.info(
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
)
_LOGGER.info(
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
)
else:
_LOGGER.info(
f"The service {service} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
self.raw_logger.log_tm(pus_tm)


class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
verif_wrapper: VerificationWrapper,
):
super(TcHandler, self).__init__()
self.seq_count_provider = seq_count_provider
self.verif_wrapper = verif_wrapper
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=QueueWrapper.empty(),
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
seq_cnt_provider=seq_count_provider,
pus_verificator=self.verif_wrapper.pus_verificator,
default_pus_apid=None,
)

def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
raw_tc = pus_tc_wrapper.pus_tc.pack()
_LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
send_params.com_if.send(raw_tc)
elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry()
_LOGGER.info(log_entry.log_str)

def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")

def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
assert def_proc.cmd_path is not None
pack_pus_telecommands(q, def_proc.cmd_path)


def main():
add_colorlog_console_logger(_LOGGER)
tmtccmd.init_printout(False)
hook_obj = SatRsConfigHook(json_cfg_path=default_json_path())
hook_obj = SatrsConfigHook(json_cfg_path=default_json_path())
parser_wrapper = PreArgsParsingWrapper()
parser_wrapper.create_default_parent_parser()
parser_wrapper.create_default_parser()
Expand Down
47 changes: 47 additions & 0 deletions satrs-example/pytmtc/pytmtc/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Optional
from prompt_toolkit.history import FileHistory, History
from spacepackets.ccsds import PacketId, PacketType
from tmtccmd import HookBase
from tmtccmd.com import ComInterface
from tmtccmd.config import CmdTreeNode
from tmtccmd.util.obj_id import ObjectIdDictT

from pytmtc.common import Apid
from pytmtc.pus_tc import create_cmd_definition_tree


class SatrsConfigHook(HookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)

def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import (
create_com_interface_default,
create_com_interface_cfg_default,
)

assert self.cfg_path is not None
packet_id_list = []
for apid in Apid:
packet_id_list.append(PacketId(PacketType.TM, True, apid))
cfg = create_com_interface_cfg_default(
com_if_key=com_if_key,
json_cfg_path=self.cfg_path,
space_packet_ids=packet_id_list,
)
assert cfg is not None
return create_com_interface_default(cfg)

def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree."""
return create_cmd_definition_tree()

def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used
when prompting a command path from the user in CLI mode."""
return FileHistory(".tmtc-history.txt")

def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_core_object_ids

return get_core_object_ids()
55 changes: 54 additions & 1 deletion satrs-example/pytmtc/pytmtc/pus_tc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@

from spacepackets.ccsds import CdsShortTimestamp
from spacepackets.ecss import PusTelecommand
from spacepackets.seqcount import FileSeqCountProvider
from tmtccmd import ProcedureWrapper, TcHandlerBase
from tmtccmd.config import CmdTreeNode
from tmtccmd.pus import VerificationWrapper
from tmtccmd.pus.tc.s200_fsfw_mode import Mode
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.tmtc import (
DefaultPusQueueHelper,
FeedWrapper,
QueueWrapper,
SendCbParams,
TcProcedureType,
TcQueueEntryType,
)
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice

Expand All @@ -15,6 +25,49 @@
_LOGGER = logging.getLogger(__name__)


class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
verif_wrapper: VerificationWrapper,
):
super(TcHandler, self).__init__()
self.seq_count_provider = seq_count_provider
self.verif_wrapper = verif_wrapper
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=QueueWrapper.empty(),
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
seq_cnt_provider=seq_count_provider,
pus_verificator=self.verif_wrapper.pus_verificator,
default_pus_apid=None,
)

def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
raw_tc = pus_tc_wrapper.pus_tc.pack()
_LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
send_params.com_if.send(raw_tc)
elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry()
_LOGGER.info(log_entry.log_str)

def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")

def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
assert def_proc.cmd_path is not None
pack_pus_telecommands(q, def_proc.cmd_path)


def create_cmd_definition_tree() -> CmdTreeNode:

root_node = CmdTreeNode.root_node()
Expand Down
Loading

0 comments on commit d1476eb

Please sign in to comment.