diff --git a/setup.py b/setup.py index 00e60aa..bc85f49 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ 'sonic_platform_base.sonic_xcvr.api.public', 'sonic_platform_base.sonic_xcvr.codes', 'sonic_platform_base.sonic_xcvr.codes.public', + 'sonic_platform_base.sonic_xcvr.utils', 'sonic_platform_base.sonic_xcvr.api.credo', 'sonic_platform_base.sonic_xcvr.mem_maps.credo', 'sonic_platform_base.sonic_xcvr.codes.credo', diff --git a/sonic_platform_base/sfp_base.py b/sonic_platform_base/sfp_base.py index bd0ae40..561d7e7 100644 --- a/sonic_platform_base/sfp_base.py +++ b/sonic_platform_base/sfp_base.py @@ -479,3 +479,9 @@ def get_xcvr_api(self): if self._xcvr_api is None: self.refresh_xcvr_api() return self._xcvr_api + + def remove_xcvr_api(self): + """ + Removes the cached XcvrApi so that the next get_xcvr_api() call will refresh it. + """ + self._xcvr_api = None diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index 8fd4c03..9579deb 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -18,6 +18,7 @@ from .cmisVDM import CmisVdmApi import time from collections import defaultdict +from ...utils.cache import read_only_cached_api_return logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) @@ -82,6 +83,16 @@ class CmisApi(XcvrApi): LowPwrRequestSW = 4 LowPwrAllowRequestHW = 6 + # Default caching enabled; control via classmethod + cache_enabled = True + + @classmethod + def set_cache_enabled(cls, enabled: bool): + """ + Set the cache_enabled flag to control read_only_cached_api_return behavior. + """ + cls.cache_enabled = bool(enabled) + def __init__(self, xcvr_eeprom): super(CmisApi, self).__init__(xcvr_eeprom) self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None @@ -152,12 +163,14 @@ def get_vdm_unfreeze_status(self): ''' return self.xcvr_eeprom.read(consts.VDM_UNFREEZE_DONE) + @read_only_cached_api_return def get_manufacturer(self): ''' This function returns the manufacturer of the module ''' return self.xcvr_eeprom.read(consts.VENDOR_NAME_FIELD) + @read_only_cached_api_return def get_model(self): ''' This function returns the part number of the module @@ -170,42 +183,49 @@ def get_cable_length_type(self): ''' return "Length Cable Assembly(m)" + @read_only_cached_api_return def get_cable_length(self): ''' This function returns the cable length of the module ''' return self.xcvr_eeprom.read(consts.LENGTH_ASSEMBLY_FIELD) + @read_only_cached_api_return def get_vendor_rev(self): ''' This function returns the revision level for part number provided by vendor ''' return self.xcvr_eeprom.read(consts.VENDOR_REV_FIELD) + @read_only_cached_api_return def get_serial(self): ''' This function returns the serial number of the module ''' return self.xcvr_eeprom.read(consts.VENDOR_SERIAL_NO_FIELD) + @read_only_cached_api_return def get_module_type(self): ''' This function returns the SFF8024Identifier (module type / form-factor). Table 4-1 in SFF-8024 Rev4.6 ''' return self.xcvr_eeprom.read(consts.ID_FIELD) + @read_only_cached_api_return def get_module_type_abbreviation(self): ''' This function returns the SFF8024Identifier (module type / form-factor). Table 4-1 in SFF-8024 Rev4.6 ''' return self.xcvr_eeprom.read(consts.ID_ABBRV_FIELD) + @read_only_cached_api_return def get_connector_type(self): ''' This function returns module connector. Table 4-3 in SFF-8024 Rev4.6 ''' return self.xcvr_eeprom.read(consts.CONNECTOR_FIELD) + @read_only_cached_api_return def get_module_hardware_revision(self): ''' This function returns the module hardware revision @@ -217,6 +237,7 @@ def get_module_hardware_revision(self): hw_rev = [str(num) for num in [hw_major_rev, hw_minor_rev]] return '.'.join(hw_rev) + @read_only_cached_api_return def get_cmis_rev(self): ''' This function returns the CMIS version the module complies to @@ -531,6 +552,7 @@ def is_copper(self): media_intf = self.get_module_media_type() return media_intf == "passive_copper_media_interface" if media_intf else None + @read_only_cached_api_return def is_flat_memory(self): return self.xcvr_eeprom.read(consts.FLAT_MEM_FIELD) is not False @@ -802,6 +824,7 @@ def get_tx_los(self): tx_los_final.append(bool(tx_los[key])) return tx_los_final + @read_only_cached_api_return def get_tx_disable_support(self): return not self.is_flat_memory() and self.xcvr_eeprom.read(consts.TX_DISABLE_SUPPORT_FIELD) @@ -873,6 +896,7 @@ def set_power_override(self, power_override, power_set): def get_transceiver_thresholds_support(self): return not self.is_flat_memory() + @read_only_cached_api_return def get_lpmode_support(self): power_class = self.xcvr_eeprom.read(consts.POWER_CLASS_FIELD) if power_class is None: @@ -914,6 +938,7 @@ def get_module_media_interface(self): else: return 'Unknown media interface' + @read_only_cached_api_return def is_coherent_module(self): ''' Returns True if the module follow C-CMIS spec, False otherwise @@ -921,6 +946,7 @@ def is_coherent_module(self): mintf = self.get_module_media_interface() return False if 'ZR' not in mintf else True + @read_only_cached_api_return def get_datapath_init_duration(self): ''' This function returns the duration of datapath init @@ -933,6 +959,7 @@ def get_datapath_init_duration(self): value = float(duration) return value * DATAPATH_INIT_DURATION_MULTIPLIER if value <= DATAPATH_INIT_DURATION_OVERRIDE_THRESHOLD else value + @read_only_cached_api_return def get_datapath_deinit_duration(self): ''' This function returns the duration of datapath deinit @@ -942,6 +969,7 @@ def get_datapath_deinit_duration(self): duration = self.xcvr_eeprom.read(consts.DP_PATH_DEINIT_DURATION) return float(duration) if duration is not None else 0 + @read_only_cached_api_return def get_datapath_tx_turnon_duration(self): ''' This function returns the duration of datapath tx turnon @@ -951,6 +979,7 @@ def get_datapath_tx_turnon_duration(self): duration = self.xcvr_eeprom.read(consts.DP_TX_TURNON_DURATION) return float(duration) if duration is not None else 0 + @read_only_cached_api_return def get_datapath_tx_turnoff_duration(self): ''' This function returns the duration of datapath tx turnoff @@ -960,6 +989,7 @@ def get_datapath_tx_turnoff_duration(self): duration = self.xcvr_eeprom.read(consts.DP_TX_TURNOFF_DURATION) return float(duration) if duration is not None else 0 + @read_only_cached_api_return def get_module_pwr_up_duration(self): ''' This function returns the duration of module power up @@ -969,6 +999,7 @@ def get_module_pwr_up_duration(self): duration = self.xcvr_eeprom.read(consts.MODULE_PWRUP_DURATION) return float(duration) if duration is not None else 0 + @read_only_cached_api_return def get_module_pwr_down_duration(self): ''' This function returns the duration of module power down @@ -1587,7 +1618,6 @@ def get_module_level_flag(self): 'aux2_low_alarm_flag': aux2_low_alarm_flag, 'aux2_high_warn_flag': aux2_high_warn_flag, 'aux2_low_warn_flag': aux2_low_warn_flag} - aux1_high_alarm_flag = bool((module_flag_byte2 >> 0) & 0x1) aux1_low_alarm_flag = bool((module_flag_byte2 >> 1) & 0x1) aux1_high_warn_flag = bool((module_flag_byte2 >> 2) & 0x1) @@ -2670,6 +2700,7 @@ def get_datapath_deinit(self): return None return [bool(datapath_deinit & (1 << lane)) for lane in range(self.NUM_CHANNELS)] + @read_only_cached_api_return def get_application_advertisement(self): """ Get the application advertisement of the CMIS transceiver @@ -2699,6 +2730,8 @@ def get_application_advertisement(self): logger.error('Failed to read APPLS_ADVT_FIELD_PAGE01: ' + str(e)) return ret + media_type = self.xcvr_eeprom.read(consts.MEDIA_TYPE_FIELD) + prefix = map.get(media_type) for app in range(1, 16): buf = {} @@ -2708,7 +2741,6 @@ def get_application_advertisement(self): continue buf['host_electrical_interface_id'] = val - prefix = map.get(self.xcvr_eeprom.read(consts.MEDIA_TYPE_FIELD)) if prefix is None: continue key = "{}_{}".format(prefix, app) @@ -3175,3 +3207,4 @@ def get_error_description(self): return 'OK' # TODO: other XcvrApi methods + diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py b/sonic_platform_base/sonic_xcvr/mem_maps/public/cmis.py old mode 100644 new mode 100755 diff --git a/sonic_platform_base/sonic_xcvr/utils/__init__.py b/sonic_platform_base/sonic_xcvr/utils/__init__.py new file mode 100644 index 0000000..78fddf1 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/utils/__init__.py @@ -0,0 +1 @@ +# utils package for transceiver common utilities \ No newline at end of file diff --git a/sonic_platform_base/sonic_xcvr/utils/cache.py b/sonic_platform_base/sonic_xcvr/utils/cache.py new file mode 100644 index 0000000..21c8bbf --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/utils/cache.py @@ -0,0 +1,19 @@ +from collections import abc +import os + +def read_only_cached_api_return(func): + """Cache until func() returns a non-None, non-empty collections cache_value.""" + cache_name = f'_{func.__name__}_cache' + def wrapper(self): + if not self.cache_enabled: + return func(self) + if not hasattr(self, cache_name): + cache_value = func(self) + setattr(self, cache_name, cache_value) + else: + cache_value = getattr(self, cache_name) + if cache_value is None or (isinstance(cache_value, abc.Iterable) and not cache_value): + cache_value = func(self) + setattr(self, cache_name, cache_value) + return cache_value + return wrapper diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py old mode 100644 new mode 100755 index ee13f8b..8664068 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -19,6 +19,24 @@ class TestCmis(object): old_read_func = eeprom.read api = CmisApi(eeprom) + def clear_cache(self, method_name=None): + """ + Clear cached API return values for methods decorated with read_only_cached_api_return. + If method_name is provided, clear only that cache; otherwise clear all caches. + """ + if method_name: + cache_name = f'_{method_name}_cache' + if hasattr(self.api, cache_name): + delattr(self.api, cache_name) + else: + for attr in list(self.api.__dict__.keys()): + if attr.startswith('_') and attr.endswith('_cache'): + delattr(self.api, attr) + + def setup_method(self, method): + """Clear cached values before each test case.""" + self.clear_cache() + @pytest.mark.parametrize("mock_response, expected", [ ("1234567890", "1234567890"), ("ABCD", "ABCD") @@ -1446,113 +1464,84 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected): result = self.api.module_fw_upgrade(input_param) assert result == expected - @pytest.mark.parametrize("mock_response, expected",[ - ([None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], None), - ( - [ - { - 'Extended Identifier': {'Power Class': 'Power Class 8', 'MaxPower': 20.0}, - 'Identifier': 'QSFP-DD Double Density 8X Pluggable Transceiver', - 'Identifier Abbreviation': 'QSFP-DD', - 'ModuleHardwareMajorRevision': 0, - 'ModuleHardwareMinorRevision': 0, - 'VendorSN': '00000000', - 'VendorName': 'VENDOR_NAME', - 'VendorPN': 'ABCD', - 'Connector': 'LC', - 'Length Cable Assembly': 0.0, - 'ModuleMediaType': 'sm_media_interface', - 'VendorDate': '21010100', - 'VendorOUI': 'xx-xx-xx' - }, - '400GAUI-8 C2M (Annex 120E)', - '400ZR, DWDM, amplified', - 8, 1, 1, 1, - {'ActiveAppSelLane1': 1, 'ActiveAppSelLane2': 1, 'ActiveAppSelLane3': 1, 'ActiveAppSelLane4': 1, - 'ActiveAppSelLane5': 1, 'ActiveAppSelLane6': 1, 'ActiveAppSelLane7': 1, 'ActiveAppSelLane8': 1}, - '1550 nm DFB', - '0.0', - '5.0', - '0.1', - '0.0', - 'sm_media_interface', - {'status': True, 'result': ("0.3.0", 1, 1, 0, "0.2.0", 0, 0, 0, "0.3.0", "0.2.0")} - ], - { 'type': 'QSFP-DD Double Density 8X Pluggable Transceiver', - 'type_abbrv_name': 'QSFP-DD', - 'model': 'ABCD', - 'encoding': 'N/A', - 'ext_identifier': 'Power Class 8 (20.0W Max)', - 'ext_rateselect_compliance': 'N/A', - 'cable_type': 'Length Cable Assembly(m)', - 'cable_length': 0.0, - 'nominal_bit_rate': 0, - 'specification_compliance': 'sm_media_interface', - 'application_advertisement': 'N/A', - 'media_lane_count': 1, - 'vendor_rev': '0.0', - 'host_electrical_interface': '400GAUI-8 C2M (Annex 120E)', - 'vendor_oui': 'xx-xx-xx', - 'manufacturer': 'VENDOR_NAME', - 'media_interface_technology': '1550 nm DFB', - 'media_interface_code': '400ZR, DWDM, amplified', - 'serial': '00000000', - 'host_lane_count': 8, - 'active_apsel_hostlane1': 1, - 'active_apsel_hostlane3': 1, - 'active_apsel_hostlane2': 1, - 'active_apsel_hostlane5': 1, - 'active_apsel_hostlane4': 1, - 'active_apsel_hostlane7': 1, - 'active_apsel_hostlane6': 1, - 'active_apsel_hostlane8': 1, - 'hardware_rev': '0.0', - 'cmis_rev': '5.0', - 'media_lane_assignment_option': 1, - 'connector': 'LC', - 'host_lane_assignment_option': 1, - 'vendor_date': '21010100' - } - ) + @pytest.mark.parametrize("mock_response, expected", [ + ([0, 0, 0], + { + 'type': 'QSFP-DD Double Density 8X Pluggable Transceiver', + 'type_abbrv_name': 'QSFP-DD', + 'model': 'ABCD', + 'encoding': 'N/A', + 'ext_identifier': 'Power Class 8 (20.0W Max)', + 'ext_rateselect_compliance': 'N/A', + 'cable_type': 'Length Cable Assembly(m)', + 'cable_length': 0.0, + 'nominal_bit_rate': 'N/A', + 'specification_compliance': 'sm_media_interface', + 'application_advertisement': 'N/A', + 'media_lane_count': 1, + 'vendor_rev': '0.0', + 'host_electrical_interface': '400GAUI-8 C2M (Annex 120E)', + 'vendor_oui': 'xx-xx-xx', + 'manufacturer': 'VENDOR_NAME', + 'media_interface_technology': '1550 nm DFB', + 'media_interface_code': '400ZR, DWDM, amplified', + 'serial': '00000000', + 'host_lane_count': 8, + 'active_apsel_hostlane1': 1, + 'active_apsel_hostlane2': 1, + 'active_apsel_hostlane3': 1, + 'active_apsel_hostlane4': 1, + 'active_apsel_hostlane5': 1, + 'active_apsel_hostlane6': 1, + 'active_apsel_hostlane7': 1, + 'active_apsel_hostlane8': 1, + 'hardware_rev': '0.0', + 'cmis_rev': '5.0', + 'media_lane_assignment_option': 1, + 'connector': 'LC', + 'host_lane_assignment_option': 1, + 'vendor_date': '21010100', + 'vdm_supported': True, + }) ]) def test_get_transceiver_info(self, mock_response, expected): self.api.xcvr_eeprom.read = MagicMock() - self.api.xcvr_eeprom.read.return_value = mock_response[0] - self.api.get_host_electrical_interface = MagicMock() - self.api.get_host_electrical_interface.return_value = mock_response[1] - self.api.get_module_media_interface = MagicMock() - self.api.get_module_media_interface.return_value = mock_response[2] - self.api.get_host_lane_count = MagicMock() - self.api.get_host_lane_count.return_value = mock_response[3] - self.api.get_media_lane_count = MagicMock() - self.api.get_media_lane_count.return_value = mock_response[4] - self.api.get_host_lane_assignment_option = MagicMock() - self.api.get_host_lane_assignment_option.return_value = mock_response[5] - self.api.get_media_lane_assignment_option = MagicMock() - self.api.get_media_lane_assignment_option.return_value = mock_response[6] - self.api.get_active_apsel_hostlane = MagicMock() - self.api.get_active_apsel_hostlane.return_value = mock_response[7] - self.api.get_media_interface_technology = MagicMock() - self.api.get_media_interface_technology.return_value = mock_response[8] - self.api.get_vendor_rev = MagicMock() - self.api.get_vendor_rev.return_value = mock_response[9] - self.api.get_cmis_rev = MagicMock() - self.api.get_cmis_rev.return_value = mock_response[10] - self.api.get_module_fw_info = MagicMock() - self.api.get_module_media_type = MagicMock() - self.api.get_module_media_type.return_value = mock_response[13] - self.api.get_module_hardware_revision = MagicMock() - self.api.get_module_hardware_revision.return_value = '0.0' - self.api.get_module_fw_info.return_value = mock_response[14] - self.api.is_flat_memory = MagicMock() - self.api.is_flat_memory.return_value = False - result = self.api.get_transceiver_info() - assert result == expected - # Test negative path - self.api.get_cmis_rev.return_value = None - result = self.api.get_transceiver_info() - assert result == None - + def mock_read(field): + if field == consts.APPLS_ADVT_FIELD: + return { + 1: { + 'host_electrical_interface_id': '400GAUI-8 C2M (Annex 120E)', + 'module_media_interface_id': '400GBASE-DR4 (Cl 124)', + 'media_lane_count': 1, + 'host_lane_count': 8, + 'host_lane_assignment_options': 1, + 'media_lane_assignment_options': 1 + } + } + elif field == consts.MEDIA_TYPE_FIELD: + return 'sm_media_interface' + elif field == consts.EXT_IDENTIFIER_FIELD: + return {'Power Class': 'Power Class 8', 'MaxPower': 20.0} + elif field == consts.IDENTIFIER_FIELD: + return 'QSFP-DD Double Density 8X Pluggable Transceiver' + elif field == consts.IDENTIFIER_ABBRV_FIELD: + return 'QSFP-DD' + elif field == consts.VENDOR_SN_FIELD: + return '00000000' + elif field == consts.VENDOR_NAME_FIELD: + return 'VENDOR_NAME' + elif field == consts.VENDOR_PN_FIELD: + return 'ABCD' + elif field == consts.CONNECTOR_FIELD: + return 'LC' + elif field == consts.CABLE_LENGTH_FIELD: + return 0.0 + elif field == consts.VENDOR_DATE_FIELD: + return '21010100' + elif field == consts.VENDOR_OUI_FIELD: + return 'xx-xx-xx' + return None + self.api.xcvr_eeprom.read.side_effect = mock_read @pytest.mark.parametrize("mock_response, expected",[ ( @@ -2867,7 +2856,7 @@ def test_get_application_advertisement_apps_with_missing_data(self): ] result = self.api.get_application_advertisement() - assert len(result) == 2 + assert len(result) == 3 assert result[1]['host_electrical_interface_id'] == '400GAUI-8 C2M (Annex 120E)' assert result[1]['module_media_interface_id'] == '400GBASE-DR4 (Cl 124)' diff --git a/tests/sonic_xcvr/test_cmis_cache.py b/tests/sonic_xcvr/test_cmis_cache.py new file mode 100755 index 0000000..4312fca --- /dev/null +++ b/tests/sonic_xcvr/test_cmis_cache.py @@ -0,0 +1,218 @@ +import pytest +from unittest.mock import MagicMock +from sonic_platform_base.sonic_xcvr.api.public.cmis import CmisApi +from sonic_platform_base.sonic_xcvr.codes.public.sff8024 import Sff8024 +from sonic_platform_base.sonic_xcvr.fields import consts + +class TestReadOnlyCacheDecorator: + def setup_method(self): + # Initialize CmisApi with a mock EEPROM and clear initial reads + eeprom = MagicMock() + self.api = CmisApi(eeprom) + self.api.set_cache_enabled(True) + self.api.xcvr_eeprom.read.reset_mock() + + def clear_cache(self, method_name=None): + """ + Clear cached API return values for methods decorated with read_only_cached_api_return. + If method_name is provided, clear only that cache; otherwise clear all caches. + """ + if method_name: + cache_name = f'_{method_name}_cache' + if hasattr(self.api, cache_name): + delattr(self.api, cache_name) + else: + for attr in list(self.api.__dict__.keys()): + if attr.startswith('_') and attr.endswith('_cache'): + delattr(self.api, attr) + + def test_get_model_caching(self): + # Ensure get_model value is cached and read() called only once + self.api.xcvr_eeprom.read.return_value = 'model_val' + first = self.api.get_model() + second = self.api.get_model() + assert first == 'model_val' + assert second == 'model_val' + assert self.api.xcvr_eeprom.read.call_count == 1 + + def test_get_cmis_rev_caching(self): + # get_cmis_rev reads major and minor once, then caches result + # side_effect: first call returns major, second minor + self.api.xcvr_eeprom.read.side_effect = [5, 3, 7, 9] + v1 = self.api.get_cmis_rev() + v2 = self.api.get_cmis_rev() + assert v1 == '5.3' + assert v2 == '5.3' + # Only the first two reads (major and minor) should occur + assert self.api.xcvr_eeprom.read.call_count == 2 + + def test_clear_cache_for_get_model(self): + # Ensure clear_cache('get_model') clears the cache so read() is re-called + self.api.xcvr_eeprom.read.return_value = 'val1' + _ = self.api.get_model() + _ = self.api.get_model() + assert self.api.xcvr_eeprom.read.call_count == 1 + # Clear only get_model cache + self.clear_cache('get_model') + self.api.xcvr_eeprom.read.return_value = 'val2' + _ = self.api.get_model() + assert self.api.xcvr_eeprom.read.call_count == 2 + assert self.api.get_model() == 'val2' + + def test_clear_all_caches(self): + # Ensure clear_cache() clears all cached methods + # Setup: get_model and get_cmis_rev use side effects + self.api.xcvr_eeprom.read.side_effect = ['m1', 1, 2] + _ = self.api.get_model() + _ = self.api.get_cmis_rev() + assert self.api.xcvr_eeprom.read.call_count == 3 + # Clear all caches + self.clear_cache() + # Reset read mock and provide new side effects + self.api.xcvr_eeprom.read.reset_mock() + self.api.xcvr_eeprom.read.side_effect = ['m2', 3, 4] + m2 = self.api.get_model() + rev = self.api.get_cmis_rev() + assert m2 == 'm2' + assert rev == '3.4' + # Both methods should re-read their values + assert self.api.xcvr_eeprom.read.call_count == 3 + +class TestReadOnlyCacheDictAndListDecorator: + def setup_method(self): + # Initialize CmisApi with a mock EEPROM and clear initial reads + eeprom = MagicMock() + self.api = CmisApi(eeprom) + self.api.set_cache_enabled(True) + self.api.xcvr_eeprom.read.reset_mock() + + def clear_cache(self, method_name=None): + """ + Clear cached API return values for methods decorated with read_only_cached_api_return. + If method_name is provided, clear only that cache; otherwise clear all caches. + """ + if method_name: + cache_name = f'_{method_name}_cache' + if hasattr(self.api, cache_name): + delattr(self.api, cache_name) + else: + for attr in list(self.api.__dict__.keys()): + if attr.startswith('_') and attr.endswith('_cache'): + delattr(self.api, attr) + + def test_get_application_advertisement_no_cache_if_empty(self): + # Empty dict should not be cached and read() should be called each time + self.api.xcvr_eeprom.read.return_value = {} + first = self.api.get_application_advertisement() + second = self.api.get_application_advertisement() + assert first == {} + assert second == {} + assert self.api.xcvr_eeprom.read.call_count == 2 + + def test_get_application_advertisement_caching_if_non_empty(self): + # Non-empty dict should be cached and read() should be called only once + media_type = Sff8024.MODULE_MEDIA_TYPE[1] # e.g. "nm_850_media_interface" + prefix = consts.MODULE_MEDIA_INTERFACE_850NM + raw = { + f"{consts.HOST_ELECTRICAL_INTERFACE}_1": "iface1", + f"{prefix}_1": "mod_iface1", + f"{consts.MEDIA_LANE_COUNT}_1": 2, + f"{consts.HOST_LANE_COUNT}_1": 1, + f"{consts.HOST_LANE_ASSIGNMENT_OPTION}_1": 3, + f"{consts.MEDIA_LANE_ASSIGNMENT_OPTION}_1": 4, + } + # Make read() return raw for APPLS_ADVT_FIELD and valid media type string for MEDIA_TYPE_FIELD + def read_side_effect(field_name): + if field_name == consts.APPLS_ADVT_FIELD: + return raw + if field_name == consts.MEDIA_TYPE_FIELD: + # Return the module media type string that matches our prefix + return Sff8024.MODULE_MEDIA_TYPE[1] + return None + self.api.xcvr_eeprom.read.side_effect = read_side_effect + first = self.api.get_application_advertisement() + second = self.api.get_application_advertisement() + # The returned dict should be processed into the correct keys + expected = { + 1: { + 'host_electrical_interface_id': 'iface1', + 'module_media_interface_id': 'mod_iface1', + 'media_lane_count': 2, + 'host_lane_count': 1, + 'host_lane_assignment_options': 3, + 'media_lane_assignment_options': 4 + } + } + assert first == expected + assert second == expected + # Should have read APPLS_ADVT_FIELD and MEDIA_TYPE_FIELD once each + assert self.api.xcvr_eeprom.read.call_count == 2 + + def test_clear_cache_for_get_application_advertisement(self): + # Non-empty dict is cached, clear_cache should force re-read + media_type = Sff8024.MODULE_MEDIA_TYPE[1] + prefix = consts.MODULE_MEDIA_INTERFACE_850NM + raw = { + f"{consts.HOST_ELECTRICAL_INTERFACE}_1": 'iface1', + f"{prefix}_1": 'mod_iface1', + f"{consts.MEDIA_LANE_COUNT}_1": 2, + f"{consts.HOST_LANE_COUNT}_1": 1, + f"{consts.HOST_LANE_ASSIGNMENT_OPTION}_1": 3, + f"{consts.MEDIA_LANE_ASSIGNMENT_OPTION}_1": 4, + } + def read_side_effect(field_name): + if field_name == consts.APPLS_ADVT_FIELD: + return raw + if field_name == consts.MEDIA_TYPE_FIELD: + return media_type + return None + self.api.xcvr_eeprom.read.side_effect = read_side_effect + first = self.api.get_application_advertisement() + second = self.api.get_application_advertisement() + assert self.api.xcvr_eeprom.read.call_count == 2 + # Clear the specific cache and read again + self.clear_cache('get_application_advertisement') + third = self.api.get_application_advertisement() + assert third == first + assert self.api.xcvr_eeprom.read.call_count == 4 + +class TestCacheDisabled: + def setup_method(self): + # Initialize CmisApi with caching disabled + eeprom = MagicMock() + self.api = CmisApi(eeprom) + self.api.set_cache_enabled(False) + # Clear initial EEPROM reads from __init__ (is_flat_memory calls) + self.api.xcvr_eeprom.read.reset_mock() + + def clear_cache(self, method_name=None): + """ + Clear cached API return values for methods decorated with read_only_cached_api_return. + If method_name is provided, clear only that cache; otherwise clear all caches. + """ + if method_name: + cache_name = f'_{method_name}_cache' + if hasattr(self.api, cache_name): + delattr(self.api, cache_name) + else: + for attr in list(self.api.__dict__.keys()): + if attr.startswith('_') and attr.endswith('_cache'): + delattr(self.api, attr) + + def test_get_model_not_cached(self): + # get_model should not cache; read() called each time + self.api.xcvr_eeprom.read.return_value = 'model_val' + first = self.api.get_model() + second = self.api.get_model() + assert first == 'model_val' + assert second == 'model_val' + assert self.api.xcvr_eeprom.read.call_count == 2 + + def test_get_application_advertisement_not_cached(self): + # get_application_advertisement should not cache when disabled + self.api.xcvr_eeprom.read.return_value = {} + first = self.api.get_application_advertisement() + second = self.api.get_application_advertisement() + assert first == {} + assert second == {} + assert self.api.xcvr_eeprom.read.call_count == 2 diff --git a/tests/sonic_xcvr/test_sfp_base.py b/tests/sonic_xcvr/test_sfp_base.py new file mode 100755 index 0000000..2c1b24d --- /dev/null +++ b/tests/sonic_xcvr/test_sfp_base.py @@ -0,0 +1,65 @@ +import pytest +from sonic_platform_base.sfp_base import SfpBase +from unittest.mock import MagicMock + +def test_remove_xcvr_api_and_refresh(): + """ + Test that remove_xcvr_api clears the cached API and that get_xcvr_api + fetches a new API object on subsequent call. + """ + # Create SfpBase instance and mock factory + sfp = SfpBase() + factory = MagicMock() + api1 = object() + api2 = object() + factory.create_xcvr_api.side_effect = [api1, api2] + sfp._xcvr_api_factory = factory + + # Initially cache should be empty + assert sfp._xcvr_api is None + + # First call should build and cache api1 + result1 = sfp.get_xcvr_api() + assert result1 is api1 + assert factory.create_xcvr_api.call_count == 1 + + # Second call should return cached api1 without new factory call + result2 = sfp.get_xcvr_api() + assert result2 is api1 + assert factory.create_xcvr_api.call_count == 1 + + # Removing the cached API should clear the cache + sfp.remove_xcvr_api() + assert sfp._xcvr_api is None + + # Next get_xcvr_api should fetch a fresh API (api2) + result3 = sfp.get_xcvr_api() + assert result3 is api2 + assert factory.create_xcvr_api.call_count == 2 + +def test_old_api_handle_survives_remove_xcvr_api(): + """ + Test that an old API handle continues to work after remove_xcvr_api and a new handle is created. + """ + sfp = SfpBase() + api1 = MagicMock() + api2 = MagicMock() + api1.get_transceiver_bulk_status.return_value = "bulk1" + api2.get_transceiver_bulk_status.return_value = "bulk2" + factory = MagicMock() + factory.create_xcvr_api.side_effect = [api1, api2] + sfp._xcvr_api_factory = factory + + # Thread 1 gets first API handle + t1_api = sfp.get_xcvr_api() + assert t1_api is api1 + + # Thread 2 removes and recreates API + sfp.remove_xcvr_api() + t2_api = sfp.get_xcvr_api() + assert t2_api is api2 + + # Thread 1 still uses old handle without crashing + assert t1_api.get_transceiver_bulk_status() == "bulk1" + # New handle returns its own result + assert t2_api.get_transceiver_bulk_status() == "bulk2" \ No newline at end of file