Skip to content

Commit c4231e2

Browse files
add maxConcurrentCanaries flag to limit concurrent progressing canaries
This adds a flag to limit concurrent progessing canaries to avoid high requests of resources at once. The flag will not take effect if set to "0", which is default. Closes #1069 Signed-off-by: Louis Halbritter <halbritter@posteo.de> chore: update Helm default values and README Signed-off-by: Louis Halbritter <halbritter@posteo.de>
1 parent 9000136 commit c4231e2

File tree

9 files changed

+145
-47
lines changed

9 files changed

+145
-47
lines changed

charts/flagger/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ The following tables lists the configurable parameters of the Flagger chart and
186186
| `podDisruptionBudget.minAvailable` | The minimal number of available replicas that will be set in the PodDisruptionBudget | `1` |
187187
| `noCrossNamespaceRefs` | If `true`, cross namespace references to custom resources will be disabled | `false` |
188188
| `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` |
189+
| `maxConcurrentCanaries ` | Limits how many canaries can process in parallel. No limit if "0" | `0` |
189190

190191
Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example,
191192

charts/flagger/templates/deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ spec:
148148
{{- if .Values.noCrossNamespaceRefs }}
149149
- -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }}
150150
{{- end }}
151+
{{- if .Values.maxConcurrentCanaries }}
152+
- -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }}
153+
{{- end }}
151154
livenessProbe:
152155
exec:
153156
command:

charts/flagger/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ podLabels: {}
199199

200200
noCrossNamespaceRefs: false
201201

202+
maxConcurrentCanaries: 0
203+
202204
#Placeholder to supply additional volumes to the flagger pod
203205
additionalVolumes: {}
204206
# - name: tmpfs

cmd/flagger/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ var (
8686
kubeconfigServiceMesh string
8787
clusterName string
8888
noCrossNamespaceRefs bool
89+
maxConcurrentCanaries int
8990
)
9091

9192
func init() {
@@ -121,6 +122,7 @@ func init() {
121122
flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.")
122123
flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.")
123124
flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.")
125+
flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default")
124126
}
125127

126128
func main() {
@@ -253,6 +255,7 @@ func main() {
253255
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
254256
clusterName,
255257
noCrossNamespaceRefs,
258+
maxConcurrentCanaries,
256259
cfg,
257260
)
258261

pkg/controller/controller.go

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,28 @@ const controllerAgentName = "flagger"
5151

5252
// Controller is managing the canary objects and schedules canary deployments
5353
type Controller struct {
54-
kubeConfig *rest.Config
55-
kubeClient kubernetes.Interface
56-
flaggerClient clientset.Interface
57-
flaggerInformers Informers
58-
flaggerSynced cache.InformerSynced
59-
flaggerWindow time.Duration
60-
workqueue workqueue.RateLimitingInterface
61-
eventRecorder record.EventRecorder
62-
logger *zap.SugaredLogger
63-
canaries *sync.Map
64-
jobs map[string]CanaryJob
65-
recorder metrics.Recorder
66-
notifier notifier.Interface
67-
canaryFactory *canary.Factory
68-
routerFactory *router.Factory
69-
observerFactory *observers.Factory
70-
meshProvider string
71-
eventWebhook string
72-
clusterName string
73-
noCrossNamespaceRefs bool
54+
kubeConfig *rest.Config
55+
kubeClient kubernetes.Interface
56+
flaggerClient clientset.Interface
57+
flaggerInformers Informers
58+
flaggerSynced cache.InformerSynced
59+
flaggerWindow time.Duration
60+
workqueue workqueue.RateLimitingInterface
61+
eventRecorder record.EventRecorder
62+
logger *zap.SugaredLogger
63+
canaries *sync.Map
64+
jobs map[string]CanaryJob
65+
recorder metrics.Recorder
66+
notifier notifier.Interface
67+
canaryFactory *canary.Factory
68+
routerFactory *router.Factory
69+
observerFactory *observers.Factory
70+
meshProvider string
71+
eventWebhook string
72+
clusterName string
73+
noCrossNamespaceRefs bool
74+
pendingCanaries map[string]bool
75+
maxConcurrentCanaries int
7476
}
7577

7678
type Informers struct {
@@ -94,6 +96,7 @@ func NewController(
9496
eventWebhook string,
9597
clusterName string,
9698
noCrossNamespaceRefs bool,
99+
maxConcurrentCanaries int,
97100
kubeConfig *rest.Config,
98101
) *Controller {
99102
logger.Debug("Creating event broadcaster")
@@ -109,26 +112,28 @@ func NewController(
109112
recorder.SetInfo(version, meshProvider)
110113

111114
ctrl := &Controller{
112-
kubeConfig: kubeConfig,
113-
kubeClient: kubeClient,
114-
flaggerClient: flaggerClient,
115-
flaggerInformers: flaggerInformers,
116-
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
117-
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
118-
eventRecorder: eventRecorder,
119-
logger: logger,
120-
canaries: new(sync.Map),
121-
jobs: map[string]CanaryJob{},
122-
flaggerWindow: flaggerWindow,
123-
observerFactory: observerFactory,
124-
recorder: recorder,
125-
notifier: notifier,
126-
canaryFactory: canaryFactory,
127-
routerFactory: routerFactory,
128-
meshProvider: meshProvider,
129-
eventWebhook: eventWebhook,
130-
clusterName: clusterName,
131-
noCrossNamespaceRefs: noCrossNamespaceRefs,
115+
kubeConfig: kubeConfig,
116+
kubeClient: kubeClient,
117+
flaggerClient: flaggerClient,
118+
flaggerInformers: flaggerInformers,
119+
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
120+
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
121+
eventRecorder: eventRecorder,
122+
logger: logger,
123+
canaries: new(sync.Map),
124+
jobs: map[string]CanaryJob{},
125+
flaggerWindow: flaggerWindow,
126+
observerFactory: observerFactory,
127+
recorder: recorder,
128+
notifier: notifier,
129+
canaryFactory: canaryFactory,
130+
routerFactory: routerFactory,
131+
meshProvider: meshProvider,
132+
eventWebhook: eventWebhook,
133+
clusterName: clusterName,
134+
noCrossNamespaceRefs: noCrossNamespaceRefs,
135+
pendingCanaries: map[string]bool{},
136+
maxConcurrentCanaries: maxConcurrentCanaries,
132137
}
133138

134139
flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -237,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool {
237242
c.workqueue.Forget(obj)
238243
return nil
239244
}(obj)
240-
241245
if err != nil {
242246
utilruntime.HandleError(err)
243247
return true
@@ -307,7 +311,6 @@ func (c *Controller) syncHandler(key string) error {
307311
if err := c.addFinalizer(cd); err != nil {
308312
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
309313
}
310-
311314
}
312315
c.logger.Infof("Synced %s", key)
313316

pkg/controller/scheduler.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() {
134134
for job := range c.jobs {
135135
if _, exists := current[job]; !exists {
136136
c.jobs[job].Stop()
137+
delete(c.pendingCanaries, job)
137138
delete(c.jobs, job)
138139
}
139140
}
@@ -283,11 +284,22 @@ func (c *Controller) advanceCanary(name string, namespace string) {
283284
return
284285
}
285286

287+
key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)
288+
286289
if !shouldAdvance {
290+
delete(c.pendingCanaries, key)
287291
c.recorder.SetStatus(cd, cd.Status.Phase)
288292
return
289293
}
290294

295+
if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists {
296+
canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting)
297+
c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID)
298+
return
299+
}
300+
301+
c.pendingCanaries[key] = true
302+
291303
maxWeight := c.maxWeight(cd)
292304

293305
// check primary status
@@ -485,7 +497,6 @@ func (c *Controller) advanceCanary(name string, namespace string) {
485497
}
486498
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
487499
}
488-
489500
}
490501

491502
func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller,
@@ -542,7 +553,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo
542553
}
543554

544555
return
545-
546556
}
547557

548558
func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
@@ -729,7 +739,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can
729739
return
730740
}
731741
}
732-
733742
}
734743

735744
func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool {
@@ -853,7 +862,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca
853862
}
854863

855864
return newCfg, nil
856-
857865
}
858866

859867
func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool {
@@ -1010,7 +1018,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error {
10101018
firstTry = false
10111019
return
10121020
})
1013-
10141021
if err != nil {
10151022
return fmt.Errorf("failed after retries: %w", err)
10161023
}

pkg/controller/scheduler_daemonset_fixture_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
120120
recorder: metrics.NewRecorder(controllerAgentName, false),
121121
routerFactory: rf,
122122
notifier: &notifier.NopNotifier{},
123+
pendingCanaries: map[string]bool{},
123124
}
124125
ctrl.flaggerSynced = alwaysReady
125126
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)

pkg/controller/scheduler_deployment_fixture_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
149149
recorder: metrics.NewRecorder(controllerAgentName, false),
150150
routerFactory: rf,
151151
notifier: &notifier.NopNotifier{},
152+
pendingCanaries: map[string]bool{},
152153
}
153154
ctrl.flaggerSynced = alwaysReady
154155
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)

pkg/controller/scheduler_deployment_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) {
394394
assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase)
395395
}
396396

397+
func TestScheduler_DeploymentMaxConcurrent(t *testing.T) {
398+
mocks := newDeploymentFixture(nil)
399+
400+
secondCanary := newDeploymentTestCanary()
401+
secondCanary.Name = "podinfo2"
402+
403+
mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{})
404+
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)
405+
406+
// initializing
407+
mocks.ctrl.advanceCanary("podinfo", "default")
408+
mocks.ctrl.advanceCanary("podinfo2", "default")
409+
410+
// make primary ready
411+
mocks.makePrimaryReady(t)
412+
413+
// initialized
414+
mocks.ctrl.advanceCanary("podinfo", "default")
415+
mocks.ctrl.advanceCanary("podinfo2", "default")
416+
417+
// update
418+
dep2 := newDeploymentTestDeploymentV2()
419+
_, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
420+
require.NoError(t, err)
421+
422+
// detect pod spec changes
423+
mocks.ctrl.advanceCanary("podinfo", "default")
424+
mocks.ctrl.advanceCanary("podinfo2", "default")
425+
426+
// if no maxConcurrentCanaries is set, all canaries should proceed
427+
c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
428+
require.NoError(t, err)
429+
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)
430+
431+
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
432+
require.NoError(t, err)
433+
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)
434+
435+
// delete second canary and set maxConcurrency. Then add it again
436+
delete(mocks.ctrl.pendingCanaries, "podinfo2.default")
437+
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary)
438+
mocks.ctrl.maxConcurrentCanaries = 1
439+
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)
440+
441+
mocks.ctrl.advanceCanary("podinfo2", "default")
442+
mocks.ctrl.advanceCanary("podinfo2", "default")
443+
_, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
444+
require.NoError(t, err)
445+
446+
// check if second canary is waiting now
447+
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
448+
mocks.ctrl.advanceCanary("podinfo2", "default")
449+
require.NoError(t, err)
450+
assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase)
451+
452+
// make first deployment succeeded
453+
mocks.ctrl.advanceCanary("podinfo", "default")
454+
mocks.ctrl.advanceCanary("podinfo", "default")
455+
mocks.ctrl.advanceCanary("podinfo", "default")
456+
mocks.ctrl.advanceCanary("podinfo", "default")
457+
mocks.ctrl.advanceCanary("podinfo", "default")
458+
mocks.ctrl.advanceCanary("podinfo", "default")
459+
mocks.ctrl.advanceCanary("podinfo", "default")
460+
mocks.ctrl.advanceCanary("podinfo", "default")
461+
462+
// after succeeded it should get removed from pendingCanaries
463+
mocks.ctrl.advanceCanary("podinfo", "default")
464+
465+
// second canary should start with next call
466+
mocks.ctrl.advanceCanary("podinfo2", "default")
467+
468+
// check if second canary is starting
469+
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
470+
require.NoError(t, err)
471+
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)
472+
}
473+
397474
func TestScheduler_DeploymentMirroring(t *testing.T) {
398475
mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())
399476

0 commit comments

Comments
 (0)