Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] : cluster scaling #704

Merged
merged 6 commits into from
Nov 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
temp
Signed-off-by: drivebyer <yang.wu@daocloud.io>
  • Loading branch information
drivebyer committed Nov 20, 2023
commit 12e53459559628669312e153249bb0fb134c1ba1
48 changes: 25 additions & 23 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request

// Check if the cluster is downscaled
if leaderReplicas < instance.Status.ReadyLeaderReplicas {
reqLogger.Info("Redis cluster is downscaling", "Ready.ReadyLeaderReplicas", instance.Status.ReadyLeaderReplicas, "Expected.ReadyLeaderReplicas", leaderReplicas)

// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
@@ -86,15 +87,17 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
k8sutils.ClusterFailover(ctx, r.K8sClient, r.Log, instance)
}

// Step 1 Rehard the Cluster
k8sutils.ReshardRedisCluster(r.K8sClient, r.Log, instance)
// Step 2 Remove the Follower Node
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, r.Log, instance)
// Step 3 Remove the Leader Node
k8sutils.RemoveRedisNodeFromCluster(ctx, r.K8sClient, r.Log, instance)
// Step 4 Rebalance the cluster
// Step 2 Rehard the Cluster
k8sutils.ReshardRedisCluster(r.K8sClient, r.Log, instance, true)
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(r.K8sClient, r.Log, instance)

err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterReady, status.ReadyClusterReason, instance.Status.ReadyLeaderReplicas-1, instance.Status.ReadyLeaderReplicas-1)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
return ctrl.Result{RequeueAfter: time.Second * 100}, nil
}

@@ -175,17 +178,15 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// Mark the cluster status as bootstrapping if all the leader and follower nodes are ready
if int32(redisLeaderInfo.Status.ReadyReplicas) == leaderReplicas && int32(redisFollowerInfo.Status.ReadyReplicas) == followerReplicas {
if instance.Status.ReadyLeaderReplicas == leaderReplicas && instance.Status.ReadyFollowerReplicas == 0 {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterBootstrap, status.BootstrapClusterReason, leaderReplicas, followerReplicas)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
if !(instance.Status.ReadyLeaderReplicas == leaderReplicas && instance.Status.ReadyFollowerReplicas == followerReplicas) {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterBootstrap, status.BootstrapClusterReason, leaderReplicas, followerReplicas)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
}

reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)))
if k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, "") != totalReplicas {
if nc := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, ""); nc != totalReplicas {
leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, "leader")
if leaderCount != leaderReplicas {
reqLogger.Info("Not all leader are part of the cluster...", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas)
@@ -208,16 +209,17 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Info("no follower/replicas configured, skipping replication configuration", "Leaders.Count", leaderCount, "Leader.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
}
}
} else {
reqLogger.Info("Redis leader count is desired")
if int(totalReplicas) > 1 && k8sutils.CheckRedisClusterState(ctx, r.K8sClient, r.Log, instance) >= int(totalReplicas)-1 {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
reqLogger.Info("Redis cluster count is not desired", "Current.Count", nc, "Desired.Count", totalReplicas)
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
}

reqLogger.Info("Redis cluster count is desired")
if int(totalReplicas) > 1 && k8sutils.CheckRedisClusterState(ctx, r.K8sClient, r.Log, instance) >= int(totalReplicas)-1 {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
return ctrl.Result{RequeueAfter: time.Second * 120}, nil
}

// Check If there is No Empty Master Node
22 changes: 14 additions & 8 deletions k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
@@ -11,8 +11,10 @@ import (
"k8s.io/client-go/kubernetes"
)

// Reshard the redis Cluster
func ReshardRedisCluster(client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) {
// ReshardRedisCluster transfer the slots from the last node to the first node.
//
// NOTE: when all slot been transferred, the node become slave of the first master node.
func ReshardRedisCluster(client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, remove bool) {
ctx := context.TODO()
var cmd []string
currentRedisCount := CheckRedisNodeCount(ctx, client, logger, cr, "leader")
@@ -72,6 +74,10 @@ func ReshardRedisCluster(client kubernetes.Interface, logger logr.Logger, cr *re
return
}
executeCommand(client, logger, cr, cmd, cr.ObjectMeta.Name+"-leader-0")

if remove {
RemoveRedisNodeFromCluster(ctx, client, logger, cr, removePOD)
}
}

func getRedisClusterSlots(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, nodeID string) string {
@@ -336,18 +342,18 @@ func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.
}

// Remove redis cluster node would remove last node to the existing redis cluster using redis-cli
func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) {
func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, removePod RedisDetails) {
var cmd []string
currentRedisCount := CheckRedisNodeCount(ctx, client, logger, cr, "leader")
//currentRedisCount := CheckRedisNodeCount(ctx, client, logger, cr, "leader")

existingPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-0",
Namespace: cr.Namespace,
}
removePod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
Namespace: cr.Namespace,
}
//removePod := RedisDetails{
// PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
// Namespace: cr.Namespace,
//}

cmd = []string{"redis-cli", "--cluster", "del-node"}

6 changes: 3 additions & 3 deletions k8sutils/redis.go
Original file line number Diff line number Diff line change
@@ -406,20 +406,20 @@ func getContainerID(client kubernetes.Interface, logger logr.Logger, cr *redisv1
return -1, nil
}

logger.Info("Pod info retrieved successfully", "Pod Name", podName, "Namespace", cr.Namespace)
logger.V(1).Info("Pod info retrieved successfully", "Pod Name", podName, "Namespace", cr.Namespace)

targetContainer := -1
for containerID, tr := range pod.Spec.Containers {
logger.V(1).Info("Inspecting container", "Pod Name", podName, "Container ID", containerID, "Container Name", tr.Name)
if tr.Name == cr.ObjectMeta.Name+"-leader" {
targetContainer = containerID
logger.Info("Leader container found", "Container ID", containerID, "Container Name", tr.Name)
logger.V(1).Info("Leader container found", "Container ID", containerID, "Container Name", tr.Name)
break
}
}

if targetContainer == -1 {
logger.Info("Leader container not found in pod", "Pod Name", podName)
logger.V(1).Info("Leader container not found in pod", "Pod Name", podName)
return -1, nil
}