From fc7ea17f3a641ebe003725866ccb89bba8dd0bfd Mon Sep 17 00:00:00 2001 From: oganigl Date: Fri, 27 Feb 2026 15:59:32 +0100 Subject: [PATCH 1/4] improved hard fault analysis --- hard_faullt_analysis.py | 273 +++++++++++++++++++++++++++------------- 1 file changed, 186 insertions(+), 87 deletions(-) diff --git a/hard_faullt_analysis.py b/hard_faullt_analysis.py index 233eb2c0..930acd99 100644 --- a/hard_faullt_analysis.py +++ b/hard_faullt_analysis.py @@ -2,11 +2,28 @@ import struct import re import os +from dataclasses import dataclass +from typing import List, Optional HF_FLASH_ADDR = 0x080C0000 HF_FLASH_ADDR_STRING = "0x080C000" ELF_FILE = "out/build/latest.elf" CALL_TRACE_MAX_DEPTH = 16 +@dataclass +class HardFaultFrame: + flag: int + r0: int + r1: int + r2: int + r3: int + r12: int + lr: int + pc: int + psr: int + cfsr: int + fault_addr: int + calltrace_depth: int + calltrace_pcs: List[int] def read_flash(): try: cmd = [ @@ -23,6 +40,8 @@ def read_flash(): except FileNotFoundError: print("STM32_Programmer_CLI not found. Make sure it is installed and in PATH.") return None + + def decode_cfsr_memory(cfsr, fault_addr): memory_fault = cfsr & 0xFF if memory_fault == 0: @@ -107,58 +126,33 @@ def addr2line(addr): except Exception as e: return f"addr2line failed: {e}" -def analyze_call_stack(calltrace_depth, calltrace_pcs, context=2): - """ - Muestra el call stack, omitiendo frames sin fuente y mostrando snippet de código. - """ - print("\n==== Call Stack Trace ====") - if calltrace_depth == 0: - print("No call trace available.") - return - -def analyze_call_stack(calltrace_depth, calltrace_pcs, context=0): - """ - Muestra el call stack, mostrando snippet de código de la línea exacta - sin intentar sumar líneas arriba/abajo (context=0 por defecto). - Omite frames sin fuente. - """ - print("\n==== Call Stack Trace ====") - if calltrace_depth == 0: - print("No call trace available.") - return - - for pc in calltrace_pcs[:calltrace_depth]: - pc_base = pc & ~1 - snippet = addr2line(pc_base- 4).strip() - if not snippet or snippet.startswith("??:?"): - continue # no hay fuente, saltar - print_code_context(snippet,1) - - print("======================================================") - - - def print_code_context(lines, context=2): - """ - lines: exit of addr2line (función + file:line) - context: how many lines up/down show - """ + #addr2line + #line 0: function name + #line 1 : file : line line_list = lines.splitlines() if len(line_list) < 2: print("Invalid addr2line output") return - + file_line = line_list[1].strip() split = file_line.rfind(':') + + if split == -1: + print("Invalid file:line format") + return + file_path = file_line[:split] + try: - line_no = int(file_line[split+1:]) - 1 # índice base 0 + line_no = int(file_line[split+1:]) - 1 except ValueError: - print("\33[91m Couldn't find exact line\33[0m") + print("Couldn't parse line number") return - if not os.path.exists(file_path): - print("Source file not found") + + if not os.path.exists(file_path): + print(f"Source file not found: {file_path}") return with open(file_path, "r") as f: @@ -168,52 +162,133 @@ def print_code_context(lines, context=2): end = min(len(file_lines), line_no + context + 1) print(f"\nSource snippet from {file_path}:") + for i in range(start, end): code = file_lines[i].rstrip() - # Si es la línea del error, la ponemos en rojo if i == line_no: - print(f"\033[91m{i+1:>4}: {code}\033[0m") # rojo + print(f"\033[91m{i+1:>4}: {code}\033[0m") else: print(f"{i+1:>4}: {code}") -def hard_fault_analysis(memory_string): - raw = bytes.fromhex(memory_string) - raw = struct.unpack(">28I",raw) - hf = { - "HF_Flag": raw[0], - "r0": raw[1], - "r1": raw[2], - "r2": raw[3], - "r3": raw[4], - "r12": raw[5], - "lr": raw[6], - "pc": raw[7], - "psr": raw[8], - "cfsr": raw[9], - "fault_addr": raw[10], - "calltrace_depth": raw[11], - "calltrace_pcs": raw[12:28] - } - if(hf["HF_Flag"] != 0xFF00FF00): + + +def parse_hardfault(memory_string: str) -> Optional[HardFaultFrame]: + try: + raw_bytes = bytes.fromhex(memory_string) + + expected_size = 28 * 4 + if len(raw_bytes) != expected_size: + print(f"Invalid dump size. Expected {expected_size}, got {len(raw_bytes)}") + return None + + raw = struct.unpack(">28I", raw_bytes) + + return HardFaultFrame( + flag=raw[0], + r0=raw[1], + r1=raw[2], + r2=raw[3], + r3=raw[4], + r12=raw[5], + lr=raw[6], + pc=raw[7], + psr=raw[8], + cfsr=raw[9], + fault_addr=raw[10], + calltrace_depth=raw[11], + calltrace_pcs=list(raw[12:28]) + ) + + except Exception as e: + print(f"Error parsing hardfault frame: {e}") + return None + + +def analyze_call_stack(calltrace_depth: int, + calltrace_pcs: List[int], + context: int = 1): + + print("\n==== Call Stack Trace ====") + + try: + if not isinstance(calltrace_depth, int): + print("Invalid calltrace_depth type") + return + + if calltrace_depth <= 0: + print("No call trace available.") + return + + if not isinstance(calltrace_pcs, list): + print("Invalid calltrace_pcs structure") + return + + depth = min(calltrace_depth, len(calltrace_pcs), CALL_TRACE_MAX_DEPTH) + + for i in range(depth): + + pc = calltrace_pcs[i] + + # Validación básica de PC + if not isinstance(pc, int) or pc == 0: + continue + + # Limpiar bit Thumb + pc_base = pc & ~1 + + # Protección contra underflow + if pc_base < 4: + continue + + try: + snippet = addr2line(pc_base - 4) + except Exception as e: + print(f"addr2line failed for PC 0x{pc:08X}: {e}") + continue + + if not snippet or "??" in snippet: + continue + + try: + print_code_context(snippet, context) + except Exception as e: + print(f"Failed printing context for PC 0x{pc:08X}: {e}") + + except Exception as e: + print(f"Unexpected error in analyze_call_stack: {e}") + + print("===============================================") +def hard_fault_analysis(hf,context): + + if(hf.flag != 0xFF00FF00): print("There was no hardfault in your Microcontroller, Kudos for you, I hope...") return + print("================HARDFAULT DETECTED ===========") print("Registers:") for r in ['r0','r1','r2','r3','r12','lr','pc','psr']: - print(f" {r.upper():<4}: 0x{hf[r]:08X}") - - print(f" CFSR: 0x{hf['cfsr']:08X}") - error = decode_cfsr(hf["cfsr"], hf["fault_addr"]) - print("\nSource Location:") - pc_loc = addr2line(hf["pc"]) - lr_loc = addr2line(hf["lr"]) - print(f" Linker Register : 0x{hf['lr']:08X} -> {lr_loc}") - - print(f" Program Counter : 0x{hf['pc']:08X} -> {pc_loc}") - print_code_context(pc_loc) - - analyze_call_stack(hf["calltrace_depth"],hf["calltrace_pcs"]) + value = getattr(hf, r) + print(f" {r.upper():<4}: 0x{value:08X}") + print("\n") + print("Register that contains the info about the HardFault") + print(f" CFSR: 0x{hf.cfsr:08X}") + #get the cause of the error + print("------HardFault Fail------") + error = decode_cfsr(hf.cfsr, hf.fault_addr) + print("---------------------------") + + pc_loc = addr2line(hf.pc) + lr_loc = addr2line(hf.lr) + + print("\n=======Source Location: ===========\n") + print(f" --> Linker Register : 0x{hf.lr:08X}\n -> {lr_loc}") + print_code_context(lr_loc,context) + print("\n") + print(f" -->Program Counter : 0x{hf.pc:08X}\n -> {pc_loc}") + print_code_context(pc_loc,context) + print("=============================") + analyze_call_stack(hf.calltrace_depth,hf.calltrace_pcs,context) print("======================================================") @@ -224,20 +299,44 @@ def hard_fault_analysis(memory_string): print("The error has to be before PC. But not possible to know exactly when.") print("Check this link to know more : https://interrupt.memfault.com/blog/cortex-m-hardfault-debug#fn:8") +#get the memory with the address of HardFault +def extract_memory_dump(cli_output: str) -> Optional[str]: + pos = cli_output.rfind(HF_FLASH_ADDR_STRING) + if pos == -1: + print("The address was not found in CLI output") + return None + + flash = cli_output[pos:] -if __name__ == '__main__': - out = read_flash() - if(out == None): - exit() - pos_memory_flash = out.rfind(HF_FLASH_ADDR_STRING) - print(out[0:pos_memory_flash]) - flash = out[pos_memory_flash:] - print(flash) memory_string = "" for line in flash.splitlines(): - if(line.find(':') == -1): + if ":" not in line: break - _,mem = line.split(":") - memory_string += mem - memory_string = memory_string.replace(" ","") - hard_fault_analysis(memory_string) + _, mem = line.split(":", 1) + memory_string += mem.strip() + + return memory_string.replace(" ", "") + + + +if __name__ == '__main__': + #First get all the information from the flash Memory + out = read_flash() + if out is None: + exit(1) + + memory_string = extract_memory_dump(out) + if not memory_string: + exit(1) + hf = parse_hardfault(memory_string) + if not hf: + exit(1) + print("Lines of context to watch (max 10):",end="") + try: + context = int(input()) + except ValueError: + context = 2 + if context < 0 or context > 10: + context = 10 + print("\n") + hard_fault_analysis(hf,context) From 7a4d714df8ad252d05418fa6a0a43927cfd10589 Mon Sep 17 00:00:00 2001 From: oganigl Date: Fri, 27 Feb 2026 16:01:35 +0100 Subject: [PATCH 2/4] add call stack trace for memory too --- Core/Src/stm32h7xx_it.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Core/Src/stm32h7xx_it.c b/Core/Src/stm32h7xx_it.c index 2436566e..7222b4fc 100644 --- a/Core/Src/stm32h7xx_it.c +++ b/Core/Src/stm32h7xx_it.c @@ -260,9 +260,7 @@ __attribute__((noreturn, optimize("O0"))) void my_fault_handler_c(sContextStateF const uint16_t INVSTATE = usage_fault & 0x0002; // Invalid processor state const uint16_t UNDEFINSTR = usage_fault & 0x0001; // Undefined instruction. } - if (usage_fault | bus_fault) { - scan_call_stack(frame, &log_hard_fault); - } + scan_call_stack(frame, &log_hard_fault); volatile uint8_t metadata_buffer[0x100]; memcpy(metadata_buffer, (void*)METADATA_FLASH_ADDR, 0x100); // write log hard fault From ad4a490241b53e99155ae9a26068705fcad4380d Mon Sep 17 00:00:00 2001 From: oganigl Date: Fri, 27 Feb 2026 16:33:40 +0100 Subject: [PATCH 3/4] changed the pre-commit for python --- .pre-commit-config.yaml | 11 ++ ...ullt_analysis.py => hard_fault_analysis.py | 113 ++++++++++-------- 2 files changed, 76 insertions(+), 48 deletions(-) rename hard_faullt_analysis.py => hard_fault_analysis.py (83%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2b383cc9..85da3c60 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,13 +2,24 @@ minimum_pre_commit_version: "3.5.0" default_install_hook_types: - pre-commit - pre-push + repos: + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - id: check-merge-conflict - id: end-of-file-fixer - id: trailing-whitespace + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.0 + hooks: + - id: ruff + args: [ --fix ] + - id: ruff-format + + - repo: https://github.com/pre-commit/mirrors-clang-format rev: v17.0.6 hooks: diff --git a/hard_faullt_analysis.py b/hard_fault_analysis.py similarity index 83% rename from hard_faullt_analysis.py rename to hard_fault_analysis.py index 930acd99..02e069af 100644 --- a/hard_faullt_analysis.py +++ b/hard_fault_analysis.py @@ -1,14 +1,16 @@ import subprocess import struct -import re import os from dataclasses import dataclass from typing import List, Optional + HF_FLASH_ADDR = 0x080C0000 HF_FLASH_ADDR_STRING = "0x080C000" ELF_FILE = "out/build/latest.elf" CALL_TRACE_MAX_DEPTH = 16 + + @dataclass class HardFaultFrame: flag: int @@ -24,12 +26,17 @@ class HardFaultFrame: fault_addr: int calltrace_depth: int calltrace_pcs: List[int] + + def read_flash(): try: cmd = [ "STM32_Programmer_CLI", - "-c", "port=SWD", - "-r32", hex(HF_FLASH_ADDR), "112" + "-c", + "port=SWD", + "-r32", + hex(HF_FLASH_ADDR), + "112", ] out = subprocess.check_output(cmd, text=True) return out @@ -40,8 +47,8 @@ def read_flash(): except FileNotFoundError: print("STM32_Programmer_CLI not found. Make sure it is installed and in PATH.") return None - - + + def decode_cfsr_memory(cfsr, fault_addr): memory_fault = cfsr & 0xFF if memory_fault == 0: @@ -66,6 +73,7 @@ def decode_cfsr_memory(cfsr, fault_addr): print(" IACCVIOL : Instruction access violation") return 1 + # -------------------------- # Decode Bus Fault (BFSR) # -------------------------- @@ -75,10 +83,10 @@ def decode_cfsr_bus(cfsr, fault_addr): return 0 print("\nBus Fault (BFSR):") if bus_fault & 0b10000000: - if(bus_fault & 0b00000001): + if bus_fault & 0b00000001: print(f" BFARVALID : Bus fault address valid -> 0x{fault_addr:08X}") if bus_fault & 0b00000100: - print(f"\033[91m Bus fault address imprecise\033[0m (DON'T LOOK CALL STACK)") + print("\033[91m Bus fault address imprecise\033[0m (DON'T LOOK CALL STACK)") if bus_fault & 0b00100000: print(" LSPERR : Floating Point Unit lazy state preservation error") @@ -88,6 +96,7 @@ def decode_cfsr_bus(cfsr, fault_addr): print(" UNSTKERR : Stack error on return from exception") return 2 + # -------------------------- # Decode Usage Fault (UFSR) # -------------------------- @@ -110,6 +119,7 @@ def decode_cfsr_usage(cfsr): print(" UNDEFINSTR : Undefined instruction") return 4 + def decode_cfsr(cfsr, fault_addr): error = 0 error = decode_cfsr_memory(cfsr, fault_addr) + error @@ -128,25 +138,25 @@ def addr2line(addr): def print_code_context(lines, context=2): - #addr2line - #line 0: function name - #line 1 : file : line + # addr2line + # line 0: function name + # line 1 : file : line line_list = lines.splitlines() if len(line_list) < 2: print("Invalid addr2line output") return - + file_line = line_list[1].strip() - split = file_line.rfind(':') + split = file_line.rfind(":") if split == -1: print("Invalid file:line format") return - + file_path = file_line[:split] try: - line_no = int(file_line[split+1:]) - 1 + line_no = int(file_line[split + 1 :]) - 1 except ValueError: print("Couldn't parse line number") return @@ -171,7 +181,6 @@ def print_code_context(lines, context=2): print(f"{i+1:>4}: {code}") - def parse_hardfault(memory_string: str) -> Optional[HardFaultFrame]: try: raw_bytes = bytes.fromhex(memory_string) @@ -196,18 +205,17 @@ def parse_hardfault(memory_string: str) -> Optional[HardFaultFrame]: cfsr=raw[9], fault_addr=raw[10], calltrace_depth=raw[11], - calltrace_pcs=list(raw[12:28]) + calltrace_pcs=list(raw[12:28]), ) except Exception as e: print(f"Error parsing hardfault frame: {e}") return None - - -def analyze_call_stack(calltrace_depth: int, - calltrace_pcs: List[int], - context: int = 1): + +def analyze_call_stack( + calltrace_depth: int, calltrace_pcs: List[int], context: int = 1 +): print("\n==== Call Stack Trace ====") try: @@ -226,7 +234,6 @@ def analyze_call_stack(calltrace_depth: int, depth = min(calltrace_depth, len(calltrace_pcs), CALL_TRACE_MAX_DEPTH) for i in range(depth): - pc = calltrace_pcs[i] # Validación básica de PC @@ -258,54 +265,65 @@ def analyze_call_stack(calltrace_depth: int, print(f"Unexpected error in analyze_call_stack: {e}") print("===============================================") -def hard_fault_analysis(hf,context): - - if(hf.flag != 0xFF00FF00): - print("There was no hardfault in your Microcontroller, Kudos for you, I hope...") + + +def hard_fault_analysis(hf, context): + if hf.flag != 0xFF00FF00: + print( + "There was no hardfault in your Microcontroller, Kudos for you, I hope..." + ) return - + print("================HARDFAULT DETECTED ===========") print("Registers:") - for r in ['r0','r1','r2','r3','r12','lr','pc','psr']: + for r in ["r0", "r1", "r2", "r3", "r12", "lr", "pc", "psr"]: value = getattr(hf, r) - print(f" {r.upper():<4}: 0x{value:08X}") + print(f" {r.upper():<4}: 0x{value:08X}") print("\n") print("Register that contains the info about the HardFault") print(f" CFSR: 0x{hf.cfsr:08X}") - #get the cause of the error + # get the cause of the error print("------HardFault Fail------") - error = decode_cfsr(hf.cfsr, hf.fault_addr) + decode_cfsr(hf.cfsr, hf.fault_addr) print("---------------------------") - + pc_loc = addr2line(hf.pc) lr_loc = addr2line(hf.lr) - + print("\n=======Source Location: ===========\n") print(f" --> Linker Register : 0x{hf.lr:08X}\n -> {lr_loc}") - print_code_context(lr_loc,context) + print_code_context(lr_loc, context) print("\n") print(f" -->Program Counter : 0x{hf.pc:08X}\n -> {pc_loc}") - print_code_context(pc_loc,context) + print_code_context(pc_loc, context) print("=============================") - analyze_call_stack(hf.calltrace_depth,hf.calltrace_pcs,context) + analyze_call_stack(hf.calltrace_depth, hf.calltrace_pcs, context) print("======================================================") - - print("Note: In Release builds (-O2/-O3) the PC may not point exactly to the failing instruction.") - print(" During interrupts, bus faults, or stack corruption, the PC can be imprecise.") - print("\nIn case of Imprecise error is dificult to find due to is asynchronous fault") + print( + "Note: In Release builds (-O2/-O3) the PC may not point exactly to the failing instruction." + ) + print( + " During interrupts, bus faults, or stack corruption, the PC can be imprecise." + ) + print( + "\nIn case of Imprecise error is dificult to find due to is asynchronous fault" + ) print("The error has to be before PC. But not possible to know exactly when.") - print("Check this link to know more : https://interrupt.memfault.com/blog/cortex-m-hardfault-debug#fn:8") + print( + "Check this link to know more : https://interrupt.memfault.com/blog/cortex-m-hardfault-debug#fn:8" + ) -#get the memory with the address of HardFault + +# get the memory with the address of HardFault def extract_memory_dump(cli_output: str) -> Optional[str]: pos = cli_output.rfind(HF_FLASH_ADDR_STRING) if pos == -1: print("The address was not found in CLI output") return None - + flash = cli_output[pos:] memory_string = "" @@ -318,9 +336,8 @@ def extract_memory_dump(cli_output: str) -> Optional[str]: return memory_string.replace(" ", "") - -if __name__ == '__main__': - #First get all the information from the flash Memory +if __name__ == "__main__": + # First get all the information from the flash Memory out = read_flash() if out is None: exit(1) @@ -331,7 +348,7 @@ def extract_memory_dump(cli_output: str) -> Optional[str]: hf = parse_hardfault(memory_string) if not hf: exit(1) - print("Lines of context to watch (max 10):",end="") + print("Lines of context to watch (max 10):", end="") try: context = int(input()) except ValueError: @@ -339,4 +356,4 @@ def extract_memory_dump(cli_output: str) -> Optional[str]: if context < 0 or context > 10: context = 10 print("\n") - hard_fault_analysis(hf,context) + hard_fault_analysis(hf, context) From 3ddd83b022c6ceeef0d4d94bba190df938969e7a Mon Sep 17 00:00:00 2001 From: oganigl Date: Fri, 27 Feb 2026 16:44:40 +0100 Subject: [PATCH 4/4] format all the other python scripts --- Core/Inc/Code_generation/Generator.py | 8 +- .../Packet_generation/Packet_descriptions.py | 125 +++++++++++----- .../Packet_generation/Packet_generation.py | 138 +++++++++++------- tools/preflash_check.py | 14 +- 4 files changed, 192 insertions(+), 93 deletions(-) diff --git a/Core/Inc/Code_generation/Generator.py b/Core/Inc/Code_generation/Generator.py index 774d9aaf..86b999f8 100644 --- a/Core/Inc/Code_generation/Generator.py +++ b/Core/Inc/Code_generation/Generator.py @@ -7,8 +7,12 @@ def parse_args(): - parser = argparse.ArgumentParser(description="Generate packet headers from JSON_ADE") - parser.add_argument("board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json") + parser = argparse.ArgumentParser( + description="Generate packet headers from JSON_ADE" + ) + parser.add_argument( + "board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json" + ) return parser.parse_args() diff --git a/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py b/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py index 2b429415..dcfc807a 100644 --- a/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py +++ b/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py @@ -1,8 +1,9 @@ import re import json + class BoardDescription: - def __init__(self,name:str,board:dict,JSONpath:str): + def __init__(self, name: str, board: dict, JSONpath: str): self.name = name self.id = board["board_id"] self.ip = board["board_ip"] @@ -12,19 +13,22 @@ def __init__(self,name:str,board:dict,JSONpath:str): try: with open(JSONpath + "/general_info.json") as f: general_info = json.load(f) - if "addresses" in general_info and "backend" in general_info["addresses"]: + if ( + "addresses" in general_info + and "backend" in general_info["addresses"] + ): backend_ip = general_info["addresses"]["backend"] except Exception as e: print(f"Warning: Could not load backend IP from general_info.json: {e}") - #Sockets: + # Sockets: try: - with open(JSONpath+"/boards/"+name+"/sockets.json") as s: + with open(JSONpath + "/boards/" + name + "/sockets.json") as s: socks = json.load(s) self.sockets = self.SocketsDescription(socks, self.ip, backend_ip) except Exception as e: raise Exception(f"Error in file {JSONpath}/boards/{name}/sockets.json: {e}") - #Packets: + # Packets: self.sending_packets = [] self.data_size = 0 self.order_size = 0 @@ -32,23 +36,29 @@ def __init__(self,name:str,board:dict,JSONpath:str): self.packets = {} for measurement in board["measurements"]: try: - with open(JSONpath+"/boards/" + name + "/" + measurement) as f: + with open(JSONpath + "/boards/" + name + "/" + measurement) as f: m = json.load(f) self.measurement_lists.append(m) except Exception as e: - raise Exception(f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}") + raise Exception( + f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}" + ) for packets in board["packets"]: - packets_name = re.split(r'_|\.', packets)[0] + packets_name = re.split(r"_|\.", packets)[0] self.packets[packets_name] = [] try: - with open(JSONpath+"/boards/" + name+"/" + packets) as f: - p= json.load(f) + with open(JSONpath + "/boards/" + name + "/" + packets) as f: + p = json.load(f) except Exception as e: - raise Exception(f"Error in file {JSONpath}/boards/{name}/{packets}: {e}") - i=0 + raise Exception( + f"Error in file {JSONpath}/boards/{name}/{packets}: {e}" + ) + i = 0 for packet in p: - self.packets[packets_name].append(PacketDescription(packet,self.measurement_lists, packets)) - aux_sending= PacketDescription.check_for_sending(packet) + self.packets[packets_name].append( + PacketDescription(packet, self.measurement_lists, packets) + ) + aux_sending = PacketDescription.check_for_sending(packet) if aux_sending is not None: self.sending_packets.append(aux_sending) @@ -61,7 +71,7 @@ def __init__(self,name:str,board:dict,JSONpath:str): self.sending_packets = self.fix_sendind_packets(self.sending_packets) @staticmethod - def fix_sendind_packets(sending_packets:list): + def fix_sendind_packets(sending_packets: list): fixed_packets = [] lookup = {} for item in sending_packets: @@ -84,8 +94,6 @@ def fix_sendind_packets(sending_packets:list): return fixed_packets - - class SocketsDescription: def __init__(self, sockets: list, board_ip: str, backend_ip: str): self.allSockets = [] @@ -100,23 +108,46 @@ def __init__(self, sockets: list, board_ip: str, backend_ip: str): self.allSockets.append({"name": name, "type": sock_type}) if sock_type == "ServerSocket": - self.ServerSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"]}) + self.ServerSockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "port": sock["port"], + } + ) elif sock_type == "Socket": remote_ip = sock["remote_ip"] if remote_ip == "backend": remote_ip = self.backend_ip - self.Sockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "local_port": sock["local_port"], "remote_ip": remote_ip, "remote_port": sock["remote_port"]}) + self.Sockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "local_port": sock["local_port"], + "remote_ip": remote_ip, + "remote_port": sock["remote_port"], + } + ) elif sock_type == "DatagramSocket": remote_ip = sock["remote_ip"] if remote_ip == "backend": remote_ip = self.backend_ip - self.DatagramSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"], "remote_ip": remote_ip}) - + self.DatagramSockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "port": sock["port"], + "remote_ip": remote_ip, + } + ) class PacketDescription: - def __init__(self, packet:dict,measurements:list, filename:str="Unknown"): - self.id =packet["id"] + def __init__(self, packet: dict, measurements: list, filename: str = "Unknown"): + self.id = packet["id"] self.name = packet["name"].replace(" ", "_").replace("-", "_") self.type = packet["type"] self.variables = [] @@ -125,45 +156,64 @@ def __init__(self, packet:dict,measurements:list, filename:str="Unknown"): return for variable in packet["variables"]: self.variables.append(variable) - self.measurements.append(MeasurmentsDescription(measurements,variable, filename)) + self.measurements.append( + MeasurmentsDescription(measurements, variable, filename) + ) @staticmethod def check_for_sending(packet: dict): if "period" in packet and "period_type" in packet and "socket" in packet: name = packet["name"].replace(" ", "_").replace("-", "_") - return {"name": name, "period": packet["period"], "period_type": packet["period_type"], "socket": packet["socket"]} + return { + "name": name, + "period": packet["period"], + "period_type": packet["period_type"], + "socket": packet["socket"], + } else: return None + + class MeasurmentsDescription: - def __init__(self,measurements:list, variable:str, filename:str="Unknown"): + def __init__(self, measurements: list, variable: str, filename: str = "Unknown"): self.id = variable - if not hasattr(self.__class__, 'viewed_measurements'): + if not hasattr(self.__class__, "viewed_measurements"): self.__class__.viewed_measurements = {} - measurement = self._MeasurementSearch(measurements,variable) + measurement = self._MeasurementSearch(measurements, variable) if measurement is None: - print(f"Measurement not found for variable: {variable} in file: {filename}\n") - raise Exception(f"Measurement not found for variable: {variable} in file: {filename}") + print( + f"Measurement not found for variable: {variable} in file: {filename}\n" + ) + raise Exception( + f"Measurement not found for variable: {variable} in file: {filename}" + ) self.name = measurement["name"] - self.type = (self._unsigned_int_correction(measurement["type"]).replace(" ", "_").replace("-", "_")) + self.type = ( + self._unsigned_int_correction(measurement["type"]) + .replace(" ", "_") + .replace("-", "_") + ) if self.type == "enum": values = [] for value in measurement["enumValues"]: values.append(str(value)) - self.enum ={"name": (measurement["id"].replace(" ", "_").replace("-", "_")), "values": self._Enum_values_correction(values)} - self.type = (measurement["id"].replace(" ", "_").replace("-", "_")) + self.enum = { + "name": (measurement["id"].replace(" ", "_").replace("-", "_")), + "values": self._Enum_values_correction(values), + } + self.type = measurement["id"].replace(" ", "_").replace("-", "_") @staticmethod - def _Enum_values_correction(values:list): + def _Enum_values_correction(values: list): for i in range(len(values)): values[i] = values[i].replace(" ", "_").replace("-", "_") return values - @staticmethod - def _MeasurementSearch(measurements:list, variable:str): + def _MeasurementSearch(measurements: list, variable: str): if variable in MeasurmentsDescription.viewed_measurements: return MeasurmentsDescription.viewed_measurements[variable] for measurement_list in measurements: @@ -173,9 +223,8 @@ def _MeasurementSearch(measurements:list, variable:str): return measurment return None - @staticmethod - def _unsigned_int_correction(type:str): + def _unsigned_int_correction(type: str): aux_type = type[:4] if aux_type == "uint": type += "_t" diff --git a/Core/Inc/Code_generation/Packet_generation/Packet_generation.py b/Core/Inc/Code_generation/Packet_generation/Packet_generation.py index b25aec48..68a38a56 100644 --- a/Core/Inc/Code_generation/Packet_generation/Packet_generation.py +++ b/Core/Inc/Code_generation/Packet_generation/Packet_generation.py @@ -1,4 +1,4 @@ -from Packet_generation.Packet_descriptions import * +from Packet_generation.Packet_descriptions import BoardDescription import json import os import jinja2 @@ -6,16 +6,17 @@ templates_path = "Core/Inc/Code_generation/Packet_generation" -def Generate_PacketDescription(JSONpath:str,board:str): - with open(JSONpath+"/boards.json") as f: + +def Generate_PacketDescription(JSONpath: str, board: str): + with open(JSONpath + "/boards.json") as f: boards = json.load(f) boards_name = [] for b in boards: boards_name.append(b) if board in boards_name: - with open(JSONpath+"/" + (boards[board])) as f: + with open(JSONpath + "/" + (boards[board])) as f: b = json.load(f) - board_instance = BoardDescription(board, b,JSONpath) + board_instance = BoardDescription(board, b, JSONpath) globals()[board] = board_instance else: print(f"Board {board} not found, exiting...") @@ -24,20 +25,23 @@ def Generate_PacketDescription(JSONpath:str,board:str): return boards_name -#--------------DataPackets.hpp generation---------------# +# --------------DataPackets.hpp generation---------------# + -def Get_data_context(board:BoardDescription): - def GenerateDataEnum(board:BoardDescription): +def Get_data_context(board: BoardDescription): + def GenerateDataEnum(board: BoardDescription): Enums = [] for packet in board.packets: for packet_instance in board.packets[packet]: if packet_instance.type != "order": for measurement in packet_instance.measurements: - if hasattr(measurement, "enum")and measurement.enum not in Enums: + if ( + hasattr(measurement, "enum") + and measurement.enum not in Enums + ): Enums.append(measurement.enum) return Enums - def GenerateDataPackets(board: BoardDescription): Packets = [] totaldata = [] @@ -47,8 +51,8 @@ def GenerateDataPackets(board: BoardDescription): tempdata = "" tempdata_but_pointer = "" for variable in packet_instance.variables: - tempdata += (str(variable) + ",") - tempdata_but_pointer += ("&" + str(variable) + ",") + tempdata += str(variable) + "," + tempdata_but_pointer += "&" + str(variable) + "," if tempdata.endswith(","): tempdata = tempdata[:-1] if tempdata_but_pointer.endswith(","): @@ -56,21 +60,35 @@ def GenerateDataPackets(board: BoardDescription): packet_variables = [] for measurement in packet_instance.measurements: - packet_variables.append({ - "name": measurement.id.replace(" ", "_").replace("-", "_"), - "type": measurement.type - }) - - aux_packet = {"name": packet_instance.name, "data": tempdata_but_pointer.replace(" ", "_").replace("-", "_"), "id": packet_instance.id, "variables": packet_variables} + packet_variables.append( + { + "name": measurement.id.replace(" ", "_").replace( + "-", "_" + ), + "type": measurement.type, + } + ) + + aux_packet = { + "name": packet_instance.name, + "data": tempdata_but_pointer.replace(" ", "_").replace( + "-", "_" + ), + "id": packet_instance.id, + "variables": packet_variables, + } Packets.append(aux_packet) for measurement in packet_instance.measurements: - aux_data = {"type": measurement.type, "name": measurement.id.replace(" ", "_").replace("-", "_")} + aux_data = { + "type": measurement.type, + "name": measurement.id.replace(" ", "_").replace("-", "_"), + } if not any(x["name"] == aux_data["name"] for x in totaldata): totaldata.append(aux_data) - return Packets,totaldata + return Packets, totaldata - packets,data = GenerateDataPackets(board) + packets, data = GenerateDataPackets(board) def GenerateGroupedSendingPackets(board: BoardDescription): datagram_sockets = [s["name"] for s in board.sockets.DatagramSockets] @@ -97,43 +115,46 @@ def GenerateGroupedSendingPackets(board: BoardDescription): grouped_list = [] for (period, period_type), items in grouped_lookup.items(): - grouped_list.append({ - "period": period, - "period_type": period_type, - "packets": items - }) + grouped_list.append( + {"period": period, "period_type": period_type, "packets": items} + ) return grouped_list context = { "board": board.name, "enums": GenerateDataEnum(board), - "packets" : packets, + "packets": packets, "data": data, "size": board.order_size, - "Sockets":board.sockets.Sockets, - "DatagramSockets":board.sockets.DatagramSockets, + "Sockets": board.sockets.Sockets, + "DatagramSockets": board.sockets.DatagramSockets, "DatagramSocketNames": [s["name"] for s in board.sockets.DatagramSockets], "sending_packets": GenerateGroupedSendingPackets(board), } return context -def Generate_DataPackets_hpp(board_input:str): + +def Generate_DataPackets_hpp(board_input: str): data_packets_path = "Core/Inc/Communications/Packets/DataPackets.hpp" board_instance = globals()[board_input] - if board_instance.data_size == 0 and len(board_instance.sockets.DatagramSockets) == 0: + if ( + board_instance.data_size == 0 + and len(board_instance.sockets.DatagramSockets) == 0 + ): if os.path.exists(data_packets_path): os.remove(data_packets_path) return - env= jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) template = env.get_template("DataTemplate.hpp") context = Get_data_context(board_instance) - - with open(data_packets_path,"w") as Output: + with open(data_packets_path, "w") as Output: Output.write(template.render(context)) -#--------------OrderPackets.hpp generation---------------# + +# --------------OrderPackets.hpp generation---------------# + def Get_order_context(board: BoardDescription): def GenerateOrderEnum(board: BoardDescription): @@ -142,7 +163,10 @@ def GenerateOrderEnum(board: BoardDescription): for packet_instance in board.packets[packet]: if packet_instance.type == "order": for measurement in packet_instance.measurements: - if hasattr(measurement, "enum") and measurement.enum not in Enums: + if ( + hasattr(measurement, "enum") + and measurement.enum not in Enums + ): Enums.append(measurement.enum) return Enums @@ -155,20 +179,29 @@ def GenerateOrderPackets(board: BoardDescription): tempdata = "" tempdata_but_pointer = "" for variable in packet_instance.variables: - tempdata += (str(variable) + ",") - tempdata_but_pointer += ("&" + str(variable) + ",") + tempdata += str(variable) + "," + tempdata_but_pointer += "&" + str(variable) + "," if tempdata.endswith(","): tempdata = tempdata[:-1] tempdata_but_pointer = tempdata_but_pointer[:-1] packet_variables = [] for measurement in packet_instance.measurements: - packet_variables.append({ - "name": measurement.id.replace(" ", "_").replace("-", "_"), - "type": measurement.type - }) - - aux_packet = {"name": packet_instance.name, "data": tempdata_but_pointer, "id": packet_instance.id, "variables": packet_variables} + packet_variables.append( + { + "name": measurement.id.replace(" ", "_").replace( + "-", "_" + ), + "type": measurement.type, + } + ) + + aux_packet = { + "name": packet_instance.name, + "data": tempdata_but_pointer, + "id": packet_instance.id, + "variables": packet_variables, + } Packets.append(aux_packet) for measurement in packet_instance.measurements: aux_data = {"type": measurement.type, "name": measurement.name} @@ -181,23 +214,28 @@ def GenerateOrderPackets(board: BoardDescription): context = { "board": board.name, "enums": GenerateOrderEnum(board), - "packets" : packets, + "packets": packets, "data": data, "size": board.order_size, - "ServerSockets":board.sockets.ServerSockets, - "Sockets":board.sockets.Sockets, + "ServerSockets": board.sockets.ServerSockets, + "Sockets": board.sockets.Sockets, } return context -def Generate_OrderPackets_hpp(board_input:str): + +def Generate_OrderPackets_hpp(board_input: str): order_packets_path = "Core/Inc/Communications/Packets/OrderPackets.hpp" board_instance = globals()[board_input] - if (board_instance.order_size == 0 and len(board_instance.sockets.ServerSockets) == 0 and len(board_instance.sockets.Sockets) == 0): + if ( + board_instance.order_size == 0 + and len(board_instance.sockets.ServerSockets) == 0 + and len(board_instance.sockets.Sockets) == 0 + ): if os.path.exists(order_packets_path): os.remove(order_packets_path) return - env= jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) template = env.get_template("OrderTemplate.hpp") context = Get_order_context(board_instance) diff --git a/tools/preflash_check.py b/tools/preflash_check.py index 314125aa..6da7cc15 100644 --- a/tools/preflash_check.py +++ b/tools/preflash_check.py @@ -8,27 +8,32 @@ import sys from pathlib import Path + def get_script_dir() -> Path: return Path(__file__).parent.resolve() + def get_workspace_dir() -> Path: return get_script_dir().parent + def check_board_symbol(build_dir: Path) -> bool: """Check if the binary was compiled with BOARD symbol.""" marker_file = build_dir / "board_build_marker" return marker_file.exists() + def has_uncommitted_changes(workspace_dir: Path) -> bool: """Check if there are uncommitted changes in the repository.""" result = subprocess.run( ["git", "status", "--porcelain"], cwd=workspace_dir, capture_output=True, - text=True + text=True, ) return bool(result.stdout.strip()) + def main(): workspace_dir = get_workspace_dir() @@ -43,19 +48,22 @@ def main(): # Check if this is a BOARD build if not check_board_symbol(build_dir): - print("Binary was not compiled with BOARD symbol (or marker not found). Skipping pre-flash check.") + print( + "Binary was not compiled with BOARD symbol (or marker not found). Skipping pre-flash check." + ) sys.exit(0) print("Binary was compiled with BOARD symbol. Checking for uncommitted changes...") if has_uncommitted_changes(workspace_dir): print("Uncommitted changes detected. Aborting Flash...") - print("Please before flashing a board make sure all changes are commited"); + print("Please before flashing a board make sure all changes are commited") sys.exit(1) else: print("No uncommitted changes. Proceeding with flash.") sys.exit(0) + if __name__ == "__main__": main()