diff --git a/internal/config/config.go b/internal/config/config.go index 03ce4c9..ad7666f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -132,6 +132,8 @@ type Controller struct { ConfigMapNamespaces []string `mapstructure:"config_map_namespaces"` RemoveAnnotationsPrefixes []string `mapstructure:"remove_annotations_prefixes"` AnnotationsMaxLength string `mapstructure:"annotations_max_length"` + ForcePagination bool `mapstructure:"force_pagination"` + PageSize int64 `mapstructure:"page_size"` // DisabledInformers contains a list of informers to disable, // for example: @@ -171,6 +173,8 @@ func Get() Config { viper.SetDefault("controller.initial_sleep_duration", 30*time.Second) viper.SetDefault("controller.healthy_snapshot_interval_limit", 12*time.Minute) viper.SetDefault("controller.initialization_timeout_extension", 5*time.Minute) + viper.SetDefault("controller.force_pagination", false) + viper.SetDefault("controller.page_size", 500) viper.SetDefault("healthz_port", 9876) viper.SetDefault("leader_election.enabled", true) diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index 65e0b8e..dff84cb 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -120,8 +120,15 @@ func CollectSingleSnapshot(ctx context.Context, cfg *config.Controller, v version.Interface, ) (*castai.Delta, error) { - f := informers.NewSharedInformerFactory(clientset, 0) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) + tweakListOptions := func(options *metav1.ListOptions) { + if cfg.ForcePagination && options.ResourceVersion == "" { + log.Info("Forcing pagination for the list request", "limit", cfg.PageSize) + options.ResourceVersion = "" + options.Limit = cfg.PageSize + } + } + f := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithTweakListOptions(tweakListOptions)) + df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, metav1.NamespaceAll, tweakListOptions) defaultInformers := getDefaultInformers(f) conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log) @@ -204,8 +211,15 @@ func New( queue := workqueue.NewNamed("castai-agent") defaultResync := 0 * time.Second - f := informers.NewSharedInformerFactory(clientset, defaultResync) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, defaultResync) + tweakListOptions := func(options *metav1.ListOptions) { + if cfg.ForcePagination && options.ResourceVersion == "0" { + log.Info("Forcing pagination for the list request", "limit", cfg.PageSize) + options.ResourceVersion = "" + options.Limit = cfg.PageSize + } + } + f := informers.NewSharedInformerFactoryWithOptions(clientset, defaultResync, informers.WithTweakListOptions(tweakListOptions)) + df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, defaultResync, metav1.NamespaceAll, tweakListOptions) discovery := clientset.Discovery() defaultInformers := getDefaultInformers(f) @@ -940,7 +954,13 @@ func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Control apiType: reflect.TypeOf(&corev1.ConfigMap{}), permissionVerbs: []string{"get", "list", "watch"}, informerFactory: func() cache.SharedIndexInformer { - namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace)) + namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) { + if cfg.ForcePagination && options.ResourceVersion == "" { + logger.Info("Forcing pagination for the list request", "limit", cfg.PageSize) + options.ResourceVersion = "" + options.Limit = cfg.PageSize + } + })) return namespaceScopedInformer.Core().V1().ConfigMaps().Informer() }, }) diff --git a/internal/services/controller/controller_test.go b/internal/services/controller/controller_test.go index f8a8ffa..238491a 100644 --- a/internal/services/controller/controller_test.go +++ b/internal/services/controller/controller_test.go @@ -77,11 +77,18 @@ func TestMain(m *testing.M) { func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) { tests := map[string]struct { expectedReceivedObjectsCount int + paginationEnabled bool + pageSize int64 apiResourceError error }{ "All supported objects are found and received in delta": { expectedReceivedObjectsCount: 27, }, + "All supported objects are found and received in delta with pagination": { + expectedReceivedObjectsCount: 27, + paginationEnabled: true, + pageSize: 5, + }, "when fetching api resources produces multiple errors should exclude those resources": { apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+ "stale GroupVersion discovery: some error,%v: another error", @@ -200,6 +207,18 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) { } provider.EXPECT().FilterSpot(gomock.Any(), []*v1.Node{node}).Return([]*v1.Node{node}, nil) + cfg := &config.Controller{ + Interval: 15 * time.Second, + PrepTimeout: 2 * time.Second, + InitialSleepDuration: 10 * time.Millisecond, + ConfigMapNamespaces: []string{v1.NamespaceDefault}, + } + + if tt.paginationEnabled { + cfg.ForcePagination = tt.paginationEnabled + cfg.PageSize = tt.pageSize + } + ctrl := New( log, clientset, @@ -208,12 +227,7 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) { metricsClient, provider, clusterID.String(), - &config.Controller{ - Interval: 15 * time.Second, - PrepTimeout: 2 * time.Second, - InitialSleepDuration: 10 * time.Millisecond, - ConfigMapNamespaces: []string{v1.NamespaceDefault}, - }, + cfg, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log),