diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2b383cc9..85da3c60 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,13 +2,24 @@ minimum_pre_commit_version: "3.5.0" default_install_hook_types: - pre-commit - pre-push + repos: + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - id: check-merge-conflict - id: end-of-file-fixer - id: trailing-whitespace + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.0 + hooks: + - id: ruff + args: [ --fix ] + - id: ruff-format + + - repo: https://github.com/pre-commit/mirrors-clang-format rev: v17.0.6 hooks: diff --git a/Core/Inc/Code_generation/Generator.py b/Core/Inc/Code_generation/Generator.py index 774d9aaf..86b999f8 100644 --- a/Core/Inc/Code_generation/Generator.py +++ b/Core/Inc/Code_generation/Generator.py @@ -7,8 +7,12 @@ def parse_args(): - parser = argparse.ArgumentParser(description="Generate packet headers from JSON_ADE") - parser.add_argument("board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json") + parser = argparse.ArgumentParser( + description="Generate packet headers from JSON_ADE" + ) + parser.add_argument( + "board", help="Board key from Core/Inc/Code_generation/JSON_ADE/boards.json" + ) return parser.parse_args() diff --git a/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py b/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py index 2b429415..dcfc807a 100644 --- a/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py +++ b/Core/Inc/Code_generation/Packet_generation/Packet_descriptions.py @@ -1,8 +1,9 @@ import re import json + class BoardDescription: - def __init__(self,name:str,board:dict,JSONpath:str): + def __init__(self, name: str, board: dict, JSONpath: str): self.name = name self.id = board["board_id"] self.ip = board["board_ip"] @@ -12,19 +13,22 @@ def __init__(self,name:str,board:dict,JSONpath:str): try: with open(JSONpath + "/general_info.json") as f: general_info = json.load(f) - if "addresses" in general_info and "backend" in general_info["addresses"]: + if ( + "addresses" in general_info + and "backend" in general_info["addresses"] + ): backend_ip = general_info["addresses"]["backend"] except Exception as e: print(f"Warning: Could not load backend IP from general_info.json: {e}") - #Sockets: + # Sockets: try: - with open(JSONpath+"/boards/"+name+"/sockets.json") as s: + with open(JSONpath + "/boards/" + name + "/sockets.json") as s: socks = json.load(s) self.sockets = self.SocketsDescription(socks, self.ip, backend_ip) except Exception as e: raise Exception(f"Error in file {JSONpath}/boards/{name}/sockets.json: {e}") - #Packets: + # Packets: self.sending_packets = [] self.data_size = 0 self.order_size = 0 @@ -32,23 +36,29 @@ def __init__(self,name:str,board:dict,JSONpath:str): self.packets = {} for measurement in board["measurements"]: try: - with open(JSONpath+"/boards/" + name + "/" + measurement) as f: + with open(JSONpath + "/boards/" + name + "/" + measurement) as f: m = json.load(f) self.measurement_lists.append(m) except Exception as e: - raise Exception(f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}") + raise Exception( + f"Error in file {JSONpath}/boards/{name}/{measurement}: {e}" + ) for packets in board["packets"]: - packets_name = re.split(r'_|\.', packets)[0] + packets_name = re.split(r"_|\.", packets)[0] self.packets[packets_name] = [] try: - with open(JSONpath+"/boards/" + name+"/" + packets) as f: - p= json.load(f) + with open(JSONpath + "/boards/" + name + "/" + packets) as f: + p = json.load(f) except Exception as e: - raise Exception(f"Error in file {JSONpath}/boards/{name}/{packets}: {e}") - i=0 + raise Exception( + f"Error in file {JSONpath}/boards/{name}/{packets}: {e}" + ) + i = 0 for packet in p: - self.packets[packets_name].append(PacketDescription(packet,self.measurement_lists, packets)) - aux_sending= PacketDescription.check_for_sending(packet) + self.packets[packets_name].append( + PacketDescription(packet, self.measurement_lists, packets) + ) + aux_sending = PacketDescription.check_for_sending(packet) if aux_sending is not None: self.sending_packets.append(aux_sending) @@ -61,7 +71,7 @@ def __init__(self,name:str,board:dict,JSONpath:str): self.sending_packets = self.fix_sendind_packets(self.sending_packets) @staticmethod - def fix_sendind_packets(sending_packets:list): + def fix_sendind_packets(sending_packets: list): fixed_packets = [] lookup = {} for item in sending_packets: @@ -84,8 +94,6 @@ def fix_sendind_packets(sending_packets:list): return fixed_packets - - class SocketsDescription: def __init__(self, sockets: list, board_ip: str, backend_ip: str): self.allSockets = [] @@ -100,23 +108,46 @@ def __init__(self, sockets: list, board_ip: str, backend_ip: str): self.allSockets.append({"name": name, "type": sock_type}) if sock_type == "ServerSocket": - self.ServerSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"]}) + self.ServerSockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "port": sock["port"], + } + ) elif sock_type == "Socket": remote_ip = sock["remote_ip"] if remote_ip == "backend": remote_ip = self.backend_ip - self.Sockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "local_port": sock["local_port"], "remote_ip": remote_ip, "remote_port": sock["remote_port"]}) + self.Sockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "local_port": sock["local_port"], + "remote_ip": remote_ip, + "remote_port": sock["remote_port"], + } + ) elif sock_type == "DatagramSocket": remote_ip = sock["remote_ip"] if remote_ip == "backend": remote_ip = self.backend_ip - self.DatagramSockets.append({"name": name, "type": sock_type, "board_ip": self.board_ip, "port": sock["port"], "remote_ip": remote_ip}) - + self.DatagramSockets.append( + { + "name": name, + "type": sock_type, + "board_ip": self.board_ip, + "port": sock["port"], + "remote_ip": remote_ip, + } + ) class PacketDescription: - def __init__(self, packet:dict,measurements:list, filename:str="Unknown"): - self.id =packet["id"] + def __init__(self, packet: dict, measurements: list, filename: str = "Unknown"): + self.id = packet["id"] self.name = packet["name"].replace(" ", "_").replace("-", "_") self.type = packet["type"] self.variables = [] @@ -125,45 +156,64 @@ def __init__(self, packet:dict,measurements:list, filename:str="Unknown"): return for variable in packet["variables"]: self.variables.append(variable) - self.measurements.append(MeasurmentsDescription(measurements,variable, filename)) + self.measurements.append( + MeasurmentsDescription(measurements, variable, filename) + ) @staticmethod def check_for_sending(packet: dict): if "period" in packet and "period_type" in packet and "socket" in packet: name = packet["name"].replace(" ", "_").replace("-", "_") - return {"name": name, "period": packet["period"], "period_type": packet["period_type"], "socket": packet["socket"]} + return { + "name": name, + "period": packet["period"], + "period_type": packet["period_type"], + "socket": packet["socket"], + } else: return None + + class MeasurmentsDescription: - def __init__(self,measurements:list, variable:str, filename:str="Unknown"): + def __init__(self, measurements: list, variable: str, filename: str = "Unknown"): self.id = variable - if not hasattr(self.__class__, 'viewed_measurements'): + if not hasattr(self.__class__, "viewed_measurements"): self.__class__.viewed_measurements = {} - measurement = self._MeasurementSearch(measurements,variable) + measurement = self._MeasurementSearch(measurements, variable) if measurement is None: - print(f"Measurement not found for variable: {variable} in file: {filename}\n") - raise Exception(f"Measurement not found for variable: {variable} in file: {filename}") + print( + f"Measurement not found for variable: {variable} in file: {filename}\n" + ) + raise Exception( + f"Measurement not found for variable: {variable} in file: {filename}" + ) self.name = measurement["name"] - self.type = (self._unsigned_int_correction(measurement["type"]).replace(" ", "_").replace("-", "_")) + self.type = ( + self._unsigned_int_correction(measurement["type"]) + .replace(" ", "_") + .replace("-", "_") + ) if self.type == "enum": values = [] for value in measurement["enumValues"]: values.append(str(value)) - self.enum ={"name": (measurement["id"].replace(" ", "_").replace("-", "_")), "values": self._Enum_values_correction(values)} - self.type = (measurement["id"].replace(" ", "_").replace("-", "_")) + self.enum = { + "name": (measurement["id"].replace(" ", "_").replace("-", "_")), + "values": self._Enum_values_correction(values), + } + self.type = measurement["id"].replace(" ", "_").replace("-", "_") @staticmethod - def _Enum_values_correction(values:list): + def _Enum_values_correction(values: list): for i in range(len(values)): values[i] = values[i].replace(" ", "_").replace("-", "_") return values - @staticmethod - def _MeasurementSearch(measurements:list, variable:str): + def _MeasurementSearch(measurements: list, variable: str): if variable in MeasurmentsDescription.viewed_measurements: return MeasurmentsDescription.viewed_measurements[variable] for measurement_list in measurements: @@ -173,9 +223,8 @@ def _MeasurementSearch(measurements:list, variable:str): return measurment return None - @staticmethod - def _unsigned_int_correction(type:str): + def _unsigned_int_correction(type: str): aux_type = type[:4] if aux_type == "uint": type += "_t" diff --git a/Core/Inc/Code_generation/Packet_generation/Packet_generation.py b/Core/Inc/Code_generation/Packet_generation/Packet_generation.py index b25aec48..68a38a56 100644 --- a/Core/Inc/Code_generation/Packet_generation/Packet_generation.py +++ b/Core/Inc/Code_generation/Packet_generation/Packet_generation.py @@ -1,4 +1,4 @@ -from Packet_generation.Packet_descriptions import * +from Packet_generation.Packet_descriptions import BoardDescription import json import os import jinja2 @@ -6,16 +6,17 @@ templates_path = "Core/Inc/Code_generation/Packet_generation" -def Generate_PacketDescription(JSONpath:str,board:str): - with open(JSONpath+"/boards.json") as f: + +def Generate_PacketDescription(JSONpath: str, board: str): + with open(JSONpath + "/boards.json") as f: boards = json.load(f) boards_name = [] for b in boards: boards_name.append(b) if board in boards_name: - with open(JSONpath+"/" + (boards[board])) as f: + with open(JSONpath + "/" + (boards[board])) as f: b = json.load(f) - board_instance = BoardDescription(board, b,JSONpath) + board_instance = BoardDescription(board, b, JSONpath) globals()[board] = board_instance else: print(f"Board {board} not found, exiting...") @@ -24,20 +25,23 @@ def Generate_PacketDescription(JSONpath:str,board:str): return boards_name -#--------------DataPackets.hpp generation---------------# +# --------------DataPackets.hpp generation---------------# + -def Get_data_context(board:BoardDescription): - def GenerateDataEnum(board:BoardDescription): +def Get_data_context(board: BoardDescription): + def GenerateDataEnum(board: BoardDescription): Enums = [] for packet in board.packets: for packet_instance in board.packets[packet]: if packet_instance.type != "order": for measurement in packet_instance.measurements: - if hasattr(measurement, "enum")and measurement.enum not in Enums: + if ( + hasattr(measurement, "enum") + and measurement.enum not in Enums + ): Enums.append(measurement.enum) return Enums - def GenerateDataPackets(board: BoardDescription): Packets = [] totaldata = [] @@ -47,8 +51,8 @@ def GenerateDataPackets(board: BoardDescription): tempdata = "" tempdata_but_pointer = "" for variable in packet_instance.variables: - tempdata += (str(variable) + ",") - tempdata_but_pointer += ("&" + str(variable) + ",") + tempdata += str(variable) + "," + tempdata_but_pointer += "&" + str(variable) + "," if tempdata.endswith(","): tempdata = tempdata[:-1] if tempdata_but_pointer.endswith(","): @@ -56,21 +60,35 @@ def GenerateDataPackets(board: BoardDescription): packet_variables = [] for measurement in packet_instance.measurements: - packet_variables.append({ - "name": measurement.id.replace(" ", "_").replace("-", "_"), - "type": measurement.type - }) - - aux_packet = {"name": packet_instance.name, "data": tempdata_but_pointer.replace(" ", "_").replace("-", "_"), "id": packet_instance.id, "variables": packet_variables} + packet_variables.append( + { + "name": measurement.id.replace(" ", "_").replace( + "-", "_" + ), + "type": measurement.type, + } + ) + + aux_packet = { + "name": packet_instance.name, + "data": tempdata_but_pointer.replace(" ", "_").replace( + "-", "_" + ), + "id": packet_instance.id, + "variables": packet_variables, + } Packets.append(aux_packet) for measurement in packet_instance.measurements: - aux_data = {"type": measurement.type, "name": measurement.id.replace(" ", "_").replace("-", "_")} + aux_data = { + "type": measurement.type, + "name": measurement.id.replace(" ", "_").replace("-", "_"), + } if not any(x["name"] == aux_data["name"] for x in totaldata): totaldata.append(aux_data) - return Packets,totaldata + return Packets, totaldata - packets,data = GenerateDataPackets(board) + packets, data = GenerateDataPackets(board) def GenerateGroupedSendingPackets(board: BoardDescription): datagram_sockets = [s["name"] for s in board.sockets.DatagramSockets] @@ -97,43 +115,46 @@ def GenerateGroupedSendingPackets(board: BoardDescription): grouped_list = [] for (period, period_type), items in grouped_lookup.items(): - grouped_list.append({ - "period": period, - "period_type": period_type, - "packets": items - }) + grouped_list.append( + {"period": period, "period_type": period_type, "packets": items} + ) return grouped_list context = { "board": board.name, "enums": GenerateDataEnum(board), - "packets" : packets, + "packets": packets, "data": data, "size": board.order_size, - "Sockets":board.sockets.Sockets, - "DatagramSockets":board.sockets.DatagramSockets, + "Sockets": board.sockets.Sockets, + "DatagramSockets": board.sockets.DatagramSockets, "DatagramSocketNames": [s["name"] for s in board.sockets.DatagramSockets], "sending_packets": GenerateGroupedSendingPackets(board), } return context -def Generate_DataPackets_hpp(board_input:str): + +def Generate_DataPackets_hpp(board_input: str): data_packets_path = "Core/Inc/Communications/Packets/DataPackets.hpp" board_instance = globals()[board_input] - if board_instance.data_size == 0 and len(board_instance.sockets.DatagramSockets) == 0: + if ( + board_instance.data_size == 0 + and len(board_instance.sockets.DatagramSockets) == 0 + ): if os.path.exists(data_packets_path): os.remove(data_packets_path) return - env= jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) template = env.get_template("DataTemplate.hpp") context = Get_data_context(board_instance) - - with open(data_packets_path,"w") as Output: + with open(data_packets_path, "w") as Output: Output.write(template.render(context)) -#--------------OrderPackets.hpp generation---------------# + +# --------------OrderPackets.hpp generation---------------# + def Get_order_context(board: BoardDescription): def GenerateOrderEnum(board: BoardDescription): @@ -142,7 +163,10 @@ def GenerateOrderEnum(board: BoardDescription): for packet_instance in board.packets[packet]: if packet_instance.type == "order": for measurement in packet_instance.measurements: - if hasattr(measurement, "enum") and measurement.enum not in Enums: + if ( + hasattr(measurement, "enum") + and measurement.enum not in Enums + ): Enums.append(measurement.enum) return Enums @@ -155,20 +179,29 @@ def GenerateOrderPackets(board: BoardDescription): tempdata = "" tempdata_but_pointer = "" for variable in packet_instance.variables: - tempdata += (str(variable) + ",") - tempdata_but_pointer += ("&" + str(variable) + ",") + tempdata += str(variable) + "," + tempdata_but_pointer += "&" + str(variable) + "," if tempdata.endswith(","): tempdata = tempdata[:-1] tempdata_but_pointer = tempdata_but_pointer[:-1] packet_variables = [] for measurement in packet_instance.measurements: - packet_variables.append({ - "name": measurement.id.replace(" ", "_").replace("-", "_"), - "type": measurement.type - }) - - aux_packet = {"name": packet_instance.name, "data": tempdata_but_pointer, "id": packet_instance.id, "variables": packet_variables} + packet_variables.append( + { + "name": measurement.id.replace(" ", "_").replace( + "-", "_" + ), + "type": measurement.type, + } + ) + + aux_packet = { + "name": packet_instance.name, + "data": tempdata_but_pointer, + "id": packet_instance.id, + "variables": packet_variables, + } Packets.append(aux_packet) for measurement in packet_instance.measurements: aux_data = {"type": measurement.type, "name": measurement.name} @@ -181,23 +214,28 @@ def GenerateOrderPackets(board: BoardDescription): context = { "board": board.name, "enums": GenerateOrderEnum(board), - "packets" : packets, + "packets": packets, "data": data, "size": board.order_size, - "ServerSockets":board.sockets.ServerSockets, - "Sockets":board.sockets.Sockets, + "ServerSockets": board.sockets.ServerSockets, + "Sockets": board.sockets.Sockets, } return context -def Generate_OrderPackets_hpp(board_input:str): + +def Generate_OrderPackets_hpp(board_input: str): order_packets_path = "Core/Inc/Communications/Packets/OrderPackets.hpp" board_instance = globals()[board_input] - if (board_instance.order_size == 0 and len(board_instance.sockets.ServerSockets) == 0 and len(board_instance.sockets.Sockets) == 0): + if ( + board_instance.order_size == 0 + and len(board_instance.sockets.ServerSockets) == 0 + and len(board_instance.sockets.Sockets) == 0 + ): if os.path.exists(order_packets_path): os.remove(order_packets_path) return - env= jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_path)) template = env.get_template("OrderTemplate.hpp") context = Get_order_context(board_instance) diff --git a/Core/Src/stm32h7xx_it.c b/Core/Src/stm32h7xx_it.c index 2436566e..7222b4fc 100644 --- a/Core/Src/stm32h7xx_it.c +++ b/Core/Src/stm32h7xx_it.c @@ -260,9 +260,7 @@ __attribute__((noreturn, optimize("O0"))) void my_fault_handler_c(sContextStateF const uint16_t INVSTATE = usage_fault & 0x0002; // Invalid processor state const uint16_t UNDEFINSTR = usage_fault & 0x0001; // Undefined instruction. } - if (usage_fault | bus_fault) { - scan_call_stack(frame, &log_hard_fault); - } + scan_call_stack(frame, &log_hard_fault); volatile uint8_t metadata_buffer[0x100]; memcpy(metadata_buffer, (void*)METADATA_FLASH_ADDR, 0x100); // write log hard fault diff --git a/hard_faullt_analysis.py b/hard_faullt_analysis.py deleted file mode 100644 index 233eb2c0..00000000 --- a/hard_faullt_analysis.py +++ /dev/null @@ -1,243 +0,0 @@ -import subprocess -import struct -import re -import os -HF_FLASH_ADDR = 0x080C0000 -HF_FLASH_ADDR_STRING = "0x080C000" -ELF_FILE = "out/build/latest.elf" - -CALL_TRACE_MAX_DEPTH = 16 -def read_flash(): - try: - cmd = [ - "STM32_Programmer_CLI", - "-c", "port=SWD", - "-r32", hex(HF_FLASH_ADDR), "112" - ] - out = subprocess.check_output(cmd, text=True) - return out - except subprocess.CalledProcessError as e: - print("Stop debugging to check fault analysis!!!") - print(f"Error: {e}") - return None - except FileNotFoundError: - print("STM32_Programmer_CLI not found. Make sure it is installed and in PATH.") - return None -def decode_cfsr_memory(cfsr, fault_addr): - memory_fault = cfsr & 0xFF - if memory_fault == 0: - return 0 - print("\nMemory Fault (MMFSR):") - if memory_fault & 0b10000000: - print(f" MMARVALID: Memory fault address valid -> 0x{fault_addr:08X}") - if fault_addr in (0xFFFFFFFF, 0x00000000): - print(" Fault address is invalid / unmapped memory") - else: - mem_info = addr2line(fault_addr) - print_code_context(mem_info) - if memory_fault & 0b00100000: - print(" MLSPERR : Floating Point Unit lazy state preservation error") - if memory_fault & 0b00010000: - print(" MSTKERR : Stack error on entry to exception") - if memory_fault & 0b00001000: - print(" MUNSTKERR : Stack error on return from exception") - if memory_fault & 0b00000010: - print(" DACCVIOL : Data access violation (NULL pointer or invalid access)") - if memory_fault & 0b00000001: - print(" IACCVIOL : Instruction access violation") - return 1 - -# -------------------------- -# Decode Bus Fault (BFSR) -# -------------------------- -def decode_cfsr_bus(cfsr, fault_addr): - bus_fault = (cfsr & 0x0000FF00) >> 8 - if bus_fault == 0: - return 0 - print("\nBus Fault (BFSR):") - if bus_fault & 0b10000000: - if(bus_fault & 0b00000001): - print(f" BFARVALID : Bus fault address valid -> 0x{fault_addr:08X}") - if bus_fault & 0b00000100: - print(f"\033[91m Bus fault address imprecise\033[0m (DON'T LOOK CALL STACK)") - - if bus_fault & 0b00100000: - print(" LSPERR : Floating Point Unit lazy state preservation error") - if bus_fault & 0b00010000: - print(" STKERR : Stack error on entry to exception") - if bus_fault & 0b00001000: - print(" UNSTKERR : Stack error on return from exception") - return 2 - -# -------------------------- -# Decode Usage Fault (UFSR) -# -------------------------- -def decode_cfsr_usage(cfsr): - usage_fault = (cfsr & 0xFFFF0000) >> 16 - if usage_fault == 0: - return 0 - print("\nUsage Fault (UFSR):") - if usage_fault & 0x0200: - print(" DIVBYZERO : Division by zero") - if usage_fault & 0x0100: - print(" UNALIGNED : Unaligned memory access") - if usage_fault & 0x0008: - print(" NOCP : Accessed FPU when not present") - if usage_fault & 0x0004: - print(" INVPC : Invalid Program Counter(PC) load") - if usage_fault & 0x0002: - print(" INVSTATE : Invalid processor state") - if usage_fault & 0x0001: - print(" UNDEFINSTR : Undefined instruction") - return 4 - -def decode_cfsr(cfsr, fault_addr): - error = 0 - error = decode_cfsr_memory(cfsr, fault_addr) + error - error = decode_cfsr_bus(cfsr, fault_addr) + error - error = decode_cfsr_usage(cfsr) + error - return error - - -def addr2line(addr): - cmd = ["arm-none-eabi-addr2line", "-e", ELF_FILE, "-f", "-C", hex(addr)] - try: - output = subprocess.check_output(cmd, text=True).strip() - return output - except Exception as e: - return f"addr2line failed: {e}" - -def analyze_call_stack(calltrace_depth, calltrace_pcs, context=2): - """ - Muestra el call stack, omitiendo frames sin fuente y mostrando snippet de código. - """ - print("\n==== Call Stack Trace ====") - if calltrace_depth == 0: - print("No call trace available.") - return - -def analyze_call_stack(calltrace_depth, calltrace_pcs, context=0): - """ - Muestra el call stack, mostrando snippet de código de la línea exacta - sin intentar sumar líneas arriba/abajo (context=0 por defecto). - Omite frames sin fuente. - """ - print("\n==== Call Stack Trace ====") - if calltrace_depth == 0: - print("No call trace available.") - return - - for pc in calltrace_pcs[:calltrace_depth]: - pc_base = pc & ~1 - snippet = addr2line(pc_base- 4).strip() - if not snippet or snippet.startswith("??:?"): - continue # no hay fuente, saltar - print_code_context(snippet,1) - - print("======================================================") - - - - -def print_code_context(lines, context=2): - """ - lines: exit of addr2line (función + file:line) - context: how many lines up/down show - """ - line_list = lines.splitlines() - if len(line_list) < 2: - print("Invalid addr2line output") - return - - file_line = line_list[1].strip() - split = file_line.rfind(':') - file_path = file_line[:split] - try: - line_no = int(file_line[split+1:]) - 1 # índice base 0 - except ValueError: - print("\33[91m Couldn't find exact line\33[0m") - return - if not os.path.exists(file_path): - print("Source file not found") - return - - with open(file_path, "r") as f: - file_lines = f.readlines() - - start = max(0, line_no - context) - end = min(len(file_lines), line_no + context + 1) - - print(f"\nSource snippet from {file_path}:") - for i in range(start, end): - code = file_lines[i].rstrip() - # Si es la línea del error, la ponemos en rojo - if i == line_no: - print(f"\033[91m{i+1:>4}: {code}\033[0m") # rojo - else: - print(f"{i+1:>4}: {code}") - -def hard_fault_analysis(memory_string): - raw = bytes.fromhex(memory_string) - raw = struct.unpack(">28I",raw) - hf = { - "HF_Flag": raw[0], - "r0": raw[1], - "r1": raw[2], - "r2": raw[3], - "r3": raw[4], - "r12": raw[5], - "lr": raw[6], - "pc": raw[7], - "psr": raw[8], - "cfsr": raw[9], - "fault_addr": raw[10], - "calltrace_depth": raw[11], - "calltrace_pcs": raw[12:28] - } - if(hf["HF_Flag"] != 0xFF00FF00): - print("There was no hardfault in your Microcontroller, Kudos for you, I hope...") - return - print("================HARDFAULT DETECTED ===========") - print("Registers:") - - for r in ['r0','r1','r2','r3','r12','lr','pc','psr']: - print(f" {r.upper():<4}: 0x{hf[r]:08X}") - - print(f" CFSR: 0x{hf['cfsr']:08X}") - error = decode_cfsr(hf["cfsr"], hf["fault_addr"]) - print("\nSource Location:") - pc_loc = addr2line(hf["pc"]) - lr_loc = addr2line(hf["lr"]) - print(f" Linker Register : 0x{hf['lr']:08X} -> {lr_loc}") - - print(f" Program Counter : 0x{hf['pc']:08X} -> {pc_loc}") - print_code_context(pc_loc) - - analyze_call_stack(hf["calltrace_depth"],hf["calltrace_pcs"]) - - print("======================================================") - - - print("Note: In Release builds (-O2/-O3) the PC may not point exactly to the failing instruction.") - print(" During interrupts, bus faults, or stack corruption, the PC can be imprecise.") - print("\nIn case of Imprecise error is dificult to find due to is asynchronous fault") - print("The error has to be before PC. But not possible to know exactly when.") - print("Check this link to know more : https://interrupt.memfault.com/blog/cortex-m-hardfault-debug#fn:8") - - -if __name__ == '__main__': - out = read_flash() - if(out == None): - exit() - pos_memory_flash = out.rfind(HF_FLASH_ADDR_STRING) - print(out[0:pos_memory_flash]) - flash = out[pos_memory_flash:] - print(flash) - memory_string = "" - for line in flash.splitlines(): - if(line.find(':') == -1): - break - _,mem = line.split(":") - memory_string += mem - memory_string = memory_string.replace(" ","") - hard_fault_analysis(memory_string) diff --git a/hard_fault_analysis.py b/hard_fault_analysis.py new file mode 100644 index 00000000..02e069af --- /dev/null +++ b/hard_fault_analysis.py @@ -0,0 +1,359 @@ +import subprocess +import struct +import os +from dataclasses import dataclass +from typing import List, Optional + +HF_FLASH_ADDR = 0x080C0000 +HF_FLASH_ADDR_STRING = "0x080C000" +ELF_FILE = "out/build/latest.elf" + +CALL_TRACE_MAX_DEPTH = 16 + + +@dataclass +class HardFaultFrame: + flag: int + r0: int + r1: int + r2: int + r3: int + r12: int + lr: int + pc: int + psr: int + cfsr: int + fault_addr: int + calltrace_depth: int + calltrace_pcs: List[int] + + +def read_flash(): + try: + cmd = [ + "STM32_Programmer_CLI", + "-c", + "port=SWD", + "-r32", + hex(HF_FLASH_ADDR), + "112", + ] + out = subprocess.check_output(cmd, text=True) + return out + except subprocess.CalledProcessError as e: + print("Stop debugging to check fault analysis!!!") + print(f"Error: {e}") + return None + except FileNotFoundError: + print("STM32_Programmer_CLI not found. Make sure it is installed and in PATH.") + return None + + +def decode_cfsr_memory(cfsr, fault_addr): + memory_fault = cfsr & 0xFF + if memory_fault == 0: + return 0 + print("\nMemory Fault (MMFSR):") + if memory_fault & 0b10000000: + print(f" MMARVALID: Memory fault address valid -> 0x{fault_addr:08X}") + if fault_addr in (0xFFFFFFFF, 0x00000000): + print(" Fault address is invalid / unmapped memory") + else: + mem_info = addr2line(fault_addr) + print_code_context(mem_info) + if memory_fault & 0b00100000: + print(" MLSPERR : Floating Point Unit lazy state preservation error") + if memory_fault & 0b00010000: + print(" MSTKERR : Stack error on entry to exception") + if memory_fault & 0b00001000: + print(" MUNSTKERR : Stack error on return from exception") + if memory_fault & 0b00000010: + print(" DACCVIOL : Data access violation (NULL pointer or invalid access)") + if memory_fault & 0b00000001: + print(" IACCVIOL : Instruction access violation") + return 1 + + +# -------------------------- +# Decode Bus Fault (BFSR) +# -------------------------- +def decode_cfsr_bus(cfsr, fault_addr): + bus_fault = (cfsr & 0x0000FF00) >> 8 + if bus_fault == 0: + return 0 + print("\nBus Fault (BFSR):") + if bus_fault & 0b10000000: + if bus_fault & 0b00000001: + print(f" BFARVALID : Bus fault address valid -> 0x{fault_addr:08X}") + if bus_fault & 0b00000100: + print("\033[91m Bus fault address imprecise\033[0m (DON'T LOOK CALL STACK)") + + if bus_fault & 0b00100000: + print(" LSPERR : Floating Point Unit lazy state preservation error") + if bus_fault & 0b00010000: + print(" STKERR : Stack error on entry to exception") + if bus_fault & 0b00001000: + print(" UNSTKERR : Stack error on return from exception") + return 2 + + +# -------------------------- +# Decode Usage Fault (UFSR) +# -------------------------- +def decode_cfsr_usage(cfsr): + usage_fault = (cfsr & 0xFFFF0000) >> 16 + if usage_fault == 0: + return 0 + print("\nUsage Fault (UFSR):") + if usage_fault & 0x0200: + print(" DIVBYZERO : Division by zero") + if usage_fault & 0x0100: + print(" UNALIGNED : Unaligned memory access") + if usage_fault & 0x0008: + print(" NOCP : Accessed FPU when not present") + if usage_fault & 0x0004: + print(" INVPC : Invalid Program Counter(PC) load") + if usage_fault & 0x0002: + print(" INVSTATE : Invalid processor state") + if usage_fault & 0x0001: + print(" UNDEFINSTR : Undefined instruction") + return 4 + + +def decode_cfsr(cfsr, fault_addr): + error = 0 + error = decode_cfsr_memory(cfsr, fault_addr) + error + error = decode_cfsr_bus(cfsr, fault_addr) + error + error = decode_cfsr_usage(cfsr) + error + return error + + +def addr2line(addr): + cmd = ["arm-none-eabi-addr2line", "-e", ELF_FILE, "-f", "-C", hex(addr)] + try: + output = subprocess.check_output(cmd, text=True).strip() + return output + except Exception as e: + return f"addr2line failed: {e}" + + +def print_code_context(lines, context=2): + # addr2line + # line 0: function name + # line 1 : file : line + line_list = lines.splitlines() + if len(line_list) < 2: + print("Invalid addr2line output") + return + + file_line = line_list[1].strip() + split = file_line.rfind(":") + + if split == -1: + print("Invalid file:line format") + return + + file_path = file_line[:split] + + try: + line_no = int(file_line[split + 1 :]) - 1 + except ValueError: + print("Couldn't parse line number") + return + + if not os.path.exists(file_path): + print(f"Source file not found: {file_path}") + return + + with open(file_path, "r") as f: + file_lines = f.readlines() + + start = max(0, line_no - context) + end = min(len(file_lines), line_no + context + 1) + + print(f"\nSource snippet from {file_path}:") + + for i in range(start, end): + code = file_lines[i].rstrip() + if i == line_no: + print(f"\033[91m{i+1:>4}: {code}\033[0m") + else: + print(f"{i+1:>4}: {code}") + + +def parse_hardfault(memory_string: str) -> Optional[HardFaultFrame]: + try: + raw_bytes = bytes.fromhex(memory_string) + + expected_size = 28 * 4 + if len(raw_bytes) != expected_size: + print(f"Invalid dump size. Expected {expected_size}, got {len(raw_bytes)}") + return None + + raw = struct.unpack(">28I", raw_bytes) + + return HardFaultFrame( + flag=raw[0], + r0=raw[1], + r1=raw[2], + r2=raw[3], + r3=raw[4], + r12=raw[5], + lr=raw[6], + pc=raw[7], + psr=raw[8], + cfsr=raw[9], + fault_addr=raw[10], + calltrace_depth=raw[11], + calltrace_pcs=list(raw[12:28]), + ) + + except Exception as e: + print(f"Error parsing hardfault frame: {e}") + return None + + +def analyze_call_stack( + calltrace_depth: int, calltrace_pcs: List[int], context: int = 1 +): + print("\n==== Call Stack Trace ====") + + try: + if not isinstance(calltrace_depth, int): + print("Invalid calltrace_depth type") + return + + if calltrace_depth <= 0: + print("No call trace available.") + return + + if not isinstance(calltrace_pcs, list): + print("Invalid calltrace_pcs structure") + return + + depth = min(calltrace_depth, len(calltrace_pcs), CALL_TRACE_MAX_DEPTH) + + for i in range(depth): + pc = calltrace_pcs[i] + + # Validación básica de PC + if not isinstance(pc, int) or pc == 0: + continue + + # Limpiar bit Thumb + pc_base = pc & ~1 + + # Protección contra underflow + if pc_base < 4: + continue + + try: + snippet = addr2line(pc_base - 4) + except Exception as e: + print(f"addr2line failed for PC 0x{pc:08X}: {e}") + continue + + if not snippet or "??" in snippet: + continue + + try: + print_code_context(snippet, context) + except Exception as e: + print(f"Failed printing context for PC 0x{pc:08X}: {e}") + + except Exception as e: + print(f"Unexpected error in analyze_call_stack: {e}") + + print("===============================================") + + +def hard_fault_analysis(hf, context): + if hf.flag != 0xFF00FF00: + print( + "There was no hardfault in your Microcontroller, Kudos for you, I hope..." + ) + return + + print("================HARDFAULT DETECTED ===========") + print("Registers:") + + for r in ["r0", "r1", "r2", "r3", "r12", "lr", "pc", "psr"]: + value = getattr(hf, r) + print(f" {r.upper():<4}: 0x{value:08X}") + print("\n") + print("Register that contains the info about the HardFault") + print(f" CFSR: 0x{hf.cfsr:08X}") + # get the cause of the error + print("------HardFault Fail------") + decode_cfsr(hf.cfsr, hf.fault_addr) + print("---------------------------") + + pc_loc = addr2line(hf.pc) + lr_loc = addr2line(hf.lr) + + print("\n=======Source Location: ===========\n") + print(f" --> Linker Register : 0x{hf.lr:08X}\n -> {lr_loc}") + print_code_context(lr_loc, context) + print("\n") + print(f" -->Program Counter : 0x{hf.pc:08X}\n -> {pc_loc}") + print_code_context(pc_loc, context) + print("=============================") + analyze_call_stack(hf.calltrace_depth, hf.calltrace_pcs, context) + + print("======================================================") + + print( + "Note: In Release builds (-O2/-O3) the PC may not point exactly to the failing instruction." + ) + print( + " During interrupts, bus faults, or stack corruption, the PC can be imprecise." + ) + print( + "\nIn case of Imprecise error is dificult to find due to is asynchronous fault" + ) + print("The error has to be before PC. But not possible to know exactly when.") + print( + "Check this link to know more : https://interrupt.memfault.com/blog/cortex-m-hardfault-debug#fn:8" + ) + + +# get the memory with the address of HardFault +def extract_memory_dump(cli_output: str) -> Optional[str]: + pos = cli_output.rfind(HF_FLASH_ADDR_STRING) + if pos == -1: + print("The address was not found in CLI output") + return None + + flash = cli_output[pos:] + + memory_string = "" + for line in flash.splitlines(): + if ":" not in line: + break + _, mem = line.split(":", 1) + memory_string += mem.strip() + + return memory_string.replace(" ", "") + + +if __name__ == "__main__": + # First get all the information from the flash Memory + out = read_flash() + if out is None: + exit(1) + + memory_string = extract_memory_dump(out) + if not memory_string: + exit(1) + hf = parse_hardfault(memory_string) + if not hf: + exit(1) + print("Lines of context to watch (max 10):", end="") + try: + context = int(input()) + except ValueError: + context = 2 + if context < 0 or context > 10: + context = 10 + print("\n") + hard_fault_analysis(hf, context) diff --git a/tools/preflash_check.py b/tools/preflash_check.py index 314125aa..6da7cc15 100644 --- a/tools/preflash_check.py +++ b/tools/preflash_check.py @@ -8,27 +8,32 @@ import sys from pathlib import Path + def get_script_dir() -> Path: return Path(__file__).parent.resolve() + def get_workspace_dir() -> Path: return get_script_dir().parent + def check_board_symbol(build_dir: Path) -> bool: """Check if the binary was compiled with BOARD symbol.""" marker_file = build_dir / "board_build_marker" return marker_file.exists() + def has_uncommitted_changes(workspace_dir: Path) -> bool: """Check if there are uncommitted changes in the repository.""" result = subprocess.run( ["git", "status", "--porcelain"], cwd=workspace_dir, capture_output=True, - text=True + text=True, ) return bool(result.stdout.strip()) + def main(): workspace_dir = get_workspace_dir() @@ -43,19 +48,22 @@ def main(): # Check if this is a BOARD build if not check_board_symbol(build_dir): - print("Binary was not compiled with BOARD symbol (or marker not found). Skipping pre-flash check.") + print( + "Binary was not compiled with BOARD symbol (or marker not found). Skipping pre-flash check." + ) sys.exit(0) print("Binary was compiled with BOARD symbol. Checking for uncommitted changes...") if has_uncommitted_changes(workspace_dir): print("Uncommitted changes detected. Aborting Flash...") - print("Please before flashing a board make sure all changes are commited"); + print("Please before flashing a board make sure all changes are commited") sys.exit(1) else: print("No uncommitted changes. Proceeding with flash.") sys.exit(0) + if __name__ == "__main__": main()