Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@ minimum_pre_commit_version: "3.5.0"
default_install_hook_types:
- pre-commit
- pre-push

repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: check-merge-conflict
- id: end-of-file-fixer
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.0
hooks:
- id: ruff
args: [ --fix ]
- id: ruff-format


- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v17.0.6
hooks:
Expand Down
8 changes: 6 additions & 2 deletions Core/Inc/Code_generation/Generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@


def parse_args():
parser = argparse.ArgumentParser(description="Generate packet headers from JSON_ADE")
parser.add_argument("board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json")
parser = argparse.ArgumentParser(
description="Generate packet headers from JSON_ADE"
)
parser.add_argument(
"board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json"
)
return parser.parse_args()


Expand Down
125 changes: 87 additions & 38 deletions Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import re
import json


class BoardDescription:
def __init__(self,name:str,board:dict,JSONpath:str):
def __init__(self, name: str, board: dict, JSONpath: str):
self.name = name
self.id = board["board_id"]
self.ip = board["board_ip"]
Expand All @@ -12,43 +13,52 @@ def __init__(self,name:str,board:dict,JSONpath:str):
try:
with open(JSONpath + "/general_info.json") as f:
general_info = json.load(f)
if "addresses" in general_info and "backend" in general_info["addresses"]:
if (
"addresses" in general_info
and "backend" in general_info["addresses"]
):
backend_ip = general_info["addresses"]["backend"]
except Exception as e:
print(f"Warning: Could not load backend IP from general_info.json: {e}")

#Sockets:
# Sockets:
try:
with open(JSONpath+"/boards/"+name+"/sockets.json") as s:
with open(JSONpath + "/boards/" + name + "/sockets.json") as s:
socks = json.load(s)
self.sockets = self.SocketsDescription(socks, self.ip, backend_ip)
except Exception as e:
raise Exception(f"Error in file {JSONpath}/boards/{name}/sockets.json: {e}")
#Packets:
# Packets:
self.sending_packets = []
self.data_size = 0
self.order_size = 0
self.measurement_lists = []
self.packets = {}
for measurement in board["measurements"]:
try:
with open(JSONpath+"/boards/" + name + "/" + measurement) as f:
with open(JSONpath + "/boards/" + name + "/" + measurement) as f:
m = json.load(f)
self.measurement_lists.append(m)
except Exception as e:
raise Exception(f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}")
raise Exception(
f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}"
)
for packets in board["packets"]:
packets_name = re.split(r'_|\.', packets)[0]
packets_name = re.split(r"_|\.", packets)[0]
self.packets[packets_name] = []
try:
with open(JSONpath+"/boards/" + name+"/" + packets) as f:
p= json.load(f)
with open(JSONpath + "/boards/" + name + "/" + packets) as f:
p = json.load(f)
except Exception as e:
raise Exception(f"Error in file {JSONpath}/boards/{name}/{packets}: {e}")
i=0
raise Exception(
f"Error in file {JSONpath}/boards/{name}/{packets}: {e}"
)
i = 0
for packet in p:
self.packets[packets_name].append(PacketDescription(packet,self.measurement_lists, packets))
aux_sending= PacketDescription.check_for_sending(packet)
self.packets[packets_name].append(
PacketDescription(packet, self.measurement_lists, packets)
)
aux_sending = PacketDescription.check_for_sending(packet)
if aux_sending is not None:
self.sending_packets.append(aux_sending)

Expand All @@ -61,7 +71,7 @@ def __init__(self,name:str,board:dict,JSONpath:str):
self.sending_packets = self.fix_sendind_packets(self.sending_packets)

@staticmethod
def fix_sendind_packets(sending_packets:list):
def fix_sendind_packets(sending_packets: list):
fixed_packets = []
lookup = {}
for item in sending_packets:
Expand All @@ -84,8 +94,6 @@ def fix_sendind_packets(sending_packets:list):

return fixed_packets



class SocketsDescription:
def __init__(self, sockets: list, board_ip: str, backend_ip: str):
self.allSockets = []
Expand All @@ -100,23 +108,46 @@ def __init__(self, sockets: list, board_ip: str, backend_ip: str):
self.allSockets.append({"name": name, "type": sock_type})

if sock_type == "ServerSocket":
self.ServerSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"]})
self.ServerSockets.append(
{
"name": name,
"type": sock_type,
"board_ip": self.board_ip,
"port": sock["port"],
}
)
elif sock_type == "Socket":
remote_ip = sock["remote_ip"]
if remote_ip == "backend":
remote_ip = self.backend_ip
self.Sockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "local_port": sock["local_port"], "remote_ip": remote_ip, "remote_port": sock["remote_port"]})
self.Sockets.append(
{
"name": name,
"type": sock_type,
"board_ip": self.board_ip,
"local_port": sock["local_port"],
"remote_ip": remote_ip,
"remote_port": sock["remote_port"],
}
)
elif sock_type == "DatagramSocket":
remote_ip = sock["remote_ip"]
if remote_ip == "backend":
remote_ip = self.backend_ip
self.DatagramSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"], "remote_ip": remote_ip})

self.DatagramSockets.append(
{
"name": name,
"type": sock_type,
"board_ip": self.board_ip,
"port": sock["port"],
"remote_ip": remote_ip,
}
)


class PacketDescription:
def __init__(self, packet:dict,measurements:list, filename:str="Unknown"):
self.id =packet["id"]
def __init__(self, packet: dict, measurements: list, filename: str = "Unknown"):
self.id = packet["id"]
self.name = packet["name"].replace(" ", "_").replace("-", "_")
self.type = packet["type"]
self.variables = []
Expand All @@ -125,45 +156,64 @@ def __init__(self, packet:dict,measurements:list, filename:str="Unknown"):
return
for variable in packet["variables"]:
self.variables.append(variable)
self.measurements.append(MeasurmentsDescription(measurements,variable, filename))
self.measurements.append(
MeasurmentsDescription(measurements, variable, filename)
)

@staticmethod
def check_for_sending(packet: dict):
if "period" in packet and "period_type" in packet and "socket" in packet:
name = packet["name"].replace(" ", "_").replace("-", "_")
return {"name": name, "period": packet["period"], "period_type": packet["period_type"], "socket": packet["socket"]}
return {
"name": name,
"period": packet["period"],
"period_type": packet["period_type"],
"socket": packet["socket"],
}

else:
return None


class MeasurmentsDescription:
def __init__(self,measurements:list, variable:str, filename:str="Unknown"):
def __init__(self, measurements: list, variable: str, filename: str = "Unknown"):
self.id = variable
if not hasattr(self.__class__, 'viewed_measurements'):
if not hasattr(self.__class__, "viewed_measurements"):
self.__class__.viewed_measurements = {}
measurement = self._MeasurementSearch(measurements,variable)
measurement = self._MeasurementSearch(measurements, variable)

if measurement is None:
print(f"Measurement not found for variable: {variable} in file: {filename}\n")
raise Exception(f"Measurement not found for variable: {variable} in file: {filename}")
print(
f"Measurement not found for variable: {variable} in file: {filename}\n"
)
raise Exception(
f"Measurement not found for variable: {variable} in file: {filename}"
)

self.name = measurement["name"]
self.type = (self._unsigned_int_correction(measurement["type"]).replace(" ", "_").replace("-", "_"))
self.type = (
self._unsigned_int_correction(measurement["type"])
.replace(" ", "_")
.replace("-", "_")
)
if self.type == "enum":
values = []
for value in measurement["enumValues"]:
values.append(str(value))
self.enum ={"name": (measurement["id"].replace(" ", "_").replace("-", "_")), "values": self._Enum_values_correction(values)}
self.type = (measurement["id"].replace(" ", "_").replace("-", "_"))
self.enum = {
"name": (measurement["id"].replace(" ", "_").replace("-", "_")),
"values": self._Enum_values_correction(values),
}
self.type = measurement["id"].replace(" ", "_").replace("-", "_")

@staticmethod
def _Enum_values_correction(values:list):
def _Enum_values_correction(values: list):
for i in range(len(values)):
values[i] = values[i].replace(" ", "_").replace("-", "_")
return values


@staticmethod
def _MeasurementSearch(measurements:list, variable:str):
def _MeasurementSearch(measurements: list, variable: str):
if variable in MeasurmentsDescription.viewed_measurements:
return MeasurmentsDescription.viewed_measurements[variable]
for measurement_list in measurements:
Expand All @@ -173,9 +223,8 @@ def _MeasurementSearch(measurements:list, variable:str):
return measurment
return None


@staticmethod
def _unsigned_int_correction(type:str):
def _unsigned_int_correction(type: str):
aux_type = type[:4]
if aux_type == "uint":
type += "_t"
Expand Down
Loading