diff --git a/back/requirements.txt b/back/requirements.txt index 0c083992..312007a3 100644 --- a/back/requirements.txt +++ b/back/requirements.txt @@ -4,4 +4,5 @@ python-dotenv==1.1.1 marshmallow_dataclass==8.7.1 psutil==7.0.0 netaddr==1.3.0 -ipmininet @ git+https://github.com/mimi-net/ipmininet.git@1.2.4 \ No newline at end of file +ipmininet @ git+https://github.com/mimi-net/ipmininet.git@1.2.4 +requests==2.32.3 diff --git a/back/src/emulator.py b/back/src/emulator.py index d6d29969..1e7b0320 100644 --- a/back/src/emulator.py +++ b/back/src/emulator.py @@ -1,7 +1,10 @@ import os import os.path import subprocess - +import time +import datetime +import logging +import logging_config from ipmininet.ipnet import IPNet from jobs import Jobs from network_schema import Job, Network @@ -10,6 +13,7 @@ from network_topology import MiminetTopology from network import MiminetNetwork +logger = logging.getLogger(__name__) def emulate( network: Network, @@ -23,6 +27,17 @@ def emulate( tuple: animation list and pcap files. """ + start_ts = time.time() + + logger.info( + "emulation_started", + extra={ + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "level": "INFO", + "jobs_count": len(network.jobs) if getattr(network, "jobs", None) else 0, + } + ) + setLogLevel("info") # Validate job limit @@ -53,6 +68,16 @@ def emulate( net.stop() except Exception as e: + + logger.error( + "miminet_configuration_failed", + extra={ + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "level": "ERROR", + "error": str(e), + } + ) + error(f"An error occurred during mininet configuration: {str(e)}") subprocess.call("mn -c", shell=True) @@ -61,6 +86,17 @@ def emulate( animation, pcaps = create_animation(topo.interfaces) animation = group_packets_by_time(animation) + duration_ms = int((time.time() - start_ts) * 1000) + logger.info( + "emulation_finished", + extra={ + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "level": "INFO", + "pcaps_count": len(pcaps), + "duration_ms": duration_ms, + } + ) + return animation, pcaps diff --git a/back/src/jobs.py b/back/src/jobs.py index 8c1fccc0..854ac4e8 100755 --- a/back/src/jobs.py +++ b/back/src/jobs.py @@ -1,13 +1,21 @@ import re import shlex import ipaddress +import logging +import logging_config from netaddr import EUI, AddrFormatError from typing import Any, Callable, List, Dict from network_schema import Job from mininet.log import info from ipmininet.host.config.dnsmasq import Dnsmasq +logger = logging.getLogger(__name__) + + +def _log(level, event, extra): + logger.log(level, event, extra=extra) + def filter_arg_for_options( arg: str, flags_without_args: List[str], flags_with_args: Dict[str, str] @@ -76,6 +84,12 @@ def udp_tcp_args_checker(ip, size, port) -> bool: def net_dev_checker(dev) -> bool: """Checker for net interface""" if not re.match(r"^[a-z][a-z0-9:_\-\.]{,14}$", dev): + # Log invalid interface name + _log( + logging.WARNING, + "net_dev_invalid", + {"dev": dev, "reason": "device name not matched"}, + ) return False return True @@ -84,13 +98,31 @@ def ip_addr_add_checker(ip, mask, dev) -> bool: """Checker all args in ip addr add job""" if not valid_ip(ip): + # Log invalid/empty IP for ip addr add + _log( + logging.WARNING, + "ip_addr_add_invalid_ip", + {"ip": ip, "mask": mask, "dev": dev}, + ) return False try: _ = int(mask) except (ValueError, TypeError): + # Log invalid mask + _log( + logging.WARNING, + "ip_addr_add_invalid_mask", + {"ip": ip, "mask": mask, "dev": dev}, + ) return False if not net_dev_checker(dev): + # Log invalid device when adding IP + _log( + logging.WARNING, + "ip_addr_add_invalid_dev", + {"ip": ip, "mask": mask, "dev": dev}, + ) return False return True @@ -136,10 +168,40 @@ def subinterface_vlan_checker(intf, ip, mask, vlan, intf_name) -> bool: def ipip_interface_checker(ip_start, ip_end, ip_int, name_int) -> bool: """Checker args for ipip_interface""" - if not valid_ip(ip_start) or not valid_ip(ip_end) or not valid_ip(ip_int): + if not valid_ip(ip_start): + # Log missing/invalid start IP of IPIP tunnel + _log( + logging.WARNING, + "ipip_invalid_start_ip", + {"ip_start": ip_start, "ip_end": ip_end, "ip_int": ip_int, "name": name_int}, + ) + return False + + if not valid_ip(ip_end): + # Log missing/invalid end IP of IPIP tunnel + _log( + logging.WARNING, + "ipip_invalid_end_ip", + {"ip_start": ip_start, "ip_end": ip_end, "ip_int": ip_int, "name": name_int}, + ) + return False + + if not valid_ip(ip_int): + # Log missing/invalid IP of IPIP interface + _log( + logging.WARNING, + "ipip_invalid_iface_ip", + {"ip_start": ip_start, "ip_end": ip_end, "ip_int": ip_int, "name": name_int}, + ) return False if not valid_iface(name_int): + # Log invalid IPIP interface name + _log( + logging.WARNING, + "ipip_invalid_iface_name", + {"ip_start": ip_start, "ip_end": ip_end, "ip_int": ip_int, "name": name_int}, + ) return False return True @@ -204,16 +266,43 @@ def valid_iface(iface) -> bool: return False return True - +def run_command(job, host, cmd): + return_code, stdout, stderr = host.cmd_result(cmd) + if return_code != 0: + # Log failed command execution + _log( + logging.ERROR, + "job_execution_failed", + { + "job_id": getattr(job, "job_id", None), + "host_id": getattr(job, "host_id", None), + "command": cmd, + "return_code": return_code, + "stderr": stderr, + }, + ) + else: + # Log successful command execution + _log( + logging.DEBUG, + "job_command_executed", + { + "job_id": getattr(job, "job_id", None), + "host_id": getattr(job, "host_id", None), + "command": cmd, + "return_code": return_code, + "stdout": stdout, + }, + ) + def ping_handler(job: Job, job_host: Any) -> None: """Execute ping -c 1""" arg_ip = job.arg_1 if not valid_ip(arg_ip): return - - job_host.cmd(f"ping -c 1 {arg_ip}") - + cmd = f"ping -c 1 {arg_ip}" + run_command(job, job_host, cmd) def ping_with_options_handler(job: Job, job_host: Any) -> None: """Execute ping with options""" @@ -226,8 +315,8 @@ def ping_with_options_handler(job: Job, job_host: Any) -> None: if len(arg_opt) > 0: arg_opt = ping_options_filter(arg_opt) - - job_host.cmd(f"ping -c 1 {arg_opt} {arg_ip}") + cmd = f"ping -c 1 {arg_opt} {arg_ip}" + run_command(job, job_host, cmd) def get_sending_data_argument(job: Job) -> tuple[str | int, str | int, str | int]: @@ -248,10 +337,8 @@ def sending_udp_data_handler(job: Job, job_host: Any) -> None: if not udp_tcp_args_checker(arg_ip, arg_size, arg_port): return - job_host.cmd( - f"dd if=/dev/urandom bs={arg_size} count=1 | nc -uq1 {arg_ip} {arg_port}" - ) - + cmd = f"dd if=/dev/urandom bs={arg_size} count=1 | nc -uq1 {arg_ip} {arg_port}" + run_command(job, job_host, cmd) def sending_tcp_data_handler(job: Job, job_host: Any) -> None: """Method for sending TCP data sending""" @@ -261,10 +348,9 @@ def sending_tcp_data_handler(job: Job, job_host: Any) -> None: if not udp_tcp_args_checker(arg_ip, arg_size, arg_port): return - job_host.cmd( - f"dd if=/dev/urandom bs={arg_size} count=1 | nc -w 30 -q1 {arg_ip} {arg_port}" - ) - + cmd = f"dd if=/dev/urandom bs={arg_size} count=1 | nc -w 30 -q1 {arg_ip} {arg_port}" + run_command(job, job_host, cmd) + def traceroute_handler(job: Job, job_host: Any) -> None: """Method for executing traceroute""" @@ -278,7 +364,8 @@ def traceroute_handler(job: Job, job_host: Any) -> None: if len(arg_opt) > 0: arg_opt = traceroute_options_filter(arg_opt) - job_host.cmd(f"traceroute -n {arg_opt} {arg_ip}") + cmd = f"traceroute -n {arg_opt} {arg_ip}" + run_command(job, job_host, cmd) def ip_addr_add_handler(job: Job, job_host: Any) -> None: @@ -289,10 +376,16 @@ def ip_addr_add_handler(job: Job, job_host: Any) -> None: arg_dev = job.arg_1 if not ip_addr_add_checker(arg_ip, arg_mask, arg_dev): + # Log skipping ip addr add due to invalid args + _log( + logging.ERROR, + "ip_addr_add_skipped", + {"dev": arg_dev, "ip": arg_ip, "mask": arg_mask}, + ) return - job_host.cmd(f"ip addr add {arg_ip}/{arg_mask} dev {arg_dev}") - + cmd = f"ip addr add {arg_ip}/{arg_mask} dev {arg_dev}" + run_command(job, job_host, cmd) def iptables_handler(job: Job, job_host: Any) -> None: """Method for adding forwarding rule""" @@ -302,7 +395,8 @@ def iptables_handler(job: Job, job_host: Any) -> None: if not net_dev_checker(arg_dev): return - job_host.cmd(f"iptables -t nat -A POSTROUTING -o {arg_dev} -j MASQUERADE") + cmd = f"iptables -t nat -A POSTROUTING -o {arg_dev} -j MASQUERADE" + run_command(job, job_host, cmd) def port_forwarding_tcp_handler(job: Job, job_host: Any) -> None: @@ -346,7 +440,8 @@ def ip_route_add_handler(job: Job, job_host: Any) -> None: if not ip_route_add_checker(arg_ip, arg_mask, arg_router): return - job_host.cmd(f"ip route add {arg_ip}/{arg_mask} via {arg_router}") + cmd = f"ip route add {arg_ip}/{arg_mask} via {arg_router}" + run_command(job, job_host, cmd) def block_tcp_udp_port(job: Job, job_host: Any) -> None: @@ -355,10 +450,10 @@ def block_tcp_udp_port(job: Job, job_host: Any) -> None: if not valid_port(arg_port): return - - job_host.cmd(f"iptables -A INPUT -p tcp --dport {arg_port} -j DROP") - job_host.cmd(f"iptables -A INPUT -p udp --dport {arg_port} -j DROP") - + cmd = f"iptables -A INPUT -p tcp --dport {arg_port} -j DROP" + run_command(job, job_host, cmd) + cmd = f"iptables -A INPUT -p udp --dport {arg_port} -j DROP" + run_command(job, job_host, cmd) def open_tcp_server_handler(job: Job, job_host: Any) -> None: """ "Method for open tcp server""" @@ -368,10 +463,10 @@ def open_tcp_server_handler(job: Job, job_host: Any) -> None: if not valid_port(arg_port) or not valid_ip(arg_ip): return - job_host.cmd( - f"nohup nc -k -d {arg_ip} -l {arg_port} > /tmp/tcpserver 2>&1 < /dev/null &" - ) - + + cmd = f"nohup nc -k -d {arg_ip} -l {arg_port} > /tmp/tcpserver 2>&1 < /dev/null &" + run_command(job, job_host, cmd) + def open_udp_server_handler(job: Job, job_host: Any) -> None: """ "Method for open udp server""" @@ -381,9 +476,8 @@ def open_udp_server_handler(job: Job, job_host: Any) -> None: if not valid_ip(arg_ip) or not valid_port(arg_port): return - job_host.cmd( - f"nohup nc -d -u {arg_ip} -l {arg_port} > /tmp/udpserver 2>&1 < /dev/null &" - ) + cmd = f"nohup nc -d -u {arg_ip} -l {arg_port} > /tmp/udpserver 2>&1 < /dev/null &" + run_command(job, job_host, cmd) def arp_handler(job: Job, job_host: Any) -> None: @@ -394,7 +488,8 @@ def arp_handler(job: Job, job_host: Any) -> None: if not valid_ip(arg_ip) or not valid_mac(arg_mac): return - job_host.cmd(f"arp -s {arg_ip} {arg_mac}") + cmd = f"arp -s {arg_ip} {arg_mac}" + run_command(job, job_host, cmd) def subinterface_with_vlan(job: Job, job_host: Any) -> None: @@ -410,12 +505,13 @@ def subinterface_with_vlan(job: Job, job_host: Any) -> None: ): return - job_host.cmd( - f"ip link add link {arg_intf} name {arg_intf_name}.{arg_vlan} type vlan id {arg_vlan}" - ) + cmd = f"ip link add link {arg_intf} name {arg_intf_name}.{arg_vlan} type vlan id {arg_vlan}" + run_command(job, job_host, cmd) job_host.cmd(f"ip addr add {arg_ip}/{arg_mask} dev {arg_intf_name}.{arg_vlan}") - job_host.cmd(f"ip link set dev {arg_intf_name}.{arg_vlan} up") + run_command(job, job_host, cmd) + cmd = f"ip link set dev {arg_intf_name}.{arg_vlan} up" + run_command(job, job_host, cmd) def add_ipip_interface(job: Job, job_host: Any) -> None: """Method for adding ipip-interface""" @@ -425,13 +521,23 @@ def add_ipip_interface(job: Job, job_host: Any) -> None: arg_name_int = job.arg_4 if not ipip_interface_checker(arg_ip_start, arg_ip_end, arg_ip_int, arg_name_int): + # Log skipping IPIP creation due to invalid args + _log( + logging.ERROR, + "ipip_interface_invalid_params", + { + "ip_start": arg_ip_start, + "ip_end": arg_ip_end, + "ip_int": arg_ip_int, + "name": arg_name_int, + }, + ) return - job_host.cmd( - f"ip tunnel add {arg_name_int} mode ipip remote {arg_ip_end} local {arg_ip_start}" - ) - job_host.cmd(f"ifconfig {arg_name_int} {arg_ip_int}") - + cmd = f"ip tunnel add {arg_name_int} mode ipip remote {arg_ip_end} local {arg_ip_start}" + run_command(job, job_host, cmd) + cmd = f"ifconfig {arg_name_int} {arg_ip_int}" + run_command(job, job_host, cmd) def add_gre(job: Job, job_host: Any) -> None: arg_ip_start = job.arg_1 @@ -442,12 +548,12 @@ def add_gre(job: Job, job_host: Any) -> None: if not add_gre_checker(arg_ip_start, arg_ip_end, arg_ip_iface, arg_name_iface): return - job_host.cmd( - f"ip tunnel add {arg_name_iface} mode gre remote {arg_ip_end} local {arg_ip_start} ttl 255" - ) - job_host.cmd(f"ip addr add {arg_ip_iface}/24 dev {arg_name_iface}") - job_host.cmd(f"ip link set {arg_name_iface} up") - + cmd = f"ip tunnel add {arg_name_iface} mode gre remote {arg_ip_end} local {arg_ip_start} ttl 255" + run_command(job, job_host, cmd) + cmd = f"ip addr add {arg_ip_iface}/24 dev {arg_name_iface}" + run_command(job, job_host, cmd) + cmd = f"ip link set {arg_name_iface} up" + run_command(job, job_host, cmd) def arp_proxy_enable(job: Job, job_host: Any) -> None: """Enable ARP proxying on the interface""" @@ -456,7 +562,8 @@ def arp_proxy_enable(job: Job, job_host: Any) -> None: if not valid_iface(arg_iface): return - job_host.cmd(f"sysctl -w net.ipv4.conf.{arg_iface}.proxy_arp=1") + cmd = f"sysctl -w net.ipv4.conf.{arg_iface}.proxy_arp=1" + run_command(job, job_host, cmd) def dhcp_client(job: Job, job_host): @@ -545,4 +652,27 @@ def strategy(self, job_id: int) -> None: self._strategy = self._dct[job_id] def handler(self) -> None: - self._strategy(self._job, self._job_host) + job_id = getattr(self._job, "job_id", None) + host_id = getattr(self._job, "host_id", None) + # Log job start + _log( + logging.DEBUG, + "job_start", + {"job_id": job_id, "host_id": host_id}, + ) + try: + self._strategy(self._job, self._job_host) + # Log job success + _log( + logging.DEBUG, + "job_done", + {"job_id": job_id, "host_id": host_id}, + ) + except Exception as e: + # Log job failure + _log( + logging.ERROR, + "job_failed", + {"job_id": job_id, "host_id": host_id, "error": str(e)}, + ) + raise diff --git a/back/src/logging_config.py b/back/src/logging_config.py new file mode 100644 index 00000000..4d350e9f --- /dev/null +++ b/back/src/logging_config.py @@ -0,0 +1,99 @@ +import json +import logging +import os +from datetime import datetime, timezone + +import requests + + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +UNIFIED_AGENT_URL = os.getenv("UNIFIED_AGENT_URL", "http://158.160.179.91:22132/write") +HTTP_TIMEOUT = float(os.getenv("LOG_HTTP_TIMEOUT", "1.0")) + +_RESERVED = { + "name", + "msg", + "args", + "levelname", + "levelno", + "pathname", + "filename", + "module", + "exc_info", + "exc_text", + "stack_info", + "lineno", + "funcName", + "created", + "msecs", + "relativeCreated", + "thread", + "threadName", + "processName", + "process", + "message", +} + + +class JsonFormatter(logging.Formatter): + """Render log records as JSON with extras preserved.""" + + def format(self, record: logging.LogRecord) -> str: + extras = { + k: v + for k, v in record.__dict__.items() + if k not in _RESERVED and not k.startswith("_") + } + payload = { + "message": record.getMessage(), + "level": record.levelname, + "logger": record.name, + "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), + } + if extras: + payload["extra"] = extras + if record.exc_info: + payload["exc_info"] = self.formatException(record.exc_info) + return json.dumps(payload, ensure_ascii=False) + + +class HttpPostHandler(logging.Handler): + """Send each log record to an HTTP endpoint.""" + + def __init__(self, url: str, timeout: float = 1.0): + super().__init__() + self.url = url + self.timeout = timeout + + def emit(self, record: logging.LogRecord) -> None: + try: + payload = self.format(record) + headers = {"Content-Type": "application/json"} + requests.post(self.url, data=payload, headers=headers, timeout=self.timeout) + print("log requested") + except Exception: + self.handleError(record) + + +def _configure_logging() -> None: + root = logging.getLogger() + root.setLevel(LOG_LEVEL) + + # Prevent duplicate handlers on reload. + if any(isinstance(h, HttpPostHandler) for h in root.handlers): + return + + formatter = JsonFormatter() + + console_handler = logging.StreamHandler() + console_handler.setLevel(LOG_LEVEL) + console_handler.setFormatter(formatter) + root.addHandler(console_handler) + + http_handler = HttpPostHandler(UNIFIED_AGENT_URL, timeout=HTTP_TIMEOUT) + http_handler.setLevel(LOG_LEVEL) + http_handler.setFormatter(formatter) + root.addHandler(http_handler) + + +_configure_logging() diff --git a/back/src/network.py b/back/src/network.py index e018aa01..b9a3a877 100644 --- a/back/src/network.py +++ b/back/src/network.py @@ -1,17 +1,19 @@ +import logging import os import time from psutil import Process from ipmininet.ipnet import IPNet from mininet.log import info import psutil - +import logging_config from network_topology import MiminetTopology from network_schema import Network - from net_utils.vlan import setup_vlans, clean_bridges from net_utils.vxlan import setup_vtep_interfaces, teardown_vtep_bridges +from datetime import date, datetime +logger = logging.getLogger(__name__) class MiminetNetwork(IPNet): def __init__(self, topo: MiminetTopology, network: Network): super().__init__(topo=topo, use_v6=False, autoSetMacs=True, allocate_IPs=False) @@ -49,10 +51,30 @@ def __check_files(self): if not os.path.exists(pcap_out_file1): self.__clear_files() + logger.error( + "pcap_out_file_not_found", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "ERROR", + "task_id": getattr(self, "task_id", None), + "interface": link1, + "expected_file": pcap_out_file1, + } + ) raise ValueError(f"No capture for interface '{link1}'.") if not os.path.exists(pcap_out_file2): self.__clear_files() + logger.error( + "pcap_file_not_found", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "ERROR", + "task_id": getattr(self, "task_id", None), + "interface": link2, + "expected_file": pcap_out_file2, + } + ) raise ValueError(f"No capture for interface '{link2}'.") def __clear_files(self): @@ -78,13 +100,28 @@ def __clean_services(self): current_process = Process() children = current_process.children(recursive=True) allowed = ("mimidump", "bash") + killed_count = 0 + zombie_count = 0 for child in children: if child.status() == psutil.STATUS_ZOMBIE: # in case we already have zombies child.wait() + zombie_count += 1 elif child.name() not in allowed: # finish other processes info(f"Killed: {child.name()} {child.pid}") child.kill() child.wait() + killed_count += 1 + if zombie_count > 0 or killed_count > 0: + logger.warning( + "cleanup_incomplete", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "ERROR", + "killed_processes": killed_count, + "zombies_left": zombie_count + } + ) + diff --git a/back/src/network_topology.py b/back/src/network_topology.py index b6d62a02..6b261149 100644 --- a/back/src/network_topology.py +++ b/back/src/network_topology.py @@ -6,6 +6,11 @@ from ipmininet.router.config import RouterConfig from network_schema import Network, Node, NodeConfig, NodeInterface from pkt_parser import is_ipv4_address +import logging +import datetime +import logging_config + +logger = logging.getLogger(__name__) class MiminetTopology(IPTopo): @@ -124,7 +129,18 @@ def __configure_link(self, link, iface: NodeInterface): def build(self, *args, **kwargs): links = [] interfaces = [] - + + logger.info( + "Topology build started", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "INFO", + "event": "topology_build_start", + "nodes_count": len(self.__network.nodes), + "edges_count": len(self.__network.edges), + } + ) + for node in self.__network.nodes: # Caches node by ID for quick lookup later self.__id_to_node[node.data.id] = node @@ -157,7 +173,6 @@ def _to_percent(val): raise ValueError( f"Edge '{edge_id}' references unknown target node '{target_id}'." ) - # Mininet host objects (https://mininet.org/api/classmininet_1_1node_1_1Host.html) src_host = self.__nodes[source_id] trg_host = self.__nodes[target_id] @@ -180,7 +195,7 @@ def _to_percent(val): duplicate_percentage, ) ) - + # Put virtual switch between nodes and return link between them link1, link2 = self.addLink( src_host, @@ -191,17 +206,45 @@ def _to_percent(val): loss_percentage=loss_percentage, duplicate_percentage=duplicate_percentage, ) - + + logger.debug( + "Link created", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "DEBUG", + "event": "link_created", + "edge_id": edge_id, + "link1": str(link1), + "link2": str(link2), + "src_iface": src_iface.name, + "trg_iface": trg_iface.name, + } + ) + self.__configure_link(link1[src_host], src_iface) self.__configure_link(link2[trg_host], trg_iface) - + links.append(link1[src_host]) links.append(link2[trg_host]) interfaces.append(src_iface.name) interfaces.append(trg_iface.name) - + if links: + logger.debug( + "Network capture setup", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "DEBUG", + "event": "capture_setup", + "interfaces": interfaces, + "links": [str(l) for l in links], + "options": { + "base_filename": "capture", + "extra_arguments": "not igmp", + } + } + ) # Set up packet capturing self.addNetworkCapture( nodes=[], @@ -209,6 +252,15 @@ def _to_percent(val): base_filename="capture", extra_arguments="not igmp", ) + logger.info( + "Topology build complete", + extra={ + "timestamp": datetime().utcnow().isoformat() + "Z", + "level": "INFO", + "event": "topology_build_end", + "interfaces": self.__iface_pairs.copy(), + } + ) super().build(*args, **kwargs) def addLink( diff --git a/back/src/tasks.py b/back/src/tasks.py index 76a057f8..fac859b8 100644 --- a/back/src/tasks.py +++ b/back/src/tasks.py @@ -1,7 +1,10 @@ import json import os import signal +import datetime +import logging +import logging_config import marshmallow_dataclass from celery_app import ( app, @@ -9,10 +12,11 @@ SEND_NETWORK_RESPONSE_ROUTING_KEY, ) from mininet.log import setLogLevel, error - from network_schema import Network from emulator import emulate +logger = logging.getLogger(__name__) + def run_miminet(network_json: str): """Load network from JSON and start emulation safely. @@ -35,7 +39,7 @@ def run_miminet(network_json: str): network_schema = marshmallow_dataclass.class_schema(Network)() network_json = network_schema.load(jnet, unknown="include") - for _ in range(4): + for attempt in range(4): try: animation, pcaps = emulate(network_json) @@ -43,7 +47,16 @@ def run_miminet(network_json: str): except Exception as e: # Sometimes mininet doesn't work correctly and simulation needs to be redone, # Example of mininet error: https://github.com/mininet/mininet/issues/737. - error(e) + logger.warning( + "emulating_retry", + extra={ + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "level": "WARNING", + "task_id": None, + "attempt": attempt + 1, + "error": str(e), + }, + ) continue return "[]", [] diff --git a/front/src/celery_app.py b/front/src/celery_app.py index 7c09300f..05f9b27e 100644 --- a/front/src/celery_app.py +++ b/front/src/celery_app.py @@ -11,6 +11,7 @@ from celery import Celery from dotenv import load_dotenv from kombu import Exchange, Queue +import logging_config load_dotenv() diff --git a/front/src/logging_config.py b/front/src/logging_config.py new file mode 100644 index 00000000..649470a0 --- /dev/null +++ b/front/src/logging_config.py @@ -0,0 +1,100 @@ +import json +import logging +import os +from datetime import datetime, timezone + +import requests + + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +UNIFIED_AGENT_URL = os.getenv("UNIFIED_AGENT_URL", "http://158.160.179.91:22132/write") +HTTP_TIMEOUT = float(os.getenv("LOG_HTTP_TIMEOUT", "1.0")) + +# Keys present on every LogRecord; used to filter extras for JSON payload. +_RESERVED = { + "name", + "msg", + "args", + "levelname", + "levelno", + "pathname", + "filename", + "module", + "exc_info", + "exc_text", + "stack_info", + "lineno", + "funcName", + "created", + "msecs", + "relativeCreated", + "thread", + "threadName", + "processName", + "process", + "message", +} + + +class JsonFormatter(logging.Formatter): + """Render log records as JSON with extras preserved.""" + + def format(self, record: logging.LogRecord) -> str: + extras = { + k: v + for k, v in record.__dict__.items() + if k not in _RESERVED and not k.startswith("_") + } + payload = { + "message": record.getMessage(), + "level": record.levelname, + "logger": record.name, + "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), + } + if extras: + payload["extra"] = extras + if record.exc_info: + payload["exc_info"] = self.formatException(record.exc_info) + return json.dumps(payload, ensure_ascii=False) + + +class HttpPostHandler(logging.Handler): + """Send each log record to an HTTP endpoint.""" + + def __init__(self, url: str, timeout: float = 1.0): + super().__init__() + self.url = url + self.timeout = timeout + + def emit(self, record: logging.LogRecord) -> None: + try: + payload = self.format(record) + headers = {"Content-Type": "application/json"} + requests.post(self.url, data=payload, headers=headers, timeout=self.timeout) + print("log requested") + except Exception: + self.handleError(record) + + +def _configure_logging() -> None: + root = logging.getLogger() + root.setLevel(LOG_LEVEL) + + # Prevent duplicate handlers on reload. + if any(isinstance(h, HttpPostHandler) for h in root.handlers): + return + + formatter = JsonFormatter() + + console_handler = logging.StreamHandler() + console_handler.setLevel(LOG_LEVEL) + console_handler.setFormatter(formatter) + root.addHandler(console_handler) + + http_handler = HttpPostHandler(UNIFIED_AGENT_URL, timeout=HTTP_TIMEOUT) + http_handler.setLevel(LOG_LEVEL) + http_handler.setFormatter(formatter) + root.addHandler(http_handler) + + +_configure_logging() diff --git a/front/src/miminet_network.py b/front/src/miminet_network.py index c2d7f634..1abc9371 100644 --- a/front/src/miminet_network.py +++ b/front/src/miminet_network.py @@ -2,7 +2,9 @@ import os import uuid import shutil +import logging +import logging_config from flask import ( flash, jsonify, @@ -18,6 +20,8 @@ import datetime from sqlalchemy import not_ +logger = logging.getLogger(__name__) + @login_required def create_network(): @@ -557,6 +561,16 @@ def get_emulation_queue_size(): .count() ) + logger.info( + "emulation_queue_size", + extra={ + "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "level": "INFO", + "time_filter": time_filter_req, + "count": emulated_networks_count, + }, + ) + return make_response( jsonify({"size": emulated_networks_count}), 200, diff --git a/front/src/miminet_simulation.py b/front/src/miminet_simulation.py index eed4aa08..9b679a71 100644 --- a/front/src/miminet_simulation.py +++ b/front/src/miminet_simulation.py @@ -1,5 +1,8 @@ import os import uuid +import logging + +import logging_config from celery_app import ( SEND_NETWORK_EXCHANGE, @@ -11,6 +14,12 @@ from miminet_model import Network, Simulate, SimulateLog, db from werkzeug.wrappers import Response +logger = logging.getLogger(__name__) + + +def _log(level, event, extra): + logger.log(level, event, extra=extra) + @login_required def run_simulation() -> Response: @@ -18,6 +27,12 @@ def run_simulation() -> Response: user = current_user network_guid = request.args.get("guid", type=str) + _log( + logging.INFO, + "run_simulation_start", + {"user_id": getattr(user, "id", None), "guid": network_guid, "method": request.method}, + ) + if not network_guid: ret = { "simulation_id": -1, @@ -40,12 +55,19 @@ def run_simulation() -> Response: # Get saved emulations sims = Simulate.query.filter(Simulate.network_id == net.id).all() + removed = len(sims) # Remove all previous emulations for s in sims: db.session.delete(s) db.session.commit() + _log( + logging.INFO, + "run_simulation_cleanup", + {"network_id": net.id, "removed_sims": removed}, + ) + # Write log simlog = SimulateLog( author_id=net.author_id, network=net.network, network_guid=net.guid @@ -58,17 +80,48 @@ def run_simulation() -> Response: db.session.add(simlog) db.session.commit() - # Send emulation task to celery - app.send_task( - "tasks.mininet_worker", - (net.network,), - routing_key=str(uuid.uuid4()), - exchange=SEND_NETWORK_EXCHANGE, - exchange_type=EXCHANGE_TYPE, - task_id=str(task_guid), - headers={"network_task_name": "tasks.save_simulate_result"}, + _log( + logging.INFO, + "run_simulation_created", + {"network_id": net.id, "simulation_id": sim.id, "task_guid": str(task_guid)}, ) + # Send emulation task to celery + routing_key = str(uuid.uuid4()) + try: + app.send_task( + "tasks.mininet_worker", + (net.network,), + routing_key=routing_key, + exchange=SEND_NETWORK_EXCHANGE, + exchange_type=EXCHANGE_TYPE, + task_id=str(task_guid), + headers={"network_task_name": "tasks.save_simulate_result"}, + ) + _log( + logging.INFO, + "run_simulation_enqueued", + { + "simulation_id": sim.id, + "task_guid": str(task_guid), + "routing_key": routing_key, + "exchange": SEND_NETWORK_EXCHANGE.name, + }, + ) + except Exception as e: + _log( + logging.ERROR, + "run_simulation_enqueue_failed", + { + "simulation_id": sim.id, + "task_guid": str(task_guid), + "routing_key": routing_key, + "exchange": SEND_NETWORK_EXCHANGE.name, + "error": str(e), + }, + ) + raise + # Return network id to check emulation result ret = {"simulation_id": sim.id} return make_response(jsonify(ret), 201) diff --git a/front/src/quiz/service/check_practice_service.py b/front/src/quiz/service/check_practice_service.py index a7b910a6..06140500 100644 --- a/front/src/quiz/service/check_practice_service.py +++ b/front/src/quiz/service/check_practice_service.py @@ -1,10 +1,15 @@ import ipaddress import logging +import logging_config import quiz.service.check_host_service as chs - from quiz.service.check_network_service import check_network_configuration +logger = logging.getLogger(__name__) + + +def _log(level, event, extra): + logger.log(level, event, extra=extra) def check_in_one_network_with(requirement, answer, device): points = 0 @@ -12,6 +17,16 @@ def check_in_one_network_with(requirement, answer, device): target_id = requirement.get("target") points_awarded = requirement.get("points", 1) + # Log start of in_one_network_with check + _log( + logging.DEBUG, + "in_one_network_with_start", + { + "device": device, + "target": target_id, + "points_awarded": points_awarded, + }, + ) if not target_id: hints.append("Целевое устройство не указано.") @@ -50,9 +65,30 @@ def get_networks(node): for net2 in target_networks: if net1.overlaps(net2): points = points_awarded + _log( + logging.INFO, + "in_one_network_with_success", + { + "device": device, + "target": target_id, + "network": str(net1), + "awarded": points, + }, + ) return points, hints hints.append(f"{device} и {target_id} не находятся в одной сети.") + # Log failure to be in same network and awarded points (usually 0) + _log( + logging.INFO, + "in_one_network_with_fail", + { + "device": device, + "target": target_id, + "awarded": points, + "hints": hints, + }, + ) return points, hints @@ -63,6 +99,18 @@ def check_abstract_ip_equal(abstract_equal, answer, device): if not abstract_equal: return points, hints + # Log parameters before abstract IP equality check + _log( + logging.DEBUG, + "abstract_ip_equal_start", + { + "device": device, + "to": abstract_equal.get("to"), + "expected_equal_with": abstract_equal.get("expected_equal_with"), + "points_awarded": abstract_equal.get("points", 1), + }, + ) + nodes = answer["nodes"] edges = answer["edges"] @@ -112,6 +160,18 @@ def check_abstract_ip_equal(abstract_equal, answer, device): common_ips = device_ips_to_to_node.intersection(compare_ips) if common_ips: points = max(points_awarded, 0) + # Log successful abstract IP equality and awarded points + _log( + logging.INFO, + "abstract_ip_equal_success", + { + "device": device, + "to": to_node_id, + "compare": expected_equal_with, + "common_ips": list(common_ips), + "awarded": points, + }, + ) else: hints.append( f"IP-адреса интерфейсов {device}, направленных к {to_node_id}, не совпадают ни с одним IP-адресом интерфейсов {expected_equal_with}." @@ -119,6 +179,18 @@ def check_abstract_ip_equal(abstract_equal, answer, device): if points_awarded < 0: points = points_awarded + # Log final abstract IP equality result and hints + _log( + logging.INFO, + "abstract_ip_equal_result", + { + "device": device, + "to": to_node_id, + "compare": expected_equal_with, + "awarded": points, + "hints": hints, + }, + ) return points, hints @@ -135,20 +207,55 @@ def check_host(requirement, answer, device): hints.append(f"Устройство {device} не найдено в сети.") return points_for_host, hints + # Log start of host check and requirement keys + _log( + logging.DEBUG, + "practice_check_host_start", + {"device": device, "requirement_keys": list(requirement.keys())}, + ) + # Checking commands (ping in particular) cmd = requirement.get("cmd") if cmd: + # Log command check input + _log( + logging.DEBUG, + "practice_cmd_check", + {"device": device, "cmd": cmd}, + ) points_for_cmd, cmd_hints = chs.process_host_command(cmd, answer, device) points_for_host += points_for_cmd hints.extend(cmd_hints) + # Log command check result and awarded points + _log( + logging.INFO, + "practice_cmd_result", + { + "device": device, + "awarded": points_for_cmd, + "host_total": points_for_host, + "hints": cmd_hints, + }, + ) # Checking for identical VLANs all_vlan_conditions_passed = True + equal_vlan_hints_all = [] + equal_vlan_targets = [] equal_vlan_id = requirement.get("equal_vlan_id") if equal_vlan_id: targets = equal_vlan_id.get("targets", []) - points = equal_vlan_id.get("points", 1) + equal_vlan_targets = targets + equal_vlan_points = equal_vlan_id.get("points", 1) + # Log VLAN equality check input + _log( + logging.DEBUG, + "practice_equal_vlan_check", + {"device": device, "targets": targets, "points": equal_vlan_points}, + ) + + equal_vlan_hints_all = [] for target in targets: result, equal_vlan_hints = chs.check_vlan_id( @@ -157,18 +264,44 @@ def check_host(requirement, answer, device): if not result: all_vlan_conditions_passed = False hints.extend(equal_vlan_hints) + equal_vlan_hints_all.extend(equal_vlan_hints) + equal_vlan_awarded = ( + equal_vlan_points if all_vlan_conditions_passed and equal_vlan_id else 0 + ) if all_vlan_conditions_passed: - points_for_host += points - points = 0 + points_for_host += equal_vlan_points + # Log VLAN equality result and points + _log( + logging.INFO, + "practice_equal_vlan_result", + { + "device": device, + "targets": equal_vlan_targets, + "awarded": equal_vlan_awarded, + "host_total": points_for_host, + "hints": equal_vlan_hints_all if not all_vlan_conditions_passed else [], + }, + ) # Checking for different VLANs all_vlan_conditions_passed = True + no_equal_vlan_hints_all = [] + no_equal_vlan_targets = [] no_equal_vlan_id = requirement.get("no_equal_vlan_id") if no_equal_vlan_id: targets = no_equal_vlan_id.get("targets", []) - points = no_equal_vlan_id.get("points", 1) + no_equal_vlan_targets = targets + no_equal_vlan_points = no_equal_vlan_id.get("points", 1) + # Log VLAN inequality check input + _log( + logging.DEBUG, + "practice_no_equal_vlan_check", + {"device": device, "targets": targets, "points": no_equal_vlan_points}, + ) + + no_equal_vlan_hints_all = [] for target in targets: result, no_equal_vlan_hints = chs.check_vlan_id( @@ -177,15 +310,40 @@ def check_host(requirement, answer, device): if not result: all_vlan_conditions_passed = False hints.extend(no_equal_vlan_hints) + no_equal_vlan_hints_all.extend(no_equal_vlan_hints) + no_equal_vlan_awarded = ( + no_equal_vlan_points if all_vlan_conditions_passed and no_equal_vlan_id else 0 + ) if all_vlan_conditions_passed: - points_for_host += points + points_for_host += no_equal_vlan_points + # Log VLAN inequality result and points + _log( + logging.INFO, + "practice_no_equal_vlan_result", + { + "device": device, + "targets": no_equal_vlan_targets, + "awarded": no_equal_vlan_awarded, + "host_total": points_for_host, + "hints": no_equal_vlan_hints_all if not all_vlan_conditions_passed else [], + }, + ) # Checking for the privacy of an IP address ip_check = requirement.get("ip_check") if ip_check: points = ip_check.get("points", 1) target_node_id = ip_check.get("to") + # Log IP privacy check input + _log( + logging.DEBUG, + "practice_ip_check", + {"device": device, "target": target_node_id, "points": points}, + ) + + ip_check_hints = [] + before = points_for_host target_node = next( (node for node in nodes if node["data"]["id"] == target_node_id), None @@ -195,6 +353,9 @@ def check_host(requirement, answer, device): hints.append( f"Целевое устройство {target_node_id} для проверки приватности ip-адреса не найдено в сети." ) + ip_check_hints.append( + f"Целевое устройство {target_node_id} для проверки приватности ip-адреса не найдено в сети." + ) for interface in host_node.get("interface", []): edge_id = interface.get("connect") @@ -203,6 +364,9 @@ def check_host(requirement, answer, device): hints.append( f"Интерфейс на устройстве {device} не подключен к {target_node_id}." ) + ip_check_hints.append( + f"Интерфейс на устройстве {device} не подключен к {target_node_id}." + ) continue connected_edge = next( @@ -214,6 +378,9 @@ def check_host(requirement, answer, device): hints.append( f"Соединение для интерфейса на устройстве {device} не найдено." ) + ip_check_hints.append( + f"Соединение для интерфейса на устройстве {device} не найдено." + ) continue connected_node_id = ( @@ -231,16 +398,49 @@ def check_host(requirement, answer, device): hints.append( f"IP-адрес {source_ip} на устройстве {device} не является приватным." ) + ip_check_hints.append( + f"IP-адрес {source_ip} на устройстве {device} не является приватным." + ) else: hints.append( f"Устройство {device} не подключено к {target_node_id}. Проверка приватности IP-адреса невозможна." ) + ip_check_hints.append( + f"Устройство {device} не подключено к {target_node_id}. Проверка приватности IP-адреса невозможна." + ) + awarded_ip_check = points_for_host - before + # Log IP privacy check result and points + _log( + logging.INFO, + "practice_ip_check_result", + { + "device": device, + "target": target_node_id, + "awarded": awarded_ip_check, + "host_total": points_for_host, + "hints": ip_check_hints, + }, + ) # Checking whether the default gateway is configured required_default_gw = requirement.get("default_gw") if required_default_gw: points = required_default_gw.get("points", 1) actual_default_gw = host_node["config"].get("default_gw") + # Log default gateway absence check input + _log( + logging.DEBUG, + "practice_default_gw_check", + { + "device": device, + "expected_absent": True, + "actual": actual_default_gw, + "points": points, + }, + ) + + default_gw_hints = [] + before = points_for_host if not actual_default_gw: points_for_host += points @@ -248,6 +448,21 @@ def check_host(requirement, answer, device): hints.append( f"Вы настроили маршрут по умолчанию ({actual_default_gw}) у {device}, но по условию задания это не требовалось." ) + default_gw_hints.append( + f"Вы настроили маршрут по умолчанию ({actual_default_gw}) у {device}, но по условию задания это не требовалось." + ) + awarded_default_gw = points_for_host - before + # Log default gateway check result and points + _log( + logging.INFO, + "practice_default_gw_result", + { + "device": device, + "awarded": awarded_default_gw, + "host_total": points_for_host, + "hints": default_gw_hints, + }, + ) # Mask check mask_check = requirement.get("mask_check") @@ -255,17 +470,40 @@ def check_host(requirement, answer, device): points = mask_check.get("points", 1) target = mask_check.get("to") expected_mask = mask_check.get("subnet_mask") + # Log subnet mask check input + _log( + logging.DEBUG, + "practice_mask_check", + { + "device": device, + "target": target, + "expected_mask": expected_mask, + "points": points, + }, + ) result, mask_hints = chs.check_subnet_mask( answer, device, target, expected_mask ) + awarded_mask = 0 if result: points_for_host += points + awarded_mask = points else: hints.extend(mask_hints) - - ip_equal = requirement.get("ip_equal") + # Log subnet mask check result and points + _log( + logging.INFO, + "practice_mask_check_result", + { + "device": device, + "target": target, + "awarded": awarded_mask, + "host_total": points_for_host, + "hints": mask_hints if not result else [], + }, + ) # Check for a specific IP address ip_equal = requirement.get("ip_equal") @@ -273,8 +511,20 @@ def check_host(requirement, answer, device): points = ip_equal.get("points", 1) expected_ip = ip_equal.get("expected_ip") target_node_id = ip_equal.get("to") + # Log specific IP check input + _log( + logging.DEBUG, + "practice_ip_equal_check", + { + "device": device, + "target": target_node_id, + "expected_ip": expected_ip, + "points": points, + }, + ) found = False + ip_equal_hints = [] for interface in host_node.get("interface", []): edge_id = interface.get("connect") @@ -305,11 +555,31 @@ def check_host(requirement, answer, device): hints.append( f"IP-адрес интерфейса на устройстве {device}, подключенного к {target_node_id}, должен быть {expected_ip}, но найден {actual_ip}." ) + ip_equal_hints.append( + f"IP-адрес интерфейса на устройстве {device}, подключенного к {target_node_id}, должен быть {expected_ip}, но найден {actual_ip}." + ) if not found: hints.append( f"Не найден интерфейс на устройстве {device}, подключённый к {target_node_id} с IP {expected_ip}." ) + ip_equal_hints.append( + f"Не найден интерфейс на устройстве {device}, подключённый к {target_node_id} с IP {expected_ip}." + ) + + # Log specific IP check result and points + _log( + logging.INFO, + "practice_ip_equal_result", + { + "device": device, + "target": target_node_id, + "expected_ip": expected_ip, + "awarded": points if found else 0, + "host_total": points_for_host, + "hints": ip_equal_hints, + }, + ) # abstract_ip_equal abstract_equal = requirement.get("abstract_ip_equal") @@ -317,6 +587,17 @@ def check_host(requirement, answer, device): points, abstract_hints = check_abstract_ip_equal(abstract_equal, answer, device) points_for_host += points hints.extend(abstract_hints) + # Log abstract IP equality aggregation result + _log( + logging.INFO, + "practice_abstract_ip_equal_result", + { + "device": device, + "awarded": points, + "host_total": points_for_host, + "hints": abstract_hints, + }, + ) # Check if two hosts are in the same network in_one_network_with = requirement.get("in_one_network_with") @@ -324,6 +605,24 @@ def check_host(requirement, answer, device): p, net_hints = check_in_one_network_with(in_one_network_with, answer, device) points_for_host += p hints.extend(net_hints) + # Log same-network check result and points + _log( + logging.INFO, + "practice_in_one_network_result", + { + "device": device, + "awarded": p, + "host_total": points_for_host, + "hints": net_hints, + }, + ) + + # Log host-level summary: points and hints count + _log( + logging.INFO, + "practice_check_host_done", + {"device": device, "host_total": points_for_host, "hints_count": len(hints)}, + ) return points_for_host, hints @@ -331,11 +630,25 @@ def check_host(requirement, answer, device): def check_task(requirements, answer): total_points = 0 hints = [] - - logging.info(f"requirements: {requirements}") + # Log task-level start: counts of requirements/nodes/edges + _log( + logging.INFO, + "practice_check_task_start", + { + "requirements_count": len(requirements), + "nodes_count": len(answer.get("nodes", [])), + "edges_count": len(answer.get("edges", [])), + }, + ) for requirement in requirements: for device, requirements in requirement.items(): + # Log device requirement keys before processing + _log( + logging.DEBUG, + "practice_check_task_device", + {"device": device, "requirements_keys": list(requirements.keys())}, + ) if ( device.startswith("host") or device.startswith("server") @@ -344,11 +657,39 @@ def check_task(requirements, answer): points, device_hints = check_host(requirements, answer, device) total_points += points hints.extend(device_hints) + # Log device result: points and hints + _log( + logging.INFO, + "practice_device_result", + { + "device": device, + "awarded": points, + "total_points": total_points, + "hints": device_hints, + }, + ) elif device.startswith("network"): points, network_hints = check_network_configuration( requirements, answer ) total_points += points hints.extend(network_hints) + # Log network result: points and hints + _log( + logging.INFO, + "practice_network_result", + { + "network": device, + "awarded": points, + "total_points": total_points, + "hints": network_hints, + }, + ) + # Log task-level summary: total points and hints count + _log( + logging.INFO, + "practice_check_task_done", + {"total_points": total_points, "hints_count": len(hints)}, + ) return total_points, hints diff --git a/front/src/tasks.py b/front/src/tasks.py index a1ef3d34..bcfe277c 100644 --- a/front/src/tasks.py +++ b/front/src/tasks.py @@ -4,6 +4,7 @@ import logging import json +import logging_config from sqlalchemy.orm.exc import StaleDataError from celery_app import ( @@ -21,6 +22,12 @@ answer_on_exam_without_session, ) +logger = logging.getLogger(__name__) + + +def _log(level, event, extra): + logger.log(level, event, extra=extra) + @app.task(bind=True, queue="common-results-queue") def save_simulate_result(self, animation, pcaps): @@ -101,7 +108,11 @@ def perform_task_check(session_question_id, data_list): ) except Exception as e: - logging.error(f"Ошибка при создании задачи: {e}.") + _log( + logging.ERROR, + "check_task_emulation_create_failed", + {"error": str(e), "guid": guid}, + ) answer_on_exam_without_session(networks_to_check, guid) @@ -128,7 +139,14 @@ def perform_task_check(session_question_id, data_list): ) except Exception as e: - logging.error(f"Ошибка при создании задачи: {e}.") + _log( + logging.ERROR, + "check_task_emulation_create_failed", + { + "error": str(e), + "session_question_id": session_question_id, + }, + ) with flask_app.app_context(): answer_on_exam_question(session_question_id, networks_to_check) @@ -141,13 +159,42 @@ def create_emulation_task(net_schema): net_schema = ( json.dumps(net_schema) if not isinstance(net_schema, str) else net_schema ) + routing_key = str(uuid.uuid4()) + + # Log start of sending task to RabbitMQ + _log( + logging.INFO, + "rabbitmq_send_task_start", + {"routing_key": routing_key, "exchange": SEND_NETWORK_EXCHANGE.name}, + ) - async_obj = app.send_task( - "tasks.mininet_worker", - [net_schema], - routing_key=str(uuid.uuid4()), - exchange=SEND_NETWORK_EXCHANGE, - exchange_type=EXCHANGE_TYPE, + try: + async_obj = app.send_task( + "tasks.mininet_worker", + [net_schema], + routing_key=routing_key, + exchange=SEND_NETWORK_EXCHANGE, + exchange_type=EXCHANGE_TYPE, + ) + except Exception as e: + # Log broker rejection (incl. disk_free_limit) + _log( + logging.ERROR, + "rabbitmq_send_task_failed", + { + "routing_key": routing_key, + "exchange": SEND_NETWORK_EXCHANGE.name, + "error": str(e), + "hint": "Check RabbitMQ disk_free_limit and broker availability", + }, + ) + raise + + # Log successful scheduling of task in queue + _log( + logging.INFO, + "rabbitmq_send_task_scheduled", + {"routing_key": routing_key, "task_id": async_obj.id}, ) async_res = AsyncResult(id=async_obj.id, app=app) @@ -156,7 +203,27 @@ def create_emulation_task(net_schema): with allow_join_result(): animation, _ = async_res.wait(timeout=120) + # Log successful result receipt from worker + _log( + logging.INFO, + "rabbitmq_receive_result_success", + {"routing_key": routing_key, "task_id": async_obj.id}, + ) return animation except TimeoutError: + # Log timeout while waiting for result + _log( + logging.ERROR, + "rabbitmq_receive_result_timeout", + {"routing_key": routing_key, "task_id": async_obj.id}, + ) # TODO improve error message (add user info) raise Exception(f"""Check task failed!\nNetwork Schema: {net_schema}.""") + except Exception as e: + # Log any other errors while waiting for result + _log( + logging.ERROR, + "rabbitmq_receive_result_failed", + {"routing_key": routing_key, "task_id": async_obj.id, "error": str(e)}, + ) + raise