diff --git a/tests/common/mellanox_data.py b/tests/common/mellanox_data.py index 679e053110..8f7f237485 100644 --- a/tests/common/mellanox_data.py +++ b/tests/common/mellanox_data.py @@ -1,5 +1,7 @@ import functools - +import pytest +import logging +logger = logging.getLogger(__name__) SPC1_HWSKUS = ["ACS-MSN2700", "Mellanox-SN2700", "Mellanox-SN2700-D48C8", "ACS-MSN2740", "ACS-MSN2100", "ACS-MSN2410", "ACS-MSN2010", "ACS-SN2201"] @@ -1249,3 +1251,79 @@ def get_hardware_version(duthost, platform): def get_hw_management_version(duthost): full_version = duthost.shell('dpkg-query --showformat=\'${Version}\' --show hw-management')['stdout'] return full_version[len('1.mlnx.'):] + + +def is_pinewave_module(port_info): + """ Check if the given port info indicates an pinewave module and handle known issues """ + return 'PINEWAVE' in port_info.get('manufacturer', '') + + +def is_unsupported_module(port_info, port_number): + if is_pinewave_module(port_info): + logger.info(f"Port {port_number} has an unsupported module, skipping it and continue to check other ports") + return True + return False + + +def skip_on_unsupported_module(): + pytest.skip("All ports are with unsupported modules, skipping the test due to Github issue #21878") + + +def is_cmis_version_supported(cmis_version, min_required_version=5.0, failed_api_ports=None, port_name=None): + """ + Check if a CMIS version supports a specific feature by comparing it to a minimum required version + @param: cmis_version: CMIS version string (e.g., "5.0", "4.0", etc.) + @param: min_required_version: Minimum required CMIS version (default: 5.0) + @param: failed_api_ports: List to append failed ports to (optional) + @param: port_name: Port name to append to failed list if version check fails (optional) + @return: bool: True if CMIS version is supported, False otherwise + """ + try: + cmis_version_float = float(cmis_version) + return cmis_version_float >= min_required_version + except (ValueError, TypeError): + if failed_api_ports is not None and port_name is not None: + failed_api_ports.append(port_name) + return False + + +def get_supported_available_optical_interfaces(eeprom_infos, parsed_presence, + min_cmis_version=5.0, return_failed_api_ports=False): + """ + Filter available optical interfaces based on presence, EEPROM detection, media type, and CMIS version support + @param: eeprom_infos: Dictionary containing EEPROM information for each port + @param: parsed_presence: Dictionary containing presence status for each port + @param: min_cmis_version: Minimum required CMIS version (default: 5.0) + @param: return_failed_api_ports: If True, return both available_optical_interfaces and failed_api_ports. + If False, return only available_optical_interfaces (default: False) + @return: list or tuple: If return_failed_api_ports=False, returns list of available optical interface names. + If return_failed_api_ports=True, returns (available_optical_interfaces, failed_api_ports) + """ + available_optical_interfaces = [] + failed_api_ports = [] + + for port_name, eeprom_info in eeprom_infos.items(): + if parsed_presence.get(port_name) != "Present": + continue + if "SFP EEPROM detected" not in eeprom_info[port_name]: + continue + media_technology = eeprom_info.get("Media Interface Technology", "N/A").upper() + if "COPPER" in media_technology: + continue + if "N/A" in media_technology: + failed_api_ports.append(port_name) + continue + cmis_version = eeprom_info.get("CMIS Revision", "N/A") + if "N/A" in cmis_version: + failed_api_ports.append(port_name) + continue + elif not is_cmis_version_supported(cmis_version, min_cmis_version, failed_api_ports, port_name): + logging.info(f"Port {port_name} skipped: CMIS not supported on this port.") + continue + + available_optical_interfaces.append(port_name) + + if return_failed_api_ports: + return available_optical_interfaces, failed_api_ports + else: + return available_optical_interfaces diff --git a/tests/conftest.py b/tests/conftest.py index 3eee081a62..58ee8555a9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -238,6 +238,12 @@ def pytest_addoption(parser): ################################# parser.addoption("--run-stress-tests", action="store_true", default=False, help="Run only tests stress tests") + ################################# + # Port error test options # + ################################# + parser.addoption("--collected-ports-num", action="store", default=5, type=int, + help="Number of ports to collect for testing (default: 5)") + def pytest_configure(config): if config.getoption("enable_macsec"): diff --git a/tests/layer1/test_port_error.py b/tests/layer1/test_port_error.py new file mode 100644 index 0000000000..deb92bb3cb --- /dev/null +++ b/tests/layer1/test_port_error.py @@ -0,0 +1,214 @@ +import logging +import pytest +import random +import time +import os +import re + +from tests.common.helpers.assertions import pytest_assert +from tests.common.utilities import skip_release +from tests.common.platform.transceiver_utils import parse_sfp_eeprom_infos +from tests.common.mellanox_data import get_supported_available_optical_interfaces +from tests.common.utilities import wait_until + +pytestmark = [ + pytest.mark.disable_loganalyzer, # disable automatic loganalyzer + pytest.mark.topology('any') +] + +SUPPORTED_PLATFORMS = ["arista_7060x6", "nvidia_sn5640", "nvidia_sn5600"] +cmd_sfp_presence = "sudo sfpshow presence" + + +@pytest.fixture(scope="session") +def collected_ports_num(request): + """ + Fixture to get the number of ports to collect from command line argument + """ + return request.config.getoption("--collected-ports-num") + + +class TestMACFault(object): + @pytest.fixture(scope="class", autouse=True) + def is_supported_nvidia_platform_with_sw_control_disabled(self, duthost): + return 'nvidia' in duthost.facts['platform'].lower() and not self.is_sw_control_feature_enabled(duthost) + + @pytest.fixture(scope="class", autouse=True) + def is_supported_nvidia_platform_with_sw_control_enabled(self, duthost): + return 'nvidia' in duthost.facts['platform'].lower() and self.is_sw_control_feature_enabled(duthost) + + @pytest.fixture(scope="class", autouse=True) + def is_supported_platform(self, duthost, tbinfo, is_supported_nvidia_platform_with_sw_control_disabled): + if 'ptp' not in tbinfo['topo']['name']: + pytest.skip("Skipping test: Not applicable for non-PTP topology") + + if any(platform in duthost.facts['platform'] for platform in SUPPORTED_PLATFORMS): + skip_release(duthost, ["201811", "201911", "202012", "202205", "202211", "202305", "202405"]) + else: + pytest.skip("DUT has platform {}, test is not supported".format(duthost.facts['platform'])) + + if is_supported_nvidia_platform_with_sw_control_disabled: + pytest.skip("SW control feature is not enabled on Nvidia platform") + + @staticmethod + def get_mac_fault_count(dut, interface, fault_type): + output = dut.show_and_parse("show int errors {}".format(interface)) + logging.info("Raw output for show int errors on {}: {}".format(interface, output)) + + fault_count = 0 + for error_info in output: + if error_info['port errors'] == fault_type: + fault_count = int(error_info['count']) + break + + logging.info("{} count on {}: {}".format(fault_type, interface, fault_count)) + return fault_count + + @staticmethod + def get_interface_status(dut, interface): + return dut.show_and_parse("show interfaces status {}".format(interface))[0].get("oper", "unknown") + + @pytest.fixture(scope="class", autouse=True) + def reboot_dut(self, duthosts, localhost, enum_rand_one_per_hwsku_frontend_hostname): + from tests.common.reboot import reboot + reboot(duthosts[enum_rand_one_per_hwsku_frontend_hostname], + localhost, safe_reboot=True, check_intf_up_ports=True) + + @pytest.fixture(scope="class") + def get_dut_and_supported_available_optical_interfaces(self, duthosts, enum_rand_one_per_hwsku_frontend_hostname, + is_supported_nvidia_platform_with_sw_control_enabled): + dut = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + + sfp_presence = dut.command(cmd_sfp_presence) + parsed_presence = {line.split()[0]: line.split()[1] for line in sfp_presence["stdout_lines"][2:]} + supported_available_optical_interfaces = [] + failed_api_ports = [] + + if is_supported_nvidia_platform_with_sw_control_enabled: + + eeprom_infos = dut.shell("sudo sfputil show eeprom -d")['stdout'] + eeprom_infos = parse_sfp_eeprom_infos(eeprom_infos) + + supported_available_optical_interfaces, failed_api_ports = ( + get_supported_available_optical_interfaces( + eeprom_infos, parsed_presence, return_failed_api_ports=True + ) + ) + pytest_assert(supported_available_optical_interfaces, + "No interfaces with SFP detected. Cannot proceed with tests.") + logging.info("Available Optical interfaces for tests: {}".format(supported_available_optical_interfaces)) + else: + interfaces = list(dut.show_and_parse("show interfaces status")) + supported_available_optical_interfaces = [ + intf["interface"] for intf in interfaces + if parsed_presence.get(intf["interface"]) == "Present" + ] + + pytest_assert(supported_available_optical_interfaces, + "No interfaces with SFP detected. Cannot proceed with tests.") + + return dut, supported_available_optical_interfaces, failed_api_ports + + def is_sw_control_feature_enabled(self, duthost): + """ + Check if SW control feature is enabled. + """ + try: + platform_name = duthost.facts['platform'] + hwsku = duthost.facts.get('hwsku', '') + sai_profile_path = os.path.join('/usr/share/sonic/device', platform_name, hwsku, 'sai.profile') + cmd = duthost.shell('cat {}'.format(sai_profile_path), module_ignore_errors=True) + if cmd['rc'] == 0 and 'SAI_INDEPENDENT_MODULE_MODE' in cmd['stdout']: + sc_enabled = re.search(r"SAI_INDEPENDENT_MODULE_MODE=(\d?)", cmd['stdout']) + if sc_enabled and sc_enabled.group(1) == '1': + return True + except Exception as e: + logging.error("Error checking SW control feature on Nvidia platform: {}".format(e)) + return False + + def shutdown_and_startup_interfaces(self, dut, interface): + dut.command("sudo config interface shutdown {}".format(interface)) + pytest_assert(wait_until(30, 2, 0, lambda: self.get_interface_status(dut, interface) == "down"), + "Interface {} did not go down after shutdown".format(interface)) + + dut.command("sudo config interface startup {}".format(interface)) + pytest_assert(wait_until(30, 2, 0, lambda: self.get_interface_status(dut, interface) == "up"), + "Interface {} did not come up after startup".format(interface)) + + def test_mac_local_fault_increment(self, get_dut_and_supported_available_optical_interfaces, + collected_ports_num): + dut, supported_available_optical_interfaces, failed_api_ports = ( + get_dut_and_supported_available_optical_interfaces() + ) + selected_interfaces = random.sample(supported_available_optical_interfaces, + min(collected_ports_num, len(supported_available_optical_interfaces))) + logging.info("Selected interfaces for tests: {}".format(selected_interfaces)) + + for interface in selected_interfaces: + self.shutdown_and_startup_interfaces(dut, interface) + + pytest_assert(self.get_interface_status(dut, interface) == "up", + "Interface {} was not up before disabling/enabling rx-output using sfputil".format(interface)) + + local_fault_before = self.get_mac_fault_count(dut, interface, "mac local fault") + logging.info("Initial MAC local fault count on {}: {}".format(interface, local_fault_before)) + + dut.shell("sudo sfputil debug rx-output {} disable".format(interface)) + time.sleep(5) + pytest_assert(self.get_interface_status(dut, interface) == "down", + "Interface {iface} did not go down after 'sudo sfputil debug rx-output {iface} disable'" + .format(iface=interface)) + + dut.shell("sudo sfputil debug rx-output {} enable".format(interface)) + time.sleep(20) + pytest_assert(self.get_interface_status(dut, interface) == "up", + "Interface {iface} did not come up after 'sudo sfputil debug rx-output {iface} enable'" + .format(iface=interface)) + + local_fault_after = self.get_mac_fault_count(dut, interface, "mac local fault") + logging.info("MAC local fault count after disabling/enabling rx-output using sfputil {}: {}".format( + interface, local_fault_after)) + + pytest_assert(local_fault_after > local_fault_before, + "MAC local fault count did not increment after disabling/enabling rx-output on the device") + + pytest_assert(len(failed_api_ports) == 0, "Interfaces with failed API ports: {}".format(failed_api_ports)) + + def test_mac_remote_fault_increment(self, get_dut_and_supported_available_optical_interfaces, collected_ports_num): + dut, supported_available_optical_interfaces, failed_api_ports = ( + get_dut_and_supported_available_optical_interfaces() + ) + selected_interfaces = random.sample(supported_available_optical_interfaces, + min(collected_ports_num, len(supported_available_optical_interfaces))) + logging.info("Selected interfaces for tests: {}".format(selected_interfaces)) + + for interface in selected_interfaces: + self.shutdown_and_startup_interfaces(dut, interface) + + pytest_assert(self.get_interface_status(dut, interface) == "up", + "Interface {} was not up before disabling/enabling tx-output using sfputil".format(interface)) + + remote_fault_before = self.get_mac_fault_count(dut, interface, "mac remote fault") + logging.info("Initial MAC remote fault count on {}: {}".format(interface, remote_fault_before)) + + dut.shell("sudo sfputil debug tx-output {} disable".format(interface)) + time.sleep(5) + pytest_assert(self.get_interface_status(dut, interface) == "down", + "Interface {iface} did not go down after 'sudo sfputil debug tx-output {iface} disable'" + .format(iface=interface)) + + dut.shell("sudo sfputil debug tx-output {} enable".format(interface)) + time.sleep(20) + + pytest_assert(self.get_interface_status(dut, interface) == "up", + "Interface {iface} did not come up after 'sudo sfputil debug tx-output {iface} enable'" + .format(iface=interface)) + + remote_fault_after = self.get_mac_fault_count(dut, interface, "mac remote fault") + logging.info("MAC remote fault count after disabling/enabling tx-output using sfputil {}: {}".format( + interface, remote_fault_after)) + + pytest_assert(remote_fault_after > remote_fault_before, + "MAC remote fault count did not increment after disabling/enabling tx-output on the device") + + pytest_assert(len(failed_api_ports) == 0, "Interfaces with failed API ports: {}".format(failed_api_ports))