From 37d7576edde14e29b63ec786b80a104068e83854 Mon Sep 17 00:00:00 2001 From: "Wilczynski, Andrzej" Date: Tue, 28 Oct 2025 13:16:03 +0000 Subject: [PATCH 1/3] Implemented OOP approach for tested applications in the pytest framework Add parent class for common parameters and execution logic across RxTxApp, FFmpeg, and Gstreamer. Includes RxTxApp child class implementation. Signed-off-by: Wilczynski, Andrzej --- .../validation/mtl_engine/application_base.py | 421 +++++++++++ .../mtl_engine/config/app_mappings.py | 94 +++ .../mtl_engine/config/param_mappings.py | 143 ++++ .../mtl_engine/config/universal_params.py | 110 +++ tests/validation/mtl_engine/rxtxapp.py | 655 ++++++++++++++++++ 5 files changed, 1423 insertions(+) create mode 100644 tests/validation/mtl_engine/application_base.py create mode 100644 tests/validation/mtl_engine/config/app_mappings.py create mode 100644 tests/validation/mtl_engine/config/param_mappings.py create mode 100644 tests/validation/mtl_engine/config/universal_params.py create mode 100644 tests/validation/mtl_engine/rxtxapp.py diff --git a/tests/validation/mtl_engine/application_base.py b/tests/validation/mtl_engine/application_base.py new file mode 100644 index 000000000..c3023c717 --- /dev/null +++ b/tests/validation/mtl_engine/application_base.py @@ -0,0 +1,421 @@ +# Base Application Class for Media Transport Library +# Provides common interface for all media application frameworks + +import logging +import re +import time +from abc import ABC, abstractmethod + +from .config.app_mappings import DEFAULT_PAYLOAD_TYPE_CONFIG, DEFAULT_PORT_CONFIG +from .config.universal_params import UNIVERSAL_PARAMS + +# Import execution utilities with fallback +try: + from .execute import run + from .RxTxApp import prepare_tcpdump +except ImportError: + # Fallback for direct execution + from execute import run + from RxTxApp import prepare_tcpdump + +logger = logging.getLogger(__name__) + + +class Application(ABC): + """Abstract base class shared by all framework adapters (RxTxApp / FFmpeg / GStreamer). + + Unified model: + 1. create_command(...) MUST be called first. It populates: + - self.command: full shell command string ready to run + - self.config: optional dict (RxTxApp) written immediately if a config_file_path is supplied + 2. execute_test(...) ONLY executes already prepared command(s); it NEVER builds commands. + - Single-host: call execute_test with host=... + - Dual-host: create TWO application objects (tx_app, rx_app) each with its own create_command(); + then call tx_app.execute_test(tx_host=..., rx_host=..., rx_app=rx_app). + 3. validate_results() now has a uniform no-argument signature and consumes internal state + (self.universal_params, self.config, self.last_output, and any produced files). + """ + + def __init__(self, app_path, config_file_path=None): + self.app_path = app_path + self.config_file_path = config_file_path + self.universal_params = UNIVERSAL_PARAMS.copy() + self._user_provided_params = set() + self.command: str | None = None + self.config: dict | None = None + self.last_output: str | None = None + self.last_return_code: int | None = None + + @abstractmethod + def get_framework_name(self) -> str: + """Return the framework name (e.g., 'RxTxApp', 'FFmpeg', 'GStreamer').""" + pass + + @abstractmethod + def get_executable_name(self) -> str: + """Return the executable name for this framework.""" + pass + + @abstractmethod + def create_command(self, **kwargs): + """Populate self.command (+ self.config for frameworks that need it). + + Implementations MUST: + - call self.set_universal_params(**kwargs) + - set self.command (string) + - optionally set self.config + - write config file immediately if applicable + They MAY return (self.command, self.config) for backward compatibility with existing tests. + """ + raise NotImplementedError + + @abstractmethod + def validate_results(self) -> bool: # type: ignore[override] + """Framework-specific validation implemented by subclasses. + + Subclasses should read: self.universal_params, self.config, self.last_output, etc. + Must return True/False. + """ + raise NotImplementedError + + def set_universal_params(self, **kwargs): + """Set universal parameters and track which were provided by user.""" + self._user_provided_params = set(kwargs.keys()) + + for param, value in kwargs.items(): + if param in self.universal_params: + self.universal_params[param] = value + else: + raise ValueError(f"Unknown universal parameter: {param}") + + def get_executable_path(self) -> str: + """Get the full path to the executable based on framework type.""" + executable_name = self.get_executable_name() + + # For applications with specific paths, combine with directory + if self.app_path and not executable_name.startswith("/"): + if self.app_path.endswith("/"): + return f"{self.app_path}{executable_name}" + else: + return f"{self.app_path}/{executable_name}" + else: + # For system executables or full paths + return executable_name + + def was_user_provided(self, param_name: str) -> bool: + """Check if a parameter was explicitly provided by the user.""" + return param_name in self._user_provided_params + + def get_session_default_port(self, session_type: str) -> int: + """Get default port for a specific session type.""" + port_map = { + "st20p": DEFAULT_PORT_CONFIG["st20p_port"], + "st22p": DEFAULT_PORT_CONFIG["st22p_port"], + "st30p": DEFAULT_PORT_CONFIG["st30p_port"], + "video": DEFAULT_PORT_CONFIG["video_port"], + "audio": DEFAULT_PORT_CONFIG["audio_port"], + "ancillary": DEFAULT_PORT_CONFIG["ancillary_port"], + "fastmetadata": DEFAULT_PORT_CONFIG["fastmetadata_port"], + } + return port_map.get(session_type, DEFAULT_PORT_CONFIG["st20p_port"]) + + def get_session_default_payload_type(self, session_type: str) -> int: + """Get default payload type for a specific session type.""" + payload_map = { + "st20p": DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"], + "st22p": DEFAULT_PAYLOAD_TYPE_CONFIG["st22p_payload_type"], + "st30p": DEFAULT_PAYLOAD_TYPE_CONFIG["st30p_payload_type"], + "video": DEFAULT_PAYLOAD_TYPE_CONFIG["video_payload_type"], + "audio": DEFAULT_PAYLOAD_TYPE_CONFIG["audio_payload_type"], + "ancillary": DEFAULT_PAYLOAD_TYPE_CONFIG["ancillary_payload_type"], + "fastmetadata": DEFAULT_PAYLOAD_TYPE_CONFIG["fastmetadata_payload_type"], + } + return payload_map.get( + session_type, DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"] + ) + + def get_common_session_params(self, session_type: str) -> dict: + """Get common session parameters used across all session types.""" + default_port = self.get_session_default_port(session_type) + default_payload = self.get_session_default_payload_type(session_type) + + return { + "replicas": self.universal_params.get( + "replicas", UNIVERSAL_PARAMS["replicas"] + ), + "start_port": int( + self.universal_params.get("port") + if self.was_user_provided("port") + else default_port + ), + "payload_type": ( + self.universal_params.get("payload_type") + if self.was_user_provided("payload_type") + else default_payload + ), + } + + def get_common_video_params(self) -> dict: + """Get common video parameters used across video session types.""" + return { + "width": int(self.universal_params.get("width", UNIVERSAL_PARAMS["width"])), + "height": int( + self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + ), + "interlaced": self.universal_params.get( + "interlaced", UNIVERSAL_PARAMS["interlaced"] + ), + "device": self.universal_params.get("device", UNIVERSAL_PARAMS["device"]), + "enable_rtcp": self.universal_params.get( + "enable_rtcp", UNIVERSAL_PARAMS["enable_rtcp"] + ), + } + + def execute_test( + self, + build: str, + test_time: int = 30, + host=None, + tx_host=None, + rx_host=None, + rx_app=None, + sleep_interval: int = 4, + tx_first: bool = True, + capture_cfg=None, + ) -> bool: + """Execute a prepared command (or two for dual host). + + Usage patterns: + # Single host + app.create_command(...) + app.execute_test(build=..., host=my_host, test_time=10) + + # Dual host + tx_app.create_command(direction='tx', ...) + rx_app.create_command(direction='rx', ...) + tx_app.execute_test(build=..., tx_host=hostA, rx_host=hostB, rx_app=rx_app) + """ + is_dual = tx_host is not None and rx_host is not None + if is_dual and not rx_app: + raise ValueError("rx_app instance required for dual-host execution") + if not is_dual and not host: + raise ValueError("host required for single-host execution") + + if not self.command: + raise RuntimeError("create_command() must be called before execute_test()") + framework_name = self.get_framework_name() + + # Single-host execution + if not is_dual: + cmd = self.add_timeout(self.command, test_time) + logger.info(f"[single] Running {framework_name} command: {cmd}") + # Optional tcpdump capture hook retained for RxTxApp compatibility + if ( + capture_cfg + and capture_cfg.get("enable") + and "prepare_tcpdump" in globals() + ): + try: + prepare_tcpdump(capture_cfg, host) + except Exception as e: + logger.warning(f"capture setup failed: {e}") + proc = self.start_process(cmd, build, test_time, host) + try: + proc.wait( + timeout=(test_time or 0) + + self.universal_params.get("process_timeout_buffer", 90) + ) + except Exception: + logger.warning( + f"{framework_name} process wait timed out (continuing to capture output)" + ) + self.last_output = self.capture_stdout(proc, framework_name) + self.last_return_code = getattr(proc, "returncode", None) + return self.validate_results() + + # Dual-host execution (tx self, rx rx_app) + assert rx_app is not None + if not rx_app.command: + raise RuntimeError( + "rx_app has no prepared command (call create_command first)" + ) + tx_cmd = self.add_timeout(self.command, test_time) + rx_cmd = rx_app.add_timeout(rx_app.command, test_time) + primary_first = tx_first + first_cmd, first_host, first_label = ( + (tx_cmd, tx_host, f"{framework_name}-TX") + if primary_first + else (rx_cmd, rx_host, f"{rx_app.get_framework_name()}-RX") + ) + second_cmd, second_host, second_label = ( + (rx_cmd, rx_host, f"{rx_app.get_framework_name()}-RX") + if primary_first + else (tx_cmd, tx_host, f"{framework_name}-TX") + ) + logger.info(f"[dual] Starting first: {first_label} -> {first_cmd}") + first_proc = self.start_process(first_cmd, build, test_time, first_host) + time.sleep(sleep_interval) + logger.info(f"[dual] Starting second: {second_label} -> {second_cmd}") + second_proc = self.start_process(second_cmd, build, test_time, second_host) + # Wait processes + total_timeout = (test_time or 0) + self.universal_params.get( + "process_timeout_buffer", 90 + ) + for p, label in [(first_proc, first_label), (second_proc, second_label)]: + try: + p.wait(timeout=total_timeout) + except Exception: + logger.warning( + f"Process {label} wait timeout; capturing partial output" + ) + # Capture outputs + if primary_first: + self.last_output = self.capture_stdout(first_proc, first_label) + rx_app.last_output = rx_app.capture_stdout(second_proc, second_label) + else: + rx_app.last_output = rx_app.capture_stdout(first_proc, first_label) + self.last_output = self.capture_stdout(second_proc, second_label) + self.last_return_code = getattr(first_proc, "returncode", None) + rx_app.last_return_code = getattr(second_proc, "returncode", None) + tx_ok = self.validate_results() + rx_ok = rx_app.validate_results() + return tx_ok and rx_ok + + # ------------------------- + # Common helper utilities + # ------------------------- + def add_timeout(self, command: str, test_time: int, grace: int = None) -> str: + """Wrap command with timeout if test_time provided (adds a grace period).""" + if grace is None: + grace = self.universal_params.get("timeout_grace", 10) + # If the command already has an internal --test_time X argument, ensure the wrapper + # timeout is >= that internal value + grace to avoid premature SIGTERM (RC 124). + internal_test_time = None + m = re.search(r"--test_time\s+(\d+)", command) + if m: + try: + internal_test_time = int(m.group(1)) + except ValueError: + internal_test_time = None + effective_test_time = test_time or internal_test_time + if internal_test_time and test_time and internal_test_time != test_time: + logger.debug( + f"Mismatch between execute_test test_time={test_time} and command --test_time {internal_test_time}; " + f"using max" + ) + effective_test_time = max(internal_test_time, test_time) + elif internal_test_time and not test_time: + effective_test_time = internal_test_time + if effective_test_time and not command.strip().startswith("timeout "): + return f"timeout {effective_test_time + grace} {command}" + return command + + def start_and_capture( + self, command: str, build: str, test_time: int, host, process_name: str + ): + """Start a single process and capture its stdout safely.""" + process = self.start_process(command, build, test_time, host) + output = self.capture_stdout(process, process_name) + return process, output + + def start_dual_with_delay( + self, + tx_command: str, + rx_command: str, + build: str, + test_time: int, + tx_host, + rx_host, + tx_first: bool, + sleep_interval: int, + tx_name: str, + rx_name: str, + ): + """Start two processes with an optional delay ordering TX/RX based on tx_first flag.""" + if tx_first: + tx_process = self.start_process(tx_command, build, test_time, tx_host) + time.sleep(sleep_interval) + rx_process = self.start_process(rx_command, build, test_time, rx_host) + else: + rx_process = self.start_process(rx_command, build, test_time, rx_host) + time.sleep(sleep_interval) + tx_process = self.start_process(tx_command, build, test_time, tx_host) + tx_output = self.capture_stdout(tx_process, tx_name) + rx_output = self.capture_stdout(rx_process, rx_name) + return (tx_process, rx_process, tx_output, rx_output) + + def extract_framerate(self, framerate_str, default: int = None) -> int: + """Extract numeric framerate from various string or numeric forms (e.g. 'p25', '60').""" + if default is None: + default = self.universal_params.get("default_framerate_numeric", 60) + if isinstance(framerate_str, (int, float)): + try: + return int(framerate_str) + except Exception: + return default + if not isinstance(framerate_str, str) or not framerate_str: + return default + if framerate_str.startswith("p") and len(framerate_str) > 1: + num = framerate_str[1:] + else: + num = framerate_str + try: + return int(float(num)) + except ValueError: + logger.warning( + f"Could not parse framerate '{framerate_str}', defaulting to {default}" + ) + return default + + # Legacy execute_* abstract methods removed; unified execute_test used instead. + + def start_process(self, command: str, build: str, test_time: int, host): + """Start a process on the specified host.""" + logger.info(f"Starting {self.get_framework_name()} process...") + buffer_val = self.universal_params.get("process_timeout_buffer", 90) + timeout = (test_time or 0) + buffer_val + return run(command, host=host, cwd=build, timeout=timeout) + + def capture_stdout(self, process, process_name: str) -> str: + """Capture stdout from a process.""" + try: + # Remote process objects (from mfd_connect) expose stdout via 'stdout_text' + if hasattr(process, "stdout_text") and process.stdout_text: + output = process.stdout_text + logger.debug( + f"{process_name} output (captured stdout_text): {output[:200]}..." + ) + return output + # Local fallback (subprocess) may expose .stdout already consumed elsewhere + if hasattr(process, "stdout") and process.stdout: + try: + # Attempt to read if it's a file-like object + if hasattr(process.stdout, "read"): + output = process.stdout.read() + else: + output = str(process.stdout) + logger.debug( + f"{process_name} output (captured stdout): {output[:200]}..." + ) + return output + except Exception: + pass + logger.warning(f"No stdout available for {process_name}") + return "" + except Exception as e: + logger.error(f"Error capturing {process_name} output: {e}") + return "" + + def get_case_id(self) -> str: + """Generate a case ID for logging/debugging purposes.""" + try: + import inspect + + frame = inspect.currentframe() + while frame: + if "test_" in frame.f_code.co_name: + return frame.f_code.co_name + frame = frame.f_back + return "unknown_test" + except Exception: + return "unknown_test" diff --git a/tests/validation/mtl_engine/config/app_mappings.py b/tests/validation/mtl_engine/config/app_mappings.py new file mode 100644 index 000000000..459eb8898 --- /dev/null +++ b/tests/validation/mtl_engine/config/app_mappings.py @@ -0,0 +1,94 @@ +# Application name mappings and format conversion utilities + +# Map framework names to executable names +APP_NAME_MAP = {"rxtxapp": "RxTxApp", "ffmpeg": "ffmpeg", "gstreamer": "gst-launch-1.0"} + +# Format conversion mappings +FFMPEG_FORMAT_MAP = { + "YUV422PLANAR10LE": "yuv422p10le", + "YUV422PLANAR8": "yuv422p", + "YUV420PLANAR8": "yuv420p", + "YUV420PLANAR10LE": "yuv420p10le", + "RGB24": "rgb24", + "RGBA": "rgba", + "YUV422RFC4175PG2BE10": "yuv422p10le", # RFC4175 to planar 10-bit LE +} + +SESSION_TYPE_MAP = { + "ffmpeg": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "rawvideo", + "audio": "pcm_s24le", + }, + "gstreamer": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "mtl_video", + "audio": "mtl_audio", + }, +} + +FRAMERATE_TO_VIDEO_FORMAT_MAP = { + "p60": "i1080p60", + "p59": "i1080p59", + "p50": "i1080p50", + "p30": "i1080p30", + "p29": "i1080p29", + "p25": "i1080p25", + "p24": "i1080p24", + "p23": "i1080p23", +} + +# Default network configuration values +DEFAULT_NETWORK_CONFIG = { + "nic_port": "0000:31:01.0", + "unicast_tx_ip": "192.168.17.101", + "unicast_rx_ip": "192.168.17.102", + "multicast_tx_ip": "192.168.17.101", + "multicast_rx_ip": "192.168.17.102", + "multicast_destination_ip": "239.168.48.9", + "default_config_file": "config.json", +} + +# Default port configuration by session type +DEFAULT_PORT_CONFIG = { + "st20p_port": 20000, + "st22p_port": 20000, + "st30p_port": 30000, + "video_port": 20000, + "audio_port": 30000, + "ancillary_port": 40000, + "fastmetadata_port": 40000, +} + +# Default payload type configuration by session type +DEFAULT_PAYLOAD_TYPE_CONFIG = { + "st20p_payload_type": 112, + "st22p_payload_type": 114, + "st30p_payload_type": 111, + "video_payload_type": 112, + "audio_payload_type": 111, + "ancillary_payload_type": 113, + "fastmetadata_payload_type": 115, +} + +# Default ST22p-specific configuration +DEFAULT_ST22P_CONFIG = { + "framerate": "p25", + "pack_type": "codestream", + "codec": "JPEG-XS", + "quality": "speed", + "codec_threads": 2, +} + +# Default FFmpeg configuration +DEFAULT_FFMPEG_CONFIG = { + "default_pixel_format": "yuv422p10le", + "default_session_type": "mtl_st20p", +} + +# Default GStreamer configuration +DEFAULT_GSTREAMER_CONFIG = {"default_session_type": "mtl_st20p"} diff --git a/tests/validation/mtl_engine/config/param_mappings.py b/tests/validation/mtl_engine/config/param_mappings.py new file mode 100644 index 000000000..e730f2eb3 --- /dev/null +++ b/tests/validation/mtl_engine/config/param_mappings.py @@ -0,0 +1,143 @@ +# Parameter mappings for different applications +# Maps universal parameter names to application-specific names + +# RxTxApp parameter mapping +RXTXAPP_PARAM_MAP = { + # Network parameters + "source_ip": "ip", + "destination_ip": "dip", + "multicast_ip": "ip", + "port": "start_port", + "nic_port": "name", + # Video parameters + "width": "width", + "height": "height", + "framerate": "fps", + "interlaced": "interlaced", + "transport_format": "transport_format", + # Audio parameters + "audio_format": "audio_format", + "audio_channels": "audio_channel", + "audio_sampling": "audio_sampling", + "audio_ptime": "audio_ptime", + # Streaming parameters + "payload_type": "payload_type", + "replicas": "replicas", + "pacing": "pacing", + "packing": "packing", + "device": "device", + "codec": "codec", + "quality": "quality", + "codec_threads": "codec_thread_count", + # File I/O + "input_file": "st20p_url", # for input files + "output_file": "st20p_url", # for output files (RX) + "url": "video_url", # for video files + # Flags + "enable_rtcp": "enable_rtcp", + "measure_latency": "measure_latency", + "display": "display", + # RxTxApp specific command-line parameters + "config_file": "--config_file", + "enable_ptp": "--ptp", + "lcores": "--lcores", + "test_time": "--test_time", + "dma_dev": "--dma_dev", + "log_level": "--log_level", + "log_file": "--log_file", + "arp_timeout_s": "--arp_timeout_s", + "allow_across_numa_core": "--allow_across_numa_core", + "no_multicast": "--no_multicast", + "rx_separate_lcore": "--rx_separate_lcore", + "rx_mix_lcore": "--rx_mix_lcore", + "runtime_session": "--runtime_session", + "rx_timing_parser": "--rx_timing_parser", + "pcapng_dump": "--pcapng_dump", + "rx_video_file_frames": "--rx_video_file_frames", + "framebuffer_count": "--rx_video_fb_cnt", + "promiscuous": "--promiscuous", + "cni_thread": "--cni_thread", + "sch_session_quota": "--sch_session_quota", + "p_tx_dst_mac": "--p_tx_dst_mac", + "r_tx_dst_mac": "--r_tx_dst_mac", + "nb_tx_desc": "--nb_tx_desc", + "nb_rx_desc": "--nb_rx_desc", + "tasklet_time": "--tasklet_time", + "tsc": "--tsc", + "pacing_way": "--pacing_way", + "shaping": "--shaping", + "vrx": "--vrx", + "ts_first_pkt": "--ts_first_pkt", + "ts_delta_us": "--ts_delta_us", + "mono_pool": "--mono_pool", + "tasklet_thread": "--tasklet_thread", + "tasklet_sleep": "--tasklet_sleep", + "tasklet_sleep_us": "--tasklet_sleep_us", + "app_bind_lcore": "--app_bind_lcore", + "rxtx_simd_512": "--rxtx_simd_512", + "rss_mode": "--rss_mode", + "tx_no_chain": "--tx_no_chain", + "multi_src_port": "--multi_src_port", + "audio_fifo_size": "--audio_fifo_size", + "dhcp": "--dhcp", + "virtio_user": "--virtio_user", + "phc2sys": "--phc2sys", + "ptp_sync_sys": "--ptp_sync_sys", + "rss_sch_nb": "--rss_sch_nb", + "log_time_ms": "--log_time_ms", + "rx_audio_dump_time_s": "--rx_audio_dump_time_s", + "dedicated_sys_lcore": "--dedicated_sys_lcore", + "bind_numa": "--bind_numa", + "force_numa": "--force_numa", +} + +# FFmpeg parameter mapping +# Maps universal params to FFmpeg MTL plugin flags. +# Width & height both map to -video_size; command builders coalesce them into WxH format. +# Framerate maps to -fps (distinct from input rawvideo's -framerate). +FFMPEG_PARAM_MAP = { + # Network parameters + "source_ip": "-p_sip", + "destination_ip": "-p_tx_ip", # TX unicast destination + "multicast_ip": "-p_rx_ip", # RX multicast group + "port": "-udp_port", + "nic_port": "-p_port", + # Video parameters (width/height combined externally) + "width": "-video_size", + "height": "-video_size", + "framerate": "-fps", + "pixel_format": "-pix_fmt", + # Streaming parameters + "payload_type": "-payload_type", + "session_type": "-f", # Converted via SESSION_TYPE_MAP + # File I/O + "input_file": "-i", + "output_file": "", # Output appears last (no explicit flag) +} + +# GStreamer parameter mapping +# Maps universal params to MTL GStreamer element properties. +# Set as name=value pairs in the pipeline. +GSTREAMER_PARAM_MAP = { + # Network parameters + "source_ip": "dev-ip", # Interface IP + "destination_ip": "ip", # Destination (unicast) IP + "port": "udp-port", # UDP port + "nic_port": "dev-port", # NIC device/PCI identifier + # Video parameters / caps + "width": "width", + "height": "height", + "framerate": "framerate", + "pixel_format": "format", + # Audio parameters + "audio_format": "audio-format", + "audio_channels": "channel", + "audio_sampling": "sampling", + # Streaming parameters + "payload_type": "payload-type", + "queues": "queues", # Currently legacy / advanced usage + "framebuffer_count": "framebuff-cnt", + # File I/O (filesrc/filesink) + "input_file": "location", + "output_file": "location", +} diff --git a/tests/validation/mtl_engine/config/universal_params.py b/tests/validation/mtl_engine/config/universal_params.py new file mode 100644 index 000000000..d5a1ff46c --- /dev/null +++ b/tests/validation/mtl_engine/config/universal_params.py @@ -0,0 +1,110 @@ +# Universal parameter definitions for all media applications +# This serves as the common interface for RxTxApp, FFmpeg, and GStreamer + +UNIVERSAL_PARAMS = { + # Network parameters + "source_ip": None, # Source IP address (interface IP) + "destination_ip": None, # Destination IP address (session IP) + "multicast_ip": None, # Multicast group IP + "port": 20000, # UDP port number + "nic_port": None, # Network interface/port name + "nic_port_list": None, # List of network interfaces/ports + "tx_nic_port": None, # Override NIC port for TX direction + "rx_nic_port": None, # Override NIC port for RX direction + # Video parameters + "width": 1920, # Video width in pixels + "height": 1080, # Video height in pixels + "framerate": "p60", # Frame rate (p25, p30, p50, p60, etc.) + "interlaced": False, # Progressive (False) or Interlaced (True) + "pixel_format": "YUV422PLANAR10LE", # Pixel format for TX input and RX output + "transport_format": "YUV_422_10bit", # Transport format for streaming + # Audio parameters + "audio_format": "PCM24", # Audio format + "audio_channels": ["U02"], # Audio channel configuration + "audio_sampling": "96kHz", # Audio sampling rate + "audio_ptime": "1", # Audio packet time + # ST41 (Fast Metadata) parameters + "fastmetadata_data_item_type": 1234567, # Data Item Type for ST41 + "fastmetadata_k_bit": 0, # K-bit value for ST41 + "fastmetadata_fps": "p59", # Frame rate for ST41 + "type_mode": "frame", # Type mode for ST41: "rtp" or "frame" + # Streaming parameters + "payload_type": 112, # RTP payload type + "session_type": "st20p", # Session type (st20p, st22p, st30p, video, audio, etc.) + "direction": None, # Direction: tx, rx, or None (both for RxTxApp) + "replicas": 1, # Number of session replicas + "framebuffer_count": None, # Frame buffer count (RX video: rx_video_fb_cnt) + # Quality and encoding parameters + "pacing": "gap", # Pacing mode (gap, auto, etc.) + "packing": "BPM", # Packing mode + "device": "AUTO", # Device selection + "codec": "JPEG-XS", # Codec for compressed formats + "quality": "speed", # Quality setting + "codec_threads": 2, # Number of codec threads + # File I/O parameters + "input_file": None, # Input file path + "output_file": None, # Output file path + # Test configuration + "test_mode": "multicast", # Test mode (unicast, multicast, kernel) + "test_time": 30, # Test duration in seconds + "enable_rtcp": False, # Enable RTCP + "measure_latency": False, # Enable latency measurement + "display": False, # Enable display output + "enable_ptp": False, # Enable PTP synchronization + "virtio_user": False, # Enable virtio-user mode + # RxTxApp specific parameters + "config_file": None, # JSON config file path + "lcores": None, # DPDK lcore list (e.g., "28,29,30,31") + "dma_dev": None, # DMA device list (e.g., "DMA1,DMA2,DMA3") + "log_level": None, # Log level (debug, info, notice, warning, error) + "log_file": None, # Log file path + "arp_timeout_s": None, # ARP timeout in seconds (default: 60) + "allow_across_numa_core": False, # Allow cores across NUMA nodes + "no_multicast": False, # Disable multicast join message + "rx_separate_lcore": False, # RX video on dedicated lcores + "rx_mix_lcore": False, # Allow TX/RX video on same core + "runtime_session": False, # Start instance before creating sessions + "rx_timing_parser": False, # Enable timing check for video RX streams + "pcapng_dump": None, # Dump n packets to pcapng files + "rx_video_file_frames": None, # Dump received video frames to yuv file + "promiscuous": False, # Enable RX promiscuous mode + "cni_thread": False, # Use dedicated thread for CNI messages + "sch_session_quota": None, # Max sessions count per lcore + "p_tx_dst_mac": None, # Destination MAC for primary port + "r_tx_dst_mac": None, # Destination MAC for redundant port + "nb_tx_desc": None, # Number of TX descriptors per queue + "nb_rx_desc": None, # Number of RX descriptors per queue + "tasklet_time": False, # Enable tasklet running time stats + "tsc": False, # Force TSC pacing + "pacing_way": None, # Pacing way (auto, rl, tsc, tsc_narrow, ptp, tsn) + "shaping": None, # ST21 shaping type (narrow, wide) + "vrx": None, # ST21 vrx value + "ts_first_pkt": False, # Set RTP timestamp at first packet egress + "ts_delta_us": None, # RTP timestamp delta in microseconds + "mono_pool": False, # Use mono pool for all queues + "tasklet_thread": False, # Run tasklet under thread + "tasklet_sleep": False, # Enable sleep if tasklets report done + "tasklet_sleep_us": None, # Sleep microseconds value + "app_bind_lcore": False, # Run app thread on pinned lcore + "rxtx_simd_512": False, # Enable DPDK SIMD 512 path + "rss_mode": None, # RSS mode (l3_l4, l3, none) + "tx_no_chain": False, # Use memcopy instead of mbuf chain + "multi_src_port": False, # Use multiple source ports for ST20 TX + "audio_fifo_size": None, # Audio FIFO size + "dhcp": False, # Enable DHCP for all ports + "phc2sys": False, # Enable built-in phc2sys function + "ptp_sync_sys": False, # Enable PTP to system time sync + "rss_sch_nb": None, # Number of schedulers for RSS dispatch + "log_time_ms": False, # Enable ms accuracy log printer + "rx_audio_dump_time_s": None, # Dump audio frames for n seconds + "dedicated_sys_lcore": False, # Run MTL system tasks on dedicated lcore + "bind_numa": False, # Bind all MTL threads to NIC NUMA + "force_numa": None, # Force NIC port NUMA ID + # Execution control defaults + "sleep_interval": 4, # Delay between starting TX and RX + "tx_first": True, # Whether to start TX side before RX + "timeout_grace": 10, # Extra seconds for process timeout + "process_timeout_buffer": 90, # Buffer added to test_time for run() timeout + "pattern_duration": 30, # Duration for generated test patterns + "default_framerate_numeric": 60, # Fallback numeric framerate +} diff --git a/tests/validation/mtl_engine/rxtxapp.py b/tests/validation/mtl_engine/rxtxapp.py new file mode 100644 index 000000000..6b94fda80 --- /dev/null +++ b/tests/validation/mtl_engine/rxtxapp.py @@ -0,0 +1,655 @@ +# RxTxApp Implementation for Media Transport Library +# Handles RxTxApp-specific command generation and configuration + +import json +import logging +import os + +from .application_base import Application +from .config.app_mappings import APP_NAME_MAP, DEFAULT_NETWORK_CONFIG +from .config.param_mappings import RXTXAPP_PARAM_MAP +from .config.universal_params import UNIVERSAL_PARAMS + +# Import execution utilities with fallback +try: + import copy + + from . import rxtxapp_config as legacy_cfg + from .execute import log_fail + + # Import legacy helpers so we can emit a backward-compatible JSON config + from .RxTxApp import ( + add_interfaces, + check_rx_output, + check_tx_output, + create_empty_config, + kernel_ip_dict, + multicast_ip_dict, + unicast_ip_dict, + ) +except ImportError: + # Fallback for direct execution (when running this module standalone) + import copy + + import rxtxapp_config as legacy_cfg + from execute import log_fail + from RxTxApp import ( + add_interfaces, + check_rx_output, + check_tx_output, + create_empty_config, + kernel_ip_dict, + multicast_ip_dict, + unicast_ip_dict, + ) + +logger = logging.getLogger(__name__) + + +class RxTxApp(Application): + """RxTxApp framework implementation (unified model).""" + + def get_framework_name(self) -> str: + return "RxTxApp" + + def get_executable_name(self) -> str: + return APP_NAME_MAP["rxtxapp"] + + def create_command(self, **kwargs): # type: ignore[override] + self.set_universal_params(**kwargs) + cmd, cfg = self._create_rxtxapp_command_and_config() + self.command = cmd + self.config = cfg + # Write config immediately if path known + config_path = ( + self.config_file_path + or self.universal_params.get("config_file") + or "config.json" + ) + try: + with open(config_path, "w") as f: + json.dump(cfg, f, indent=2) + except Exception as e: + logger.warning(f"Failed to write RxTxApp config file {config_path}: {e}") + return self.command, self.config + + def _create_rxtxapp_command_and_config(self) -> tuple: + """ + Generate RxTxApp command line and JSON configuration from universal parameters. + Uses config file path from constructor if provided, otherwise defaults to value from DEFAULT_NETWORK_CONFIG. + + Returns: + Tuple of (command_string, config_dict) + """ + # Use config file path from constructor or default (absolute path) + if self.config_file_path: + config_file_path = self.config_file_path + else: + config_file_path = os.path.abspath( + DEFAULT_NETWORK_CONFIG["default_config_file"] + ) + + # Build command line with all command-line parameters + executable_path = self.get_executable_path() + cmd_parts = ["sudo", executable_path] + cmd_parts.extend(["--config_file", config_file_path]) + + # Add command-line parameters from RXTXAPP_PARAM_MAP + session_type = self.universal_params.get( + "session_type", UNIVERSAL_PARAMS["session_type"] + ) + for universal_param, rxtx_param in RXTXAPP_PARAM_MAP.items(): + # Skip file I/O generic mapping for st22p; we set st22p_url explicitly in JSON + if session_type == "st22p" and universal_param in ( + "input_file", + "output_file", + ): + continue + # Skip test_time unless explicitly provided (for VTune tests, duration is controlled by VTune) + if universal_param == "test_time" and not self.was_user_provided( + "test_time" + ): + continue + if rxtx_param.startswith("--"): # Command-line parameter only + if universal_param in self.universal_params: + value = self.universal_params[universal_param] + if value is not None and value is not False: + if isinstance(value, bool) and value: + cmd_parts.append(rxtx_param) + elif not isinstance(value, bool): + cmd_parts.extend([rxtx_param, str(value)]) + + # Create JSON configuration + config_dict = self._create_rxtxapp_config_dict() + + return " ".join(cmd_parts), config_dict + + def _create_rxtxapp_config_dict(self) -> dict: + """ + Build complete RxTxApp JSON config structure from universal parameters. + Creates interfaces, sessions, and all session-specific configurations. + This method intentionally recreates the original ("legacy") nested JSON + structure expected by the existing RxTxApp binary and validation helpers. + The previous refactored flat structure caused validation failures because + check_tx_output() and performance detection logic rely on nested lists + like config['tx_sessions'][0]['st20p'][0]. + + Returns: + Complete RxTxApp configuration dictionary + """ + # Currently only st20p/st22p/st30p/video/audio/ancillary/fastmetadata supported + # We rebuild the legacy shell for all session types but only populate the active one. + + session_type = self.universal_params.get( + "session_type", UNIVERSAL_PARAMS["session_type"] + ) + direction = self.universal_params.get("direction") # None means loopback + test_mode = self.universal_params.get( + "test_mode", UNIVERSAL_PARAMS["test_mode"] + ) + + # Determine NIC ports list (need at least 2 entries for legacy loopback template) + nic_port = self.universal_params.get( + "nic_port", DEFAULT_NETWORK_CONFIG["nic_port"] + ) + nic_port_list = self.universal_params.get("nic_port_list") + replicas = self.universal_params.get("replicas", 1) + + if not nic_port_list: + # For single-direction (tx-only or rx-only) with replicas on same port, + # only use one interface to avoid MTL duplicate port error + # For loopback (direction=None), need two interfaces + if direction in ("tx", "rx") and replicas >= 1: + nic_port_list = [nic_port] # Single interface for single-direction + else: + nic_port_list = [nic_port, nic_port] # Duplicate for loopback + elif len(nic_port_list) == 1: + # Same logic: single interface for single-direction, duplicate for loopback + if direction in ("tx", "rx") and replicas >= 1: + pass # Keep single element + else: + nic_port_list = nic_port_list * 2 + + # Base legacy structure + config = create_empty_config() + config["tx_no_chain"] = self.universal_params.get("tx_no_chain", False) + + # Fill interface names & addressing using legacy helper + try: + add_interfaces(config, nic_port_list, test_mode) + except Exception as e: + logger.warning( + f"Legacy add_interfaces failed ({e}); falling back to direct assignment" + ) + # Minimal fallback assignment - handle single or dual interface configs + config["interfaces"][0]["name"] = nic_port_list[0] + # Set IP addresses based on test mode + if test_mode == "unicast": + config["interfaces"][0]["ip"] = unicast_ip_dict["tx_interfaces"] + config["tx_sessions"][0]["dip"][0] = unicast_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = unicast_ip_dict["rx_sessions"] + elif test_mode == "multicast": + config["interfaces"][0]["ip"] = multicast_ip_dict["tx_interfaces"] + config["tx_sessions"][0]["dip"][0] = multicast_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = multicast_ip_dict["rx_sessions"] + elif test_mode == "kernel": + config["tx_sessions"][0]["dip"][0] = kernel_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = kernel_ip_dict["rx_sessions"] + + if len(nic_port_list) > 1: + config["interfaces"][1]["name"] = nic_port_list[1] + if test_mode == "unicast": + config["interfaces"][1]["ip"] = unicast_ip_dict["rx_interfaces"] + elif test_mode == "multicast": + config["interfaces"][1]["ip"] = multicast_ip_dict["rx_interfaces"] + elif direction in ("tx", "rx"): + # For single-direction single-interface, remove second interface + if len(config["interfaces"]) > 1: + config["interfaces"] = [config["interfaces"][0]] + + # Fix session interface indices when using single interface + # Template has TX on interface[0] and RX on interface[1], but with single interface both should use [0] + if len(config["interfaces"]) == 1: + if config["tx_sessions"] and len(config["tx_sessions"]) > 0: + config["tx_sessions"][0]["interface"] = [0] + if config["rx_sessions"] and len(config["rx_sessions"]) > 0: + config["rx_sessions"][0]["interface"] = [0] + + # Override interface IPs and session IPs with user-provided source_ip/destination_ip if specified + # This allows tests to use custom IP addressing instead of hardcoded unicast_ip_dict values + if test_mode == "unicast": + user_source_ip = self.universal_params.get("source_ip") + user_dest_ip = self.universal_params.get("destination_ip") + + if direction == "tx" and len(config["interfaces"]) >= 1: + # TX: interface IP = source_ip (local), session dip = destination_ip (remote RX) + if user_source_ip: + config["interfaces"][0]["ip"] = user_source_ip + if ( + user_dest_ip + and config["tx_sessions"] + and len(config["tx_sessions"]) > 0 + ): + config["tx_sessions"][0]["dip"][0] = user_dest_ip + elif direction == "rx" and len(config["interfaces"]) >= 1: + # RX: interface IP = destination_ip (local bind), session ip = source_ip (filter for TX) + if user_dest_ip: + config["interfaces"][0]["ip"] = user_dest_ip + if ( + user_source_ip + and config["rx_sessions"] + and len(config["rx_sessions"]) > 0 + ): + config["rx_sessions"][0]["ip"][0] = user_source_ip + elif direction is None and len(config["interfaces"]) >= 2: + # Loopback: TX interface uses source_ip, RX interface uses destination_ip + if user_source_ip: + config["interfaces"][0]["ip"] = user_source_ip + if user_dest_ip: + config["interfaces"][1]["ip"] = user_dest_ip + if ( + user_dest_ip + and config["tx_sessions"] + and len(config["tx_sessions"]) > 0 + ): + config["tx_sessions"][0]["dip"][0] = user_dest_ip + if ( + user_source_ip + and config["rx_sessions"] + and len(config["rx_sessions"]) > 0 + ): + config["rx_sessions"][0]["ip"][0] = user_source_ip + + # Helper to populate a nested session list for a given type + def _populate_session(is_tx: bool): + if session_type == "st20p": + template = copy.deepcopy( + legacy_cfg.config_tx_st20p_session + if is_tx + else legacy_cfg.config_rx_st20p_session + ) + # Map universal params -> legacy field names + template["width"] = int( + self.universal_params.get("width", template["width"]) + ) + template["height"] = int( + self.universal_params.get("height", template["height"]) + ) + template["fps"] = self.universal_params.get( + "framerate", template["fps"] + ) + template["pacing"] = self.universal_params.get( + "pacing", template["pacing"] + ) + template["packing"] = self.universal_params.get( + "packing", template.get("packing", "BPM") + ) + # pixel_format becomes input_format or output_format + pixel_format = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pixel_format or template.get( + "input_format" + ) + else: + template["output_format"] = pixel_format or template.get( + "output_format" + ) + template["transport_format"] = self.universal_params.get( + "transport_format", template["transport_format"] + ) + if is_tx and self.universal_params.get("input_file"): + template["st20p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st20p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["display"] = self.universal_params.get( + "display", template.get("display", False) + ) + template["enable_rtcp"] = self.universal_params.get( + "enable_rtcp", template.get("enable_rtcp", False) + ) + return template + elif session_type == "st22p": + template = copy.deepcopy( + legacy_cfg.config_tx_st22p_session + if is_tx + else legacy_cfg.config_rx_st22p_session + ) + template["width"] = int( + self.universal_params.get("width", template["width"]) + ) + template["height"] = int( + self.universal_params.get("height", template["height"]) + ) + template["fps"] = self.universal_params.get( + "framerate", template["fps"] + ) + template["codec"] = self.universal_params.get( + "codec", template["codec"] + ) # JPEG-XS etc. + template["quality"] = self.universal_params.get( + "quality", template["quality"] + ) + template["codec_thread_count"] = self.universal_params.get( + "codec_threads", template["codec_thread_count"] + ) + pf = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pf or template.get("input_format") + else: + template["output_format"] = pf or template.get("output_format") + if is_tx and self.universal_params.get("input_file"): + template["st22p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st22p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["enable_rtcp"] = self.universal_params.get( + "enable_rtcp", template.get("enable_rtcp", False) + ) + return template + elif session_type == "st30p": + template = copy.deepcopy( + legacy_cfg.config_tx_st30p_session + if is_tx + else legacy_cfg.config_rx_st30p_session + ) + template["audio_format"] = self.universal_params.get( + "audio_format", template["audio_format"] + ) + template["audio_channel"] = self.universal_params.get( + "audio_channels", template["audio_channel"] + ) + template["audio_sampling"] = self.universal_params.get( + "audio_sampling", template["audio_sampling"] + ) + template["audio_ptime"] = self.universal_params.get( + "audio_ptime", template["audio_ptime"] + ) + if is_tx and self.universal_params.get("input_file"): + template["audio_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["audio_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + return template + + elif session_type == "fastmetadata": + template = copy.deepcopy( + legacy_cfg.config_tx_st41_session + if is_tx + else legacy_cfg.config_rx_st41_session + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["fastmetadata_data_item_type"] = int( + self.universal_params.get( + "fastmetadata_data_item_type", + template["fastmetadata_data_item_type"], + ) + ) + template["fastmetadata_k_bit"] = int( + self.universal_params.get( + "fastmetadata_k_bit", template["fastmetadata_k_bit"] + ) + ) + if is_tx: + template["type"] = self.universal_params.get( + "type_mode", template["type"] + ) + template["fastmetadata_fps"] = self.universal_params.get( + "fastmetadata_fps", template["fastmetadata_fps"] + ) + template["fastmetadata_url"] = self.universal_params.get( + "input_file", template["fastmetadata_url"] + ) + else: + template["fastmetadata_url"] = self.universal_params.get( + "output_file", template.get("fastmetadata_url", "") + ) + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + return template + + else: + # Fallback: reuse st20p layout for unknown session types (minimal support) + template = {"replicas": 1} + return template + + # Populate TX sessions + if direction in (None, "tx"): + st_entry = _populate_session(True) + if st_entry: + config["tx_sessions"][0].setdefault(session_type, []) + config["tx_sessions"][0][session_type].append(st_entry) + # Ensure non-empty video list to force functional validation instead of FPS performance path + placeholder_video = { + "type": "placeholder", + "video_format": "", + "pg_format": "", + } + current_video_list = config["tx_sessions"][0].get("video") + if not current_video_list: + config["tx_sessions"][0]["video"] = [placeholder_video] + elif len(current_video_list) == 0: + current_video_list.append(placeholder_video) + + # Populate RX sessions + if direction in (None, "rx"): + st_entry = _populate_session(False) + if st_entry: + config["rx_sessions"][0].setdefault(session_type, []) + config["rx_sessions"][0][session_type].append(st_entry) + placeholder_video = { + "type": "placeholder", + "video_format": "", + "pg_format": "", + } + current_video_list = config["rx_sessions"][0].get("video") + if not current_video_list: + config["rx_sessions"][0]["video"] = [placeholder_video] + elif len(current_video_list) == 0: + current_video_list.append(placeholder_video) + + # If only TX or only RX requested, clear the other list + if direction == "tx": + config["rx_sessions"] = [] + elif direction == "rx": + config["tx_sessions"] = [] + + return config + + def validate_results(self) -> bool: # type: ignore[override] + """ + Validate execution results exactly like original RxTxApp.execute_test(). + + Matches the validation pattern from mtl_engine/RxTxApp.py: + - For st20p: Check RX output + TX/RX converter creation (NOT tx result lines) + - For st22p: Check RX output only + - For video/audio/etc: Check both TX and RX outputs + + Returns True if validation passes. Raises AssertionError on failure. + """ + + def _fail(msg: str): + try: + log_fail(msg) + except Exception: + logger.error(msg) + raise AssertionError(msg) + + try: + if not self.config: + _fail("RxTxApp validate_results called without config") + + session_type = self._get_session_type_from_config(self.config) + output_lines = self.last_output.split("\n") if self.last_output else [] + rc = getattr(self, "last_return_code", None) + + # 1. Check return code (must be 0 or None for dual-host secondary) + if rc not in (0, None): + _fail(f"Process return code {rc} indicates failure") + + # 2. Validate based on session type - match original RxTxApp.execute_test() logic + passed = True + + if session_type == "st20p": + # Original validation: check_rx_output + check_tx_converter_output + check_rx_converter_output + # Note: Original does NOT check check_tx_output for st20p! + passed = passed and check_rx_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build=None, + ) + + # Import converter check functions if available + try: + from .RxTxApp import ( + check_rx_converter_output, + check_tx_converter_output, + ) + + passed = passed and check_tx_converter_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build="", + ) + + passed = passed and check_rx_converter_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build="", + ) + except ImportError: + # Fallback: if converter checks not available, just check RX + logger.warning( + "Converter check functions not available, using RX check only" + ) + + if not passed: + _fail("st20p validation failed (RX output or converter checks)") + + elif session_type in ("st22p", "st30p", "fastmetadata"): + # Original validation: check_rx_output only (no TX result line for st22p/st30p/fastmetadata) + passed = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not passed: + _fail(f"{session_type} validation failed (RX output check)") + + elif session_type in ("video", "audio", "ancillary"): + # Original validation: check both TX and RX outputs + _tx_ok = check_tx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + _rx_ok = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not (_tx_ok and _rx_ok): + _fail(f"{session_type} validation failed (TX or RX output check)") + + else: + # Unknown session type - default to checking both + logger.warning( + f"Unknown session type {session_type}, using default validation" + ) + _tx_ok = check_tx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + _rx_ok = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not (_tx_ok and _rx_ok): + _fail(f"{session_type} validation failed") + + logger.info(f"RxTxApp validation passed for {session_type}") + return True + + except AssertionError: + # Already handled/logged + raise + except Exception as e: + _fail(f"RxTxApp validation unexpected error: {e}") + + def _get_session_type_from_config(self, config: dict) -> str: + """Extract session type from RxTxApp config.""" + # Inspect nested lists to identify actual session type; legacy layout nests under tx_sessions[i][type] + if not config.get("tx_sessions"): + return "st20p" + for tx_entry in config["tx_sessions"]: + for possible in ( + "st22p", + "st20p", + "st30p", + "fastmetadata", + "video", + "audio", + "ancillary", + ): + if possible in tx_entry and tx_entry[possible]: + return possible + return "st20p" From 1db3b68240bfa0ff1bce85353c18a384e2144360 Mon Sep 17 00:00:00 2001 From: "Wilczynski, Andrzej" Date: Tue, 28 Oct 2025 14:15:59 +0000 Subject: [PATCH 2/3] Added refactored tests for single host, st20p category. Based on new OOP approach. Signed-off-by: Wilczynski, Andrzej --- .../validation/mtl_engine/application_base.py | 14 +- .../st20p/format/test_format_refactored.py | 234 ++++++++++++++++++ .../single/st20p/fps/test_fps_refactored.py | 76 ++++++ .../integrity/test_integrity_refactored.py | 91 +++++++ .../interlace/test_interlace_refactored.py | 53 ++++ .../st20p/pacing/test_pacing_refactored.py | 66 +++++ .../st20p/packing/test_packing_refactored.py | 67 +++++ .../test_resolutions_refactored.py | 70 ++++++ .../test_mode/test_multicast_refactored.py | 66 +++++ 9 files changed, 727 insertions(+), 10 deletions(-) create mode 100644 tests/validation/tests/single/st20p/format/test_format_refactored.py create mode 100644 tests/validation/tests/single/st20p/fps/test_fps_refactored.py create mode 100644 tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py create mode 100644 tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py create mode 100644 tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py create mode 100644 tests/validation/tests/single/st20p/packing/test_packing_refactored.py create mode 100644 tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py create mode 100644 tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py diff --git a/tests/validation/mtl_engine/application_base.py b/tests/validation/mtl_engine/application_base.py index c3023c717..81722f4ad 100644 --- a/tests/validation/mtl_engine/application_base.py +++ b/tests/validation/mtl_engine/application_base.py @@ -8,15 +8,7 @@ from .config.app_mappings import DEFAULT_PAYLOAD_TYPE_CONFIG, DEFAULT_PORT_CONFIG from .config.universal_params import UNIVERSAL_PARAMS - -# Import execution utilities with fallback -try: - from .execute import run - from .RxTxApp import prepare_tcpdump -except ImportError: - # Fallback for direct execution - from execute import run - from RxTxApp import prepare_tcpdump +from .execute import run logger = logging.getLogger(__name__) @@ -216,7 +208,9 @@ def execute_test( and "prepare_tcpdump" in globals() ): try: - prepare_tcpdump(capture_cfg, host) + # prepare_tcpdump not yet implemented; left to change in the future + # prepare_tcpdump(capture_cfg, host) + pass except Exception as e: logger.warning(f"capture setup failed: {e}") proc = self.start_process(cmd, build, test_time, host) diff --git a/tests/validation/tests/single/st20p/format/test_format_refactored.py b/tests/validation/tests/single/st20p/format/test_format_refactored.py new file mode 100644 index 000000000..4071e7e30 --- /dev/null +++ b/tests/validation/tests/single/st20p/format/test_format_refactored.py @@ -0,0 +1,234 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.smoke +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + indirect=["media_file"], + ids=list(yuv_files_422p10le.keys()), +) +def test_422p10le_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """Send files in YUV422PLANAR10LE format converting to transport format YUV_422_10bit""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_format_refactored_{media_file_info['filename']}" + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +# List of supported formats based on st_frame_fmt_from_transport() +pixel_formats = dict( + YUV_422_10bit=("ST20_FMT_YUV_422_10BIT", "YUV422RFC4175PG2BE10"), + YUV_422_8bit=("ST20_FMT_YUV_422_8BIT", "UYVY"), + YUV_422_12bit=("ST20_FMT_YUV_422_12BIT", "YUV422RFC4175PG2BE12"), + YUV_444_10bit=("ST20_FMT_YUV_444_10BIT", "YUV444RFC4175PG4BE10"), + YUV_444_12bit=("ST20_FMT_YUV_444_12BIT", "YUV444RFC4175PG2BE12"), + YUV_420_8bit=("ST20_FMT_YUV_420_8BIT", "YUV420CUSTOM8"), + RGB_8bit=("ST20_FMT_RGB_8BIT", "RGB8"), + RGB_10bit=("ST20_FMT_RGB_10BIT", "RGBRFC4175PG4BE10"), + RGB_12bit=("ST20_FMT_RGB_12BIT", "RGBRFC4175PG2BE12"), + YUV_422_PLANAR10LE=("ST20_FMT_YUV_422_PLANAR10LE", "YUV422PLANAR10LE"), + V210=("ST20_FMT_V210", "V210"), +) + + +# List of supported one-way convertions based on st_frame_get_converter() +convert1_formats = dict( + UYVY="UYVY", + YUV422PLANAR8="YUV422PLANAR8", + YUV420PLANAR8="YUV420PLANAR8", +) + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("format", convert1_formats.keys()) +def test_convert_on_rx_refactored( + hosts, build, media, nic_port_list, test_time, format, media_file +): + """Send file in YUV_422_10bit pixel formats with supported conversion on RX side""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format="YUV422RFC4175PG2BE10", + transport_format="YUV_422_10bit", + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +# List of supported two-way convertions based on st_frame_get_converter() +convert2_formats = dict( + V210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + Y210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + YUV422PLANAR12LE=( + "ST20_FMT_YUV_422_12BIT", + "YUV_422_12bit", + "YUV422RFC4175PG2BE12", + ), + YUV444PLANAR10LE=( + "ST20_FMT_YUV_444_10BIT", + "YUV_444_10bit", + "YUV444RFC4175PG4BE10", + ), + YUV444PLANAR12LE=( + "ST20_FMT_YUV_444_12BIT", + "YUV_444_12bit", + "YUV444RFC4175PG2BE12", + ), + GBRPLANAR10LE=("ST20_FMT_RGB_10BIT", "RGB_10bit", "RGBRFC4175PG4BE10"), + GBRPLANAR12LE=("ST20_FMT_RGB_12BIT", "RGB_12bit", "RGBRFC4175PG2BE12"), +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", convert2_formats.keys()) +def test_tx_rx_conversion_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + media_file, +): + """Send file in different pixel formats with supported two-way conversion on TX and RX""" + media_file_info, media_file_path = media_file + text_format, transport_format, _ = convert2_formats[format] + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format=format, + transport_format=transport_format, + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", pixel_formats.keys()) +def test_formats_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + test_config, + prepare_ramdisk, + media_file, +): + """Send file in different supported pixel formats without conversion during transport""" + media_file_info, media_file_path = media_file + text_format, file_format = pixel_formats[format] + host = list(hosts.values())[0] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_format_refactored_formats_{format}" + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format=file_format, + transport_format=format, + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/fps/test_fps_refactored.py b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py new file mode 100644 index 000000000..072e60e9f --- /dev/null +++ b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["ParkJoy_1080p"]], + indirect=["media_file"], + ids=["ParkJoy_1080p"], +) +@pytest.mark.parametrize( + "fps", + [ + "p23", + "p24", + "p25", + pytest.param("p29", marks=pytest.mark.smoke), + "p30", + "p50", + "p59", + "p60", + "p100", + "p119", + "p120", + ], +) +def test_fps_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + fps, + prepare_ramdisk, + media_file, +): + """Test different frame rates""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port_list": host.vfs, + "test_mode": "multicast", + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": fps, + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_time": test_time, + } + + if fps in ["p30", "p50", "p59", "p60"]: + config_params.update({"pacing": "gap", "tx_no_chain": True}) + elif fps in ["p100", "p119", "p120"]: + config_params.update({"pacing": "linear", "tx_no_chain": True}) + + app.create_command(**config_params) + + actual_test_time = test_time + if fps in ["p30", "p50", "p59", "p60"]: + actual_test_time = max(test_time, 15) + elif fps in ["p100", "p119", "p120"]: + actual_test_time = max(test_time, 10) + + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py new file mode 100644 index 000000000..0df6bbb7f --- /dev/null +++ b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py @@ -0,0 +1,91 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import logging +import os + +import pytest +from mfd_common_libs.log_levels import TEST_PASS +from mtl_engine.const import LOG_FOLDER +from mtl_engine.execute import log_fail +from mtl_engine.integrity import calculate_yuv_frame_size, check_st20p_integrity +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + +logger = logging.getLogger(__name__) + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Penguin_720p"], + yuv_files_422rfc10["Penguin_1080p"], + pytest.param(yuv_files_422p10le["Penguin_720p"], marks=pytest.mark.nightly), + yuv_files_422p10le["Penguin_1080p"], + ], + indirect=["media_file"], + ids=[ + "Penguin_720p_422rfc10", + "Penguin_1080p_422rfc10", + "Penguin_720p_422p10le", + "Penguin_1080p_422p10le", + ], +) +def test_integrity_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test video integrity by comparing input and output files""" + media_file_info, media_file_path = media_file + + log_dir = os.path.join(os.getcwd(), LOG_FOLDER, "latest") + os.makedirs(log_dir, exist_ok=True) + out_file_url = os.path.join(log_dir, "out.yuv") + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + nic_port_list=host.vfs, + source_ip="192.168.17.101", + destination_ip="192.168.17.102", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p25", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + output_file=out_file_url, + test_mode="unicast", + pacing="linear", + test_time=test_time, + ) + + actual_test_time = max(test_time, 8) + app.execute_test(build=build, test_time=actual_test_time, host=host) + + frame_size = calculate_yuv_frame_size( + media_file_info["width"], + media_file_info["height"], + media_file_info["file_format"], + ) + result = check_st20p_integrity( + src_url=media_file_path, out_url=out_file_url, frame_size=frame_size + ) + + if result: + logger.log(TEST_PASS, "INTEGRITY PASS") + else: + log_fail("INTEGRITY FAIL") + raise AssertionError( + "st20p integrity test failed content integrity comparison." + ) diff --git a/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py new file mode 100644 index 000000000..7ecbaaf95 --- /dev/null +++ b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_interlace +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_interlace.values()), + indirect=["media_file"], + ids=list(yuv_files_interlace.keys()), +) +def test_interlace_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test interlaced video transmission""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "interlaced": True, + "pacing": "linear", + "tx_no_chain": False, + "test_time": test_time, + } + + app.create_command(**config_params) + actual_test_time = max(test_time, 10) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py new file mode 100644 index 000000000..c585c1696 --- /dev/null +++ b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize("pacing", ["narrow", "wide", "linear"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_pacing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + pacing, + prepare_ramdisk, + media_file, +): + """Test different pacing modes (narrow, wide, linear)""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "pacing": pacing, + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + config_params["tx_no_chain"] = True if pacing == "linear" else False + actual_test_time = max(test_time, 12) + elif pacing == "narrow": + config_params["tx_no_chain"] = False + actual_test_time = max(test_time, 8) + else: + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/packing/test_packing_refactored.py b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py new file mode 100644 index 000000000..63a3ff5fe --- /dev/null +++ b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize("packing", ["GPM_SL", "GPM"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_packing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + packing, + prepare_ramdisk, + media_file, +): + """Test different packing modes (GPM_SL, GPM)""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "packing": packing, + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + if packing == "GPM_SL": + config_params.update({"tx_no_chain": True, "pacing": "linear"}) + else: + config_params.update({"tx_no_chain": False, "pacing": "wide"}) + actual_test_time = max(test_time, 12) + else: + config_params["pacing"] = "linear" if packing == "GPM_SL" else "narrow" + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py new file mode 100644 index 000000000..6e7dcb7cd --- /dev/null +++ b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py @@ -0,0 +1,70 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422rfc10.values()), + indirect=["media_file"], + ids=list(yuv_files_422rfc10.keys()), +) +def test_resolutions_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test different video resolutions""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "multicast", + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + + if height >= 2160: + config_params.update( + {"pacing": "linear", "packing": "GPM_SL", "tx_no_chain": True} + ) + elif height >= 1080: + config_params.update({"pacing": "wide", "packing": "GPM", "tx_no_chain": False}) + else: + config_params.update( + {"pacing": "narrow", "packing": "GPM", "tx_no_chain": False} + ) + + app.create_command(**config_params) + + actual_test_time = test_time + if height >= 2160: + actual_test_time = max(test_time, 15) + elif height >= 1080: + actual_test_time = max(test_time, 10) + else: + actual_test_time = max(test_time, 8) + + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py new file mode 100644 index 000000000..a84defd5f --- /dev/null +++ b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_multicast_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test multicast transmission mode""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "multicast", + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + config_params.update( + {"pacing": "linear", "packing": "GPM_SL", "tx_no_chain": True} + ) + actual_test_time = max(test_time, 15) + elif height >= 1080: + config_params.update({"pacing": "wide", "packing": "GPM", "tx_no_chain": False}) + actual_test_time = max(test_time, 10) + else: + config_params.update( + {"pacing": "narrow", "packing": "GPM", "tx_no_chain": False} + ) + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) From 57df293a66e4c1b85783ec5ebe39ad8cdce959f2 Mon Sep 17 00:00:00 2001 From: "Wilczynski, Andrzej" Date: Tue, 28 Oct 2025 20:07:44 +0000 Subject: [PATCH 3/3] Added refactored tests for single host, st22p category. Based on new OOP approach. Signed-off-by: Wilczynski, Andrzej --- .../st22p/codec/test_codec_refactored.py | 52 ++++++++++++++ .../st22p/format/test_format_refactored.py | 51 ++++++++++++++ .../single/st22p/fps/test_fps_refactored.py | 68 +++++++++++++++++++ .../interlace/test_interlace_refactored.py | 50 ++++++++++++++ .../st22p/quality/test_quality_refactored.py | 67 ++++++++++++++++++ .../test_mode/test_unicast_refactored.py | 49 +++++++++++++ 6 files changed, 337 insertions(+) create mode 100644 tests/validation/tests/single/st22p/codec/test_codec_refactored.py create mode 100644 tests/validation/tests/single/st22p/format/test_format_refactored.py create mode 100644 tests/validation/tests/single/st22p/fps/test_fps_refactored.py create mode 100644 tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py create mode 100644 tests/validation/tests/single/st22p/quality/test_quality_refactored.py create mode 100644 tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py diff --git a/tests/validation/tests/single/st22p/codec/test_codec_refactored.py b/tests/validation/tests/single/st22p/codec/test_codec_refactored.py new file mode 100644 index 000000000..be49876db --- /dev/null +++ b/tests/validation/tests/single/st22p/codec/test_codec_refactored.py @@ -0,0 +1,52 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("codec", ["JPEG-XS", "H264_CBR"]) +def test_codec_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + codec, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec=codec, + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/format/test_format_refactored.py b/tests/validation/tests/single/st22p/format/test_format_refactored.py new file mode 100644 index 000000000..cb236be29 --- /dev/null +++ b/tests/validation/tests/single/st22p/format/test_format_refactored.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + ids=list(yuv_files_422p10le.keys()), +) +def test_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/fps/test_fps_refactored.py b/tests/validation/tests/single/st22p/fps/test_fps_refactored.py new file mode 100644 index 000000000..3e39e63d9 --- /dev/null +++ b/tests/validation/tests/single/st22p/fps/test_fps_refactored.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize( + "fps", + [ + "p23", + "p24", + pytest.param("p25", marks=pytest.mark.nightly), + "p29", + "p30", + "p50", + "p59", + "p60", + "p100", + "p119", + "p120", + ], +) +@pytest.mark.parametrize("codec", ["JPEG-XS", "H264_CBR"]) +def test_fps_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + fps, + codec, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=fps, + codec=codec, + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=16, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py b/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py new file mode 100644 index 000000000..7c5473a0e --- /dev/null +++ b/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_interlace +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + list(yuv_files_interlace.values()), + ids=list(yuv_files_interlace.keys()), +) +def test_interlace_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + interlaced=True, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/quality/test_quality_refactored.py b/tests/validation/tests/single/st22p/quality/test_quality_refactored.py new file mode 100644 index 000000000..4ebfcdf0e --- /dev/null +++ b/tests/validation/tests/single/st22p/quality/test_quality_refactored.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os +import shutil + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("quality", ["quality", "speed"]) +@pytest.mark.nightly +def test_quality_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + quality, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + # Ensure kahawai.json (plugin configuration) is available in build cwd so st22 encoder can load plugins + kahawai_src = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../../../../..", "kahawai.json") + ) + kahawai_dst = os.path.join(build, "kahawai.json") + try: + if os.path.exists(kahawai_src) and not os.path.exists(kahawai_dst): + shutil.copy2(kahawai_src, kahawai_dst) + except Exception as e: + print(f"Warning: failed to stage kahawai.json into build dir: {e}") + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality=quality, + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + result = app.execute_test(build=build, test_time=test_time, host=host) + # Enforce result to avoid silent pass when validation fails + assert ( + result + ), "Refactored st22p quality test failed validation (TX/RX outputs or return code)." diff --git a/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py b/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py new file mode 100644 index 000000000..e26d39609 --- /dev/null +++ b/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +def test_unicast_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="unicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host)