From 663dc82574ac8233f9b31b7a0323e0df90c092c5 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Thu, 11 Oct 2018 20:51:12 +0300 Subject: [PATCH] Controller refactoring part two - share components between loops --- Makefile | 2 +- cmd/flagger/main.go | 10 +-- pkg/controller/controller.go | 62 ++++++++++----- pkg/controller/deployer.go | 19 +++-- pkg/controller/scheduler.go | 149 +++++++++++++---------------------- 5 files changed, 112 insertions(+), 130 deletions(-) diff --git a/Makefile b/Makefile index fbfbe2ab6..ddaba5180 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ VERSION_MINOR:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4 PATCH:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4 }' | tr -d '"' | awk -F. '{print $$NF}') SOURCE_DIRS = cmd pkg/apis pkg/controller pkg/server pkg/logging pkg/version run: - go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com + go run -race cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com build: docker build -t stefanprodan/flagger:$(TAG) . -f Dockerfile diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index 189e41731..fdfacc8c9 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -64,13 +64,13 @@ func main() { logger.Fatalf("Error building shared clientset: %v", err) } - rolloutClient, err := clientset.NewForConfig(cfg) + flaggerClient, err := clientset.NewForConfig(cfg) if err != nil { logger.Fatalf("Error building example clientset: %s", err.Error()) } - canaryInformerFactory := informers.NewSharedInformerFactory(rolloutClient, time.Second*30) - canaryInformer := canaryInformerFactory.Flagger().V1alpha1().Canaries() + flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, time.Second*30) + canaryInformer := flaggerInformerFactory.Flagger().V1alpha1().Canaries() logger.Infof("Starting flagger version %s revision %s", version.VERSION, version.REVISION) @@ -94,14 +94,14 @@ func main() { c := controller.NewController( kubeClient, sharedClient, - rolloutClient, + flaggerClient, canaryInformer, controlLoopInterval, metricsServer, logger, ) - canaryInformerFactory.Start(stopCh) + flaggerInformerFactory.Start(stopCh) logger.Info("Waiting for informer caches to sync") for _, synced := range []cache.InformerSynced{ diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 460b7f405..6a0c7c1aa 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -30,23 +30,25 @@ const controllerAgentName = "flagger" type Controller struct { kubeClient kubernetes.Interface istioClient istioclientset.Interface - rolloutClient clientset.Interface - rolloutLister flaggerlisters.CanaryLister - rolloutSynced cache.InformerSynced - rolloutWindow time.Duration + flaggerClient clientset.Interface + flaggerLister flaggerlisters.CanaryLister + flaggerSynced cache.InformerSynced + flaggerWindow time.Duration workqueue workqueue.RateLimitingInterface recorder record.EventRecorder logger *zap.SugaredLogger - metricsServer string - rollouts *sync.Map + canaries *sync.Map + deployer CanaryDeployer + router CanaryRouter + observer CanaryObserver } func NewController( kubeClient kubernetes.Interface, istioClient istioclientset.Interface, - rolloutClient clientset.Interface, - rolloutInformer flaggerinformers.CanaryInformer, - rolloutWindow time.Duration, + flaggerClient clientset.Interface, + flaggerInformer flaggerinformers.CanaryInformer, + flaggerWindow time.Duration, metricServer string, logger *zap.SugaredLogger, @@ -61,21 +63,41 @@ func NewController( recorder := eventBroadcaster.NewRecorder( scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + deployer := CanaryDeployer{ + logger: logger, + kubeClient: kubeClient, + istioClient: istioClient, + flaggerClient: flaggerClient, + } + + router := CanaryRouter{ + logger: logger, + kubeClient: kubeClient, + istioClient: istioClient, + flaggerClient: flaggerClient, + } + + observer := CanaryObserver{ + metricsServer: metricServer, + } + ctrl := &Controller{ kubeClient: kubeClient, istioClient: istioClient, - rolloutClient: rolloutClient, - rolloutLister: rolloutInformer.Lister(), - rolloutSynced: rolloutInformer.Informer().HasSynced, + flaggerClient: flaggerClient, + flaggerLister: flaggerInformer.Lister(), + flaggerSynced: flaggerInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), recorder: recorder, logger: logger, - rollouts: new(sync.Map), - metricsServer: metricServer, - rolloutWindow: rolloutWindow, + canaries: new(sync.Map), + flaggerWindow: flaggerWindow, + deployer: deployer, + router: router, + observer: observer, } - rolloutInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ctrl.enqueue, UpdateFunc: func(old, new interface{}) { oldRoll, ok := checkCustomResourceType(old, logger) @@ -96,7 +118,7 @@ func NewController( r, ok := checkCustomResourceType(old, logger) if ok { ctrl.logger.Infof("Deleting %s.%s from cache", r.Name, r.Namespace) - ctrl.rollouts.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace)) + ctrl.canaries.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace)) } }, }) @@ -119,7 +141,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { c.logger.Info("Started operator workers") - tickChan := time.NewTicker(c.rolloutWindow).C + tickChan := time.NewTicker(c.flaggerWindow).C for { select { case <-tickChan: @@ -174,13 +196,13 @@ func (c *Controller) syncHandler(key string) error { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - cd, err := c.rolloutLister.Canaries(namespace).Get(name) + cd, err := c.flaggerLister.Canaries(namespace).Get(name) if errors.IsNotFound(err) { utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", key)) return nil } - c.rollouts.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd) + c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd) //if cd.Spec.TargetRef.Kind == "Deployment" { // err = c.bootstrapDeployment(cd) diff --git a/pkg/controller/deployer.go b/pkg/controller/deployer.go index 3adbd2c14..5b0188f8a 100644 --- a/pkg/controller/deployer.go +++ b/pkg/controller/deployer.go @@ -57,9 +57,9 @@ func (c *CanaryDeployer) Promote(cd *flaggerv1.Canary) error { return nil } -// IsDeploymentHealthy checks the primary and canary deployment status and returns an error if -// the deployment is in the middle of a rolling update or if the pods are unhealthy -func (c *CanaryDeployer) IsDeploymentHealthy(cd *flaggerv1.Canary) error { +// IsReady checks the primary and canary deployment status and returns an error if +// the deployments are in the middle of a rolling update or if the pods are unhealthy +func (c *CanaryDeployer) IsReady(cd *flaggerv1.Canary) error { canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(cd.Spec.TargetRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { @@ -182,25 +182,28 @@ func (c *CanaryDeployer) Scale(cd *flaggerv1.Canary, replicas int32) error { canary, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary) if err != nil { return fmt.Errorf("scaling %s.%s to %v failed: %v", canary.GetName(), canary.Namespace, replicas, err) - } - return nil } // Sync creates the primary deployment and hpa +// and scales to zero the canary deployment func (c *CanaryDeployer) Sync(cd *flaggerv1.Canary) error { + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) if err := c.createPrimaryDeployment(cd); err != nil { - return err + return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) } if cd.Status.State == "" { - c.Scale(cd, 0) + c.logger.Infof("Scaling down %s.%s", primaryName, cd.Namespace) + if err := c.Scale(cd, 0); err != nil { + return err + } } if cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" { if err := c.createPrimaryHpa(cd); err != nil { - return err + return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err) } } return nil diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index afd128d1a..f811f94c9 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -9,7 +9,7 @@ import ( ) func (c *Controller) scheduleCanaries() { - c.rollouts.Range(func(key interface{}, value interface{}) bool { + c.canaries.Range(func(key interface{}, value interface{}) bool { r := value.(*flaggerv1.Canary) if r.Spec.TargetRef.Kind == "Deployment" { go c.advanceCanary(r.Name, r.Namespace) @@ -19,93 +19,74 @@ func (c *Controller) scheduleCanaries() { } func (c *Controller) advanceCanary(name string, namespace string) { - // check if the rollout exists - r, err := c.rolloutClient.FlaggerV1alpha1().Canaries(namespace).Get(name, v1.GetOptions{}) + // check if the canary exists + cd, err := c.flaggerClient.FlaggerV1alpha1().Canaries(namespace).Get(name, v1.GetOptions{}) if err != nil { c.logger.Errorf("Canary %s.%s not found", name, namespace) return } - deployer := CanaryDeployer{ - logger: c.logger, - kubeClient: c.kubeClient, - istioClient: c.istioClient, - flaggerClient: c.rolloutClient, - } - // create primary deployment and hpa if needed - err = deployer.Sync(r) - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.deployer.Sync(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) return } - router := CanaryRouter{ - logger: c.logger, - kubeClient: c.kubeClient, - istioClient: c.istioClient, - flaggerClient: c.rolloutClient, - } - // create ClusterIP services and virtual service if needed - err = router.Sync(r) - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.router.Sync(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) return } // set max weight default value to 100% maxWeight := 100 - if r.Spec.CanaryAnalysis.MaxWeight > 0 { - maxWeight = r.Spec.CanaryAnalysis.MaxWeight + if cd.Spec.CanaryAnalysis.MaxWeight > 0 { + maxWeight = cd.Spec.CanaryAnalysis.MaxWeight } // check primary and canary deployments status - err = deployer.IsDeploymentHealthy(r) - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.deployer.IsReady(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) return } // check if virtual service exists // and if it contains weighted destination routes to the primary and canary services - primaryRoute, canaryRoute, err := router.GetRoutes(r) + primaryRoute, canaryRoute, err := c.router.GetRoutes(cd) if err != nil { - c.recordEventWarningf(r, "%v", err) + c.recordEventWarningf(cd, "%v", err) return } // check if canary analysis should start (canary revision has changes) or continue - if ok := c.checkCanaryStatus(r, deployer); !ok { + if ok := c.checkCanaryStatus(cd, c.deployer); !ok { return } // check if the number of failed checks reached the threshold - if r.Status.State == "running" && r.Status.FailedChecks >= r.Spec.CanaryAnalysis.Threshold { - c.recordEventWarningf(r, "Rolling back %s.%s failed checks threshold reached %v", - r.Name, r.Namespace, r.Status.FailedChecks) + if cd.Status.State == "running" && cd.Status.FailedChecks >= cd.Spec.CanaryAnalysis.Threshold { + c.recordEventWarningf(cd, "Rolling back %s.%s failed checks threshold reached %v", + cd.Name, cd.Namespace, cd.Status.FailedChecks) // route all traffic back to primary primaryRoute.Weight = 100 canaryRoute.Weight = 0 - if err := router.SetRoutes(r, primaryRoute, canaryRoute); err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil { + c.recordEventWarningf(cd, "%v", err) return } - c.recordEventWarningf(r, "Canary failed! Scaling down %s.%s", - r.Spec.TargetRef.Name, r.Namespace) + c.recordEventWarningf(cd, "Canary failed! Scaling down %s.%s", + cd.Spec.TargetRef.Name, cd.Namespace) // shutdown canary - err = deployer.Scale(r, 0) - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.deployer.Scale(cd, 0); err != nil { + c.recordEventWarningf(cd, "%v", err) return } // mark canary as failed - err := deployer.SetState(r, "failed") - if err != nil { + if err := c.deployer.SetState(cd, "failed"); err != nil { c.logger.Errorf("%v", err) return } @@ -115,11 +96,11 @@ func (c *Controller) advanceCanary(name string, namespace string) { // check if the canary success rate is above the threshold // skip check if no traffic is routed to canary if canaryRoute.Weight == 0 { - c.recordEventInfof(r, "Starting canary deployment for %s.%s", r.Name, r.Namespace) + c.recordEventInfof(cd, "Starting canary deployment for %s.%s", cd.Name, cd.Namespace) } else { - if ok := c.checkCanaryMetrics(r); !ok { - if err = deployer.SetFailedChecks(r, r.Status.FailedChecks+1); err != nil { - c.recordEventWarningf(r, "%v", err) + if ok := c.checkCanaryMetrics(cd); !ok { + if err := c.deployer.SetFailedChecks(cd, cd.Status.FailedChecks+1); err != nil { + c.recordEventWarningf(cd, "%v", err) return } return @@ -128,31 +109,29 @@ func (c *Controller) advanceCanary(name string, namespace string) { // increase canary traffic percentage if canaryRoute.Weight < maxWeight { - primaryRoute.Weight -= r.Spec.CanaryAnalysis.StepWeight + primaryRoute.Weight -= cd.Spec.CanaryAnalysis.StepWeight if primaryRoute.Weight < 0 { primaryRoute.Weight = 0 } - canaryRoute.Weight += r.Spec.CanaryAnalysis.StepWeight + canaryRoute.Weight += cd.Spec.CanaryAnalysis.StepWeight if primaryRoute.Weight > 100 { primaryRoute.Weight = 100 } - if err = router.SetRoutes(r, primaryRoute, canaryRoute); err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil { + c.recordEventWarningf(cd, "%v", err) return } - c.recordEventInfof(r, "Advance %s.%s canary weight %v", r.Name, r.Namespace, canaryRoute.Weight) + c.recordEventInfof(cd, "Advance %s.%s canary weight %v", cd.Name, cd.Namespace, canaryRoute.Weight) // promote canary - primaryName := fmt.Sprintf("%s-primary", r.Spec.TargetRef.Name) + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) if canaryRoute.Weight == maxWeight { - c.recordEventInfof(r, "Copying %s.%s template spec to %s.%s", - r.Spec.TargetRef.Name, r.Namespace, primaryName, r.Namespace) - - err := deployer.Promote(r) - if err != nil { - c.recordEventWarningf(r, "%v", err) + c.recordEventInfof(cd, "Copying %s.%s template spec to %s.%s", + cd.Spec.TargetRef.Name, cd.Namespace, primaryName, cd.Namespace) + if err := c.deployer.Promote(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) return } } @@ -160,24 +139,22 @@ func (c *Controller) advanceCanary(name string, namespace string) { // route all traffic back to primary primaryRoute.Weight = 100 canaryRoute.Weight = 0 - if err = router.SetRoutes(r, primaryRoute, canaryRoute); err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil { + c.recordEventWarningf(cd, "%v", err) return } - c.recordEventInfof(r, "Promotion completed! Scaling down %s.%s", r.Spec.TargetRef.Name, r.Namespace) + c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) // shutdown canary - err = deployer.Scale(r, 0) - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.deployer.Scale(cd, 0); err != nil { + c.recordEventWarningf(cd, "%v", err) return } // update status - err = deployer.SetState(r, "finished") - if err != nil { - c.recordEventWarningf(r, "%v", err) + if err := c.deployer.SetState(cd, "finished"); err != nil { + c.recordEventWarningf(cd, "%v", err) return } } @@ -189,55 +166,35 @@ func (c *Controller) checkCanaryStatus(r *flaggerv1.Canary, deployer CanaryDeplo } if r.Status.State == "" { - status := flaggerv1.CanaryStatus{ - State: "initialized", - FailedChecks: 0, - } - - err := deployer.SyncStatus(r, status) - if err != nil { + if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "initialized"}); err != nil { c.logger.Errorf("%v", err) return false } - c.recordEventInfof(r, "Initialization done! %s.%s", r.Name, r.Namespace) return false } if diff, err := deployer.IsNewSpec(r); diff { - c.recordEventInfof(r, "New revision detected %s.%s", r.Spec.TargetRef.Name, r.Namespace) - err = deployer.Scale(r, 1) - if err != nil { + c.recordEventInfof(r, "New revision detected! Scaling up %s.%s", r.Spec.TargetRef.Name, r.Namespace) + if err = deployer.Scale(r, 1); err != nil { c.recordEventErrorf(r, "%v", err) return false } - - status := flaggerv1.CanaryStatus{ - State: "running", - FailedChecks: 0, - } - err := deployer.SyncStatus(r, status) - if err != nil { + if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "running"}); err != nil { c.logger.Errorf("%v", err) return false } - c.recordEventInfof(r, "Scaling up %s.%s", r.Spec.TargetRef.Name, r.Namespace) - return false } - return false } func (c *Controller) checkCanaryMetrics(r *flaggerv1.Canary) bool { - observer := &CanaryObserver{ - metricsServer: c.metricsServer, - } for _, metric := range r.Spec.CanaryAnalysis.Metrics { if metric.Name == "istio_requests_total" { - val, err := observer.GetDeploymentCounter(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) + val, err := c.observer.GetDeploymentCounter(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) if err != nil { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err) + c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.metricsServer, err) return false } if float64(metric.Threshold) > val { @@ -248,9 +205,9 @@ func (c *Controller) checkCanaryMetrics(r *flaggerv1.Canary) bool { } if metric.Name == "istio_request_duration_seconds_bucket" { - val, err := observer.GetDeploymentHistogram(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) + val, err := c.observer.GetDeploymentHistogram(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) if err != nil { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err) + c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.metricsServer, err) return false } t := time.Duration(metric.Threshold) * time.Millisecond