Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion tests/common/mellanox_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import functools

import pytest
import logging
logger = logging.getLogger(__name__)

SPC1_HWSKUS = ["ACS-MSN2700", "Mellanox-SN2700", "Mellanox-SN2700-D48C8", "ACS-MSN2740", "ACS-MSN2100", "ACS-MSN2410",
"ACS-MSN2010", "ACS-SN2201"]
Expand Down Expand Up @@ -1249,3 +1251,79 @@ def get_hardware_version(duthost, platform):
def get_hw_management_version(duthost):
full_version = duthost.shell('dpkg-query --showformat=\'${Version}\' --show hw-management')['stdout']
return full_version[len('1.mlnx.'):]


def is_pinewave_module(port_info):
""" Check if the given port info indicates an pinewave module and handle known issues """
return 'PINEWAVE' in port_info.get('manufacturer', '')


def is_unsupported_module(port_info, port_number):
if is_pinewave_module(port_info):
logger.info(f"Port {port_number} has an unsupported module, skipping it and continue to check other ports")
return True
return False


def skip_on_unsupported_module():
pytest.skip("All ports are with unsupported modules, skipping the test due to Github issue #21878")


def is_cmis_version_supported(cmis_version, min_required_version=5.0, failed_api_ports=None, port_name=None):
"""
Check if a CMIS version supports a specific feature by comparing it to a minimum required version
@param: cmis_version: CMIS version string (e.g., "5.0", "4.0", etc.)
@param: min_required_version: Minimum required CMIS version (default: 5.0)
@param: failed_api_ports: List to append failed ports to (optional)
@param: port_name: Port name to append to failed list if version check fails (optional)
@return: bool: True if CMIS version is supported, False otherwise
"""
try:
cmis_version_float = float(cmis_version)
return cmis_version_float >= min_required_version
except (ValueError, TypeError):
if failed_api_ports is not None and port_name is not None:
failed_api_ports.append(port_name)
return False


def get_supported_available_optical_interfaces(eeprom_infos, parsed_presence,
min_cmis_version=5.0, return_failed_api_ports=False):
"""
Filter available optical interfaces based on presence, EEPROM detection, media type, and CMIS version support
@param: eeprom_infos: Dictionary containing EEPROM information for each port
@param: parsed_presence: Dictionary containing presence status for each port
@param: min_cmis_version: Minimum required CMIS version (default: 5.0)
@param: return_failed_api_ports: If True, return both available_optical_interfaces and failed_api_ports.
If False, return only available_optical_interfaces (default: False)
@return: list or tuple: If return_failed_api_ports=False, returns list of available optical interface names.
If return_failed_api_ports=True, returns (available_optical_interfaces, failed_api_ports)
"""
available_optical_interfaces = []
failed_api_ports = []

for port_name, eeprom_info in eeprom_infos.items():
if parsed_presence.get(port_name) != "Present":
continue
if "SFP EEPROM detected" not in eeprom_info[port_name]:
continue
media_technology = eeprom_info.get("Media Interface Technology", "N/A").upper()
if "COPPER" in media_technology:
continue
if "N/A" in media_technology:
failed_api_ports.append(port_name)
continue
cmis_version = eeprom_info.get("CMIS Revision", "N/A")
if "N/A" in cmis_version:
failed_api_ports.append(port_name)
continue
elif not is_cmis_version_supported(cmis_version, min_cmis_version, failed_api_ports, port_name):
logging.info(f"Port {port_name} skipped: CMIS not supported on this port.")
continue

available_optical_interfaces.append(port_name)

if return_failed_api_ports:
return available_optical_interfaces, failed_api_ports
else:
return available_optical_interfaces
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ def pytest_addoption(parser):
#################################
parser.addoption("--run-stress-tests", action="store_true", default=False, help="Run only tests stress tests")

#################################
# Port error test options #
#################################
parser.addoption("--collected-ports-num", action="store", default=5, type=int,
help="Number of ports to collect for testing (default: 5)")


def pytest_configure(config):
if config.getoption("enable_macsec"):
Expand Down
214 changes: 214 additions & 0 deletions tests/layer1/test_port_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import logging
import pytest
import random
import time
import os
import re

from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import skip_release
from tests.common.platform.transceiver_utils import parse_sfp_eeprom_infos
from tests.common.mellanox_data import get_supported_available_optical_interfaces
from tests.common.utilities import wait_until

pytestmark = [
pytest.mark.disable_loganalyzer, # disable automatic loganalyzer
pytest.mark.topology('any')
]

SUPPORTED_PLATFORMS = ["arista_7060x6", "nvidia_sn5640", "nvidia_sn5600"]
cmd_sfp_presence = "sudo sfpshow presence"


@pytest.fixture(scope="session")
def collected_ports_num(request):
"""
Fixture to get the number of ports to collect from command line argument
"""
return request.config.getoption("--collected-ports-num")


class TestMACFault(object):
@pytest.fixture(scope="class", autouse=True)
def is_supported_nvidia_platform_with_sw_control_disabled(self, duthost):
return 'nvidia' in duthost.facts['platform'].lower() and not self.is_sw_control_feature_enabled(duthost)

@pytest.fixture(scope="class", autouse=True)
def is_supported_nvidia_platform_with_sw_control_enabled(self, duthost):
return 'nvidia' in duthost.facts['platform'].lower() and self.is_sw_control_feature_enabled(duthost)

@pytest.fixture(scope="class", autouse=True)
def is_supported_platform(self, duthost, tbinfo, is_supported_nvidia_platform_with_sw_control_disabled):
if 'ptp' not in tbinfo['topo']['name']:
pytest.skip("Skipping test: Not applicable for non-PTP topology")

if any(platform in duthost.facts['platform'] for platform in SUPPORTED_PLATFORMS):
skip_release(duthost, ["201811", "201911", "202012", "202205", "202211", "202305", "202405"])
else:
pytest.skip("DUT has platform {}, test is not supported".format(duthost.facts['platform']))

if is_supported_nvidia_platform_with_sw_control_disabled:
pytest.skip("SW control feature is not enabled on Nvidia platform")

@staticmethod
def get_mac_fault_count(dut, interface, fault_type):
output = dut.show_and_parse("show int errors {}".format(interface))
logging.info("Raw output for show int errors on {}: {}".format(interface, output))

fault_count = 0
for error_info in output:
if error_info['port errors'] == fault_type:
fault_count = int(error_info['count'])
break

logging.info("{} count on {}: {}".format(fault_type, interface, fault_count))
return fault_count

@staticmethod
def get_interface_status(dut, interface):
return dut.show_and_parse("show interfaces status {}".format(interface))[0].get("oper", "unknown")

@pytest.fixture(scope="class", autouse=True)
def reboot_dut(self, duthosts, localhost, enum_rand_one_per_hwsku_frontend_hostname):
from tests.common.reboot import reboot
reboot(duthosts[enum_rand_one_per_hwsku_frontend_hostname],
localhost, safe_reboot=True, check_intf_up_ports=True)

@pytest.fixture(scope="class")
def get_dut_and_supported_available_optical_interfaces(self, duthosts, enum_rand_one_per_hwsku_frontend_hostname,
is_supported_nvidia_platform_with_sw_control_enabled):
dut = duthosts[enum_rand_one_per_hwsku_frontend_hostname]

sfp_presence = dut.command(cmd_sfp_presence)
parsed_presence = {line.split()[0]: line.split()[1] for line in sfp_presence["stdout_lines"][2:]}
supported_available_optical_interfaces = []
failed_api_ports = []

if is_supported_nvidia_platform_with_sw_control_enabled:

eeprom_infos = dut.shell("sudo sfputil show eeprom -d")['stdout']
eeprom_infos = parse_sfp_eeprom_infos(eeprom_infos)

supported_available_optical_interfaces, failed_api_ports = (
get_supported_available_optical_interfaces(
eeprom_infos, parsed_presence, return_failed_api_ports=True
)
)
pytest_assert(supported_available_optical_interfaces,
"No interfaces with SFP detected. Cannot proceed with tests.")
logging.info("Available Optical interfaces for tests: {}".format(supported_available_optical_interfaces))
else:
interfaces = list(dut.show_and_parse("show interfaces status"))
supported_available_optical_interfaces = [
intf["interface"] for intf in interfaces
if parsed_presence.get(intf["interface"]) == "Present"
]

pytest_assert(supported_available_optical_interfaces,
"No interfaces with SFP detected. Cannot proceed with tests.")

return dut, supported_available_optical_interfaces, failed_api_ports

def is_sw_control_feature_enabled(self, duthost):
"""
Check if SW control feature is enabled.
"""
try:
platform_name = duthost.facts['platform']
hwsku = duthost.facts.get('hwsku', '')
sai_profile_path = os.path.join('/usr/share/sonic/device', platform_name, hwsku, 'sai.profile')
cmd = duthost.shell('cat {}'.format(sai_profile_path), module_ignore_errors=True)
if cmd['rc'] == 0 and 'SAI_INDEPENDENT_MODULE_MODE' in cmd['stdout']:
sc_enabled = re.search(r"SAI_INDEPENDENT_MODULE_MODE=(\d?)", cmd['stdout'])
if sc_enabled and sc_enabled.group(1) == '1':
return True
except Exception as e:
logging.error("Error checking SW control feature on Nvidia platform: {}".format(e))
return False

def shutdown_and_startup_interfaces(self, dut, interface):
dut.command("sudo config interface shutdown {}".format(interface))
pytest_assert(wait_until(30, 2, 0, lambda: self.get_interface_status(dut, interface) == "down"),
"Interface {} did not go down after shutdown".format(interface))

dut.command("sudo config interface startup {}".format(interface))
pytest_assert(wait_until(30, 2, 0, lambda: self.get_interface_status(dut, interface) == "up"),
"Interface {} did not come up after startup".format(interface))

def test_mac_local_fault_increment(self, get_dut_and_supported_available_optical_interfaces,
collected_ports_num):
dut, supported_available_optical_interfaces, failed_api_ports = (
get_dut_and_supported_available_optical_interfaces()
)
selected_interfaces = random.sample(supported_available_optical_interfaces,
min(collected_ports_num, len(supported_available_optical_interfaces)))
logging.info("Selected interfaces for tests: {}".format(selected_interfaces))

for interface in selected_interfaces:
self.shutdown_and_startup_interfaces(dut, interface)

pytest_assert(self.get_interface_status(dut, interface) == "up",
"Interface {} was not up before disabling/enabling rx-output using sfputil".format(interface))

local_fault_before = self.get_mac_fault_count(dut, interface, "mac local fault")
logging.info("Initial MAC local fault count on {}: {}".format(interface, local_fault_before))

dut.shell("sudo sfputil debug rx-output {} disable".format(interface))
time.sleep(5)
pytest_assert(self.get_interface_status(dut, interface) == "down",
"Interface {iface} did not go down after 'sudo sfputil debug rx-output {iface} disable'"
.format(iface=interface))

dut.shell("sudo sfputil debug rx-output {} enable".format(interface))
time.sleep(20)
pytest_assert(self.get_interface_status(dut, interface) == "up",
"Interface {iface} did not come up after 'sudo sfputil debug rx-output {iface} enable'"
.format(iface=interface))

local_fault_after = self.get_mac_fault_count(dut, interface, "mac local fault")
logging.info("MAC local fault count after disabling/enabling rx-output using sfputil {}: {}".format(
interface, local_fault_after))

pytest_assert(local_fault_after > local_fault_before,
"MAC local fault count did not increment after disabling/enabling rx-output on the device")

pytest_assert(len(failed_api_ports) == 0, "Interfaces with failed API ports: {}".format(failed_api_ports))

def test_mac_remote_fault_increment(self, get_dut_and_supported_available_optical_interfaces, collected_ports_num):
dut, supported_available_optical_interfaces, failed_api_ports = (
get_dut_and_supported_available_optical_interfaces()
)
selected_interfaces = random.sample(supported_available_optical_interfaces,
min(collected_ports_num, len(supported_available_optical_interfaces)))
logging.info("Selected interfaces for tests: {}".format(selected_interfaces))

for interface in selected_interfaces:
self.shutdown_and_startup_interfaces(dut, interface)

pytest_assert(self.get_interface_status(dut, interface) == "up",
"Interface {} was not up before disabling/enabling tx-output using sfputil".format(interface))

remote_fault_before = self.get_mac_fault_count(dut, interface, "mac remote fault")
logging.info("Initial MAC remote fault count on {}: {}".format(interface, remote_fault_before))

dut.shell("sudo sfputil debug tx-output {} disable".format(interface))
time.sleep(5)
pytest_assert(self.get_interface_status(dut, interface) == "down",
"Interface {iface} did not go down after 'sudo sfputil debug tx-output {iface} disable'"
.format(iface=interface))

dut.shell("sudo sfputil debug tx-output {} enable".format(interface))
time.sleep(20)

pytest_assert(self.get_interface_status(dut, interface) == "up",
"Interface {iface} did not come up after 'sudo sfputil debug tx-output {iface} enable'"
.format(iface=interface))

remote_fault_after = self.get_mac_fault_count(dut, interface, "mac remote fault")
logging.info("MAC remote fault count after disabling/enabling tx-output using sfputil {}: {}".format(
interface, remote_fault_after))

pytest_assert(remote_fault_after > remote_fault_before,
"MAC remote fault count did not increment after disabling/enabling tx-output on the device")

pytest_assert(len(failed_api_ports) == 0, "Interfaces with failed API ports: {}".format(failed_api_ports))
Loading