Skip to content

Commit

Permalink
Fix check partition logic.
Browse files Browse the repository at this point in the history
Signed-off-by: d-kuro <kurosawa7620@gmail.com>
  • Loading branch information
d-kuro committed Jul 31, 2024
1 parent 69a62a0 commit f8b23dd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 48 deletions.
81 changes: 34 additions & 47 deletions controllers/partition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reco
metrics.CurrentReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.CurrentReplicas))
metrics.UpdatedReplicasVec.WithLabelValues(cluster.Name, cluster.Namespace).Set(float64(sts.Status.UpdatedReplicas))

// In this case, the reconciliation of MySQLClusterReconciler has not been completed.
// Wait until completion.
if cluster.Generation != cluster.Status.ReconcileInfo.Generation || sts.Generation != sts.Status.ObservedGeneration {
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

if !r.needPartitionUpdate(sts) {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -134,6 +140,12 @@ func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) erro
func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, cluster *mocov1beta2.MySQLCluster, sts *appsv1.StatefulSet) (bool, error) {
log := crlog.FromContext(ctx)

if sts.Spec.Replicas == &sts.Status.UpdatedReplicas {
// In this case, a rolling update has been completed.
// Update the partition to the initial value (`sts.spec.replicas`) and finish the reconciliation.
return true, nil
}

podList, err := r.getSortedPodList(ctx, sts)
if err != nil {
return false, fmt.Errorf("failed to get pod list: %w", err)
Expand All @@ -156,6 +168,10 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu
return false, nil
}

if podList.Items[nextRolloutTarget].Labels[appsv1.ControllerRevisionHashLabelKey] == sts.Status.UpdateRevision {
return true, nil
}

// If not all Pods are ready, the MySQLCluster becomes Unhealthy.
// Even if the MySQLCluster is not healthy, the rollout continues if the rollout target Pod is not ready.
// This is because there is an expectation that restarting the Not Ready Pod might improve its state.
Expand All @@ -164,10 +180,7 @@ func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, clu
return false, nil
}

ready, err := r.areAllChildPodsRolloutReady(ctx, sts, podList)
if err != nil {
return false, fmt.Errorf("failed to check if all child pods are ready: %w", err)
}
ready := r.areAllChildPodsRolloutReady(ctx, sts, podList)

return ready, nil
}
Expand All @@ -193,19 +206,9 @@ func (r *StatefulSetPartitionReconciler) getSortedPodList(ctx context.Context, s
return podList, nil
}

func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) (bool, error) {
func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context.Context, sts *appsv1.StatefulSet, sortedPodList *corev1.PodList) bool {
log := crlog.FromContext(ctx)

firstReivision := sortedPodList.Items[0].Labels[appsv1.ControllerRevisionHashLabelKey]
lastIndex := len(sortedPodList.Items) - 1
lastRevision := sortedPodList.Items[lastIndex].Labels[appsv1.ControllerRevisionHashLabelKey]
revisionCounts := make(map[string]int)

for _, pod := range sortedPodList.Items {
revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey]
revisionCounts[revision]++
}

nextRolloutTarget := r.nextRolloutTargetIndex(sts)

// Proceed with the rollout for the next Pod to be rolled out, even if it is not Ready.
Expand All @@ -217,46 +220,27 @@ func (r *StatefulSetPartitionReconciler) areAllChildPodsRolloutReady(ctx context
for _, pod := range excludeNextRolloutTagetPodList {
if pod.DeletionTimestamp != nil {
log.Info("Pod is in the process of being terminated", "name", pod.Name, "namespace", pod.Namespace)
return false, nil
return false
}
if !podutils.IsPodAvailable(&pod, 5, metav1.Now()) {
log.Info("Pod is not ready", "name", pod.Name, "namespace", pod.Namespace)
return false, nil
return false
}
}

partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
expectedPodsCount := int(*sts.Spec.Replicas) - int(partition)

// If the number of partitions is equal to the number of replicas, the rollout has not started.
// Decrease the partition to start the rollout.
// If there is only one revision type in all other cases, the rollout is complete.
if int(*sts.Spec.Replicas) == int(partition) {
return true, nil
} else if len(revisionCounts) == 1 {
return true, nil
}

// If the first and last revisions are the same, it is a rollback.
// To consider rollbacks, search from the beginning of the Pod list,
// and if a Pod with the same revision as the last one is found, add it to the expected number of Pods.
if firstReivision == lastRevision {
for _, pod := range sortedPodList.Items {
revision := pod.Labels[appsv1.ControllerRevisionHashLabelKey]
if revision == lastRevision {
expectedPodsCount++
} else {
break
for _, c := range pod.Status.InitContainerStatuses {
log.Info("Container is not ready", "pod", pod.Name, "namespace", pod.Namespace, "container", c.Name)
if !c.Ready {
return false
}
}
for _, c := range pod.Status.ContainerStatuses {
log.Info("Container is not ready", "pod", pod.Name, "namespace", pod.Namespace, "container", c.Name)
if !c.Ready {
return false
}
}
}

if revisionCounts[lastRevision] != expectedPodsCount {
log.Info("Pod count is different from the expected number", "revision", lastRevision, "expected", expectedPodsCount, "actual", revisionCounts[lastRevision])
return false, nil
}

return true, nil
return true
}

// isMySQLClusterHealthy checks the health status of a given MySQLCluster.
Expand Down Expand Up @@ -307,6 +291,8 @@ func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.Statefu

// patchNewPartition patches the new partition of a StatefulSet.
func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error {
log := crlog.FromContext(ctx)

oldPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
newPartition := oldPartition - 1

Expand All @@ -321,6 +307,7 @@ func (r *StatefulSetPartitionReconciler) patchNewPartition(ctx context.Context,
return fmt.Errorf("failed to patch new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err)
}

log.Info("Updated partition", "newPartition", newPartition, "oldPartition", oldPartition)
r.Recorder.Eventf(sts, corev1.EventTypeNormal, "PartitionUpdate", "Updated partition from %d to %d", oldPartition, newPartition)

return nil
Expand Down
2 changes: 1 addition & 1 deletion e2e/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var _ = Context("partition_test", func() {
Expect(err).NotTo(HaveOccurred())
Expect(sts.Spec.UpdateStrategy.RollingUpdate).NotTo(BeNil())
Expect(sts.Spec.UpdateStrategy.RollingUpdate.Partition).NotTo(BeNil())
Expect(*sts.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
Expect(*sts.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(2)))
})

It("should rollback succeed", func() {
Expand Down

0 comments on commit f8b23dd

Please sign in to comment.