Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy to replicas feature #853

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 79 additions & 18 deletions cmd/proxy/cmd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ var CmdProxy = &cobra.Command{
type config struct {
cmd.CommonConfig

listenAddress string
port string
stopListening bool
debug bool
listenAddress string
port string
stopListening bool
replicaMode bool
replicaModeFallBack bool
loadBalancingType string
debug bool
logPollon bool

keepAliveIdle int
keepAliveCount int
Expand All @@ -66,10 +70,14 @@ func init() {
CmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address")
CmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port")
CmdProxy.PersistentFlags().BoolVar(&cfg.stopListening, "stop-listening", true, "stop listening on store error")
CmdProxy.PersistentFlags().BoolVar(&cfg.replicaMode, "replica-mode", false, "proxy to replicas")
CmdProxy.PersistentFlags().BoolVar(&cfg.replicaModeFallBack, "replica-mode-fallback", false, "Fallback to master when no lives replicas")
CmdProxy.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging")
CmdProxy.PersistentFlags().BoolVar(&cfg.logPollon, "log-pollon", false, "enable pollon logging")
CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveIdle, "tcp-keepalive-idle", 0, "set tcp keepalive idle (seconds)")
CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveCount, "tcp-keepalive-count", 0, "set tcp keepalive probe count number")
CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveInterval, "tcp-keepalive-interval", 0, "set tcp keepalive interval (seconds)")
CmdProxy.PersistentFlags().StringVar(&cfg.loadBalancingType, "load-balancing-type", "random", "proxy to replicas LB Type")

if err := CmdProxy.PersistentFlags().MarkDeprecated("debug", "use --log-level=debug instead"); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -140,6 +148,9 @@ func (c *ClusterChecker) startPollonProxy() error {
pp.SetKeepAliveIdle(time.Duration(cfg.keepAliveIdle) * time.Second)
pp.SetKeepAliveCount(cfg.keepAliveCount)
pp.SetKeepAliveInterval(time.Duration(cfg.keepAliveInterval) * time.Second)
if cfg.loadBalancingType == "leastqueue" {
pp.SetLBType(pollon.LeastQueue)
}

c.pp = pp
c.listener = listener
Expand Down Expand Up @@ -186,6 +197,63 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime
return nil
}

func tcpAddrToStr(addr []*net.TCPAddr) string {
var result string
for i, a := range addr {
if i > 0 {
result += " "
}
result += a.String()
}
return result
}
func GetProxyDBs(cd *cluster.ClusterData) []*net.TCPAddr {
var result []*net.TCPAddr
//Proxy to master
if !cfg.replicaMode {
proxy := cd.Proxy
db, ok := cd.DBs[proxy.Spec.MasterDBUID]
if !ok {
return nil
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
return nil
}
result = append(result, addr)
return result
}
//Proxy to replicas
var master, replicas []*net.TCPAddr
for _, db := range cd.DBs {
if !db.Status.Healthy {
continue
}
if len(db.Status.ListenAddress) < 1 || len(db.Status.ListenAddress) < 1 {
continue
}
if db.Spec.Role == common.RoleStandby {
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
continue
}
replicas = append(replicas, addr)
}
if db.Spec.Role == common.RoleMaster {
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
continue
}
master = append(master, addr)
}
}
if len(replicas) < 1 && cfg.replicaModeFallBack {
log.Errorw("No alive replicas FallBack to master")
return master
}
return replicas
}

// Check reads the cluster data and applies the right pollon configuration.
func (c *ClusterChecker) Check() error {
cd, _, err := c.e.GetClusterData(context.TODO())
Expand Down Expand Up @@ -242,9 +310,9 @@ func (c *ClusterChecker) Check() error {
return nil
}

db, ok := cd.DBs[proxy.Spec.MasterDBUID]
if !ok {
log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID)
addr := GetProxyDBs(cd)
if len(addr) < 1 {
log.Infow("no db object available, closing connections")
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
// ignore errors on setting proxy info
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
Expand All @@ -259,13 +327,7 @@ func (c *ClusterChecker) Check() error {
return nil
}

addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port))
if err != nil {
log.Errorw("cannot resolve db address", zap.Error(err))
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
return nil
}
log.Infow("master address", "address", addr)
log.Infow("Proxy address", "address", tcpAddrToStr(addr))
if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil {
// if we failed to update our proxy info when a master is defined we
// cannot ignore this error since the sentinel won't know that we exist
Expand All @@ -283,13 +345,12 @@ func (c *ClusterChecker) Check() error {
// start proxing only if we are inside enabledProxies, this ensures that the
// sentinel has read our proxyinfo and knows we are alive
if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) {
log.Infow("proxying to master address", "address", addr)
log.Infow("proxying to address", "address", tcpAddrToStr(addr))
c.sendPollonConfData(pollon.ConfData{DestAddr: addr})
} else {
log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr)
log.Infow("not proxying to address since we aren't in the enabled proxies list", "address", addr)
c.sendPollonConfData(pollon.ConfData{DestAddr: nil})
}

return nil
}

Expand Down Expand Up @@ -389,7 +450,7 @@ func proxy(c *cobra.Command, args []string) {
if cmd.IsColorLoggerEnable(c, &cfg.CommonConfig) {
log = slog.SColor()
}
if slog.IsDebug() {
if slog.IsDebug() || cfg.logPollon {
if cmd.IsColorLoggerEnable(c, &cfg.CommonConfig) {
stdlog := slog.StdLogColor()
pollon.SetLogger(stdlog)
Expand Down