From 189d527ee50201eccdbbd5f4d5fd79b41bcea850 Mon Sep 17 00:00:00 2001 From: Andrey Kiskyak Date: Fri, 8 Oct 2021 16:55:50 +0300 Subject: [PATCH] Add proxy to replicas feature --- cmd/proxy/cmd/proxy.go | 97 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 79 insertions(+), 18 deletions(-) diff --git a/cmd/proxy/cmd/proxy.go b/cmd/proxy/cmd/proxy.go index 39f1f3b99..10ac83d87 100644 --- a/cmd/proxy/cmd/proxy.go +++ b/cmd/proxy/cmd/proxy.go @@ -48,10 +48,14 @@ var CmdProxy = &cobra.Command{ type config struct { cmd.CommonConfig - listenAddress string - port string - stopListening bool - debug bool + listenAddress string + port string + stopListening bool + replicaMode bool + replicaModeFallBack bool + loadBalancingType string + debug bool + logPollon bool keepAliveIdle int keepAliveCount int @@ -66,10 +70,14 @@ func init() { CmdProxy.PersistentFlags().StringVar(&cfg.listenAddress, "listen-address", "127.0.0.1", "proxy listening address") CmdProxy.PersistentFlags().StringVar(&cfg.port, "port", "5432", "proxy listening port") CmdProxy.PersistentFlags().BoolVar(&cfg.stopListening, "stop-listening", true, "stop listening on store error") + CmdProxy.PersistentFlags().BoolVar(&cfg.replicaMode, "replica-mode", false, "proxy to replicas") + CmdProxy.PersistentFlags().BoolVar(&cfg.replicaModeFallBack, "replica-mode-fallback", false, "Fallback to master when no lives replicas") CmdProxy.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging") + CmdProxy.PersistentFlags().BoolVar(&cfg.logPollon, "log-pollon", false, "enable pollon logging") CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveIdle, "tcp-keepalive-idle", 0, "set tcp keepalive idle (seconds)") CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveCount, "tcp-keepalive-count", 0, "set tcp keepalive probe count number") CmdProxy.PersistentFlags().IntVar(&cfg.keepAliveInterval, "tcp-keepalive-interval", 0, "set tcp keepalive interval (seconds)") + CmdProxy.PersistentFlags().StringVar(&cfg.loadBalancingType, "load-balancing-type", "random", "proxy to replicas LB Type") if err := CmdProxy.PersistentFlags().MarkDeprecated("debug", "use --log-level=debug instead"); err != nil { log.Fatal(err) @@ -140,6 +148,9 @@ func (c *ClusterChecker) startPollonProxy() error { pp.SetKeepAliveIdle(time.Duration(cfg.keepAliveIdle) * time.Second) pp.SetKeepAliveCount(cfg.keepAliveCount) pp.SetKeepAliveInterval(time.Duration(cfg.keepAliveInterval) * time.Second) + if cfg.loadBalancingType == "leastqueue" { + pp.SetLBType(pollon.LeastQueue) + } c.pp = pp c.listener = listener @@ -186,6 +197,63 @@ func (c *ClusterChecker) SetProxyInfo(e store.Store, generation int64, proxyTime return nil } +func tcpAddrToStr(addr []*net.TCPAddr) string { + var result string + for i, a := range addr { + if i > 0 { + result += " " + } + result += a.String() + } + return result +} +func GetProxyDBs(cd *cluster.ClusterData) []*net.TCPAddr { + var result []*net.TCPAddr + //Proxy to master + if !cfg.replicaMode { + proxy := cd.Proxy + db, ok := cd.DBs[proxy.Spec.MasterDBUID] + if !ok { + return nil + } + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) + if err != nil { + return nil + } + result = append(result, addr) + return result + } + //Proxy to replicas + var master, replicas []*net.TCPAddr + for _, db := range cd.DBs { + if !db.Status.Healthy { + continue + } + if len(db.Status.ListenAddress) < 1 || len(db.Status.ListenAddress) < 1 { + continue + } + if db.Spec.Role == common.RoleStandby { + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) + if err != nil { + continue + } + replicas = append(replicas, addr) + } + if db.Spec.Role == common.RoleMaster { + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) + if err != nil { + continue + } + master = append(master, addr) + } + } + if len(replicas) < 1 && cfg.replicaModeFallBack { + log.Errorw("No alive replicas FallBack to master") + return master + } + return replicas +} + // Check reads the cluster data and applies the right pollon configuration. func (c *ClusterChecker) Check() error { cd, _, err := c.e.GetClusterData(context.TODO()) @@ -242,9 +310,9 @@ func (c *ClusterChecker) Check() error { return nil } - db, ok := cd.DBs[proxy.Spec.MasterDBUID] - if !ok { - log.Infow("no db object available, closing connections to master", "db", proxy.Spec.MasterDBUID) + addr := GetProxyDBs(cd) + if len(addr) < 1 { + log.Infow("no db object available, closing connections") c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) // ignore errors on setting proxy info if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { @@ -259,13 +327,7 @@ func (c *ClusterChecker) Check() error { return nil } - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(db.Status.ListenAddress, db.Status.Port)) - if err != nil { - log.Errorw("cannot resolve db address", zap.Error(err)) - c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) - return nil - } - log.Infow("master address", "address", addr) + log.Infow("Proxy address", "address", tcpAddrToStr(addr)) if err = c.SetProxyInfo(c.e, proxy.Generation, proxyTimeout); err != nil { // if we failed to update our proxy info when a master is defined we // cannot ignore this error since the sentinel won't know that we exist @@ -283,13 +345,12 @@ func (c *ClusterChecker) Check() error { // start proxing only if we are inside enabledProxies, this ensures that the // sentinel has read our proxyinfo and knows we are alive if util.StringInSlice(proxy.Spec.EnabledProxies, c.uid) { - log.Infow("proxying to master address", "address", addr) + log.Infow("proxying to address", "address", tcpAddrToStr(addr)) c.sendPollonConfData(pollon.ConfData{DestAddr: addr}) } else { - log.Infow("not proxying to master address since we aren't in the enabled proxies list", "address", addr) + log.Infow("not proxying to address since we aren't in the enabled proxies list", "address", addr) c.sendPollonConfData(pollon.ConfData{DestAddr: nil}) } - return nil } @@ -389,7 +450,7 @@ func proxy(c *cobra.Command, args []string) { if cmd.IsColorLoggerEnable(c, &cfg.CommonConfig) { log = slog.SColor() } - if slog.IsDebug() { + if slog.IsDebug() || cfg.logPollon { if cmd.IsColorLoggerEnable(c, &cfg.CommonConfig) { stdlog := slog.StdLogColor() pollon.SetLogger(stdlog)