From 58bb283fc4cb7be0f80d95e72ebc25ab4176f252 Mon Sep 17 00:00:00 2001 From: xinyu Date: Wed, 11 Sep 2024 14:12:50 +0800 Subject: [PATCH] [CMIS] Add lane_mask parameter to set_loopback_mode() to enable setting loopback mode for individual lanes rather than the entire physical port Signed-off-by: xinyu --- .../sonic_xcvr/api/public/cmis.py | 225 +++++++++++++++--- tests/sonic_xcvr/test_cmis.py | 195 ++++++++++++--- 2 files changed, 358 insertions(+), 62 deletions(-) diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index fca1d0f8b..af43b8dff 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -1115,50 +1115,209 @@ def get_loopback_capability(self): loopback_capability['media_side_output_loopback_supported'] = bool((allowed_loopback_result >> 0) & 0x1) return loopback_capability - def set_loopback_mode(self, loopback_mode): + def set_host_input_loopback(self, lane_mask, enable): ''' - This function sets the module loopback mode. - Loopback mode has to be one of the five: - 1. "none" (default) - 2. "host-side-input" - 3. "host-side-output" - 4. "media-side-input" - 5. "media-side-output" - The function will look at 13h:128 to check advertized loopback capabilities. - Return True if the provision succeeds, False if it fails + Sets the host-side input loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. ''' loopback_capability = self.get_loopback_capability() if loopback_capability is None: + logger.info('Failed to get loopback capabilities') return False - if loopback_mode == 'none': - status_host_input = self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0) - status_host_output = self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0) - status_media_input = self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0) - status_media_output = self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0) - return all([status_host_input, status_host_output, status_media_input, status_media_output]) - elif loopback_mode == 'host-side-input': - if loopback_capability['host_side_input_loopback_supported']: - return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, 0xff) - else: - return False - elif loopback_mode == 'host-side-output': - if loopback_capability['host_side_output_loopback_supported']: - return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, 0xff) - else: + + if loopback_capability['host_side_input_loopback_supported'] is False: + logger.error('Host input loopback is not supported') + return False + + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + logger.error('Per-lane host input loopback is not supported, lane_mask:%#x', lane_mask) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if media_input_val or media_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'media_input_val:{media_input_val:#x}, media_output_val:{media_output_val:#x}' + logger.error(txt) return False - elif loopback_mode == 'media-side-input': - if loopback_capability['media_side_input_loopback_supported']: - return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, 0xff) - else: + + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val | lane_mask) + else: + return self.xcvr_eeprom.write(consts.HOST_INPUT_LOOPBACK, host_input_val & ~lane_mask) + + def set_host_output_loopback(self, lane_mask, enable): + ''' + Sets the host-side output loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + ''' + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['host_side_output_loopback_supported'] is False: + logger.error('Host output loopback is not supported') + return False + + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + logger.error('Per-lane host output loopback is not supported, lane_mask:%#x', lane_mask) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if media_input_val or media_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'media_input_val:{media_input_val:x}, media_output_val:{media_output_val:#x}' + logger.error(txt) return False - elif loopback_mode == 'media-side-output': - if loopback_capability['media_side_output_loopback_supported']: - return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, 0xff) - else: + + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val | lane_mask) + else: + return self.xcvr_eeprom.write(consts.HOST_OUTPUT_LOOPBACK, host_output_val & ~lane_mask) + + def set_media_input_loopback(self, lane_mask, enable): + ''' + Sets the media-side input loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + ''' + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['media_side_input_loopback_supported'] is False: + logger.error('Media input loopback is not supported') + return False + + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + logger.error('Per-lane media input loopback is not supported, lane_mask:%#x', lane_mask) + return False + + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if host_input_val or host_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}' + logger.error(txt) return False + + media_input_val = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val | lane_mask) else: + return self.xcvr_eeprom.write(consts.MEDIA_INPUT_LOOPBACK, media_input_val & ~lane_mask) + + def set_media_output_loopback(self, lane_mask, enable): + ''' + Sets the media-side output loopback mode for specified lanes. + + Args: + lane_mask (int): A bitmask indicating which lanes to enable/disable loopback. + - 0xFF: Enable loopback on all lanes. + - Individual bits represent corresponding lanes. + enable (bool): True to enable loopback, False to disable. + + Returns: + bool: True if the operation succeeds, False otherwise. + ''' + loopback_capability = self.get_loopback_capability() + if loopback_capability is None: + logger.info('Failed to get loopback capabilities') + return False + + if loopback_capability['media_side_output_loopback_supported'] is False: + logger.error('Media output loopback is not supported') + return False + + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + logger.error('Per-lane media output loopback is not supported, lane_mask:%#x', lane_mask) return False + if loopback_capability['simultaneous_host_media_loopback_supported'] is False: + host_input_val = self.xcvr_eeprom.read(consts.HOST_INPUT_LOOPBACK) + host_output_val = self.xcvr_eeprom.read(consts.HOST_OUTPUT_LOOPBACK) + if host_input_val or host_output_val: + txt = 'Simultaneous host media loopback is not supported\n' + txt += f'host_input_val:{host_input_val:#x}, host_output_val:{host_output_val:#x}' + logger.error(txt) + return False + + media_output_val = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) + if enable: + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val | lane_mask) + else: + return self.xcvr_eeprom.write(consts.MEDIA_OUTPUT_LOOPBACK, media_output_val & ~lane_mask) + + def set_loopback_mode(self, loopback_mode, lane_mask = 0xff, enable = False): + ''' + This function sets the module loopback mode. + + Args: + - loopback_mode (str): Specifies the loopback mode. It must be one of the following: + 1. "none" + 2. "host-side-input" + 3. "host-side-output" + 4. "media-side-input" + 5. "media-side-output" + - lane_mask (int): A bitmask representing the lanes to which the loopback mode should + be applied. Default 0xFF applies to all lanes. + - enable (bool): Whether to enable or disable the loopback mode. Default False. + Returns: + - bool: True if the operation succeeds, False otherwise. + ''' + loopback_functions = { + 'host-side-input': self.set_host_input_loopback, + 'host-side-output': self.set_host_output_loopback, + 'media-side-input': self.set_media_input_loopback, + 'media-side-output': self.set_media_output_loopback, + } + + if loopback_mode == 'none': + return all([ + self.set_host_input_loopback(0xff, False), + self.set_host_output_loopback(0xff, False), + self.set_media_input_loopback(0xff, False), + self.set_media_output_loopback(0xff, False) + ]) + + func = loopback_functions.get(loopback_mode) + if func: + return func(lane_mask, enable) + + logger.error('Invalid loopback mode:%s, lane_mask:%#x', loopback_mode, lane_mask) + return False + def get_vdm(self): ''' This function returns all the VDM items, including real time monitor value, threholds and flags diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py index e1cc5c38c..7c81dc2e4 100644 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -1052,45 +1052,182 @@ def test_get_loopback_capability(self, mock_response, expected): result = self.api.get_loopback_capability() assert result == expected - @pytest.mark.parametrize("input_param, mock_response",[ - ('none', { + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'host_side_input_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True - }), - ('host-side-input', { + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': False, + }, False), + ([0xf, True], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True - }), - ('host-side-output', { + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_input_loopback_supported': True, - 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True - }), - ('media-side-input', { + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ([0xf, False], { 'host_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ]) + def test_set_host_input_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_host_input_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'host_side_output_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_output_loopback_supported': True, - 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True - }), - ('media-side-output', { - 'host_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': False, + }, False), + ([0xf, True], { + 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_host_loopback_supported': True, + }, False), + ([0xf, True], { 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ([0xf, False], { + 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': True, + }, True), + ]) + def test_set_host_output_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_host_output_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'media_side_input_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { 'media_side_input_loopback_supported': True, - 'media_side_output_loopback_supported': True - }), - ( - 'none', None - ) + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, False), + ([0xf, True], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ([0xf, False], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), ]) - def test_set_loopback_mode(self, input_param, mock_response): + def test_set_media_input_loopback(self, input_param, mock_response, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response - self.api.set_loopback_mode(input_param) + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_media_input_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + ([0xf, True], None, False), + ([0xf, True], { + 'media_side_output_loopback_supported': False, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, False), + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': False, + 'per_lane_media_loopback_supported': True, + }, False), + ([0xf, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ([0xf, False], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': True, + }, True), + ]) + def test_set_media_output_loopback(self, input_param, mock_response, expected): + self.api.get_loopback_capability = MagicMock() + self.api.get_loopback_capability.return_value = mock_response + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] + self.api.xcvr_eeprom.write = MagicMock() + self.api.xcvr_eeprom.write.return_value = True + result = self.api.set_media_output_loopback(input_param[0], input_param[1]) + assert result == expected + + @pytest.mark.parametrize("input_param, mock_response, expected",[ + (['none', 0], True, True), + (['host-side-input', 0x0F, True], True, True), + (['host-side-output', 0x0F, True], True, True), + (['media-side-input', 0x0F, True], True, True), + (['media-side-output', 0x0F, True], True, True), + (['host-side-input', 0xF0, False], True, True), + (['host-side-output', 0xF0, False], True, True), + (['media-side-input', 0xF0, False], True, True), + (['media-side-output', 0xF0, False], True, True), + (['', 0xF0, False], True, False), + + ]) + def test_set_loopback_mode(self, input_param, mock_response, expected): + self.api.set_host_input_loopback = MagicMock() + self.api.set_host_input_loopback.return_value = mock_response + self.api.set_host_output_loopback = MagicMock() + self.api.set_host_output_loopback.return_value = mock_response + self.api.set_media_input_loopback = MagicMock() + self.api.set_media_input_loopback.return_value = mock_response + self.api.set_media_output_loopback = MagicMock() + self.api.set_media_output_loopback.return_value = mock_response + result = self.api.set_loopback_mode(input_param[0], input_param[1]) + assert result == expected @pytest.mark.parametrize("mock_response, expected",[ (