Skip to content

Commit a28b8d4

Browse files
authored
Fix race condition when unpausing MCP at the end of a noop-for-pool maintenance (#120)
The previous version was able to get into an invalid state: the upgrade job was updated twice ([1][1], [2][2]) at the end of an update and the k8s client cache does not guarantee read after write consistency [3][3]. This lead to the MCPs being paused again just after force unpausing them. [1]: https://github.com/appuio/openshift-upgrade-controller/blob/0c1b407fd99a17ada4242a6454303911938377cd/controllers/upgradejob_controller.go#L939 [2]: https://github.com/appuio/openshift-upgrade-controller/blob/0c1b407fd99a17ada4242a6454303911938377cd/controllers/upgradejob_controller.go#L334 [3]: https://pkg.go.dev/sigs.k8s.io/controller-runtime#hdr-Clients_and_Caches
1 parent 8ee8214 commit a28b8d4

File tree

2 files changed

+59
-27
lines changed

2 files changed

+59
-27
lines changed

controllers/upgradejob_controller.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type UpgradeJobReconciler struct {
4747
ManagedUpstreamClusterVersionName string
4848
}
4949

50-
var ClusterVersionLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job"
50+
var JobLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job"
5151

5252
const (
5353
UpgradeJobHookJobTrackerFinalizer = "upgradejobs.managedupgrade.appuio.io/hook-job-tracker"
@@ -96,15 +96,21 @@ func (r *UpgradeJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
9696
// Don't execute hooks created after the job was finished.
9797
_, eserr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventSuccess, sc.Reason, sc.LastTransitionTime.Time)
9898
_, eferr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, sc.Reason, sc.LastTransitionTime.Time)
99-
return ctrl.Result{}, multierr.Combine(eserr, eferr, r.cleanupLock(ctx, &uj))
99+
return ctrl.Result{}, multierr.Combine(
100+
eserr,
101+
eferr,
102+
// Prevent pools that did not have any changes/updates from being paused indefinitely.
103+
r.cleanupMachineConfigPools(ctx, uj),
104+
r.cleanupLock(ctx, uj),
105+
)
100106
}
101107
fc := apimeta.FindStatusCondition(uj.Status.Conditions, managedupgradev1beta1.UpgradeJobConditionFailed)
102108
if fc != nil && fc.Status == metav1.ConditionTrue {
103109
// Ignore hooks status, they can't influence the upgrade anymore.
104110
// Don't execute hooks created after the job was finished.
105111
_, efaerr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFailure, fc.Reason, fc.LastTransitionTime.Time)
106112
_, efierr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, fc.Reason, fc.LastTransitionTime.Time)
107-
return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, &uj))
113+
return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, uj))
108114
}
109115

110116
cont, err := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventCreate, "", time.Time{})
@@ -195,7 +201,7 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana
195201
return ctrl.Result{}, fmt.Errorf("failed to lock cluster version: %w", err)
196202
}
197203

198-
if err := r.pauseUnpauseMachineConfigPools(ctx, uj, false); err != nil {
204+
if err := r.pauseUnpauseMachineConfigPools(ctx, uj); err != nil {
199205
return ctrl.Result{}, fmt.Errorf("failed to pause machine config pools: %w", err)
200206
}
201207

@@ -319,11 +325,6 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana
319325
return ctrl.Result{}, nil
320326
}
321327

322-
// Ensure pools that were paused but did not need an upgrade are unpaused
323-
if err := r.pauseUnpauseMachineConfigPools(ctx, uj, true); err != nil {
324-
return ctrl.Result{}, fmt.Errorf("failed to ensure machine config pools are unpaused: %w", err)
325-
}
326-
327328
// Set the upgrade as successful
328329
r.setStatusCondition(&uj.Status.Conditions, metav1.Condition{
329330
Type: managedupgradev1beta1.UpgradeJobConditionSucceeded,
@@ -373,7 +374,7 @@ func JobFromClusterVersionMapper(c client.Reader, cvName string) handler.MapFunc
373374
// upgradeJobNameFromLockedClusterVersion returns the upgrade job name from the locked cluster version.
374375
// If the cluster version is not locked, it returns false.
375376
func upgradeJobNameFromLockedClusterVersion(cv configv1.ClusterVersion) (ok bool, nn types.NamespacedName) {
376-
job := cv.GetAnnotations()[ClusterVersionLockAnnotation]
377+
job := cv.GetAnnotations()[JobLockAnnotation]
377378
if job == "" {
378379
return false, types.NamespacedName{}
379380
}
@@ -441,17 +442,17 @@ func (r *UpgradeJobReconciler) runHealthCheck(
441442
return true, r.Status().Update(ctx, uj)
442443
}
443444

444-
func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error {
445+
func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error {
445446
var version configv1.ClusterVersion
446447
if err := r.Get(ctx, types.NamespacedName{
447448
Name: r.ManagedUpstreamClusterVersionName,
448449
}, &version); err != nil {
449450
return fmt.Errorf("failed to get cluster version: %w", err)
450451
}
451452

452-
lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation]
453+
lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation]
453454
if hasLockingJob && lockingJob == uj.Namespace+"/"+uj.Name {
454-
delete(version.Annotations, ClusterVersionLockAnnotation)
455+
delete(version.Annotations, JobLockAnnotation)
455456
if err := r.Update(ctx, &version); err != nil {
456457
return fmt.Errorf("failed to unlock cluster version: %w", err)
457458
}
@@ -466,11 +467,11 @@ func (r *UpgradeJobReconciler) tryLockClusterVersion(ctx context.Context, versio
466467
version.Annotations = map[string]string{}
467468
}
468469

469-
lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation]
470+
lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation]
470471
if hasLockingJob && lockingJob != lockVal {
471472
return fmt.Errorf("cluster version is locked by %s", lockingJob)
472473
} else if !hasLockingJob {
473-
version.Annotations[ClusterVersionLockAnnotation] = lockVal
474+
version.Annotations[JobLockAnnotation] = lockVal
474475
// There is no race condition between the Get and Update calls because the server will reject the update with a Conflict error if the resource has been modified since the Get call.
475476
if err := r.Client.Update(ctx, version); err != nil {
476477
return fmt.Errorf("failed to lock cluster version: %w", err)
@@ -873,10 +874,9 @@ func findTrackedHookJob(ujhookName, event string, uj managedupgradev1beta1.Upgra
873874

874875
// pauseUnpauseMachineConfigPools pauses or unpauses the machine config pools that match the given selectors in .Spec.MachineConfigPools and have a delay set.
875876
// The decision to pause or unpause is based on `pool.DelayUpgrade.DelayMin` relative to the startAfter time of the upgrade job.
876-
// If ensureUnpause is true, it will unpause the pools even if the delay has not expired.
877877
// It sets a timeout condition and returns an error if the delay is expired.
878878
// It also returns an error if the machine config pools cannot be listed or updated.
879-
func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, ensureUnpause bool) error {
879+
func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error {
880880
l := log.FromContext(ctx).WithName("UpgradeJobReconciler.pauseUnpauseMachineConfigPools")
881881

882882
var controllerManagesPools bool
@@ -887,8 +887,8 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex
887887
}
888888
timeSinceStart := r.timeSinceStartAfter(uj)
889889
beforeMinDelay := timeSinceStart < pool.DelayUpgrade.DelayMin.Duration
890-
shouldPause := !ensureUnpause && beforeMinDelay
891-
l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "ensureUnpause", ensureUnpause, "timeSinceStart", timeSinceStart)
890+
shouldPause := beforeMinDelay
891+
l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "timeSinceStart", timeSinceStart)
892892

893893
sel, err := metav1.LabelSelectorAsSelector(pool.MatchLabels)
894894
if err != nil {
@@ -919,6 +919,11 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex
919919
}
920920
if mcp.Spec.Paused != shouldPause {
921921
l.Info("Updating machine config pools pause field", "from", mcp.Spec.Paused, "to", shouldPause)
922+
if mcp.Annotations == nil {
923+
mcp.Annotations = map[string]string{}
924+
}
925+
// Mark the MCP as managed by the upgrade job for later cleanup
926+
mcp.Annotations[JobLockAnnotation] = uj.Namespace + "/" + uj.Name
922927
mcp.Spec.Paused = shouldPause
923928
if err := r.Update(ctx, &mcp); err != nil {
924929
return fmt.Errorf("failed to pause/unpause machine config pool %q: %w", mcp.Name, err)
@@ -1040,3 +1045,30 @@ func (r *UpgradeJobReconciler) checkAndMarkSkipped(ctx context.Context, uj manag
10401045
}
10411046
return false, nil
10421047
}
1048+
1049+
// cleanupMachineConfigPools removes the JobLockAnnotation from all machine config pools that have it set to the upgrade job and unpauses annotated pools if they are paused.
1050+
func (r *UpgradeJobReconciler) cleanupMachineConfigPools(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error {
1051+
l := log.FromContext(ctx).WithName("UpgradeJobReconciler.cleanupMachineConfigPools")
1052+
1053+
var mcpl machineconfigurationv1.MachineConfigPoolList
1054+
if err := r.List(ctx, &mcpl); err != nil {
1055+
return fmt.Errorf("failed to list machine config pools: %w", err)
1056+
}
1057+
1058+
errs := make([]error, 0, len(mcpl.Items))
1059+
for _, mcp := range mcpl.Items {
1060+
if mcp.Annotations[JobLockAnnotation] != uj.Namespace+"/"+uj.Name {
1061+
continue
1062+
}
1063+
delete(mcp.Annotations, JobLockAnnotation)
1064+
if mcp.Spec.Paused {
1065+
l.Info("unpausing machine config pool", "pool", mcp.Name)
1066+
mcp.Spec.Paused = false
1067+
}
1068+
if err := r.Update(ctx, &mcp); err != nil {
1069+
errs = append(errs, fmt.Errorf("failed to cleanup machine config pool %q: %w", mcp.Name, err))
1070+
}
1071+
}
1072+
1073+
return multierr.Combine(errs...)
1074+
}

controllers/upgradejob_controller_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) {
168168
_, err := subject.Reconcile(ctx, requestForObject(upgradeJob))
169169
require.NoError(t, err)
170170
require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
171-
lock, ok := ucv.Annotations[ClusterVersionLockAnnotation]
171+
lock, ok := ucv.Annotations[JobLockAnnotation]
172172
require.True(t, ok, "lock annotation must be set")
173173
require.Equal(t, upgradeJob.Namespace+"/"+upgradeJob.Name, lock, "lock annotation must contain upgrade job reference")
174174
})
@@ -295,7 +295,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) {
295295
_, err = subject.Reconcile(ctx, requestForObject(upgradeJob))
296296
require.NoError(t, err)
297297
require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
298-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
298+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
299299
_, err = subject.Reconcile(ctx, requestForObject(upgradeJob))
300300
require.NoError(t, err, "should ignore requests if cluster version is not locked")
301301
})
@@ -425,7 +425,7 @@ func Test_UpgradeJobReconciler_Reconcile_Skipped_Job(t *testing.T) {
425425

426426
require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
427427
require.Empty(t, ucv.Spec.DesiredUpdate, "cluster version should not be updated")
428-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "cluster version should not be locked")
428+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "cluster version should not be locked")
429429
})
430430

431431
step(t, "`Success` and `Finish` hooks", func(t *testing.T) {
@@ -969,7 +969,7 @@ func Test_UpgradeJobReconciler_Reconcile_UpgradeWithdrawn(t *testing.T) {
969969
require.NotNil(t, failedCond, "should set failed condition")
970970
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonUpgradeWithdrawn, failedCond.Reason)
971971
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
972-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
972+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
973973
}
974974

975975
func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) {
@@ -1024,7 +1024,7 @@ func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) {
10241024
require.NotNil(t, failedCond, "should set failed condition")
10251025
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonTimedOut, failedCond.Reason)
10261026
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
1027-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
1027+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
10281028
}
10291029

10301030
func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) {
@@ -1088,7 +1088,7 @@ func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) {
10881088
require.NotNil(t, failedCond, "should set failed condition")
10891089
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPreHealthCheckFailed, failedCond.Reason)
10901090
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
1091-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
1091+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
10921092
}
10931093

10941094
func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) {
@@ -1161,7 +1161,7 @@ func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) {
11611161
require.NotNil(t, failedCond, "should set failed condition")
11621162
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPostHealthCheckFailed, failedCond.Reason)
11631163
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
1164-
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
1164+
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
11651165
}
11661166

11671167
func Test_UpgradeJobReconciler_Reconcile_PausedMachineConfigPools(t *testing.T) {
@@ -1544,7 +1544,7 @@ func Test_JobFromClusterVersionHandler(t *testing.T) {
15441544
require.Len(t, subject(context.Background(), nil), 0, "should not return a reconcile request if clusterversion is not locked")
15451545

15461546
ucv.Annotations = map[string]string{
1547-
ClusterVersionLockAnnotation: "ns/upgrade-1234-4-5-13",
1547+
JobLockAnnotation: "ns/upgrade-1234-4-5-13",
15481548
}
15491549
require.NoError(t, client.Update(context.Background(), ucv))
15501550

0 commit comments

Comments
 (0)