Skip to content

Commit

Permalink
refactor: add common controller util (OT-CONTAINER-KIT#977)
Browse files Browse the repository at this point in the history
* refactor: add common controller util

Signed-off-by: drivebyer <yang.wu@daocloud.io>

* fix lint

Signed-off-by: drivebyer <wuyangmuc@gmail.com>

---------

Signed-off-by: drivebyer <yang.wu@daocloud.io>
Signed-off-by: drivebyer <wuyangmuc@gmail.com>
Signed-off-by: Matt Robinson <mattrobinsonsre@gmail.com>
  • Loading branch information
drivebyer authored and mattrobinsonsre committed Jul 11, 2024
1 parent ff882f7 commit b90307b
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 88 deletions.
25 changes: 9 additions & 16 deletions controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/k8sutils"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -47,36 +47,29 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
return intctrlutil.RequeueWithErrorChecking(err, reqLogger, "failed to get redis instance")
}
if instance.ObjectMeta.GetDeletionTimestamp() != nil {
if err = k8sutils.HandleRedisFinalizer(r.Client, r.K8sClient, r.Log, instance); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to handle redis finalizer")
}
return ctrl.Result{}, nil
return intctrlutil.Reconciled()
}
if _, found := instance.ObjectMeta.GetAnnotations()["redis.opstreelabs.in/skip-reconcile"]; found {
reqLogger.Info("Found annotations redis.opstreelabs.in/skip-reconcile, so skipping reconcile")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "found skip reconcile annotation")
}
if err = k8sutils.AddFinalizer(instance, k8sutils.RedisFinalizer, r.Client); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to add finalizer")
}

err = k8sutils.CreateStandaloneRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to create redis")
}
err = k8sutils.CreateStandaloneService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to create service")
}

reqLogger.Info("Will reconcile redis operator in again 10 seconds")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "requeue after 10 seconds")
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
61 changes: 27 additions & 34 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/OT-CONTAINER-KIT/redis-operator/api/status"
redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/k8sutils"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -50,20 +51,16 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request

err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
return intctrlutil.RequeueWithErrorChecking(err, reqLogger, "failed to get redis cluster instance")
}
if instance.ObjectMeta.GetDeletionTimestamp() != nil {
if err = k8sutils.HandleRedisClusterFinalizer(r.Client, r.K8sClient, r.Log, instance); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to handle redis cluster finalizer")
}
return ctrl.Result{}, nil
return intctrlutil.Reconciled()
}
if _, found := instance.ObjectMeta.GetAnnotations()["rediscluster.opstreelabs.in/skip-reconcile"]; found {
reqLogger.Info("Found annotations rediscluster.opstreelabs.in/skip-reconcile, so skipping reconcile")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "found skip reconcile annotation")
}
instance.SetDefault()

Expand All @@ -72,7 +69,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
totalReplicas := leaderReplicas + followerReplicas

if err = k8sutils.AddFinalizer(instance, k8sutils.RedisClusterFinalizer, r.Client); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "failed to add finalizer")
}

// Check if the cluster is downscaled
Expand All @@ -98,41 +95,41 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(r.K8sClient, r.Log, instance)
reqLogger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "")
}

// Mark the cluster status as initializing if there are no leader or follower nodes
if (instance.Status.ReadyLeaderReplicas == 0 && instance.Status.ReadyFollowerReplicas == 0) ||
instance.Status.ReadyLeaderReplicas != leaderReplicas {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterInitializing, status.InitializingClusterLeaderReason, instance.Status.ReadyLeaderReplicas, instance.Status.ReadyFollowerReplicas, r.Dk8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}

if leaderReplicas != 0 {
err = k8sutils.CreateRedisLeaderService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}
err = k8sutils.CreateRedisLeader(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

// todo: remove me after watch statefulset in controller
redisLeaderInfo, err := k8sutils.GetStatefulSet(r.K8sClient, r.Log, instance.GetNamespace(), instance.GetName()+"-leader")
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "")
}
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

if r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") {
Expand All @@ -141,54 +138,52 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
instance.Status.ReadyFollowerReplicas != followerReplicas {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterInitializing, status.InitializingClusterFollowerReason, leaderReplicas, instance.Status.ReadyFollowerReplicas, r.Dk8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}
// if we have followers create their service.
if followerReplicas != 0 {
err = k8sutils.CreateRedisFollowerService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}
err = k8sutils.CreateRedisFollower(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "follower", instance.Spec.RedisFollower.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}
// todo: remove me after watch statefulset in controller
redisFollowerInfo, err := k8sutils.GetStatefulSet(r.K8sClient, r.Log, instance.GetNamespace(), instance.GetName()+"-follower")
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "")
}
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

if leaderReplicas == 0 {
reqLogger.Info("Redis leaders Cannot be 0", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis leaders Cannot be 0", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)
}

if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
reqLogger.Info("Redis leader and follower nodes are not ready yet")
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis leader and follower nodes are not ready yet")
}

// Mark the cluster status as bootstrapping if all the leader and follower nodes are ready
if !(instance.Status.ReadyLeaderReplicas == leaderReplicas && instance.Status.ReadyFollowerReplicas == followerReplicas) {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterBootstrap, status.BootstrapClusterReason, leaderReplicas, followerReplicas, r.Dk8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}

if nc := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, ""); nc != totalReplicas {
reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)))
reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", redisLeaderInfo.Status.ReadyReplicas, "Followers.Ready", redisFollowerInfo.Status.ReadyReplicas)
leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, "leader")
if leaderCount != leaderReplicas {
reqLogger.Info("Not all leader are part of the cluster...", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas)
Expand All @@ -211,16 +206,15 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Info("no follower/replicas configured, skipping replication configuration", "Leaders.Count", leaderCount, "Leader.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
}
}
reqLogger.Info("Redis cluster count is not desired", "Current.Count", nc, "Desired.Count", totalReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis cluster count is not desired", "Current.Count", nc, "Desired.Count", totalReplicas)
}

reqLogger.Info("Redis cluster count is desired")
if int(totalReplicas) > 1 && k8sutils.CheckRedisClusterState(ctx, r.K8sClient, r.Log, instance) >= int(totalReplicas)-1 {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}

Expand All @@ -234,12 +228,11 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
if k8sutils.RedisClusterStatusHealth(ctx, r.K8sClient, r.Log, instance) {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterReady, status.ReadyClusterReason, leaderReplicas, followerReplicas, r.Dk8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}
}
reqLogger.Info("Will reconcile redis cluster operator in again 10 seconds")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "")
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
34 changes: 14 additions & 20 deletions controllers/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/k8sutils"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,50 +34,45 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req

err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
return intctrlutil.RequeueWithErrorChecking(err, reqLogger, "")
}
if instance.ObjectMeta.GetDeletionTimestamp() != nil {
if err = k8sutils.HandleRedisReplicationFinalizer(r.Client, r.K8sClient, r.Log, instance); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
return ctrl.Result{}, nil
return intctrlutil.Reconciled()
}
if _, found := instance.ObjectMeta.GetAnnotations()["redisreplication.opstreelabs.in/skip-reconcile"]; found {
reqLogger.Info("Found annotations redisreplication.opstreelabs.in/skip-reconcile, so skipping reconcile")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "found skip reconcile annotation")
}

leaderReplicas := int32(1)
followerReplicas := instance.Spec.GetReplicationCounts("replication") - leaderReplicas
totalReplicas := leaderReplicas + followerReplicas

if err = k8sutils.AddFinalizer(instance, k8sutils.RedisReplicationFinalizer, r.Client); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

err = k8sutils.CreateReplicationRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
err = k8sutils.CreateReplicationService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

// Set Pod distruptiuon Budget Later

redisReplicationInfo, err := k8sutils.GetStatefulSet(r.K8sClient, r.Log, instance.GetNamespace(), instance.GetName())
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 60}, err
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "")
}

// Check that the Leader and Follower are ready in redis replication
if redisReplicationInfo.Status.ReadyReplicas != totalReplicas {
reqLogger.Info("Redis replication nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisReplicationInfo.Status.ReadyReplicas)), "Expected.Replicas", totalReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis replication nodes are not ready yet", "Ready.Replicas", redisReplicationInfo.Status.ReadyReplicas, "Expected.Replicas", totalReplicas)
}

var realMaster string
Expand All @@ -90,18 +85,17 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req
realMaster = masterNodes[0]
}
if err = k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, r.Log, instance, masterNodes, realMaster); err != nil {
return ctrl.Result{RequeueAfter: time.Second * 60}, err
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "")
}
}
realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, r.Log, instance, masterNodes)
if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
if err = r.UpdateRedisPodRoleLabel(ctx, instance, realMaster); err != nil {
return ctrl.Result{}, err
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
reqLogger.Info("Will reconcile redis operator in again 10 seconds")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
return intctrlutil.RequeueAfter(reqLogger, time.Second*10, "")
}

func (r *RedisReplicationReconciler) UpdateRedisReplicationMaster(ctx context.Context, instance *redisv1beta2.RedisReplication, masterNode string) error {
Expand Down
Loading

0 comments on commit b90307b

Please sign in to comment.