Skip to content

Commit

Permalink
fix(reconciler): recreate missing resources (#222)
Browse files Browse the repository at this point in the history
* fix(reconciler): recreate missing resources

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* fix

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* trigger controller

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* fix

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* debug

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* fix

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* test

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

* remove pvc change

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>

---------

Signed-off-by: Abhradeep Chakraborty <abhradeep@dragonflydb.io>
  • Loading branch information
Abhra303 authored Dec 3, 2024
1 parent 8cc255a commit 4012589
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 43 deletions.
30 changes: 28 additions & 2 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

GinkgoLogr.Info("start timestamp", "timestamp", time.Now().UTC())
// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
GinkgoLogr.Info("end timestamp", "timestamp", time.Now().UTC())

// Check for service and statefulset
var ss appsv1.StatefulSet
Expand Down Expand Up @@ -478,18 +480,26 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// check for affinity
Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))
}
// Update df to the latest
err = k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &df)
Expect(err).To(BeNil())
GinkgoLogr.Info("df arg propagate phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)

})

It("Check for data", func() {
stopChan := make(chan struct{}, 1)
defer close(stopChan)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password, 6395)
Expect(err).To(BeNil())

// Check for test data
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
defer close(stopChan)
})

It("Change Service specification to LoadBalancer", func() {
Expand All @@ -512,11 +522,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Labels: newLabels,
}

GinkgoLogr.Info("df phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
Expand All @@ -533,6 +544,19 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Expect(svc.Labels).To(Equal(newLabels))
})

It("Should recreate missing statefulset", func() {
var ss appsv1.StatefulSet
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expect(err).To(BeNil())

Expect(k8sClient.Delete(ctx, &ss)).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)
Expect(err).To(BeNil())
})

It("Cleanup", func() {
var df resourcesv1.Dragonfly
err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -919,6 +943,8 @@ func isDragonflyInphase(ctx context.Context, c client.Client, name, namespace, p
return false, nil
}

GinkgoLogr.Info("dragonfly phase", "phase", df.Status.Phase, "update", df.Status.IsRollingUpdate)

// Ready means we also want rolling update to be false
if phase == controller.PhaseReady {
// check for replicas
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.5.3
github.com/samber/lo v1.47.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRci
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
144 changes: 103 additions & 41 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dragonflydb/dragonfly-operator/internal/resources"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -91,7 +92,49 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Created resources")
return ctrl.Result{}, nil
} else if df.Status.IsRollingUpdate {
}

// Ensure all resources exist before moving forward.
missingResources, err := r.getMissingResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
for _, resource := range missingResources {
// recreate missing resources
if err := r.Create(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Update all resources even if the df is in rollout state to ensure
// that newer updates don't get blocked by failed update attempts.
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")

if df.Status.IsRollingUpdate {
// This is a Rollout
log.Info("Rolling out new version")
var updatedStatefulset appsv1.StatefulSet
Expand Down Expand Up @@ -127,6 +170,14 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
} else {
log.Info("found pod without label", "pod", pod.Name)
if isFailedToStart(&pod) {
// This is a new pod which is trying to be ready, but couldn't start due to misconfig.
// Delete the pod and create a new one.
if err := r.Delete(ctx, &pod); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}
// retry after they are ready
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
Expand Down Expand Up @@ -234,63 +285,74 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

return ctrl.Result{}, nil
} else {
} else if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
// perform a rollout only if the pod spec has changed
var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Check if the pod spec has changed
log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas)
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// requeue so that the rollout is processed
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

// Is this a Dragonfly object update?
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
}
return ctrl.Result{Requeue: true}, nil
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
func isFailedToStart(pod *corev1.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if (containerStatus.State.Waiting != nil && isFailureReason(containerStatus.State.Waiting.Reason)) ||
(containerStatus.State.Terminated != nil && isFailureReason(containerStatus.State.Terminated.Reason)) {
return true
}
}
return false
}

// isFailureReason checks if the given reason indicates a failure.
func isFailureReason(reason string) bool {
return reason == "ErrImagePull" ||
reason == "ImagePullBackOff" ||
reason == "CrashLoopBackOff" ||
reason == "RunContainerError"
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")
return ctrl.Result{Requeue: true}, nil
func (r *DragonflyReconciler) getMissingResources(ctx context.Context, df *dfv1alpha1.Dragonfly) ([]client.Object, error) {
resources, err := resources.GetDragonflyResources(ctx, df)
if err != nil {
return nil, err
}
missingResources := make([]client.Object, 0)
for _, resource := range resources {
obj := resource.DeepCopyObject().(client.Object)

err := r.Get(ctx, client.ObjectKey{
Namespace: df.Namespace,
Name: resource.GetName(),
}, obj)

if errors.IsNotFound(err) {
missingResources = append(missingResources, resource)
} else if err != nil {
return nil, fmt.Errorf("failed to get resource %s/%s: %w", df.Namespace, resource.GetName(), err)
}
}
return missingResources, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *DragonflyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Listen only to spec changes
For(&dfv1alpha1.Dragonfly{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&appsv1.StatefulSet{}, builder.MatchEveryOwner).
Owns(&corev1.Service{}, builder.MatchEveryOwner).
Named("Dragonfly").
Complete(r)
}

0 comments on commit 4012589

Please sign in to comment.