From e0e19e5c7bbc38eec0dfae6594f7f356c2264a66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Mon, 25 Nov 2024 15:13:16 +0000 Subject: [PATCH 1/8] feat: Add first iteration of sources controller --- cmd/main.go | 15 +++ internal/globals/types.go | 5 + internal/sources/controller.go | 196 +++++++++++++++++++++++++++++++++ internal/sources/types.go | 40 +++++++ internal/sources/utils.go | 148 +++++++++++++++++++++++++ 5 files changed, 404 insertions(+) create mode 100644 internal/sources/controller.go create mode 100644 internal/sources/types.go create mode 100644 internal/sources/utils.go diff --git a/cmd/main.go b/cmd/main.go index 3d38d3f..78772fd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "strings" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -44,6 +45,7 @@ import ( "freepik.com/admitik/internal/certificates" "freepik.com/admitik/internal/controller" "freepik.com/admitik/internal/globals" + "freepik.com/admitik/internal/sources" "freepik.com/admitik/internal/xyz" // +kubebuilder:scaffold:imports ) @@ -318,6 +320,19 @@ func main() { os.Exit(1) } + // Init a controller in charge of launching watchers TODO + sourcesController := sources.SourcesController{ + Client: globals.Application.KubeRawClient, + Options: sources.SourcesControllerOptions{ + InformerDurationToResync: 60 * time.Second, + WatchersDurationBetweenReconcileLoops: 10 * time.Second, + WatcherDurationToAck: 2 * time.Second, + }, + } + + setupLog.Info("starting sources controller") + go sourcesController.Start(globals.Application.Context) + // Init primary controller // ATTENTION: This controller may be replaced by a custom one in the future doing the same tasks // to simplify this project's dependencies and maintainability diff --git a/internal/globals/types.go b/internal/globals/types.go index 8668e2e..b0ac49d 100644 --- a/internal/globals/types.go +++ b/internal/globals/types.go @@ -5,6 +5,7 @@ import ( "sync" // + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -33,4 +34,8 @@ type applicationT struct { // ClusterAdmissionPolicyPool ClusterAdmissionPolicyPoolT + + // WatcherPool TODO + //WatcherPool map[ResourceTypeName]ResourceTypeWatcherT + //WatcherPool *sources.WatcherPoolT } diff --git a/internal/sources/controller.go b/internal/sources/controller.go new file mode 100644 index 0000000..ad5612e --- /dev/null +++ b/internal/sources/controller.go @@ -0,0 +1,196 @@ +package sources + +import ( + "context" + "fmt" + "strings" + "time" + + // + + // + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// SourcesControllerOptions TODO +type SourcesControllerOptions struct { + + // Duration to wait until resync all the objects + InformerDurationToResync time.Duration + + // WatchersDurationBetweenReconcileLoops is the duration to wait between the moment + // of launching watchers, and repeating this process (avoid the spam, mate) + WatchersDurationBetweenReconcileLoops time.Duration + + // WatcherDurationToAck is the duration before checking whether a watcher + // is started or not during watchers' reconciling process + WatcherDurationToAck time.Duration +} + +// SourcesControllerOptions represents the controller that triggers parallel watchers. +// These watchers are in charge of maintaining the pool of sources asked by the user in ClusterAdmissionPolicy objects. +// A source group is represented by GVRNN (Group + Version + Resource + Namespace + Name) +type SourcesController struct { + // Kubernetes clients + Client *dynamic.DynamicClient + + // options to modify the behavior of this SourcesController + Options SourcesControllerOptions + + // Carried stuff + WatcherPool WatcherPoolT +} + +// Start launches the SourcesController and keeps it alive +// It kills the controller on application context death, and rerun the process when failed +func (r *SourcesController) Start(ctx context.Context) { + logger := log.FromContext(ctx) + + for { + select { + case <-ctx.Done(): + logger.Info("SourcesController finished by context") + return + default: + logger.Info("Starting SourcesController") + r.reconcileWatchers(ctx) + } + + time.Sleep(2 * time.Second) + } +} + +// reconcileWatchers launches a parallel process that launches +// watchers for resource types defined into the WatcherPool +func (r *SourcesController) reconcileWatchers(ctx context.Context) { + logger := log.FromContext(ctx) + + for resourceType, resourceTypeWatcher := range r.WatcherPool.Pool { + + // TODO: Is this really needed or useful? + // Check the existence of the resourceType into the WatcherPool. + // Remember the controller.ClusterAdmissionPolicyController can remove watchers on garbage collection + if _, resourceTypeFound := r.WatcherPool.Pool[resourceType]; !resourceTypeFound { + continue + } + + // Prevent blocked watchers from being started. + // Remember the controller.ClusterAdmissionPolicyController blocks them during garbage collection + if *resourceTypeWatcher.Blocked { + continue + } + + if !*resourceTypeWatcher.Started { + go r.watchType(ctx, resourceType) + + // Wait for the resourceType watcher to ACK itself into WatcherPool + time.Sleep(r.Options.WatcherDurationToAck) + if *(r.WatcherPool.Pool[resourceType].Started) == false { + logger.Info(fmt.Sprintf("Impossible to start watcher for resource type: %s", resourceType)) + } + } + + // Wait a bit to reduce the spam to machine resources + time.Sleep(r.Options.WatchersDurationBetweenReconcileLoops) + } +} + +// watchType launches a watcher for a certain resource type, and trigger processing for each entering resource event +func (r *SourcesController) watchType(ctx context.Context, watchedType ResourceTypeName) { + + logger := log.FromContext(ctx) + + logger.Info(fmt.Sprintf("Watcher for '%s' has been started", watchedType)) + + // Set ACK flag for watcher launching into the WatcherPool + *(r.WatcherPool.Pool[watchedType].Started) = true + defer func() { + *(r.WatcherPool.Pool[watchedType].Started) = false + }() + + // Extract GVR + Namespace + Name from watched type: + // {group}/{version}/{resource}/{namespace}/{name} + GVRNN := strings.Split(string(watchedType), "/") + if len(GVRNN) != 5 { + logger.Info("Failed to parse GVR from resourceType. Does it look like {group}/{version}/{resource}?") + return + } + resourceGVR := schema.GroupVersionResource{ + Group: GVRNN[0], + Version: GVRNN[1], + Resource: GVRNN[2], + } + + // Include the namespace when defined by the user (used as filter) + namespace := corev1.NamespaceAll + if GVRNN[3] != "" { + namespace = GVRNN[3] + } + + // Include the name when defined by the user (used as filter) + name := GVRNN[4] + + var listOptionsFunc dynamicinformer.TweakListOptionsFunc = func(options *metav1.ListOptions) {} + if name != "" { + listOptionsFunc = func(options *metav1.ListOptions) { + options.FieldSelector = "metadata.name=" + name + } + } + + // Listen to stop signal to kill this watcher just in case it's needed + stopCh := make(chan struct{}) + + go func() { + <-*(r.WatcherPool.Pool[watchedType].StopSignal) + close(stopCh) + logger.Info(fmt.Sprintf("Watcher for resource type '%s' killed by StopSignal", watchedType)) + }() + + // Define our informer + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(r.Client, + r.Options.InformerDurationToResync, namespace, listOptionsFunc) + + // Create an informer. This is a special type of client-go watcher that includes + // mechanisms to hide disconnections, handle reconnections, and cache watched objects + informer := factory.ForResource(resourceGVR).Informer() + + // Register functions to handle different types of events + handlers := cache.ResourceEventHandlerFuncs{ + + AddFunc: func(eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + r.CreateWatcherResource(watchedType, convertedObject) + }, + UpdateFunc: func(_, eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + objectIndex := r.GetWatcherResourceIndex(watchedType, convertedObject) + + if objectIndex > -1 { + r.UpdateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) + } + }, + DeleteFunc: func(eventObject interface{}) { + convertedObject := eventObject.(*unstructured.Unstructured) + objectIndex := r.GetWatcherResourceIndex(watchedType, convertedObject) + + if objectIndex > -1 { + r.DeleteWatcherResourceByIndex(watchedType, objectIndex) + } + }, + } + + _, err := informer.AddEventHandler(handlers) + if err != nil { + logger.Error(err, "Error adding handling functions for events to an informer") + return + } + + informer.Run(stopCh) +} diff --git a/internal/sources/types.go b/internal/sources/types.go new file mode 100644 index 0000000..b85c758 --- /dev/null +++ b/internal/sources/types.go @@ -0,0 +1,40 @@ +package sources + +import ( + "sync" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// TODO +type ResourceTypeName string + +// TODO +type ResourceTypeWatcherT struct { + // Enforce concurrency safety + Mutex *sync.Mutex + + // Started represents a flag to know if the watcher is running + Started *bool + + // Blocked represents a flag to prevent watcher from starting + Blocked *bool + + // StopSignal represents a flag to kill the watcher. + // Watcher will be potentially re-launched by SourcesController + StopSignal *chan bool + + // Dependants represents the amount of policies + // depending on the resources cached by this watcher + //Dependants int + + // + ResourceList *[]*unstructured.Unstructured +} + +type WatcherPoolT struct { + // Enforce concurrency safety + Mutex *sync.Mutex + + Pool map[ResourceTypeName]ResourceTypeWatcherT +} diff --git a/internal/sources/utils.go b/internal/sources/utils.go new file mode 100644 index 0000000..069b1bf --- /dev/null +++ b/internal/sources/utils.go @@ -0,0 +1,148 @@ +package sources + +import ( + "errors" + "sync" + "time" + + // + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + // +) + +// TODO +func (r *SourcesController) StartWatcher(watcherType ResourceTypeName) { + + var initialStartedState bool = false + var initialBlockedState bool = false + var initialResourceListState []*unstructured.Unstructured + + initialStopSignalState := make(chan bool) + initialMutexState := sync.Mutex{} + + r.WatcherPool.Pool[watcherType] = ResourceTypeWatcherT{ + Mutex: &initialMutexState, + + Started: &initialStartedState, + Blocked: &initialBlockedState, + StopSignal: &initialStopSignalState, + + ResourceList: &initialResourceListState, + } +} + +// TODO +func (r *SourcesController) CreateWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) { + + resourceList := r.WatcherPool.Pool[watcherType].ResourceList + + (r.WatcherPool.Pool[watcherType].Mutex).Lock() + defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + + temporaryManifest := (*resource).DeepCopy() + *resourceList = append(*resourceList, temporaryManifest) +} + +// TODO +func (r *SourcesController) GetWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) (result int) { + + resourceList := r.WatcherPool.Pool[watcherType].ResourceList + + for tmpResourceIndex, tmpResource := range *resourceList { + + if (tmpResource.GetName() == resource.GetName()) && + (tmpResource.GetNamespace() == resource.GetNamespace()) { + return tmpResourceIndex + } + } + + return -1 +} + +// TODO +func (r *SourcesController) UpdateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) { + + resourceList := r.WatcherPool.Pool[watcherType].ResourceList + + (r.WatcherPool.Pool[watcherType].Mutex).Lock() + defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + + (*resourceList)[resourceIndex] = resource +} + +// TODO +func (r *SourcesController) DeleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) { + + resourceList := r.WatcherPool.Pool[watcherType].ResourceList + + (r.WatcherPool.Pool[watcherType].Mutex).Lock() + defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + + // Substitute the selected notification object with the last one from the list, + // then replace the whole list with it, minus the last. + *resourceList = append((*resourceList)[:resourceIndex], (*resourceList)[resourceIndex+1:]...) +} + +// DisableWatcherFromWatcherPool disable a watcher from the WatcherPool. +// It first blocks the watcher to prevent it from being started by any controller, +// then blocks the WatcherPool temporary while killing the watcher. +func (r *SourcesController) DisableWatcherFromWatcherPool(watcherType ResourceTypeName) (result bool, err error) { + + //Application.WatcherPool.Pool[watcherType].Mutex.Lock() + + // 1. Prevent watcher from being started again + *r.WatcherPool.Pool[watcherType].Blocked = true + + // 2. Stop the watcher + *r.WatcherPool.Pool[watcherType].StopSignal <- true + + //Application.WatcherPool.Pool[watcherType].Mutex.Unlock() + + // 3. Wait for the watcher to be stopped. Return false on failure + stoppedWatcher := false + for i := 0; i < 10; i++ { + if !*r.WatcherPool.Pool[watcherType].Started { + stoppedWatcher = true + break + } + time.Sleep(1 * time.Second) + } + + if !stoppedWatcher { + return false, errors.New("impossible to stop the watcher") + } + + // 4. Delete the watcher from the WatcherPool.Pool + //Application.WatcherPool.Mutex.Lock() + //delete(Application.WatcherPool.Pool, watcherType) + //Application.WatcherPool.Mutex.Unlock() + + //if _, keyFound := Application.WatcherPool.Pool[watcherType]; keyFound { + // return false, errors.New("impossible to delete the watcherType from WatcherPool") + //} + + return true, nil +} + +// CleanWatcherPool check the WatcherPool looking for empty watchers to trigger their deletion. +// This function is intended to be executed on its own, so returns nothing +// func (r *SourcesController) CleanWatcherPool(ctx context.Context) { +// logger := log.FromContext(ctx) + +// for watcherType, _ := range r.WatcherPool.Pool { + +// if len(*r.WatcherPool.Pool[watcherType].ResourceList) != 0 { +// continue +// } + +// watcherDeleted, err := r.DisableWatcherFromWatcherPool(watcherType) +// if !watcherDeleted { +// logger.WithValues("watcher", watcherType, "error", err). +// Info("watcher was not deleted from WatcherPool") +// continue +// } + +// logger.WithValues("watcher", watcherType). +// Info("watcher has been deleted from WatcherPool") +// } +// } From 2e09ce791e1be1fa410714a9bcd4e714c2d2b0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Mon, 25 Nov 2024 19:34:29 +0000 Subject: [PATCH 2/8] feat: Add second iteration of sources controller --- cmd/main.go | 18 ++++ internal/sources/controller.go | 24 +++-- internal/sources/utils.go | 161 ++++++++++++++++++--------------- 3 files changed, 125 insertions(+), 78 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 78772fd..e415497 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -19,6 +19,7 @@ package main import ( "crypto/tls" "flag" + coreLog "log" "os" "path/filepath" "strings" @@ -333,6 +334,23 @@ func main() { setupLog.Info("starting sources controller") go sourcesController.Start(globals.Application.Context) + go func() { + err = sourcesController.SyncWatchers([]sources.ResourceTypeName{ + "/v1/namespaces//", + "gateway.networking.k8s.io/v1/httproutes//"}) + if err != nil { + coreLog.Printf("error syncing watchers: %s", err.Error()) + } + + time.Sleep(5 * time.Second) + + err = sourcesController.SyncWatchers([]sources.ResourceTypeName{ + "gateway.networking.k8s.io/v1/httproutes//"}) + if err != nil { + coreLog.Printf("error syncing watchers: %s", err.Error()) + } + }() + // Init primary controller // ATTENTION: This controller may be replaced by a custom one in the future doing the same tasks // to simplify this project's dependencies and maintainability diff --git a/internal/sources/controller.go b/internal/sources/controller.go index ad5612e..3cea529 100644 --- a/internal/sources/controller.go +++ b/internal/sources/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "time" // @@ -48,18 +49,27 @@ type SourcesController struct { WatcherPool WatcherPoolT } +// TODO +func (r *SourcesController) init() { + r.WatcherPool = WatcherPoolT{ + Mutex: &sync.Mutex{}, + Pool: map[ResourceTypeName]ResourceTypeWatcherT{}, + } +} + // Start launches the SourcesController and keeps it alive // It kills the controller on application context death, and rerun the process when failed func (r *SourcesController) Start(ctx context.Context) { logger := log.FromContext(ctx) + r.init() + for { select { case <-ctx.Done(): logger.Info("SourcesController finished by context") return default: - logger.Info("Starting SourcesController") r.reconcileWatchers(ctx) } @@ -92,7 +102,7 @@ func (r *SourcesController) reconcileWatchers(ctx context.Context) { // Wait for the resourceType watcher to ACK itself into WatcherPool time.Sleep(r.Options.WatcherDurationToAck) - if *(r.WatcherPool.Pool[resourceType].Started) == false { + if !*(r.WatcherPool.Pool[resourceType].Started) { logger.Info(fmt.Sprintf("Impossible to start watcher for resource type: %s", resourceType)) } } @@ -166,22 +176,22 @@ func (r *SourcesController) watchType(ctx context.Context, watchedType ResourceT AddFunc: func(eventObject interface{}) { convertedObject := eventObject.(*unstructured.Unstructured) - r.CreateWatcherResource(watchedType, convertedObject) + r.createWatcherResource(watchedType, convertedObject) }, UpdateFunc: func(_, eventObject interface{}) { convertedObject := eventObject.(*unstructured.Unstructured) - objectIndex := r.GetWatcherResourceIndex(watchedType, convertedObject) + objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) if objectIndex > -1 { - r.UpdateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) + r.updateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) } }, DeleteFunc: func(eventObject interface{}) { convertedObject := eventObject.(*unstructured.Unstructured) - objectIndex := r.GetWatcherResourceIndex(watchedType, convertedObject) + objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) if objectIndex > -1 { - r.DeleteWatcherResourceByIndex(watchedType, objectIndex) + r.deleteWatcherResourceByIndex(watchedType, objectIndex) } }, } diff --git a/internal/sources/utils.go b/internal/sources/utils.go index 069b1bf..763d793 100644 --- a/internal/sources/utils.go +++ b/internal/sources/utils.go @@ -2,16 +2,20 @@ package sources import ( "errors" + "fmt" + "slices" "sync" "time" - // "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" // ) -// TODO -func (r *SourcesController) StartWatcher(watcherType ResourceTypeName) { +// prepareWatcher scaffolds a new watcher in the WatchedPool. +// This prepares the field for later watchers' reconciliation process. +// That process will create the real Kubernetes informer for this object +// This function is not responsible for blocking the pool before being executed +func (r *SourcesController) prepareWatcher(watcherType ResourceTypeName) { var initialStartedState bool = false var initialBlockedState bool = false @@ -31,8 +35,87 @@ func (r *SourcesController) StartWatcher(watcherType ResourceTypeName) { } } +// disableWatcher disables a watcher from the WatcherPool. +// It first blocks the watcher to prevent it from being started by any controller, +// then, the watcher is stopped and resources are deleted. +// This function is not responsible for blocking the pool before being executed +func (r *SourcesController) disableWatcher(watcherType ResourceTypeName) (result bool) { + + // 1. Prevent watcher from being started again + *r.WatcherPool.Pool[watcherType].Blocked = true + + // 2. Stop the watcher + *r.WatcherPool.Pool[watcherType].StopSignal <- true + + // 3. Wait for the watcher to be stopped. Return false on failure + stoppedWatcher := false + for i := 0; i < 10; i++ { + if !*r.WatcherPool.Pool[watcherType].Started { + stoppedWatcher = true + break + } + time.Sleep(1 * time.Second) + } + + if !stoppedWatcher { + return false + } + + *r.WatcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} + return true +} + +// SyncWatchers ensures the WatcherPool matches the desired state. +// +// Given a list of desired watchers in GVRNN format (Group/Version/Resource/Namespace/Name), +// this function creates missing watchers, ensures active ones are unblocked, and removes +// any watchers that are no longer needed. +func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (err error) { + r.WatcherPool.Mutex.Lock() + defer r.WatcherPool.Mutex.Unlock() + + // 1. Keep existing watchers (or create them) for desired resources types + for _, desiredPoolName := range watcherTypeList { + + // Scaffold new watchers when they does not exist + if _, exists := r.WatcherPool.Pool[desiredPoolName]; !exists { + r.prepareWatcher(desiredPoolName) + continue + } + + // Ensure already existing ones are NOT blocked + r.WatcherPool.Pool[desiredPoolName].Mutex.Lock() + if !*r.WatcherPool.Pool[desiredPoolName].Started { + falseVal := false + *r.WatcherPool.Pool[desiredPoolName].Blocked = falseVal + } + r.WatcherPool.Pool[desiredPoolName].Mutex.Unlock() + } + + // 2. Clean up unneeded watchers + for existingPoolName, existingPool := range r.WatcherPool.Pool { + + if !slices.Contains(watcherTypeList, existingPoolName) { + existingPool.Mutex.Lock() + + watcherDisabled := r.disableWatcher(existingPoolName) + if !watcherDisabled { + err = errors.Join(fmt.Errorf("impossible to disable watcher for: %s", existingPoolName)) + } + + existingPool.Mutex.Unlock() + + // Clean up the watcher from the pool + delete(r.WatcherPool.Pool, existingPoolName) + continue + } + } + + return err +} + // TODO -func (r *SourcesController) CreateWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) { +func (r *SourcesController) createWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) { resourceList := r.WatcherPool.Pool[watcherType].ResourceList @@ -44,7 +127,7 @@ func (r *SourcesController) CreateWatcherResource(watcherType ResourceTypeName, } // TODO -func (r *SourcesController) GetWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) (result int) { +func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) (result int) { resourceList := r.WatcherPool.Pool[watcherType].ResourceList @@ -60,7 +143,7 @@ func (r *SourcesController) GetWatcherResourceIndex(watcherType ResourceTypeName } // TODO -func (r *SourcesController) UpdateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) { +func (r *SourcesController) updateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) { resourceList := r.WatcherPool.Pool[watcherType].ResourceList @@ -71,7 +154,7 @@ func (r *SourcesController) UpdateWatcherResourceByIndex(watcherType ResourceTyp } // TODO -func (r *SourcesController) DeleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) { +func (r *SourcesController) deleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) { resourceList := r.WatcherPool.Pool[watcherType].ResourceList @@ -82,67 +165,3 @@ func (r *SourcesController) DeleteWatcherResourceByIndex(watcherType ResourceTyp // then replace the whole list with it, minus the last. *resourceList = append((*resourceList)[:resourceIndex], (*resourceList)[resourceIndex+1:]...) } - -// DisableWatcherFromWatcherPool disable a watcher from the WatcherPool. -// It first blocks the watcher to prevent it from being started by any controller, -// then blocks the WatcherPool temporary while killing the watcher. -func (r *SourcesController) DisableWatcherFromWatcherPool(watcherType ResourceTypeName) (result bool, err error) { - - //Application.WatcherPool.Pool[watcherType].Mutex.Lock() - - // 1. Prevent watcher from being started again - *r.WatcherPool.Pool[watcherType].Blocked = true - - // 2. Stop the watcher - *r.WatcherPool.Pool[watcherType].StopSignal <- true - - //Application.WatcherPool.Pool[watcherType].Mutex.Unlock() - - // 3. Wait for the watcher to be stopped. Return false on failure - stoppedWatcher := false - for i := 0; i < 10; i++ { - if !*r.WatcherPool.Pool[watcherType].Started { - stoppedWatcher = true - break - } - time.Sleep(1 * time.Second) - } - - if !stoppedWatcher { - return false, errors.New("impossible to stop the watcher") - } - - // 4. Delete the watcher from the WatcherPool.Pool - //Application.WatcherPool.Mutex.Lock() - //delete(Application.WatcherPool.Pool, watcherType) - //Application.WatcherPool.Mutex.Unlock() - - //if _, keyFound := Application.WatcherPool.Pool[watcherType]; keyFound { - // return false, errors.New("impossible to delete the watcherType from WatcherPool") - //} - - return true, nil -} - -// CleanWatcherPool check the WatcherPool looking for empty watchers to trigger their deletion. -// This function is intended to be executed on its own, so returns nothing -// func (r *SourcesController) CleanWatcherPool(ctx context.Context) { -// logger := log.FromContext(ctx) - -// for watcherType, _ := range r.WatcherPool.Pool { - -// if len(*r.WatcherPool.Pool[watcherType].ResourceList) != 0 { -// continue -// } - -// watcherDeleted, err := r.DisableWatcherFromWatcherPool(watcherType) -// if !watcherDeleted { -// logger.WithValues("watcher", watcherType, "error", err). -// Info("watcher was not deleted from WatcherPool") -// continue -// } - -// logger.WithValues("watcher", watcherType). -// Info("watcher has been deleted from WatcherPool") -// } -// } From c04f54effd4e47828b10fecb309190e785397dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Mon, 25 Nov 2024 22:55:11 +0000 Subject: [PATCH 3/8] feat: Add third iteration of sources controller --- internal/sources/controller.go | 35 +++++- internal/sources/types.go | 12 +- internal/sources/utils.go | 196 ++++++++++++++++++++++----------- 3 files changed, 163 insertions(+), 80 deletions(-) diff --git a/internal/sources/controller.go b/internal/sources/controller.go index 3cea529..aa0006b 100644 --- a/internal/sources/controller.go +++ b/internal/sources/controller.go @@ -52,8 +52,8 @@ type SourcesController struct { // TODO func (r *SourcesController) init() { r.WatcherPool = WatcherPoolT{ - Mutex: &sync.Mutex{}, - Pool: map[ResourceTypeName]ResourceTypeWatcherT{}, + Mutex: &sync.RWMutex{}, + Pool: map[ResourceTypeName]*ResourceTypeWatcherT{}, } } @@ -176,14 +176,30 @@ func (r *SourcesController) watchType(ctx context.Context, watchedType ResourceT AddFunc: func(eventObject interface{}) { convertedObject := eventObject.(*unstructured.Unstructured) - r.createWatcherResource(watchedType, convertedObject) + + err := r.createWatcherResource(watchedType, convertedObject) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error creating resource in resource list") + return + } }, UpdateFunc: func(_, eventObject interface{}) { convertedObject := eventObject.(*unstructured.Unstructured) - objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) + objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) if objectIndex > -1 { - r.updateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) + + err := r.updateWatcherResourceByIndex(watchedType, objectIndex, convertedObject) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error updating resource in resource list") + return + } } }, DeleteFunc: func(eventObject interface{}) { @@ -191,7 +207,14 @@ func (r *SourcesController) watchType(ctx context.Context, watchedType ResourceT objectIndex := r.getWatcherResourceIndex(watchedType, convertedObject) if objectIndex > -1 { - r.deleteWatcherResourceByIndex(watchedType, objectIndex) + err := r.deleteWatcherResourceByIndex(watchedType, objectIndex) + if err != nil { + logger.WithValues( + "watcher", watchedType, + "object", convertedObject.GetNamespace()+"/"+convertedObject.GetName(), + ).Error(err, "Error deleting resource from resource list") + return + } } }, } diff --git a/internal/sources/types.go b/internal/sources/types.go index b85c758..8febb3d 100644 --- a/internal/sources/types.go +++ b/internal/sources/types.go @@ -12,7 +12,7 @@ type ResourceTypeName string // TODO type ResourceTypeWatcherT struct { // Enforce concurrency safety - Mutex *sync.Mutex + Mutex *sync.RWMutex // Started represents a flag to know if the watcher is running Started *bool @@ -24,17 +24,13 @@ type ResourceTypeWatcherT struct { // Watcher will be potentially re-launched by SourcesController StopSignal *chan bool - // Dependants represents the amount of policies - // depending on the resources cached by this watcher - //Dependants int - // - ResourceList *[]*unstructured.Unstructured + ResourceList []*unstructured.Unstructured } type WatcherPoolT struct { // Enforce concurrency safety - Mutex *sync.Mutex + Mutex *sync.RWMutex - Pool map[ResourceTypeName]ResourceTypeWatcherT + Pool map[ResourceTypeName]*ResourceTypeWatcherT } diff --git a/internal/sources/utils.go b/internal/sources/utils.go index 763d793..97e1cc3 100644 --- a/internal/sources/utils.go +++ b/internal/sources/utils.go @@ -3,7 +3,6 @@ package sources import ( "errors" "fmt" - "slices" "sync" "time" @@ -16,22 +15,17 @@ import ( // That process will create the real Kubernetes informer for this object // This function is not responsible for blocking the pool before being executed func (r *SourcesController) prepareWatcher(watcherType ResourceTypeName) { - - var initialStartedState bool = false - var initialBlockedState bool = false - var initialResourceListState []*unstructured.Unstructured - - initialStopSignalState := make(chan bool) - initialMutexState := sync.Mutex{} - - r.WatcherPool.Pool[watcherType] = ResourceTypeWatcherT{ - Mutex: &initialMutexState, - - Started: &initialStartedState, - Blocked: &initialBlockedState, - StopSignal: &initialStopSignalState, - - ResourceList: &initialResourceListState, + started := false + blocked := false + stopSignal := make(chan bool) + mutex := &sync.RWMutex{} + + r.WatcherPool.Pool[watcherType] = &ResourceTypeWatcherT{ + Mutex: mutex, + Started: &started, + Blocked: &blocked, + StopSignal: &stopSignal, + ResourceList: make([]*unstructured.Unstructured, 0), } } @@ -61,7 +55,7 @@ func (r *SourcesController) disableWatcher(watcherType ResourceTypeName) (result return false } - *r.WatcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} + r.WatcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} return true } @@ -71,71 +65,115 @@ func (r *SourcesController) disableWatcher(watcherType ResourceTypeName) (result // this function creates missing watchers, ensures active ones are unblocked, and removes // any watchers that are no longer needed. func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (err error) { - r.WatcherPool.Mutex.Lock() - defer r.WatcherPool.Mutex.Unlock() - // 1. Keep existing watchers (or create them) for desired resources types - for _, desiredPoolName := range watcherTypeList { + // 0. Check if WatcherPool is ready to work + if r.WatcherPool.Mutex == nil { + return fmt.Errorf("watcher pool is not ready") + } - // Scaffold new watchers when they does not exist - if _, exists := r.WatcherPool.Pool[desiredPoolName]; !exists { - r.prepareWatcher(desiredPoolName) + // 1. Small conversions to gain performance on huge watchers lists + desiredWatchers := make(map[ResourceTypeName]struct{}, len(watcherTypeList)) + for _, watcherType := range watcherTypeList { + desiredWatchers[watcherType] = struct{}{} + } + + // 2. Keep or create desired watchers + for watcherType := range desiredWatchers { + + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, exists := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() + + if !exists { + // Lock the watcher's mutex for writing + r.WatcherPool.Mutex.Lock() + r.prepareWatcher(watcherType) + r.WatcherPool.Mutex.Unlock() continue } - // Ensure already existing ones are NOT blocked - r.WatcherPool.Pool[desiredPoolName].Mutex.Lock() - if !*r.WatcherPool.Pool[desiredPoolName].Started { - falseVal := false - *r.WatcherPool.Pool[desiredPoolName].Blocked = falseVal + // Ensure the watcher is NOT blocked + watcher.Mutex.Lock() + if !*watcher.Started { + *watcher.Blocked = false } - r.WatcherPool.Pool[desiredPoolName].Mutex.Unlock() + watcher.Mutex.Unlock() } - // 2. Clean up unneeded watchers - for existingPoolName, existingPool := range r.WatcherPool.Pool { + // 3. Clean undesired watchers + r.WatcherPool.Mutex.RLock() + existingWatchers := make([]ResourceTypeName, 0, len(r.WatcherPool.Pool)) + for watcherType := range r.WatcherPool.Pool { + existingWatchers = append(existingWatchers, watcherType) + } + r.WatcherPool.Mutex.RUnlock() + + for _, watcherType := range existingWatchers { + if _, needed := desiredWatchers[watcherType]; !needed { + // Lock WatcherPool to access the watcher + r.WatcherPool.Mutex.RLock() + watcher := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() - if !slices.Contains(watcherTypeList, existingPoolName) { - existingPool.Mutex.Lock() + watcher.Mutex.Lock() + watcherDisabled := r.disableWatcher(watcherType) + watcher.Mutex.Unlock() - watcherDisabled := r.disableWatcher(existingPoolName) if !watcherDisabled { - err = errors.Join(fmt.Errorf("impossible to disable watcher for: %s", existingPoolName)) + err = errors.Join(err, fmt.Errorf("imposible to disable watcher for: %s", watcherType)) } - existingPool.Mutex.Unlock() - - // Clean up the watcher from the pool - delete(r.WatcherPool.Pool, existingPoolName) - continue + // Delete the watcher from the WatcherPool + r.WatcherPool.Mutex.Lock() + delete(r.WatcherPool.Pool, watcherType) + r.WatcherPool.Mutex.Unlock() } } return err } -// TODO -func (r *SourcesController) createWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) { +// createWatcherResource TODO +func (r *SourcesController) createWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) error { + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, exists := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) + } - resourceList := r.WatcherPool.Pool[watcherType].ResourceList + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() - (r.WatcherPool.Pool[watcherType].Mutex).Lock() - defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + temporaryManifest := resource.DeepCopy() + watcher.ResourceList = append(watcher.ResourceList, temporaryManifest) - temporaryManifest := (*resource).DeepCopy() - *resourceList = append(*resourceList, temporaryManifest) + return nil } // TODO -func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) (result int) { - - resourceList := r.WatcherPool.Pool[watcherType].ResourceList +func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) int { + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, exists := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() + + if !exists { + return -1 + } - for tmpResourceIndex, tmpResource := range *resourceList { + // Lock the watcher's mutex for reading + watcher.Mutex.RLock() + defer watcher.Mutex.RUnlock() - if (tmpResource.GetName() == resource.GetName()) && - (tmpResource.GetNamespace() == resource.GetNamespace()) { - return tmpResourceIndex + for index, tmpResource := range watcher.ResourceList { + if tmpResource.GetName() == resource.GetName() && + tmpResource.GetNamespace() == resource.GetNamespace() { + return index } } @@ -143,25 +181,51 @@ func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName } // TODO -func (r *SourcesController) updateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) { +func (r *SourcesController) updateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) error { + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, exists := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found", watcherType) + } + + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() - resourceList := r.WatcherPool.Pool[watcherType].ResourceList + if resourceIndex < 0 || resourceIndex >= len((*watcher).ResourceList) { + return fmt.Errorf("resource index out of bounds") + } - (r.WatcherPool.Pool[watcherType].Mutex).Lock() - defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + ((*watcher).ResourceList)[resourceIndex] = resource - (*resourceList)[resourceIndex] = resource + return nil } // TODO -func (r *SourcesController) deleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) { +func (r *SourcesController) deleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) error { + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, exists := r.WatcherPool.Pool[watcherType] + r.WatcherPool.Mutex.RUnlock() + + if !exists { + return fmt.Errorf("watcher type '%s' not found", watcherType) + } - resourceList := r.WatcherPool.Pool[watcherType].ResourceList + // Lock the watcher's mutex for writing + watcher.Mutex.Lock() + defer watcher.Mutex.Unlock() - (r.WatcherPool.Pool[watcherType].Mutex).Lock() - defer (r.WatcherPool.Pool[watcherType].Mutex).Unlock() + if resourceIndex < 0 || resourceIndex >= len((*watcher).ResourceList) { + return fmt.Errorf("resource index out of bounds") + } // Substitute the selected notification object with the last one from the list, // then replace the whole list with it, minus the last. - *resourceList = append((*resourceList)[:resourceIndex], (*resourceList)[resourceIndex+1:]...) + (*watcher).ResourceList = append(((*watcher).ResourceList)[:resourceIndex], ((*watcher).ResourceList)[resourceIndex+1:]...) + + return nil } From 8614d759ece746bc2df1e04388e342cfa0d8cc56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Tue, 26 Nov 2024 00:26:17 +0000 Subject: [PATCH 4/8] refactor: Rename XYZ to admission --- cmd/main.go | 29 +++-------- .../controller.go} | 22 ++++----- internal/{xyz => admission}/server.go | 2 +- internal/{xyz => admission}/utils.go | 2 +- internal/globals/types.go | 14 +++--- internal/sources/controller.go | 4 +- internal/sources/types.go | 4 +- internal/sources/utils.go | 48 ++++++++++++++----- 8 files changed, 66 insertions(+), 59 deletions(-) rename internal/{xyz/workload_controller.go => admission/controller.go} (78%) rename internal/{xyz => admission}/server.go (99%) rename internal/{xyz => admission}/utils.go (97%) diff --git a/cmd/main.go b/cmd/main.go index e415497..38cb9d0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -19,7 +19,6 @@ package main import ( "crypto/tls" "flag" - coreLog "log" "os" "path/filepath" "strings" @@ -43,11 +42,11 @@ import ( // "freepik.com/admitik/api/v1alpha1" + "freepik.com/admitik/internal/admission" "freepik.com/admitik/internal/certificates" "freepik.com/admitik/internal/controller" "freepik.com/admitik/internal/globals" "freepik.com/admitik/internal/sources" - "freepik.com/admitik/internal/xyz" // +kubebuilder:scaffold:imports ) @@ -332,25 +331,9 @@ func main() { } setupLog.Info("starting sources controller") + globals.Application.SourceController = &sourcesController go sourcesController.Start(globals.Application.Context) - go func() { - err = sourcesController.SyncWatchers([]sources.ResourceTypeName{ - "/v1/namespaces//", - "gateway.networking.k8s.io/v1/httproutes//"}) - if err != nil { - coreLog.Printf("error syncing watchers: %s", err.Error()) - } - - time.Sleep(5 * time.Second) - - err = sourcesController.SyncWatchers([]sources.ResourceTypeName{ - "gateway.networking.k8s.io/v1/httproutes//"}) - if err != nil { - coreLog.Printf("error syncing watchers: %s", err.Error()) - } - }() - // Init primary controller // ATTENTION: This controller may be replaced by a custom one in the future doing the same tasks // to simplify this project's dependencies and maintainability @@ -377,9 +360,9 @@ func main() { } // Init secondary controller to process coming events - workloadController := xyz.WorkloadController{ + admissionController := admission.AdmissionController{ Client: mgr.GetClient(), - Options: xyz.WorkloadControllerOptions{ + Options: admission.AdmissionControllerOptions{ // ServerAddr: "0.0.0.0", @@ -392,8 +375,8 @@ func main() { }, } - setupLog.Info("starting workload controller") - go workloadController.Start(globals.Application.Context) + setupLog.Info("starting admission controller") + go admissionController.Start(globals.Application.Context) // setupLog.Info("starting manager") diff --git a/internal/xyz/workload_controller.go b/internal/admission/controller.go similarity index 78% rename from internal/xyz/workload_controller.go rename to internal/admission/controller.go index ebe6c30..78a0067 100644 --- a/internal/xyz/workload_controller.go +++ b/internal/admission/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package xyz +package admission import ( "context" @@ -32,12 +32,12 @@ import ( const ( // - controllerContextFinishedMessage = "xyz.WorkloadController finished by context" + controllerContextFinishedMessage = "admission.AdmissionController finished by context" ) -// WorkloadControllerOptions represents available options that can be passed -// to WorkloadController on start -type WorkloadControllerOptions struct { +// AdmissionControllerOptions represents available options that can be passed +// to AdmissionController on start +type AdmissionControllerOptions struct { // ServerAddr string ServerPort int @@ -48,19 +48,19 @@ type WorkloadControllerOptions struct { TLSPrivateKey string } -// WorkloadController represents the controller that triggers parallel threads. +// AdmissionController represents the controller that triggers parallel threads. // These threads process coming events against the conditions defined in Notification CRs // Each thread is a watcher in charge of a group of resources GVRNN (Group + Version + Resource + Namespace + Name) -type WorkloadController struct { +type AdmissionController struct { Client client.Client // - Options WorkloadControllerOptions + Options AdmissionControllerOptions } -// Start launches the XYZ.WorkloadController and keeps it alive +// Start launches the XYZ.AdmissionController and keeps it alive // It kills the controller on application context death, and rerun the process when failed -func (r *WorkloadController) Start(ctx context.Context) { +func (r *AdmissionController) Start(ctx context.Context) { logger := log.FromContext(ctx) for { @@ -79,7 +79,7 @@ func (r *WorkloadController) Start(ctx context.Context) { } // runWebserver prepares and runs the HTTP server -func (r *WorkloadController) runWebserver() (err error) { +func (r *AdmissionController) runWebserver() (err error) { customServer := NewHttpServer() diff --git a/internal/xyz/server.go b/internal/admission/server.go similarity index 99% rename from internal/xyz/server.go rename to internal/admission/server.go index 5ca49ae..1ef42b4 100644 --- a/internal/xyz/server.go +++ b/internal/admission/server.go @@ -1,4 +1,4 @@ -package xyz +package admission import ( "context" diff --git a/internal/xyz/utils.go b/internal/admission/utils.go similarity index 97% rename from internal/xyz/utils.go rename to internal/admission/utils.go index f1eb4dc..0c932a2 100644 --- a/internal/xyz/utils.go +++ b/internal/admission/utils.go @@ -1,4 +1,4 @@ -package xyz +package admission import ( "errors" diff --git a/internal/globals/types.go b/internal/globals/types.go index b0ac49d..5907019 100644 --- a/internal/globals/types.go +++ b/internal/globals/types.go @@ -11,6 +11,7 @@ import ( // "freepik.com/admitik/api/v1alpha1" + "freepik.com/admitik/internal/sources" ) // ClusterAdmissionPolicyPoolT represents TODO @@ -26,16 +27,13 @@ type applicationT struct { // Context TODO Context context.Context - // KubeRawClient TODO - KubeRawClient *dynamic.DynamicClient - - // KubeRawCoreClient TODO + // Kubernetes clients + KubeRawClient *dynamic.DynamicClient KubeRawCoreClient *kubernetes.Clientset // - ClusterAdmissionPolicyPool ClusterAdmissionPolicyPoolT + SourceController *sources.SourcesController - // WatcherPool TODO - //WatcherPool map[ResourceTypeName]ResourceTypeWatcherT - //WatcherPool *sources.WatcherPoolT + // + ClusterAdmissionPolicyPool ClusterAdmissionPolicyPoolT } diff --git a/internal/sources/controller.go b/internal/sources/controller.go index aa0006b..c517979 100644 --- a/internal/sources/controller.go +++ b/internal/sources/controller.go @@ -53,7 +53,7 @@ type SourcesController struct { func (r *SourcesController) init() { r.WatcherPool = WatcherPoolT{ Mutex: &sync.RWMutex{}, - Pool: map[ResourceTypeName]*ResourceTypeWatcherT{}, + Pool: map[resourceTypeName]*ResourceTypeWatcherT{}, } } @@ -113,7 +113,7 @@ func (r *SourcesController) reconcileWatchers(ctx context.Context) { } // watchType launches a watcher for a certain resource type, and trigger processing for each entering resource event -func (r *SourcesController) watchType(ctx context.Context, watchedType ResourceTypeName) { +func (r *SourcesController) watchType(ctx context.Context, watchedType resourceTypeName) { logger := log.FromContext(ctx) diff --git a/internal/sources/types.go b/internal/sources/types.go index 8febb3d..6534068 100644 --- a/internal/sources/types.go +++ b/internal/sources/types.go @@ -7,7 +7,7 @@ import ( ) // TODO -type ResourceTypeName string +type resourceTypeName string // TODO type ResourceTypeWatcherT struct { @@ -32,5 +32,5 @@ type WatcherPoolT struct { // Enforce concurrency safety Mutex *sync.RWMutex - Pool map[ResourceTypeName]*ResourceTypeWatcherT + Pool map[resourceTypeName]*ResourceTypeWatcherT } diff --git a/internal/sources/utils.go b/internal/sources/utils.go index 97e1cc3..cb2c3b7 100644 --- a/internal/sources/utils.go +++ b/internal/sources/utils.go @@ -6,15 +6,15 @@ import ( "sync" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" // + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) // prepareWatcher scaffolds a new watcher in the WatchedPool. // This prepares the field for later watchers' reconciliation process. // That process will create the real Kubernetes informer for this object // This function is not responsible for blocking the pool before being executed -func (r *SourcesController) prepareWatcher(watcherType ResourceTypeName) { +func (r *SourcesController) prepareWatcher(watcherType resourceTypeName) { started := false blocked := false stopSignal := make(chan bool) @@ -33,7 +33,7 @@ func (r *SourcesController) prepareWatcher(watcherType ResourceTypeName) { // It first blocks the watcher to prevent it from being started by any controller, // then, the watcher is stopped and resources are deleted. // This function is not responsible for blocking the pool before being executed -func (r *SourcesController) disableWatcher(watcherType ResourceTypeName) (result bool) { +func (r *SourcesController) disableWatcher(watcherType resourceTypeName) (result bool) { // 1. Prevent watcher from being started again *r.WatcherPool.Pool[watcherType].Blocked = true @@ -64,7 +64,7 @@ func (r *SourcesController) disableWatcher(watcherType ResourceTypeName) (result // Given a list of desired watchers in GVRNN format (Group/Version/Resource/Namespace/Name), // this function creates missing watchers, ensures active ones are unblocked, and removes // any watchers that are no longer needed. -func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (err error) { +func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { // 0. Check if WatcherPool is ready to work if r.WatcherPool.Mutex == nil { @@ -72,9 +72,9 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (er } // 1. Small conversions to gain performance on huge watchers lists - desiredWatchers := make(map[ResourceTypeName]struct{}, len(watcherTypeList)) + desiredWatchers := make(map[resourceTypeName]struct{}, len(watcherTypeList)) for _, watcherType := range watcherTypeList { - desiredWatchers[watcherType] = struct{}{} + desiredWatchers[resourceTypeName(watcherType)] = struct{}{} } // 2. Keep or create desired watchers @@ -103,7 +103,7 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (er // 3. Clean undesired watchers r.WatcherPool.Mutex.RLock() - existingWatchers := make([]ResourceTypeName, 0, len(r.WatcherPool.Pool)) + existingWatchers := make([]resourceTypeName, 0, len(r.WatcherPool.Pool)) for watcherType := range r.WatcherPool.Pool { existingWatchers = append(existingWatchers, watcherType) } @@ -134,8 +134,34 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []ResourceTypeName) (er return err } +// GetWatcherResources accept a desired watcher in the GVRNN format (Group/Version/Resource/Namespace/Name) +// and returns a list of resources matching it +func (r *SourcesController) GetWatcherResources(watcherType string) (resources []*unstructured.Unstructured, err error) { + + // 0. Check if WatcherPool is ready to work + if r.WatcherPool.Mutex == nil { + return resources, fmt.Errorf("watcher pool is not ready") + } + + // Lock the WatcherPool mutex for reading + r.WatcherPool.Mutex.RLock() + watcher, watcherTypeFound := r.WatcherPool.Pool[resourceTypeName(watcherType)] + r.WatcherPool.Mutex.RUnlock() + + if !watcherTypeFound { + return nil, fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) + } + + // Lock the watcher's mutex for reading + watcher.Mutex.RLock() + defer watcher.Mutex.RUnlock() + + // Return the pointer to the ResourceList + return watcher.ResourceList, nil +} + // createWatcherResource TODO -func (r *SourcesController) createWatcherResource(watcherType ResourceTypeName, resource *unstructured.Unstructured) error { +func (r *SourcesController) createWatcherResource(watcherType resourceTypeName, resource *unstructured.Unstructured) error { // Lock the WatcherPool mutex for reading r.WatcherPool.Mutex.RLock() watcher, exists := r.WatcherPool.Pool[watcherType] @@ -156,7 +182,7 @@ func (r *SourcesController) createWatcherResource(watcherType ResourceTypeName, } // TODO -func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName, resource *unstructured.Unstructured) int { +func (r *SourcesController) getWatcherResourceIndex(watcherType resourceTypeName, resource *unstructured.Unstructured) int { // Lock the WatcherPool mutex for reading r.WatcherPool.Mutex.RLock() watcher, exists := r.WatcherPool.Pool[watcherType] @@ -181,7 +207,7 @@ func (r *SourcesController) getWatcherResourceIndex(watcherType ResourceTypeName } // TODO -func (r *SourcesController) updateWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int, resource *unstructured.Unstructured) error { +func (r *SourcesController) updateWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int, resource *unstructured.Unstructured) error { // Lock the WatcherPool mutex for reading r.WatcherPool.Mutex.RLock() watcher, exists := r.WatcherPool.Pool[watcherType] @@ -205,7 +231,7 @@ func (r *SourcesController) updateWatcherResourceByIndex(watcherType ResourceTyp } // TODO -func (r *SourcesController) deleteWatcherResourceByIndex(watcherType ResourceTypeName, resourceIndex int) error { +func (r *SourcesController) deleteWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int) error { // Lock the WatcherPool mutex for reading r.WatcherPool.Mutex.RLock() watcher, exists := r.WatcherPool.Pool[watcherType] From be7ef0e41527925b505a2da77fcf8a8f051e39bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Tue, 26 Nov 2024 01:47:29 +0000 Subject: [PATCH 5/8] feat: Get sources from SourcesController instead of K8s --- cmd/main.go | 4 +- internal/admission/server.go | 37 +++------- .../controller/clusteradmissionpolicy_sync.go | 46 +++++++++++- internal/sources/controller.go | 18 ++--- internal/sources/types.go | 4 +- internal/sources/utils.go | 74 +++++++++---------- 6 files changed, 104 insertions(+), 79 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 38cb9d0..b704bc4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -320,7 +320,9 @@ func main() { os.Exit(1) } - // Init a controller in charge of launching watchers TODO + // Init SourcesController. + // This controller is in charge of launching watchers to cache sources expressed in some CRs in background. + // This way we avoid retrieving them from Kubernetes on each request to the Admission/Mutation controllers. sourcesController := sources.SourcesController{ Client: globals.Application.KubeRawClient, Options: sources.SourcesControllerOptions{ diff --git a/internal/admission/server.go b/internal/admission/server.go index 1ef42b4..09289f7 100644 --- a/internal/admission/server.go +++ b/internal/admission/server.go @@ -18,7 +18,6 @@ import ( eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -149,7 +148,7 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R // Retrieve the sources declared per policy for sourceIndex, sourceItem := range caPolicyObj.Spec.Sources { - unstructuredSourceObjList, err := getKubeResourceList(request.Context(), + unstructuredSourceObjList, err := getSourcesFromPool( sourceItem.Group, sourceItem.Version, sourceItem.Resource, @@ -167,8 +166,8 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R tmpSources = make(map[int][]map[string]interface{}) } - for _, unstructuredItem := range unstructuredSourceObjList.Items { - tmpSources[sourceIndex] = append(tmpSources[sourceIndex], unstructuredItem.Object) + for _, unstructuredItem := range unstructuredSourceObjList { + tmpSources[sourceIndex] = append(tmpSources[sourceIndex], (*unstructuredItem).Object) } specificTemplateInjectedObject["sources"] = tmpSources @@ -221,34 +220,16 @@ func (s *HttpServer) handleRequest(response http.ResponseWriter, request *http.R reviewResponse.Response.Result = &metav1.Status{} } -// getKubeResourceList returns an unstructuredList of resources selected by params -func getKubeResourceList(ctx context.Context, group, version, resource, namespace, name string) ( - resourceList *unstructured.UnstructuredList, err error) { +// getSourcesFromPool returns a list of unstructured resources selected by params from the sources cache +func getSourcesFromPool(group, version, resource, namespace, name string) ( + resourceList []*unstructured.Unstructured, err error) { - unstructuredSourceObj := globals.Application.KubeRawClient.Resource(schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: resource, - }) + sourceString := fmt.Sprintf("%s/%s/%s/%s/%s", group, version, resource, namespace, name) - sourceListOptions := metav1.ListOptions{} - - if namespace != "" { - sourceListOptions.FieldSelector = fmt.Sprintf("metadata.namespace=%s", namespace) - } - - if name != "" { - if sourceListOptions.FieldSelector != "" { - sourceListOptions.FieldSelector += "," - } - sourceListOptions.FieldSelector = fmt.Sprintf("metadata.name=%s", name) - } - - resourceList, err = unstructuredSourceObj.List(ctx, sourceListOptions) - return resourceList, err + return globals.Application.SourceController.GetWatcherResources(sourceString) } -// createKubeEvent TODO +// createKubeEvent creates a modern event in Kuvernetes with data given by params func createKubeEvent(ctx context.Context, namespace string, object map[string]interface{}, policy v1alpha1.ClusterAdmissionPolicy, action, message string) (err error) { diff --git a/internal/controller/clusteradmissionpolicy_sync.go b/internal/controller/clusteradmissionpolicy_sync.go index 938567b..6c6b18c 100644 --- a/internal/controller/clusteradmissionpolicy_sync.go +++ b/internal/controller/clusteradmissionpolicy_sync.go @@ -6,6 +6,7 @@ import ( "slices" "strings" + // "freepik.com/admitik/api/v1alpha1" "freepik.com/admitik/internal/globals" @@ -22,7 +23,8 @@ const ( ) var ( - AdmissionOperations = []admissionregv1.OperationType{admissionregv1.Create, admissionregv1.Update, admissionregv1.Delete, admissionregv1.Connect} + AdmissionOperations = []admissionregv1.OperationType{ + admissionregv1.Create, admissionregv1.Update, admissionregv1.Delete, admissionregv1.Connect} // ValidatingWebhookConfigurationRuleScopeAll = admissionregv1.ScopeType("*") @@ -154,7 +156,6 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context tmpWebhookObj.ClientConfig = r.Options.WebhookClientConfig tmpWebhookObj.Rules = currentVwcRules tmpWebhookObj.TimeoutSeconds = &timeoutSecondsConverted - //tmpWebhookObj.MatchConditions = object.Spec.WatchedResources.MatchConditions sideEffectsClass := admissionregv1.SideEffectClass(admissionregv1.SideEffectClassNone) tmpWebhookObj.SideEffects = &sideEffectsClass @@ -179,5 +180,46 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context } } + // Ask SourcesController to watch all the sources + watchersList := r.getAllSourcesReferences() + + err = globals.Application.SourceController.SyncWatchers(watchersList) + if err != nil { + err = fmt.Errorf("error syncing watchers: %s", err.Error()) + return + } + return nil } + +// getAllSourcesReferences TODO +func (r *ClusterAdmissionPolicyReconciler) getAllSourcesReferences() (references []string) { + + globals.Application.ClusterAdmissionPolicyPool.Mutex.Lock() + defer globals.Application.ClusterAdmissionPolicyPool.Mutex.Unlock() + + // + for _, capList := range globals.Application.ClusterAdmissionPolicyPool.Pool { + for _, capObject := range capList { + for _, capObjSource := range capObject.Spec.Sources { + + sourceString := fmt.Sprintf("%s/%s/%s/%s/%s", + capObjSource.Group, + capObjSource.Version, + capObjSource.Resource, + capObjSource.Namespace, + capObjSource.Namespace, + ) + + if slices.Contains(references, sourceString) { + continue + } + + references = append(references, sourceString) + + } + } + } + + return references +} diff --git a/internal/sources/controller.go b/internal/sources/controller.go index c517979..b42a578 100644 --- a/internal/sources/controller.go +++ b/internal/sources/controller.go @@ -46,14 +46,14 @@ type SourcesController struct { Options SourcesControllerOptions // Carried stuff - WatcherPool WatcherPoolT + watcherPool WatcherPoolT } // TODO func (r *SourcesController) init() { - r.WatcherPool = WatcherPoolT{ + r.watcherPool = WatcherPoolT{ Mutex: &sync.RWMutex{}, - Pool: map[resourceTypeName]*ResourceTypeWatcherT{}, + Pool: map[resourceTypeName]*resourceTypeWatcherT{}, } } @@ -82,12 +82,12 @@ func (r *SourcesController) Start(ctx context.Context) { func (r *SourcesController) reconcileWatchers(ctx context.Context) { logger := log.FromContext(ctx) - for resourceType, resourceTypeWatcher := range r.WatcherPool.Pool { + for resourceType, resourceTypeWatcher := range r.watcherPool.Pool { // TODO: Is this really needed or useful? // Check the existence of the resourceType into the WatcherPool. // Remember the controller.ClusterAdmissionPolicyController can remove watchers on garbage collection - if _, resourceTypeFound := r.WatcherPool.Pool[resourceType]; !resourceTypeFound { + if _, resourceTypeFound := r.watcherPool.Pool[resourceType]; !resourceTypeFound { continue } @@ -102,7 +102,7 @@ func (r *SourcesController) reconcileWatchers(ctx context.Context) { // Wait for the resourceType watcher to ACK itself into WatcherPool time.Sleep(r.Options.WatcherDurationToAck) - if !*(r.WatcherPool.Pool[resourceType].Started) { + if !*(r.watcherPool.Pool[resourceType].Started) { logger.Info(fmt.Sprintf("Impossible to start watcher for resource type: %s", resourceType)) } } @@ -120,9 +120,9 @@ func (r *SourcesController) watchType(ctx context.Context, watchedType resourceT logger.Info(fmt.Sprintf("Watcher for '%s' has been started", watchedType)) // Set ACK flag for watcher launching into the WatcherPool - *(r.WatcherPool.Pool[watchedType].Started) = true + *(r.watcherPool.Pool[watchedType].Started) = true defer func() { - *(r.WatcherPool.Pool[watchedType].Started) = false + *(r.watcherPool.Pool[watchedType].Started) = false }() // Extract GVR + Namespace + Name from watched type: @@ -158,7 +158,7 @@ func (r *SourcesController) watchType(ctx context.Context, watchedType resourceT stopCh := make(chan struct{}) go func() { - <-*(r.WatcherPool.Pool[watchedType].StopSignal) + <-*(r.watcherPool.Pool[watchedType].StopSignal) close(stopCh) logger.Info(fmt.Sprintf("Watcher for resource type '%s' killed by StopSignal", watchedType)) }() diff --git a/internal/sources/types.go b/internal/sources/types.go index 6534068..4593fd7 100644 --- a/internal/sources/types.go +++ b/internal/sources/types.go @@ -10,7 +10,7 @@ import ( type resourceTypeName string // TODO -type ResourceTypeWatcherT struct { +type resourceTypeWatcherT struct { // Enforce concurrency safety Mutex *sync.RWMutex @@ -32,5 +32,5 @@ type WatcherPoolT struct { // Enforce concurrency safety Mutex *sync.RWMutex - Pool map[resourceTypeName]*ResourceTypeWatcherT + Pool map[resourceTypeName]*resourceTypeWatcherT } diff --git a/internal/sources/utils.go b/internal/sources/utils.go index cb2c3b7..1963e0d 100644 --- a/internal/sources/utils.go +++ b/internal/sources/utils.go @@ -20,7 +20,7 @@ func (r *SourcesController) prepareWatcher(watcherType resourceTypeName) { stopSignal := make(chan bool) mutex := &sync.RWMutex{} - r.WatcherPool.Pool[watcherType] = &ResourceTypeWatcherT{ + r.watcherPool.Pool[watcherType] = &resourceTypeWatcherT{ Mutex: mutex, Started: &started, Blocked: &blocked, @@ -36,15 +36,15 @@ func (r *SourcesController) prepareWatcher(watcherType resourceTypeName) { func (r *SourcesController) disableWatcher(watcherType resourceTypeName) (result bool) { // 1. Prevent watcher from being started again - *r.WatcherPool.Pool[watcherType].Blocked = true + *r.watcherPool.Pool[watcherType].Blocked = true // 2. Stop the watcher - *r.WatcherPool.Pool[watcherType].StopSignal <- true + *r.watcherPool.Pool[watcherType].StopSignal <- true // 3. Wait for the watcher to be stopped. Return false on failure stoppedWatcher := false for i := 0; i < 10; i++ { - if !*r.WatcherPool.Pool[watcherType].Started { + if !*r.watcherPool.Pool[watcherType].Started { stoppedWatcher = true break } @@ -55,7 +55,7 @@ func (r *SourcesController) disableWatcher(watcherType resourceTypeName) (result return false } - r.WatcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} + r.watcherPool.Pool[watcherType].ResourceList = []*unstructured.Unstructured{} return true } @@ -67,7 +67,7 @@ func (r *SourcesController) disableWatcher(watcherType resourceTypeName) (result func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { // 0. Check if WatcherPool is ready to work - if r.WatcherPool.Mutex == nil { + if r.watcherPool.Mutex == nil { return fmt.Errorf("watcher pool is not ready") } @@ -81,15 +81,15 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { for watcherType := range desiredWatchers { // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, exists := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() if !exists { // Lock the watcher's mutex for writing - r.WatcherPool.Mutex.Lock() + r.watcherPool.Mutex.Lock() r.prepareWatcher(watcherType) - r.WatcherPool.Mutex.Unlock() + r.watcherPool.Mutex.Unlock() continue } @@ -102,19 +102,19 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { } // 3. Clean undesired watchers - r.WatcherPool.Mutex.RLock() - existingWatchers := make([]resourceTypeName, 0, len(r.WatcherPool.Pool)) - for watcherType := range r.WatcherPool.Pool { + r.watcherPool.Mutex.RLock() + existingWatchers := make([]resourceTypeName, 0, len(r.watcherPool.Pool)) + for watcherType := range r.watcherPool.Pool { existingWatchers = append(existingWatchers, watcherType) } - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RUnlock() for _, watcherType := range existingWatchers { if _, needed := desiredWatchers[watcherType]; !needed { // Lock WatcherPool to access the watcher - r.WatcherPool.Mutex.RLock() - watcher := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() watcher.Mutex.Lock() watcherDisabled := r.disableWatcher(watcherType) @@ -125,9 +125,9 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { } // Delete the watcher from the WatcherPool - r.WatcherPool.Mutex.Lock() - delete(r.WatcherPool.Pool, watcherType) - r.WatcherPool.Mutex.Unlock() + r.watcherPool.Mutex.Lock() + delete(r.watcherPool.Pool, watcherType) + r.watcherPool.Mutex.Unlock() } } @@ -139,14 +139,14 @@ func (r *SourcesController) SyncWatchers(watcherTypeList []string) (err error) { func (r *SourcesController) GetWatcherResources(watcherType string) (resources []*unstructured.Unstructured, err error) { // 0. Check if WatcherPool is ready to work - if r.WatcherPool.Mutex == nil { + if r.watcherPool.Mutex == nil { return resources, fmt.Errorf("watcher pool is not ready") } // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, watcherTypeFound := r.WatcherPool.Pool[resourceTypeName(watcherType)] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, watcherTypeFound := r.watcherPool.Pool[resourceTypeName(watcherType)] + r.watcherPool.Mutex.RUnlock() if !watcherTypeFound { return nil, fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) @@ -163,9 +163,9 @@ func (r *SourcesController) GetWatcherResources(watcherType string) (resources [ // createWatcherResource TODO func (r *SourcesController) createWatcherResource(watcherType resourceTypeName, resource *unstructured.Unstructured) error { // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, exists := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() if !exists { return fmt.Errorf("watcher type '%s' not found. Is the watcher created?", watcherType) @@ -184,9 +184,9 @@ func (r *SourcesController) createWatcherResource(watcherType resourceTypeName, // TODO func (r *SourcesController) getWatcherResourceIndex(watcherType resourceTypeName, resource *unstructured.Unstructured) int { // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, exists := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() if !exists { return -1 @@ -209,9 +209,9 @@ func (r *SourcesController) getWatcherResourceIndex(watcherType resourceTypeName // TODO func (r *SourcesController) updateWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int, resource *unstructured.Unstructured) error { // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, exists := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() if !exists { return fmt.Errorf("watcher type '%s' not found", watcherType) @@ -233,9 +233,9 @@ func (r *SourcesController) updateWatcherResourceByIndex(watcherType resourceTyp // TODO func (r *SourcesController) deleteWatcherResourceByIndex(watcherType resourceTypeName, resourceIndex int) error { // Lock the WatcherPool mutex for reading - r.WatcherPool.Mutex.RLock() - watcher, exists := r.WatcherPool.Pool[watcherType] - r.WatcherPool.Mutex.RUnlock() + r.watcherPool.Mutex.RLock() + watcher, exists := r.watcherPool.Pool[watcherType] + r.watcherPool.Mutex.RUnlock() if !exists { return fmt.Errorf("watcher type '%s' not found", watcherType) From 7e53c0961f56d06511ba73fb767f76887f635bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Tue, 26 Nov 2024 01:47:29 +0000 Subject: [PATCH 6/8] feat: Get sources from SourcesController instead of K8s --- internal/admission/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/admission/controller.go b/internal/admission/controller.go index 78a0067..6c6c782 100644 --- a/internal/admission/controller.go +++ b/internal/admission/controller.go @@ -58,7 +58,7 @@ type AdmissionController struct { Options AdmissionControllerOptions } -// Start launches the XYZ.AdmissionController and keeps it alive +// Start launches the AdmissionController and keeps it alive // It kills the controller on application context death, and rerun the process when failed func (r *AdmissionController) Start(ctx context.Context) { logger := log.FromContext(ctx) From 7bfbab4a418125880bc214db769c03307eb5f85c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Tue, 26 Nov 2024 09:47:06 +0000 Subject: [PATCH 7/8] refactor: Decouple VWC creation from Syncing process in main controller --- internal/admission/server.go | 18 ++++ internal/admission/utils.go | 16 ++++ internal/certificates/certificates.go | 16 ++++ .../clusteradmissionpolicy_status.go | 16 ++++ .../controller/clusteradmissionpolicy_sync.go | 91 +++++++++++++------ internal/controller/commons.go | 16 ++++ internal/globals/conditions.go | 16 ++++ internal/globals/globals.go | 16 ++++ internal/globals/pools.go | 16 ++++ internal/globals/types.go | 17 +++- internal/globals/utils.go | 18 +++- internal/sources/controller.go | 16 ++++ internal/sources/types.go | 16 ++++ internal/sources/utils.go | 16 ++++ internal/template/functions.go | 16 ++++ 15 files changed, 290 insertions(+), 30 deletions(-) diff --git a/internal/admission/server.go b/internal/admission/server.go index 09289f7..6e9e7ae 100644 --- a/internal/admission/server.go +++ b/internal/admission/server.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package admission import ( @@ -9,10 +25,12 @@ import ( "slices" "time" + // "freepik.com/admitik/api/v1alpha1" "freepik.com/admitik/internal/globals" "freepik.com/admitik/internal/template" + // admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" diff --git a/internal/admission/utils.go b/internal/admission/utils.go index 0c932a2..4c9907e 100644 --- a/internal/admission/utils.go +++ b/internal/admission/utils.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package admission import ( diff --git a/internal/certificates/certificates.go b/internal/certificates/certificates.go index d53c55e..f70eda3 100644 --- a/internal/certificates/certificates.go +++ b/internal/certificates/certificates.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package certificates import ( diff --git a/internal/controller/clusteradmissionpolicy_status.go b/internal/controller/clusteradmissionpolicy_status.go index e62bd68..01e82dd 100644 --- a/internal/controller/clusteradmissionpolicy_status.go +++ b/internal/controller/clusteradmissionpolicy_status.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controller import ( diff --git a/internal/controller/clusteradmissionpolicy_sync.go b/internal/controller/clusteradmissionpolicy_sync.go index 6c6b18c..a9b5fd5 100644 --- a/internal/controller/clusteradmissionpolicy_sync.go +++ b/internal/controller/clusteradmissionpolicy_sync.go @@ -1,3 +1,21 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: Decouple this controller from 'globals' package + package controller import ( @@ -110,6 +128,48 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context } } + // Craft ValidatingWebhookConfiguration rules based on the previous existing one and current pool keys + metaWebhookObj, err := r.getMergedValidatingWebhookConfiguration(ctx) + if err != nil { + err = fmt.Errorf("error building ValidatingWebhookConfiguration '%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + + // Sync changes to Kubernetes + if errors.IsNotFound(err) { + err = r.Create(ctx, &metaWebhookObj) + if err != nil { + err = fmt.Errorf("error creating ValidatingWebhookConfiguration in Kubernetes'%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + } else { + err = r.Update(ctx, &metaWebhookObj) + if err != nil { + err = fmt.Errorf("error updating ValidatingWebhookConfiguration in Kubernetes '%s': %s", + ValidatingWebhookConfigurationName, err.Error()) + return + } + } + + // Ask SourcesController to watch all the sources + watchersList := r.getAllSourcesReferences() + + err = globals.Application.SourceController.SyncWatchers(watchersList) + if err != nil { + err = fmt.Errorf("error syncing watchers: %s", err.Error()) + return + } + + return nil +} + +// getValidatingWebhookConfiguration return a ValidatingWebhookConfiguration object that is built based on +// previous existing one in Kubernetes and the current pool keys extracted from ClusterAdmissionPolicy.spec.watchedResources +func (r *ClusterAdmissionPolicyReconciler) getMergedValidatingWebhookConfiguration(ctx context.Context) ( + vwConfig admissionregv1.ValidatingWebhookConfiguration, err error) { + // Craft ValidatingWebhookConfiguration rules based on the pool keys currentVwcRules := []admissionregv1.RuleWithOperations{} for resourcePattern, _ := range globals.Application.ClusterAdmissionPolicyPool.Pool { @@ -163,36 +223,11 @@ func (r *ClusterAdmissionPolicyReconciler) SyncAdmissionPool(ctx context.Context // Replace the webhooks section in the ValidatingWebhookConfiguration metaWebhookObj.Webhooks = []admissionregv1.ValidatingWebhook{tmpWebhookObj} - // Sync changes to Kubernetes - if errors.IsNotFound(err) { - err = r.Create(ctx, &metaWebhookObj) - if err != nil { - err = fmt.Errorf("error creating ValidatingWebhookConfiguration '%s': %s", - ValidatingWebhookConfigurationName, err.Error()) - return - } - } else { - err = r.Update(ctx, &metaWebhookObj) - if err != nil { - err = fmt.Errorf("error updating ValidatingWebhookConfiguration '%s': %s", - ValidatingWebhookConfigurationName, err.Error()) - return - } - } - - // Ask SourcesController to watch all the sources - watchersList := r.getAllSourcesReferences() - - err = globals.Application.SourceController.SyncWatchers(watchersList) - if err != nil { - err = fmt.Errorf("error syncing watchers: %s", err.Error()) - return - } - - return nil + return metaWebhookObj, nil } -// getAllSourcesReferences TODO +// getAllSourcesReferences iterates over all the ClusterAdmissionPolicy objects and +// create a list with all the sources in the format GVRNN (Group/Version/Resource/Namespace/Name) func (r *ClusterAdmissionPolicyReconciler) getAllSourcesReferences() (references []string) { globals.Application.ClusterAdmissionPolicyPool.Mutex.Lock() diff --git a/internal/controller/commons.go b/internal/controller/commons.go index b4a2cbc..4128d50 100644 --- a/internal/controller/commons.go +++ b/internal/controller/commons.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controller import ( diff --git a/internal/globals/conditions.go b/internal/globals/conditions.go index acbcf8c..d95d724 100644 --- a/internal/globals/conditions.go +++ b/internal/globals/conditions.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( diff --git a/internal/globals/globals.go b/internal/globals/globals.go index cf607af..96711d7 100644 --- a/internal/globals/globals.go +++ b/internal/globals/globals.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( diff --git a/internal/globals/pools.go b/internal/globals/pools.go index 74d3009..741d8b3 100644 --- a/internal/globals/pools.go +++ b/internal/globals/pools.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import "freepik.com/admitik/api/v1alpha1" diff --git a/internal/globals/types.go b/internal/globals/types.go index 5907019..c412ed7 100644 --- a/internal/globals/types.go +++ b/internal/globals/types.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( @@ -5,7 +21,6 @@ import ( "sync" // - "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" diff --git a/internal/globals/utils.go b/internal/globals/utils.go index 35f6c2a..725f6fa 100644 --- a/internal/globals/utils.go +++ b/internal/globals/utils.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package globals import ( @@ -5,12 +21,12 @@ import ( // "os" + // "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ctrl "sigs.k8s.io/controller-runtime" - // ) // NewKubernetesClient return a new Kubernetes Dynamic client from client-go SDK diff --git a/internal/sources/controller.go b/internal/sources/controller.go index b42a578..d7c4f75 100644 --- a/internal/sources/controller.go +++ b/internal/sources/controller.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sources import ( diff --git a/internal/sources/types.go b/internal/sources/types.go index 4593fd7..94511af 100644 --- a/internal/sources/types.go +++ b/internal/sources/types.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sources import ( diff --git a/internal/sources/utils.go b/internal/sources/utils.go index 1963e0d..fa718bc 100644 --- a/internal/sources/utils.go +++ b/internal/sources/utils.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sources import ( diff --git a/internal/template/functions.go b/internal/template/functions.go index badc151..20a0038 100644 --- a/internal/template/functions.go +++ b/internal/template/functions.go @@ -1,3 +1,19 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package template import ( From 569b02a5704da4afdc30d2b833b1e62d93b27a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alby=20Hern=C3=A1ndez?= Date: Tue, 26 Nov 2024 10:15:27 +0000 Subject: [PATCH 8/8] feat: Add flags for sourcesController params; docs: Include new flags in README --- README.md | 33 ++++++++++++++++++--------------- cmd/main.go | 17 ++++++++++++++--- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index abc6cf5..71b81ef 100644 --- a/README.md +++ b/README.md @@ -62,21 +62,24 @@ resources: Some configuration parameters can be defined by flags that can be passed to the controller. They are described in the following table: -| Name | Description | Default | -|:-------------------------------|:-------------------------------------------------------------------------------|:----------------------:| -| `--metrics-bind-address` | The address the metric endpoint binds to.
0 disables the server | `0` | -| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` | -| `--leader-elect` | Enable leader election for controller manager | `false` | -| `--metrics-secure` | If set the metrics endpoint is served securely | `false` | -| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` | -| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` | -| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` | -| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` | -| `--webhook-server-port` | The port where the webhooks server listens | `10250` | -| `--webhook-server-path` | The path where the webhooks server listens | `/validate` | -| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` | -| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` | -| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` | +| Name | Description | Default | +|:---------------------------------------|:-------------------------------------------------------------------------------|:----------------------:| +| `--metrics-bind-address` | The address the metric endpoint binds to.
0 disables the server | `0` | +| `--health-probe-bind-address` | he address the probe endpoint binds to | `:8081` | +| `--leader-elect` | Enable leader election for controller manager | `false` | +| `--metrics-secure` | If set the metrics endpoint is served securely | `false` | +| `--enable-http2` | If set, HTTP/2 will be enabled for the metrirs | `false` | +| `--sources-time-to-resync-informers` | Interval to resynchronize all resources in the informers | `60s` | +| `--sources-time-to-reconcile-watchers` | Time between each reconciliation loop of the watchers | `10s` | +| `--sources-time-to-ack-watcher` | Wait time before marking a watcher as acknowledged (ACK) after it starts | `2s` | +| `--webhook-client-hostname` | The hostname used by Kubernetes when calling the webhooks server | `webhooks.admitik.svc` | +| `--webhook-client-port` | The port used by Kubernetes when calling the webhooks server | `10250` | +| `--webhook-client-timeout` | The seconds until timout waited by Kubernetes when calling the webhooks server | `10` | +| `--webhook-server-port` | The port where the webhooks server listens | `10250` | +| `--webhook-server-path` | The path where the webhooks server listens | `/validate` | +| `--webhook-server-ca` | The CA bundle to use for the webhooks server | `-` | +| `--webhook-server-certificate` | The Certificate used by webhooks server | `-` | +| `--webhook-server-private-key` | The Private Key used by webhooks server | `-` | ## Examples diff --git a/cmd/main.go b/cmd/main.go index b704bc4..fae924a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -72,6 +72,10 @@ func main() { var enableHTTP2 bool // Custom flags from here + var sourcesTimeToResyncInformers time.Duration + var sourcesTimeToReconcileWatchers time.Duration + var sourcesTimeToAckWatcher time.Duration + var webhooksClientHostname string var webhooksClientPort int var webhooksClientTimeout int @@ -97,6 +101,13 @@ func main() { "If set, HTTP/2 will be enabled for the metrics and webhook servers") // Custom flags from here + flag.DurationVar(&sourcesTimeToResyncInformers, "sources-time-to-resync-informers", 60*time.Second, + "Interval to resynchronize all resources in the informers") + flag.DurationVar(&sourcesTimeToReconcileWatchers, "sources-time-to-reconcile-watchers", 10*time.Second, + "Time between each reconciliation loop of the watchers") + flag.DurationVar(&sourcesTimeToAckWatcher, "sources-time-to-ack-watcher", 2*time.Second, + "Wait time before marking a watcher as acknowledged (ACK) after it starts") + flag.StringVar(&webhooksClientHostname, "webhook-client-hostname", "webhooks.admitik.svc", "The hostname used by Kubernetes when calling the webhooks server") flag.IntVar(&webhooksClientPort, "webhook-client-port", 10250, @@ -326,9 +337,9 @@ func main() { sourcesController := sources.SourcesController{ Client: globals.Application.KubeRawClient, Options: sources.SourcesControllerOptions{ - InformerDurationToResync: 60 * time.Second, - WatchersDurationBetweenReconcileLoops: 10 * time.Second, - WatcherDurationToAck: 2 * time.Second, + InformerDurationToResync: sourcesTimeToResyncInformers, + WatchersDurationBetweenReconcileLoops: sourcesTimeToReconcileWatchers, + WatcherDurationToAck: sourcesTimeToAckWatcher, }, }