Skip to content

Commit

Permalink
fix: redis getrole from sentinel (#8982)
Browse files Browse the repository at this point in the history
  • Loading branch information
kizuna-lek authored Feb 28, 2025
1 parent ba489ce commit b03c7af
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
34 changes: 21 additions & 13 deletions pkg/lorry/engines/redis/get_replica_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,15 @@ func (mgr *Manager) GetReplicaRole(ctx context.Context, _ *dcs.Cluster) (string,
}

// We use the role obtained from Sentinel as the sole source of truth.
masterAddr, err := mgr.sentinelClient.GetMasterAddrByName(ctx, mgr.ClusterCompName).Result()
masterAddr, err := mgr.sentinelClient.GetMasterAddrByName(ctx, mgr.masterName).Result()
if err != nil {
mgr.Logger.Info("failed to get master address from Sentinel, try to get from Redis", "error", err.Error())
return getRoleFromRedisClient()
}

masterIP := masterAddr[0]
// if current member is not master from sentinel, just return secondary to avoid double master
if masterIP != mgr.CurrentMemberIP {
return models.SECONDARY, nil
}
return models.PRIMARY, nil
masterPort := masterAddr[1]
return mgr.checkPrimary(masterIP, masterPort), nil
}

func (mgr *Manager) SubscribeRoleChange(ctx context.Context) {
Expand All @@ -86,14 +84,24 @@ func (mgr *Manager) SubscribeRoleChange(ctx context.Context) {
ch := pubSub.Channel()
for msg := range ch {
// +switch-master <master name> <old ip> <old port> <new ip> <new port>
masterAddr := strings.Split(msg.Payload, " ")
masterName := strings.Split(masterAddr[3], ".")[0]

if masterName == mgr.CurrentMemberName {
mgr.role = models.PRIMARY
} else {
mgr.role = models.SECONDARY
msgInfo := strings.Split(msg.Payload, " ")
if len(msgInfo) != 5 {
mgr.Logger.Info("failed to get switch master info from subscribe", "msg", msg.Payload)
}

masterIP := msgInfo[3]
masterPort := msgInfo[4]
mgr.role = mgr.checkPrimary(masterIP, masterPort)
mgr.roleSubscribeUpdateTime = time.Now().Unix()
}
}

// If current member is not master from sentinel, just return secondary to avoid double master
// When currentRedisHost is a domain name, it does not include dnsDomain by default,
// and prefix matching can override the matching of domain names or IPs.
func (mgr *Manager) checkPrimary(masterIP, masterPort string) string {
if !strings.HasPrefix(masterIP, mgr.currentRedisHost) || masterPort != mgr.currentRedisPort {
return models.SECONDARY
}
return models.PRIMARY
}
61 changes: 61 additions & 0 deletions pkg/lorry/engines/redis/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package redis
import (
"context"
"fmt"
"net"
"os/exec"
"strconv"
"strings"
Expand Down Expand Up @@ -49,6 +50,9 @@ type Manager struct {
role string
roleSubscribeUpdateTime int64
roleProbePeriod int64
masterName string
currentRedisHost string
currentRedisPort string
}

var _ engines.DBManager = &Manager{}
Expand Down Expand Up @@ -77,6 +81,31 @@ func NewManager(properties engines.Properties) (engines.DBManager, error) {
roleProbePeriod: int64(viper.GetInt(constant.KBEnvRoleProbePeriod)),
}

mgr.masterName = mgr.ClusterCompName
if viper.IsSet("CUSTOM_SENTINEL_MASTER_NAME") {
mgr.masterName = viper.GetString("CUSTOM_SENTINEL_MASTER_NAME")
}
mgr.currentRedisHost = viper.GetString("KB_POD_FQDN")
mgr.currentRedisPort = viper.GetString(constant.KBEnvServicePort)

switch {
case viper.IsSet("FIXED_POD_IP_ENABLED"):
fixPodIP, err := getFixedPodIP(viper.GetString("KB_POD_FQDN"))
if err != nil {
return nil, err
}
mgr.currentRedisHost = fixPodIP
case viper.IsSet("HOST_NETWORK_ENABLED") || viper.IsSet("REDIS_ADVERTISED_PORT"):
mgr.currentRedisHost = viper.GetString("KB_HOST_IP")
if viper.IsSet("REDIS_ADVERTISED_PORT") {
port, err := mgr.getAdvertisedPort(viper.GetString("REDIS_ADVERTISED_PORT"))
if err != nil {
return nil, err
}
mgr.currentRedisPort = port
}
}

majorVersion, err := getRedisMajorVersion()
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,3 +185,35 @@ func getRedisMajorVersion() (int, error) {
}
return majorVersion, nil
}

func getFixedPodIP(podFQDN string) (string, error) {
addrs, err := net.LookupHost(podFQDN)
if err != nil {
return "", err
}
if len(addrs) > 0 {
return addrs[0], nil
}

return "", fmt.Errorf("failed to get IP address for %s", podFQDN)
}

func (mgr *Manager) getAdvertisedPort(redisAdvertisedPort string) (string, error) {
// redisAdvertisedPort: pod1Svc:advertisedPort1,pod2Svc:advertisedPort2,...
addrList := strings.Split(redisAdvertisedPort, ",")

getIndex := func(name string) string {
items := strings.Split(name, "-")
return items[len(items)-1]
}

for _, addr := range addrList {
host := strings.Split(addr, ":")[0]
port := strings.Split(addr, ":")[1]
if getIndex(host) == getIndex(mgr.CurrentMemberName) {
return port, nil
}
}

return "", fmt.Errorf("failed to get advertised port for %s", mgr.CurrentMemberName)
}
4 changes: 2 additions & 2 deletions pkg/lorry/engines/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func newSentinelClient(s *Settings, clusterCompName string) *redis.SentinelClien
sentinelHost = viper.GetString("SENTINEL_HEADLESS_SERVICE_NAME")
}
sentinelPort := "26379"
if viper.IsSet("REDIS_SENTINEL_HOST_NETWORK_PORT") {
sentinelPort = viper.GetString("REDIS_SENTINEL_HOST_NETWORK_PORT")
if viper.IsSet("SENTINEL_SERVICE_PORT") {
sentinelPort = viper.GetString("SENTINEL_SERVICE_PORT")
}

sentinelUser := s.Username
Expand Down

0 comments on commit b03c7af

Please sign in to comment.