Skip to content

Commit

Permalink
feat: add option to force pagination on informer list (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikenorgate authored Feb 6, 2025
1 parent ab28ff0 commit 605c78c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 11 deletions.
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type Controller struct {
ConfigMapNamespaces []string `mapstructure:"config_map_namespaces"`
RemoveAnnotationsPrefixes []string `mapstructure:"remove_annotations_prefixes"`
AnnotationsMaxLength string `mapstructure:"annotations_max_length"`
ForcePagination bool `mapstructure:"force_pagination"`
PageSize int64 `mapstructure:"page_size"`

// DisabledInformers contains a list of informers to disable,
// for example:
Expand Down Expand Up @@ -171,6 +173,8 @@ func Get() Config {
viper.SetDefault("controller.initial_sleep_duration", 30*time.Second)
viper.SetDefault("controller.healthy_snapshot_interval_limit", 12*time.Minute)
viper.SetDefault("controller.initialization_timeout_extension", 5*time.Minute)
viper.SetDefault("controller.force_pagination", false)
viper.SetDefault("controller.page_size", 500)

viper.SetDefault("healthz_port", 9876)
viper.SetDefault("leader_election.enabled", true)
Expand Down
30 changes: 25 additions & 5 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,15 @@ func CollectSingleSnapshot(ctx context.Context,
cfg *config.Controller,
v version.Interface,
) (*castai.Delta, error) {
f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
tweakListOptions := func(options *metav1.ListOptions) {
if cfg.ForcePagination && options.ResourceVersion == "0" {
log.Info("Forcing pagination for the list request", "limit", cfg.PageSize)
options.ResourceVersion = ""
options.Limit = cfg.PageSize
}
}
f := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithTweakListOptions(tweakListOptions))
df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, metav1.NamespaceAll, tweakListOptions)

defaultInformers := getDefaultInformers(f)
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
Expand Down Expand Up @@ -204,8 +211,15 @@ func New(
queue := workqueue.NewNamed("castai-agent")

defaultResync := 0 * time.Second
f := informers.NewSharedInformerFactory(clientset, defaultResync)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, defaultResync)
tweakListOptions := func(options *metav1.ListOptions) {
if cfg.ForcePagination && options.ResourceVersion == "0" {
log.Info("Forcing pagination for the list request", "limit", cfg.PageSize)
options.ResourceVersion = ""
options.Limit = cfg.PageSize
}
}
f := informers.NewSharedInformerFactoryWithOptions(clientset, defaultResync, informers.WithTweakListOptions(tweakListOptions))
df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, defaultResync, metav1.NamespaceAll, tweakListOptions)
discovery := clientset.Discovery()

defaultInformers := getDefaultInformers(f)
Expand Down Expand Up @@ -940,7 +954,13 @@ func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Control
apiType: reflect.TypeOf(&corev1.ConfigMap{}),
permissionVerbs: []string{"get", "list", "watch"},
informerFactory: func() cache.SharedIndexInformer {
namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace))
namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
if cfg.ForcePagination && options.ResourceVersion == "0" {
logger.Info("Forcing pagination for the list request", "limit", cfg.PageSize)
options.ResourceVersion = ""
options.Limit = cfg.PageSize
}
}))
return namespaceScopedInformer.Core().V1().ConfigMaps().Informer()
},
})
Expand Down
26 changes: 20 additions & 6 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ func TestMain(m *testing.M) {
func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
tests := map[string]struct {
expectedReceivedObjectsCount int
paginationEnabled bool
pageSize int64
apiResourceError error
}{
"All supported objects are found and received in delta": {
expectedReceivedObjectsCount: 27,
},
"All supported objects are found and received in delta with pagination": {
expectedReceivedObjectsCount: 27,
paginationEnabled: true,
pageSize: 5,
},
"when fetching api resources produces multiple errors should exclude those resources": {
apiResourceError: fmt.Errorf("unable to retrieve the complete list of server APIs: %v:"+
"stale GroupVersion discovery: some error,%v: another error",
Expand Down Expand Up @@ -200,6 +207,18 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
}
provider.EXPECT().FilterSpot(gomock.Any(), []*v1.Node{node}).Return([]*v1.Node{node}, nil)

cfg := &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
ConfigMapNamespaces: []string{v1.NamespaceDefault},
}

if tt.paginationEnabled {
cfg.ForcePagination = tt.paginationEnabled
cfg.PageSize = tt.pageSize
}

ctrl := New(
log,
clientset,
Expand All @@ -208,12 +227,7 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
metricsClient,
provider,
clusterID.String(),
&config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
ConfigMapNamespaces: []string{v1.NamespaceDefault},
},
cfg,
version,
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
Expand Down

0 comments on commit 605c78c

Please sign in to comment.