diff --git a/doc/Command-Reference.md b/doc/Command-Reference.md index 8b7a9ba8a5..8ac3e2cb5c 100644 --- a/doc/Command-Reference.md +++ b/doc/Command-Reference.md @@ -3070,19 +3070,19 @@ This command is the standard CMIS diagnostic control used for troubleshooting li - Usage: ``` - sfputil debug loopback PORT_NAME LOOPBACK_MODE + sfputil debug loopback PORT_NAME LOOPBACK_MODE - Set the loopback mode + Valid values for loopback mode host-side-input: host side input loopback mode host-side-output: host side output loopback mode media-side-input: media side input loopback mode media-side-output: media side output loopback mode - none: disable loopback mode ``` - Example: ``` - admin@sonic:~$ sfputil debug loopback Ethernet88 host-side-input + admin@sonic:~$ sfputil debug loopback Ethernet88 host-side-input enable + admin@sonic:~$ sfputil debug loopback Ethernet88 media-side-output disable ``` ## DHCP Relay diff --git a/sfputil/main.py b/sfputil/main.py index 309d5c98dd..2f669bbea5 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -18,7 +18,7 @@ import sonic_platform import sonic_platform_base.sonic_sfp.sfputilhelper from sonic_platform_base.sfp_base import SfpBase -from swsscommon.swsscommon import SonicV2Connector +from swsscommon.swsscommon import SonicV2Connector, ConfigDBConnector from natsort import natsorted from sonic_py_common import device_info, logger, multi_asic from utilities_common.sfp_helper import covert_application_advertisement_to_output_string @@ -1911,11 +1911,12 @@ def debug(): # 'loopback' subcommand @debug.command() -@click.argument('port_name', required=True, default=None) -@click.argument('loopback_mode', required=True, default="none", - type=click.Choice(["none", "host-side-input", "host-side-output", +@click.argument('port_name', required=True) +@click.argument('loopback_mode', required=True, + type=click.Choice(["host-side-input", "host-side-output", "media-side-input", "media-side-output"])) -def loopback(port_name, loopback_mode): +@click.argument('enable', required=True, type=click.Choice(["enable", "disable"])) +def loopback(port_name, loopback_mode, enable): """Set module diagnostic loopback mode """ physical_port = logical_port_to_physical_port_index(port_name) @@ -1935,17 +1936,82 @@ def loopback(port_name, loopback_mode): click.echo("{}: This functionality is not implemented".format(port_name)) sys.exit(ERROR_NOT_IMPLEMENTED) + namespace = multi_asic.get_namespace_for_port(port_name) + config_db = ConfigDBConnector(use_unix_socket_path=True, namespace=namespace) + if config_db is not None: + config_db.connect() + try: + subport = int(config_db.get(config_db.CONFIG_DB, f'PORT|{port_name}', 'subport')) + except TypeError: + click.echo(f"{port_name}: subport is not present in CONFIG_DB") + sys.exit(EXIT_FAIL) + + # If subport is set to 0, assign a default value of 1 to ensure valid subport configuration + if subport == 0: + subport = 1 + else: + click.echo(f"{port_name}: Failed to connect to CONFIG_DB") + sys.exit(EXIT_FAIL) + + state_db = SonicV2Connector(use_unix_socket_path=False, namespace=namespace) + if state_db is not None: + state_db.connect(state_db.STATE_DB) + try: + host_lane_count = int(state_db.get(state_db.STATE_DB, + f'TRANSCEIVER_INFO|{port_name}', + 'host_lane_count')) + except TypeError: + click.echo(f"{port_name}: host_lane_count is not present in STATE_DB") + sys.exit(EXIT_FAIL) + + try: + media_lane_count = int(state_db.get(state_db.STATE_DB, + f'TRANSCEIVER_INFO|{port_name}', + 'media_lane_count')) + except TypeError: + click.echo(f"{port_name}: media_lane_count is not present in STATE_DB") + sys.exit(EXIT_FAIL) + else: + click.echo(f"{port_name}: Failed to connect to STATE_DB") + sys.exit(EXIT_FAIL) + + if 'host-side' in loopback_mode: + lane_mask = get_subport_lane_mask(subport, host_lane_count) + elif 'media-side' in loopback_mode: + lane_mask = get_subport_lane_mask(subport, media_lane_count) + else: + lane_mask = 0 + try: - status = api.set_loopback_mode(loopback_mode) + status = api.set_loopback_mode(loopback_mode, + lane_mask=lane_mask, + enable=enable == 'enable') except AttributeError: click.echo("{}: Set loopback mode is not applicable for this module".format(port_name)) sys.exit(ERROR_NOT_IMPLEMENTED) + except TypeError: + click.echo("{}: Set loopback mode failed. Parameter is not supported".format(port_name)) + sys.exit(EXIT_FAIL) if status: - click.echo("{}: Set {} loopback".format(port_name, loopback_mode)) + click.echo("{}: {} {} loopback".format(port_name, enable, loopback_mode)) else: - click.echo("{}: Set {} loopback failed".format(port_name, loopback_mode)) + click.echo("{}: {} {} loopback failed".format(port_name, enable, loopback_mode)) sys.exit(EXIT_FAIL) + +def get_subport_lane_mask(subport, lane_count): + """Get the lane mask for the given subport and lane count + + Args: + subport (int): Subport number + lane_count (int): Lane count for the subport + + Returns: + int: Lane mask for the given subport and lane count + """ + return ((1 << lane_count) - 1) << ((subport - 1) * lane_count) + + if __name__ == '__main__': cli() diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 5854bb201b..7e2d382dee 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -1586,11 +1586,16 @@ def test_load_port_config(self, mock_is_multi_asic): @patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False)) @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.ConfigDBConnector') + @patch('sfputil.main.SonicV2Connector') @patch('sfputil.main.platform_sfputil', MagicMock(is_logical_port=MagicMock(return_value=1))) @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) - def test_debug_loopback(self, mock_chassis): + @patch('sonic_py_common.multi_asic.get_front_end_namespaces', MagicMock(return_value=[''])) + def test_debug_loopback(self, mock_sonic_v2_connector, mock_config_db_connector, mock_chassis): mock_sfp = MagicMock() mock_api = MagicMock() + mock_config_db_connector.return_value = MagicMock() + mock_sonic_v2_connector.return_value = MagicMock() mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) mock_sfp.get_presence.return_value = True mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) @@ -1598,31 +1603,75 @@ def test_debug_loopback(self, mock_chassis): runner = CliRunner() mock_sfp.get_presence.return_value = False result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], - ["Ethernet0", "host-side-input"]) + ["Ethernet0", "host-side-input", "enable"]) assert result.output == 'Ethernet0: SFP EEPROM not detected\n' mock_sfp.get_presence.return_value = True mock_sfp.get_xcvr_api = MagicMock(side_effect=NotImplementedError) result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], - ["Ethernet0", "host-side-input"]) + ["Ethernet0", "host-side-input", "enable"]) assert result.output == 'Ethernet0: This functionality is not implemented\n' assert result.exit_code == ERROR_NOT_IMPLEMENTED mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], - ["Ethernet0", "host-side-input"]) - assert result.output == 'Ethernet0: Set host-side-input loopback\n' + ["Ethernet0", "host-side-input", "enable"]) + assert result.output == 'Ethernet0: enable host-side-input loopback\n' + assert result.exit_code != ERROR_NOT_IMPLEMENTED + + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], + ["Ethernet0", "media-side-input", "enable"]) + assert result.output == 'Ethernet0: enable media-side-input loopback\n' assert result.exit_code != ERROR_NOT_IMPLEMENTED mock_api.set_loopback_mode.return_value = False result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], - ["Ethernet0", "none"]) - assert result.output == 'Ethernet0: Set none loopback failed\n' + ["Ethernet0", "media-side-output", "enable"]) + assert result.output == 'Ethernet0: enable media-side-output loopback failed\n' assert result.exit_code == EXIT_FAIL mock_api.set_loopback_mode.return_value = True mock_api.set_loopback_mode.side_effect = AttributeError result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], - ["Ethernet0", "none"]) + ["Ethernet0", "host-side-input", "enable"]) assert result.output == 'Ethernet0: Set loopback mode is not applicable for this module\n' assert result.exit_code == ERROR_NOT_IMPLEMENTED + + mock_api.set_loopback_mode.side_effect = [TypeError, True] + result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], + ["Ethernet0", "host-side-input", "enable"]) + assert result.output == 'Ethernet0: Set loopback mode failed. Parameter is not supported\n' + assert result.exit_code == EXIT_FAIL + + mock_config_db = MagicMock() + mock_config_db.get.side_effect = TypeError + mock_config_db_connector.return_value = mock_config_db + result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], + ["Ethernet0", "media-side-input", "enable"]) + assert result.output == 'Ethernet0: subport is not present in CONFIG_DB\n' + assert result.exit_code == EXIT_FAIL + + mock_config_db_connector.return_value = None + result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], + ["Ethernet0", "media-side-input", "enable"]) + assert result.output == 'Ethernet0: Failed to connect to CONFIG_DB\n' + assert result.exit_code == EXIT_FAIL + + mock_config_db_connector.return_value = MagicMock() + mock_sonic_v2_connector.return_value = None + result = runner.invoke(sfputil.cli.commands['debug'].commands['loopback'], + ["Ethernet0", "media-side-input", "enable"]) + assert result.output == 'Ethernet0: Failed to connect to STATE_DB\n' + assert result.exit_code == EXIT_FAIL + + @pytest.mark.parametrize("subport, lane_count, expected_mask", [ + (1, 1, 0x1), + (1, 4, 0xf), + (2, 1, 0x2), + (2, 4, 0xf0), + (3, 2, 0x30), + (4, 1, 0x8), + ]) + def test_get_subport_lane_mask(self, subport, lane_count, expected_mask): + assert sfputil.get_subport_lane_mask(subport, lane_count) == expected_mask