Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, fmt.Errorf("failed to get statefulset: %w", err)
}

if result, err := dfi.allPodsHealthy(ctx, statefulSet.Status.UpdateRevision); !result.IsZero() || err != nil {
if result, err := dfi.allPodsHealthyAndHaveRole(ctx, statefulSet.Status.UpdateRevision); !result.IsZero() || err != nil {
return result, err
}

Expand All @@ -95,6 +95,11 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, fmt.Errorf("failed to get replicas: %w", err)
}

if len(replicas.Items) != int(dfi.df.Spec.Replicas)-1 {
dfi.log.Info("waiting for all replicas to be configured", "expected", *statefulSet.Spec.Replicas-1, "current", len(replicas.Items))
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

// We want to update the replicas first then the master
// We want to have at most one updated replica in full sync phase at a time
// if not, requeue
Expand All @@ -104,7 +109,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// if we are here it means that all latest replicas are in stable sync
// delete older version replicas
if result, err := dfi.updatedReplicas(ctx, replicas, statefulSet.Status.UpdateRevision); !result.IsZero() || err != nil {
if result, err := dfi.updateReplicas(ctx, replicas, statefulSet.Status.UpdateRevision); !result.IsZero() || err != nil {
return result, err
}

Expand Down
17 changes: 13 additions & 4 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ func (dfi *DragonflyInstance) deleteRoleLabel(ctx context.Context, pod *corev1.P
return nil
}

// allPodsHealthy checks whether all pods are healthy, and deletes pods that are outdated and failed to start
func (dfi *DragonflyInstance) allPodsHealthy(ctx context.Context, updateRevision string) (ctrl.Result, error) {
// allPodsHealthyAndHaveRole checks whether all pods are healthy, and deletes pods that are outdated and failed to start
func (dfi *DragonflyInstance) allPodsHealthyAndHaveRole(ctx context.Context, updateRevision string) (ctrl.Result, error) {
pods, err := dfi.getPods(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get dragonfly pods: %w", err)
Expand All @@ -735,6 +735,11 @@ func (dfi *DragonflyInstance) allPodsHealthy(ctx context.Context, updateRevision
dfi.log.Info("waiting for pod to finish startup", "pod", pod.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

if !roleExists(&pod) {
dfi.log.Info("waiting for pod to be assigned a role", "pod", pod.Name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -763,10 +768,14 @@ func (dfi *DragonflyInstance) verifyUpdatedReplicas(ctx context.Context, replica
return ctrl.Result{}, nil
}

// updatedReplicas updates the replicas to the latest version
func (dfi *DragonflyInstance) updatedReplicas(ctx context.Context, replicas *corev1.PodList, updateRevision string) (ctrl.Result, error) {
// updateReplicas updates the replicas to the latest version
func (dfi *DragonflyInstance) updateReplicas(ctx context.Context, replicas *corev1.PodList, updateRevision string) (ctrl.Result, error) {
for _, replica := range replicas.Items {
if !isPodOnLatestVersion(&replica, updateRevision) {
_, err := dfi.getMaster(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("skipping deleting replica: failed to get master: %w", err)
}
dfi.log.Info("deleting replica", "pod", replica.Name)
dfi.eventRecorder.Event(dfi.df, corev1.EventTypeNormal, "Rollout", "Deleting replica")
if err := dfi.client.Delete(ctx, &replica); err != nil {
Expand Down