Skip to content

Commit

Permalink
Controller refactoring part two
Browse files Browse the repository at this point in the history
- share components between loops
  • Loading branch information
stefanprodan committed Oct 11, 2018
1 parent baeee62 commit 663dc82
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 130 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ VERSION_MINOR:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4
PATCH:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4 }' | tr -d '"' | awk -F. '{print $$NF}')
SOURCE_DIRS = cmd pkg/apis pkg/controller pkg/server pkg/logging pkg/version
run:
go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com
go run -race cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com

build:
docker build -t stefanprodan/flagger:$(TAG) . -f Dockerfile
Expand Down
10 changes: 5 additions & 5 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func main() {
logger.Fatalf("Error building shared clientset: %v", err)
}

rolloutClient, err := clientset.NewForConfig(cfg)
flaggerClient, err := clientset.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building example clientset: %s", err.Error())
}

canaryInformerFactory := informers.NewSharedInformerFactory(rolloutClient, time.Second*30)
canaryInformer := canaryInformerFactory.Flagger().V1alpha1().Canaries()
flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, time.Second*30)
canaryInformer := flaggerInformerFactory.Flagger().V1alpha1().Canaries()

logger.Infof("Starting flagger version %s revision %s", version.VERSION, version.REVISION)

Expand All @@ -94,14 +94,14 @@ func main() {
c := controller.NewController(
kubeClient,
sharedClient,
rolloutClient,
flaggerClient,
canaryInformer,
controlLoopInterval,
metricsServer,
logger,
)

canaryInformerFactory.Start(stopCh)
flaggerInformerFactory.Start(stopCh)

logger.Info("Waiting for informer caches to sync")
for _, synced := range []cache.InformerSynced{
Expand Down
62 changes: 42 additions & 20 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@ const controllerAgentName = "flagger"
type Controller struct {
kubeClient kubernetes.Interface
istioClient istioclientset.Interface
rolloutClient clientset.Interface
rolloutLister flaggerlisters.CanaryLister
rolloutSynced cache.InformerSynced
rolloutWindow time.Duration
flaggerClient clientset.Interface
flaggerLister flaggerlisters.CanaryLister
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
logger *zap.SugaredLogger
metricsServer string
rollouts *sync.Map
canaries *sync.Map
deployer CanaryDeployer
router CanaryRouter
observer CanaryObserver
}

func NewController(
kubeClient kubernetes.Interface,
istioClient istioclientset.Interface,
rolloutClient clientset.Interface,
rolloutInformer flaggerinformers.CanaryInformer,
rolloutWindow time.Duration,
flaggerClient clientset.Interface,
flaggerInformer flaggerinformers.CanaryInformer,
flaggerWindow time.Duration,
metricServer string,
logger *zap.SugaredLogger,

Expand All @@ -61,21 +63,41 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

deployer := CanaryDeployer{
logger: logger,
kubeClient: kubeClient,
istioClient: istioClient,
flaggerClient: flaggerClient,
}

router := CanaryRouter{
logger: logger,
kubeClient: kubeClient,
istioClient: istioClient,
flaggerClient: flaggerClient,
}

observer := CanaryObserver{
metricsServer: metricServer,
}

ctrl := &Controller{
kubeClient: kubeClient,
istioClient: istioClient,
rolloutClient: rolloutClient,
rolloutLister: rolloutInformer.Lister(),
rolloutSynced: rolloutInformer.Informer().HasSynced,
flaggerClient: flaggerClient,
flaggerLister: flaggerInformer.Lister(),
flaggerSynced: flaggerInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
recorder: recorder,
logger: logger,
rollouts: new(sync.Map),
metricsServer: metricServer,
rolloutWindow: rolloutWindow,
canaries: new(sync.Map),
flaggerWindow: flaggerWindow,
deployer: deployer,
router: router,
observer: observer,
}

rolloutInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.enqueue,
UpdateFunc: func(old, new interface{}) {
oldRoll, ok := checkCustomResourceType(old, logger)
Expand All @@ -96,7 +118,7 @@ func NewController(
r, ok := checkCustomResourceType(old, logger)
if ok {
ctrl.logger.Infof("Deleting %s.%s from cache", r.Name, r.Namespace)
ctrl.rollouts.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
ctrl.canaries.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
}
},
})
Expand All @@ -119,7 +141,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {

c.logger.Info("Started operator workers")

tickChan := time.NewTicker(c.rolloutWindow).C
tickChan := time.NewTicker(c.flaggerWindow).C
for {
select {
case <-tickChan:
Expand Down Expand Up @@ -174,13 +196,13 @@ func (c *Controller) syncHandler(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
cd, err := c.rolloutLister.Canaries(namespace).Get(name)
cd, err := c.flaggerLister.Canaries(namespace).Get(name)
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", key))
return nil
}

c.rollouts.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)

//if cd.Spec.TargetRef.Kind == "Deployment" {
// err = c.bootstrapDeployment(cd)
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func (c *CanaryDeployer) Promote(cd *flaggerv1.Canary) error {
return nil
}

// IsDeploymentHealthy checks the primary and canary deployment status and returns an error if
// the deployment is in the middle of a rolling update or if the pods are unhealthy
func (c *CanaryDeployer) IsDeploymentHealthy(cd *flaggerv1.Canary) error {
// IsReady checks the primary and canary deployment status and returns an error if
// the deployments are in the middle of a rolling update or if the pods are unhealthy
func (c *CanaryDeployer) IsReady(cd *flaggerv1.Canary) error {
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(cd.Spec.TargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -182,25 +182,28 @@ func (c *CanaryDeployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
canary, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", canary.GetName(), canary.Namespace, replicas, err)

}

return nil
}

// Sync creates the primary deployment and hpa
// and scales to zero the canary deployment
func (c *CanaryDeployer) Sync(cd *flaggerv1.Canary) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
if err := c.createPrimaryDeployment(cd); err != nil {
return err
return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
}

if cd.Status.State == "" {
c.Scale(cd, 0)
c.logger.Infof("Scaling down %s.%s", primaryName, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return err
}
}

if cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" {
if err := c.createPrimaryHpa(cd); err != nil {
return err
return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err)
}
}
return nil
Expand Down
Loading

0 comments on commit 663dc82

Please sign in to comment.