From ef570780c775dc40f9af572f8ce83d7e1bcede77 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Wed, 28 Jan 2026 17:42:55 +0800 Subject: [PATCH 1/3] fix: check updated ready && watch observed replicas only in last batch --- pkg/controllers/rolloutrun/executor/batch.go | 6 ++++-- pkg/controllers/rolloutrun/executor/canary.go | 2 +- pkg/controllers/rolloutrun/webhook/worker.go | 2 +- pkg/workload/alias.go | 9 +++++++++ pkg/workload/info.go | 17 +++++++++++++++-- 5 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 pkg/workload/alias.go diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index 26a6e82..db0214d 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -194,6 +194,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat newStatus := ctx.NewStatus currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex currentBatch := rolloutRun.Spec.Batch.Batches[currentBatchIndex] + totalBatches := len(rolloutRun.Spec.Batch.Batches) logger := ctx.GetBatchLogger() @@ -214,13 +215,14 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas) - if info.CheckUpdatedReady(currentBatchExpectedReplicas) { + ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, int(currentBatchIndex+1) == totalBatches) + if ready { // if the target is ready, we will not change partition continue } allWorkloadReady = false - logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference) + logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference, "reason", reason) expectedReplicas, err := e.calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) if err != nil { diff --git a/pkg/controllers/rolloutrun/executor/canary.go b/pkg/controllers/rolloutrun/executor/canary.go index 4ec7849..05dc344 100644 --- a/pkg/controllers/rolloutrun/executor/canary.go +++ b/pkg/controllers/rolloutrun/executor/canary.go @@ -224,7 +224,7 @@ func (e *canaryExecutor) doCanary(ctx *ExecutorContext) (bool, time.Duration, er // 2.b. waiting canary workload ready for _, info := range canaryWorkloads { - if !info.CheckUpdatedReady(info.Status.DesiredReplicas) { + if !info.CheckCanaryUpdatedReady(info.Status.DesiredReplicas) { // ready logger.Info("still waiting for canary target ready", "cluster", info.ClusterName, diff --git a/pkg/controllers/rolloutrun/webhook/worker.go b/pkg/controllers/rolloutrun/webhook/worker.go index 251216d..4e9bbb2 100644 --- a/pkg/controllers/rolloutrun/webhook/worker.go +++ b/pkg/controllers/rolloutrun/webhook/worker.go @@ -235,7 +235,7 @@ func (w *worker) doProbe() (keepGoing bool) { func (w *worker) now() metav1.Time { now := metav1.NewTime(w.webhookManager.clock.Now()) data, _ := now.MarshalJSON() - now.UnmarshalJSON(data) + _ = now.UnmarshalJSON(data) return now } diff --git a/pkg/workload/alias.go b/pkg/workload/alias.go new file mode 100644 index 0000000..db5df99 --- /dev/null +++ b/pkg/workload/alias.go @@ -0,0 +1,9 @@ +package workload + +const ( + UpdatedUnreadyGenerationMismatched UpdatedUnreadyReason = "workload Generation and ObservedGeneration are mismatched" + UpdatedUnreadyAvailableReplicasNotSatisfied UpdatedUnreadyReason = "updated available replicas is not satisfied" + UpdatedUnreadyObservedReplicasNotSatisfied UpdatedUnreadyReason = "observed replicas is not satisfied" +) + +type UpdatedUnreadyReason string diff --git a/pkg/workload/info.go b/pkg/workload/info.go index 4240eec..2f04e51 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -102,11 +102,24 @@ func (o *Info) String() string { return rolloutv1alpha1.CrossClusterObjectNameReference{Cluster: o.ClusterName, Name: o.Name}.String() } -func (o *Info) CheckUpdatedReady(replicas int32) bool { +func (o *Info) CheckUpdatedReady(replicas int32, isLastBatch bool) (bool, string) { + if o.Generation != o.Status.ObservedGeneration { + return false, string(UpdatedUnreadyGenerationMismatched) + } + if o.Status.UpdatedAvailableReplicas < replicas { + return false, string(UpdatedUnreadyAvailableReplicasNotSatisfied) + } + if isLastBatch && o.Status.ObservedReplicas > o.Status.DesiredReplicas { + return false, string(UpdatedUnreadyObservedReplicasNotSatisfied) + } + return true, "" +} + +func (o *Info) CheckCanaryUpdatedReady(replicas int32) bool { if o.Generation != o.Status.ObservedGeneration { return false } - return o.Status.UpdatedAvailableReplicas >= replicas && o.Status.ObservedReplicas <= o.Status.DesiredReplicas + return o.Status.UpdatedAvailableReplicas >= replicas } func (o *Info) APIStatus() rolloutv1alpha1.RolloutWorkloadStatus { From 039114e61a8f4dda117455da600ef93826bd15cd Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Wed, 28 Jan 2026 18:11:11 +0800 Subject: [PATCH 2/3] fix: reformat message --- pkg/controllers/rolloutrun/executor/batch.go | 5 ++++- pkg/controllers/rolloutrun/executor/canary.go | 2 +- pkg/workload/alias.go | 9 --------- pkg/workload/info.go | 17 +++++------------ 4 files changed, 10 insertions(+), 23 deletions(-) delete mode 100644 pkg/workload/alias.go diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index db0214d..c270630 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + corev1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" @@ -195,6 +196,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex currentBatch := rolloutRun.Spec.Batch.Batches[currentBatchIndex] totalBatches := len(rolloutRun.Spec.Batch.Batches) + isLastBatch := int(currentBatchIndex+1) == totalBatches logger := ctx.GetBatchLogger() @@ -215,11 +217,12 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas) - ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, int(currentBatchIndex+1) == totalBatches) + ready, reason := info.CheckUpdatedReady(currentBatchExpectedReplicas, isLastBatch) if ready { // if the target is ready, we will not change partition continue } + ctx.Recorder.Eventf(ctx.RolloutRun, corev1.EventTypeNormal, "WorkloadUpdatedUnready", "workload updated unready, reason %s", reason) allWorkloadReady = false logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference, "reason", reason) diff --git a/pkg/controllers/rolloutrun/executor/canary.go b/pkg/controllers/rolloutrun/executor/canary.go index 05dc344..f465c7d 100644 --- a/pkg/controllers/rolloutrun/executor/canary.go +++ b/pkg/controllers/rolloutrun/executor/canary.go @@ -224,7 +224,7 @@ func (e *canaryExecutor) doCanary(ctx *ExecutorContext) (bool, time.Duration, er // 2.b. waiting canary workload ready for _, info := range canaryWorkloads { - if !info.CheckCanaryUpdatedReady(info.Status.DesiredReplicas) { + if ready, _ := info.CheckUpdatedReady(info.Status.DesiredReplicas, false); !ready { // ready logger.Info("still waiting for canary target ready", "cluster", info.ClusterName, diff --git a/pkg/workload/alias.go b/pkg/workload/alias.go deleted file mode 100644 index db5df99..0000000 --- a/pkg/workload/alias.go +++ /dev/null @@ -1,9 +0,0 @@ -package workload - -const ( - UpdatedUnreadyGenerationMismatched UpdatedUnreadyReason = "workload Generation and ObservedGeneration are mismatched" - UpdatedUnreadyAvailableReplicasNotSatisfied UpdatedUnreadyReason = "updated available replicas is not satisfied" - UpdatedUnreadyObservedReplicasNotSatisfied UpdatedUnreadyReason = "observed replicas is not satisfied" -) - -type UpdatedUnreadyReason string diff --git a/pkg/workload/info.go b/pkg/workload/info.go index 2f04e51..2b9074b 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -102,26 +102,19 @@ func (o *Info) String() string { return rolloutv1alpha1.CrossClusterObjectNameReference{Cluster: o.ClusterName, Name: o.Name}.String() } -func (o *Info) CheckUpdatedReady(replicas int32, isLastBatch bool) (bool, string) { +func (o *Info) CheckUpdatedReady(replicas int32, strictCheck bool) (bool, string) { if o.Generation != o.Status.ObservedGeneration { - return false, string(UpdatedUnreadyGenerationMismatched) + return false, "workload Generation and ObservedGeneration are mismatched" } if o.Status.UpdatedAvailableReplicas < replicas { - return false, string(UpdatedUnreadyAvailableReplicasNotSatisfied) + return false, "workload updated available replicas is not satisfied" } - if isLastBatch && o.Status.ObservedReplicas > o.Status.DesiredReplicas { - return false, string(UpdatedUnreadyObservedReplicasNotSatisfied) + if strictCheck && o.Status.ObservedReplicas > o.Status.DesiredReplicas { + return false, "workload observed replicas is more than desiredReplicas" } return true, "" } -func (o *Info) CheckCanaryUpdatedReady(replicas int32) bool { - if o.Generation != o.Status.ObservedGeneration { - return false - } - return o.Status.UpdatedAvailableReplicas >= replicas -} - func (o *Info) APIStatus() rolloutv1alpha1.RolloutWorkloadStatus { return rolloutv1alpha1.RolloutWorkloadStatus{ RolloutReplicasSummary: rolloutv1alpha1.RolloutReplicasSummary{ From b382b4f644058e097711a94e010e0653e61ef572 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Wed, 28 Jan 2026 18:18:16 +0800 Subject: [PATCH 3/3] fix: format event --- pkg/controllers/rolloutrun/executor/batch.go | 2 +- pkg/controllers/rolloutrun/webhook/worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index c270630..680e96c 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -222,7 +222,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat // if the target is ready, we will not change partition continue } - ctx.Recorder.Eventf(ctx.RolloutRun, corev1.EventTypeNormal, "WorkloadUpdatedUnready", "workload updated unready, reason %s", reason) + ctx.Recorder.Eventf(ctx.RolloutRun, corev1.EventTypeNormal, "WaitingWorkloadUpdatedReady", "still waiting for target to be ready, target: %v, reason: %s", item.CrossClusterObjectNameReference, reason) allWorkloadReady = false logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference, "reason", reason) diff --git a/pkg/controllers/rolloutrun/webhook/worker.go b/pkg/controllers/rolloutrun/webhook/worker.go index 4e9bbb2..f2abf1b 100644 --- a/pkg/controllers/rolloutrun/webhook/worker.go +++ b/pkg/controllers/rolloutrun/webhook/worker.go @@ -235,7 +235,7 @@ func (w *worker) doProbe() (keepGoing bool) { func (w *worker) now() metav1.Time { now := metav1.NewTime(w.webhookManager.clock.Now()) data, _ := now.MarshalJSON() - _ = now.UnmarshalJSON(data) + now.UnmarshalJSON(data) //nolint return now }