From b0298c7a30a7c6b1adb8b4084848e8b18cfc9de9 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Thu, 25 May 2017 16:05:40 +0800 Subject: [PATCH 01/14] Fix the issue that invalid instance name can be created when clicking the 'CREATE' buttong without focusing on any field --- .../dashboard/controllers/instanceCreateDialogController.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ui/app/scripts/dashboard/controllers/instanceCreateDialogController.js b/ui/app/scripts/dashboard/controllers/instanceCreateDialogController.js index 9eb4220..8b430b1 100644 --- a/ui/app/scripts/dashboard/controllers/instanceCreateDialogController.js +++ b/ui/app/scripts/dashboard/controllers/instanceCreateDialogController.js @@ -44,6 +44,10 @@ $scope.processCreateInstanceForm = function () { $scope.creatingInstance = true; + if (!$scope.newInstance.name){ + $scope.creatingInstance = false; + angular.element('input.ng-invalid').first().focus(); + } else { $scope.checkDBName($scope.newInstance.name, function(){ dashboardServices.createInstance($scope.newInstance).then(function(response){ console.log('This is response from dashboardServices createInstance: '); @@ -76,4 +80,5 @@ //TODO Handle checkdb name API error failure }); } + } }]); From be169ed30a2f676c8ceb2d9ff0b40fc2571445dd Mon Sep 17 00:00:00 2001 From: zou sheng Date: Thu, 25 May 2017 19:49:18 +0800 Subject: [PATCH 02/14] Change sched/httplib/httplib.go to block invalid instance name null and spaces from front end and rest api request. --- sched/httplib/httplib.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sched/httplib/httplib.go b/sched/httplib/httplib.go index cc0a967..0dc3453 100644 --- a/sched/httplib/httplib.go +++ b/sched/httplib/httplib.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strconv" + "strings" "github.com/astaxie/beego" @@ -40,6 +41,19 @@ func (this *MainController) CreateInstance() { slaves, _ = strconv.Atoi(this.Ctx.Input.Param(":SLAVES")) // Get the capacity of the instance in MB inData := this.Ctx.Input.CopyBody() + //Check if instance name is valid, e.g.: space and null is not allowed from both front end and rest api + name = strings.TrimSpace(name) + + if name == "null" { + this.Ctx.WriteString(fmt.Sprintf("Instance name is null, please provide a valid name")) + return + } + + if len(name) == 0 { + this.Ctx.WriteString(fmt.Sprintf("Instance name consists of spaces, please provide a valid name")) + return + } + if len(inData) > 0 { //Some Payload is being supplied for create err := json.Unmarshal(inData, &IData) From 644e68b059e90ae426b43aa9fa5d2f75b374ebaf Mon Sep 17 00:00:00 2001 From: zou sheng Date: Sun, 16 Jul 2017 10:57:02 +0800 Subject: [PATCH 03/14] Change redis_proxy.go to support loading redis instances from zk dynamically once redis is created or deleted, and generate local unique proxy listening port which stores in zk. --- proxy/redis_proxy.go | 405 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 389 insertions(+), 16 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index d3e625d..ed3651f 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -4,15 +4,52 @@ import ( "encoding/json" "flag" "fmt" + "log" + "github.com/curator-go/curator" + "github.com/natefinch/lumberjack" + "github.com/samuel/go-zookeeper/zk" "io" "io/ioutil" - "log" + "math/rand" "net" "net/http" + "os" + "strconv" + "strings" + "time" ) -//ConfigMap A map of name of the proxy vs its actually backend endpoint -var ConfigMap map[string]Entry + +var ( + //ConfigMap A map of name of the proxy vs its actually backend endpoint + ConfigMap map[string]Entry + + //LocalPortsMap A map of local ports which fetch the data from zk once proxy daemon restarts + LocalPortsMap map[string]string + +) +const ( + + //RedisPortBaseNum Local redis listen port range from 6100 + RedisPortMinNum = 6100 + + RedisPortMaxNum = 6300 + + ProxyPort = 7979 + + CleanUpInterval = 20 + + CleanUpZKMaxReties = 3 + + CleanUpZKCheckIntervalSecs = 15 + + SyncZKIntervalSecs = 2 + + RedisPath = "/MrRedis/Instances" + + RedisLocalPortsPath = "/MrRedisLocalPorts" + +) //Config json config structure for the proxy type Config struct { @@ -38,6 +75,237 @@ type HTTPUpdate struct { Addr string } +func must(err error) { + if err != nil { + log.Println("panic") + panic(err) + } +} + +func connect() *zk.Conn { + zksStr := os.Getenv("ZOOKEEPER_SERVERS") + zks := strings.Split(zksStr, ",") + conn, _, err := zk.Connect(zks, time.Second) + must(err) + return conn +} + +func newTCPListener(addr string) (net.Listener, error) { + conn, err := net.Listen("tcp", addr) + if err != nil { + return conn, err + } + + return conn, nil +} + +/*func random(min, max int) int { + rand.Seed(time.Now().Unix()) + return rand.Intn(max-min) + min +} +*/ + +func RandInt64(min, max int) int { + if min >= max || min == 0 || max == 0 { + return max + } + return rand.Intn(max-min) + min +} + +func PrepareLocalPorts(conn *zk.Conn, path string) { + log.Printf("Begin to prepare redis_local_ports") + found, _, err := conn.Exists(path) + must(err) + if found { + log.Println(path + " already exist.") + } else { + log.Println(path + " doesn't exist, need to create it.") + flags := int32(0) + acl := zk.WorldACL(zk.PermAll) + + _, err := conn.Create(path, []byte("Mesos_local_ports_parent"), flags, acl) + if err != nil { + log.Println("Failed to create parent node " + path) + } + } + + redis_local_ports, _, err := conn.Children(path) + + must(err) + + for _, name := range redis_local_ports { + + local_port, _, _ := conn.Get(path + "/" + name) + + _, ok := LocalPortsMap[name] + + if ok { + log.Printf("%s local port %s already exist in LocalPortsMap.\n", name, local_port) + } else { + LocalPortsMap[name] = string(local_port) + } + + } + + log.Println("LocalPortsMap is") + log.Println(LocalPortsMap) + +} + +func DeleteZKPathRecursive(path string) { + zksStr := os.Getenv("ZOOKEEPER_SERVERS") + //zks := strings.Split(zksStr, ",") + + if zksStr != "" { + + retryPolicy := curator.NewExponentialBackoffRetry(time.Second, CleanUpZKMaxReties, CleanUpZKCheckIntervalSecs*time.Second) + client := curator.NewClient(zksStr, retryPolicy) + client.Start() + client.Delete().DeletingChildrenIfNeeded().ForPath(path) + log.Printf("deleteZKPathRecursive: remove zk znode %s recursively.", path) + + defer client.Close() + + } else { + + log.Printf("deleteZKPathRecursive: failed to get env variable ZOOKEEPER_SERVERS.") + + } +} + +func InitializeProxy(conn *zk.Conn, path string) { + + + redis_instance, _, err := conn.Children(path) + + must(err) + + //time.Sleep(time.Second * 15) + + log.Println("InitializeProxy: Begin to initialize ans sync proxy servers from zk.") + + for _, name := range redis_instance { + + redis_status, _, _ := conn.Get(RedisPath + "/" + name + "/Status") + + + if redis_status != nil && strings.EqualFold(string(redis_status), "RUNNING") { + + // var CurrentE Entry + + //var CurrentE Entry + + redis_id, _, err := conn.Get(RedisPath + "/" + name + "/Mname") + + must(err) + + redis_ip, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP") + + must(err) + + redis_port, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") + + must(err) + + if CurrentE, ok := ConfigMap[name]; ok { + + log.Printf("InitializeProxy: Redis name %s is already in the configMap, only change its backend redis addr. \n", name) + + CurrentE.Pair.To = string(redis_ip) + ":" + string(redis_port) + + ConfigMap[name] = CurrentE + + } else { + + log.Printf("InitializeProxy: Redis name %s not found in the configMap \n", name) + + var redis_tcp_local_port string + + if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { + + redis_port_byte, _, _ := conn.Get(RedisLocalPortsPath + "/" + name) + + redis_tcp_local_port = string(redis_port_byte[:]) + + log.Printf("redis instance %s exist, redis tcp local port is %s \n", name, redis_tcp_local_port) + + } else { + + //redis_listen_port := RedisPortBaseNum + len(redis_instance) + + var redis_port_found bool = false + + for { + random_port := RandInt64(RedisPortMinNum, RedisPortMaxNum) + + redis_tcp_local_port = strconv.Itoa(random_port) + + log.Printf("redis %s generate random local_ ort number is %s \n", name, redis_tcp_local_port) + + local_port_num := len(LocalPortsMap) + + log.Printf("local port num is %d \n", local_port_num) + + if local_port_num > 0 { + for _, value := range LocalPortsMap { + if strings.EqualFold(redis_tcp_local_port, value) { + redis_port_found = true + log.Printf("Redis %sredis port %s is already assigned.\n", name, value) + break + } + } + + if redis_port_found { + log.Printf("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) + continue + } else { + log.Printf("random_tcp_port not assigned in local, so it can be used, will skip this loop.") + break + } + } else { + log.Println("LocalPortsMap length is zero, so a random port can be choosen") + break + } + + log.Printf("loop redis %s to check local port over\n", name) + + } + + //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) + flags := int32(0) + + acl := zk.WorldACL(zk.PermAll) + + conn.Create(RedisLocalPortsPath+"/"+name, []byte(redis_tcp_local_port), flags, acl) + + } + + local_addr := "127.0.0.1" + ":" + redis_tcp_local_port + + local_tcp_addr, _ := net.ResolveTCPAddr("tcp4", local_addr) + + to_addr := string(redis_ip) + ":" + string(redis_port) + + to_tcp_addr, _ := net.ResolveTCPAddr("tcp4", to_addr) + + log.Printf("InitializeProxy: Redis %s local_tcp_addr is %s, to_tcp_addr is %s \n", name, local_tcp_addr.String(), to_tcp_addr.String()) + + currentProxyPair := PorxyPair{From: local_tcp_addr.String(), To: to_tcp_addr.String()} + + CurrentEntry := Entry{Name: name, Pair: currentProxyPair} + + ConfigMap[name] = CurrentEntry + + go HandleConnection(CurrentEntry) + + log.Println("InitializeProxy: End of InitializeProxy") + } + } + } + + +} + //HandleConnection Actuall proxy implementation per client. Untimatly this performs a implments a duplex io.Copy func HandleConnection(E Entry) error { @@ -45,18 +313,21 @@ func HandleConnection(E Entry) error { var OK bool log.Printf("HandleConnection() %v", E) - src, err := net.Listen("tcp", E.Pair.From) + //src, err := net.Listen("tcp", E.Pair.From) + listener, err := newTCPListener(E.Pair.From) + if err != nil { log.Printf("Error binding to the IP %v", err) return err } - defer src.Close() + + defer listener.Close() //Add this in the global Map so that it can be updated dynamically by HTTP apis ConfigMap[E.Name] = E for { - conn, err := src.Accept() + conn, err := listener.Accept() if err != nil { log.Printf("Error accepting a new connection %v", err) continue @@ -90,15 +361,75 @@ func HandleConnection(E Entry) error { } } +func cleanProxy(conn *zk.Conn) { + + go func(){ + for { + + time.Sleep(time.Second * CleanUpInterval) + + log.Printf("cleanProxy: Sleep %d seconds", CleanUpInterval) + + redis_instances, _, err := conn.Children(RedisPath) + + log.Printf("cleanProxy: redis_instaces nodes are %v", redis_instances) + + for _,name := range redis_instances { + + redis_status,_, err := conn.Get(RedisPath + "/" + name + "/" + "Status") + + log.Printf("cleanProxy: redis %s status is %s", name, redis_status) + + if err != nil { + log.Printf("cleanProxy: err occured when getting redis %s path %v", name, err) + return + } + + + log.Printf("cleanProxy:redis %s status is %s.\n", name, redis_status) + + if strings.EqualFold(string(redis_status), "DELETED") || redis_status == nil { + + + //delete znode in zk + + DeleteZKPathRecursive(RedisPath + "/" + name) + + CurrentE, ok := ConfigMap[name] + + if ok { + log.Printf("cleanProxy: redis %s is in the ConfigMap", name) + + from_addr := CurrentE.Pair.From + + log.Printf("cleanProxy: redis %s from_addr is %s", name, from_addr) + + delete(ConfigMap, name) + + } else { + + log.Printf("cleanProxy: redis %s is not in the ConfigMap", name) + } + + } + } + if err != nil { + return + } + } + }() +} + + //HandleHTTPUpdate Call beack to handle /Update/ HTTP call back func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Hi there, Going to Update %s! Method=%s\n", r.URL.Path[1:], r.Method) + //log.Printf(w, "Hi there, Going to Update %s! Method=%s\n", r.URL.Path[1:], r.Method) if r.Method == "PUT" { //This can be used for updating an existing variable content, err := ioutil.ReadAll(r.Body) r.Body.Close() if err != nil { - fmt.Fprintf(w, "Error understanding the Body %v", err) + //log.Printf(w, "Error understanding the Body %v", err) log.Printf("Error understanding the Body %v", err) return } @@ -108,13 +439,13 @@ func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { var OK bool err = json.Unmarshal(content, &val) if err != nil { - fmt.Fprintf(w, "Wrong json format %v", err) + //log.Printf(w, "Wrong json format %v", err) log.Printf("Wrong json format %v", err) return } if CurrentE, OK = ConfigMap[val.Name]; !OK { log.Printf("Error Proxy entry is incorrect / empty for %s", val.Name) - fmt.Fprintf(w, "Error Proxy entry is incorrect / empty for %s", val.Name) + //log.Printf(w, "Error Proxy entry is incorrect / empty for %s", val.Name) return } log.Printf("Updating From porxy for %s From %s TO %s", val.Name, CurrentE.Pair.To, val.Addr) @@ -130,11 +461,11 @@ func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { retBytes, err := json.MarshalIndent(ConfigMap, " ", " ") if err != nil { log.Printf("Error Marshalling HandleHTTPGet() %v", err) - fmt.Fprintf(w, "Error Marshalling HandleHTTPGet() %v", err) + //log.Printf(w, "Error Marshalling HandleHTTPGet() %v", err) return } - fmt.Fprintf(w, "Current Config: %s", string(retBytes)) + fmt.Fprintf(w, string(retBytes) ) return } @@ -145,6 +476,18 @@ func main() { //Initialize the global Config map ConfigMap = make(map[string]Entry) + //Initialize the global LocalPorts map + LocalPortsMap = make(map[string]string) + + //set log rotating policy + + log.SetOutput(&lumberjack.Logger{ + Filename: "/data/apps/log/MrRedis-local-proxy.log", + MaxSize: 50, // megabytes + MaxBackups: 10, + MaxAge: 3, //days +}) + //Read a config file that has json update the config files cfgFileName := flag.String("config", "./config.json", "Supply the location of MrRedis configuration file") flag.Parse() @@ -162,18 +505,48 @@ func main() { } log.Printf("Configuration file is = %v", Cfg) - //Hanlde each connection + conn := connect() - for _, E := range Cfg.Entries { - go HandleConnection(E) - } + defer conn.Close() + + //Initialize zk node /MrRedis-local-ports + + PrepareLocalPorts(conn, "/MrRedisLocalPorts") + + //Initialize existent proxy instance inside zk and added them into ConfigMap + + InitializeProxy(conn, RedisPath) + + //Clean up unused tcp ports, eg: when redis status is DELETED, the local proxy server of that redis will be shutdown. + + conn1 := connect() + + defer conn1.Close() + + cleanProxy(conn1) + + go func() { + + for { + + time.Sleep(time.Second * SyncZKIntervalSecs) + + log.Printf("Routine: Sync redis infomration from zk...") + + InitializeProxy(conn, RedisPath) + } + + }() http.HandleFunc("/Update/", HandleHTTPUpdate) + http.HandleFunc("/Get/", HandleHTTPGet) + log.Fatal(http.ListenAndServe(":"+Cfg.HTTPPort, nil)) //Wait indefinitely waitCh := make(chan bool) + <-waitCh } From 8701a346a3e354ba6d7b1da472048d1faa9d2141 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Mon, 17 Jul 2017 11:38:44 +0800 Subject: [PATCH 04/14] Change CleanUpInterval and SyncZKIntervalSecs as 3 secs to keep the redis information consistent --- proxy/redis_proxy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index ed3651f..f62a65a 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -37,13 +37,13 @@ const ( ProxyPort = 7979 - CleanUpInterval = 20 + CleanUpInterval = 3 CleanUpZKMaxReties = 3 CleanUpZKCheckIntervalSecs = 15 - SyncZKIntervalSecs = 2 + SyncZKIntervalSecs = 3 RedisPath = "/MrRedis/Instances" From a3b571ddaa45a8f25bd4d41cf1d76700cd0a57d6 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Tue, 19 Sep 2017 12:21:39 +0800 Subject: [PATCH 05/14] Fix issue that local_proxy on different machine might have different local port mapping to real server --- proxy/redis_proxy.go | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index f62a65a..7b390ad 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -57,7 +57,7 @@ type Config struct { Entries []Entry //Entries List of proxy entries } -//Entry Representation of each entry in the proxy config +//Entry Representation of each entry in the proxy configÄ type Entry struct { Name string Pair PorxyPair @@ -207,10 +207,26 @@ func InitializeProxy(conn *zk.Conn, path string) { must(err) + var redis_tcp_local_port string + if CurrentE, ok := ConfigMap[name]; ok { log.Printf("InitializeProxy: Redis name %s is already in the configMap, only change its backend redis addr. \n", name) + if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { + + redis_port_byte, _, _ := conn.Get(RedisLocalPortsPath + "/" + name) + + redis_tcp_local_port = string(redis_port_byte[:]) + + log.Printf("InitializeProxy: Redis local port %s is already in the MrRedisLocalPort, sync with zk to keep it consistent . \n", redis_tcp_local_port) + + CurrentE.Pair.From = "127.0.0.1" + ":" + redis_tcp_local_port + + log.Printf("InitializeProxy: set redis instance %s Pair.From properties to %s" , name, CurrentE.Pair.From) + + } + CurrentE.Pair.To = string(redis_ip) + ":" + string(redis_port) ConfigMap[name] = CurrentE @@ -219,8 +235,6 @@ func InitializeProxy(conn *zk.Conn, path string) { log.Printf("InitializeProxy: Redis name %s not found in the configMap \n", name) - var redis_tcp_local_port string - if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { redis_port_byte, _, _ := conn.Get(RedisLocalPortsPath + "/" + name) @@ -479,14 +493,14 @@ func main() { //Initialize the global LocalPorts map LocalPortsMap = make(map[string]string) - //set log rotating policy + //set log rotating policy - log.SetOutput(&lumberjack.Logger{ - Filename: "/data/apps/log/MrRedis-local-proxy.log", - MaxSize: 50, // megabytes - MaxBackups: 10, - MaxAge: 3, //days -}) + log.SetOutput(&lumberjack.Logger{ + Filename: "/data/apps/log/MrRedis-local-proxy.log", + MaxSize: 50, // megabytes + MaxBackups: 10, + MaxAge: 3, //days + }) //Read a config file that has json update the config files cfgFileName := flag.String("config", "./config.json", "Supply the location of MrRedis configuration file") @@ -550,3 +564,4 @@ func main() { <-waitCh } + From f7dfc6bd6082fc1a48b8297bea2c98f3de57d208 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Tue, 19 Sep 2017 22:32:44 +0800 Subject: [PATCH 06/14] Change default log module to github.com/hhkbp2/go-logging for more specific logs, and remove clean_proxy_port func to avoid concurrent read and write map --- proxy/redis_proxy.go | 201 +++++++++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 83 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 7b390ad..457ed9c 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -4,10 +4,9 @@ import ( "encoding/json" "flag" "fmt" - "log" "github.com/curator-go/curator" - "github.com/natefinch/lumberjack" "github.com/samuel/go-zookeeper/zk" + "github.com/hhkbp2/go-logging" "io" "io/ioutil" "math/rand" @@ -33,7 +32,7 @@ const ( //RedisPortBaseNum Local redis listen port range from 6100 RedisPortMinNum = 6100 - RedisPortMaxNum = 6300 + RedisPortMaxNum = 6400 ProxyPort = 7979 @@ -49,6 +48,14 @@ const ( RedisLocalPortsPath = "/MrRedisLocalPorts" + LogFilePath = "/data/apps/log/MrRedis-local-proxy.log" + + LogFileMaxSize = 100 * 1024 * 1024 // megabytes + + LogFileMaxBackups = 10 + + LogFileMaxAge = 7 //days + ) //Config json config structure for the proxy @@ -77,7 +84,7 @@ type HTTPUpdate struct { func must(err error) { if err != nil { - log.Println("panic") + logger.Error("panic") panic(err) } } @@ -105,6 +112,11 @@ func newTCPListener(addr string) (net.Listener, error) { } */ + +var logger = logging.GetLogger("redis_proxy") + + + func RandInt64(min, max int) int { if min >= max || min == 0 || max == 0 { return max @@ -113,19 +125,19 @@ func RandInt64(min, max int) int { } func PrepareLocalPorts(conn *zk.Conn, path string) { - log.Printf("Begin to prepare redis_local_ports") + logger.Info("Begin to prepare redis_local_ports") found, _, err := conn.Exists(path) must(err) if found { - log.Println(path + " already exist.") + logger.Infof(path + " already exist.") } else { - log.Println(path + " doesn't exist, need to create it.") + logger.Infof(path + " doesn't exist, need to create it.") flags := int32(0) acl := zk.WorldACL(zk.PermAll) _, err := conn.Create(path, []byte("Mesos_local_ports_parent"), flags, acl) if err != nil { - log.Println("Failed to create parent node " + path) + logger.Warnf("Failed to create parent node " + path) } } @@ -140,35 +152,31 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { _, ok := LocalPortsMap[name] if ok { - log.Printf("%s local port %s already exist in LocalPortsMap.\n", name, local_port) + logger.Infof("%s local port %s already exist in LocalPortsMap.\n", name, local_port) } else { LocalPortsMap[name] = string(local_port) } } - log.Println("LocalPortsMap is") - log.Println(LocalPortsMap) - } func DeleteZKPathRecursive(path string) { + zksStr := os.Getenv("ZOOKEEPER_SERVERS") - //zks := strings.Split(zksStr, ",") if zksStr != "" { - retryPolicy := curator.NewExponentialBackoffRetry(time.Second, CleanUpZKMaxReties, CleanUpZKCheckIntervalSecs*time.Second) client := curator.NewClient(zksStr, retryPolicy) client.Start() client.Delete().DeletingChildrenIfNeeded().ForPath(path) - log.Printf("deleteZKPathRecursive: remove zk znode %s recursively.", path) + logger.Infof("deleteZKPathRecursive: remove zk znode %s recursively.", path) defer client.Close() } else { - log.Printf("deleteZKPathRecursive: failed to get env variable ZOOKEEPER_SERVERS.") + logger.Error("deleteZKPathRecursive: failed to get env variable ZOOKEEPER_SERVERS.") } } @@ -178,11 +186,10 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_instance, _, err := conn.Children(path) - must(err) - - //time.Sleep(time.Second * 15) - - log.Println("InitializeProxy: Begin to initialize ans sync proxy servers from zk.") + if err != nil { + logger.Error("Failed to load all redis instances from zk mr-redis path /MrRedis/Instances .") + panic(err) + } for _, name := range redis_instance { @@ -191,9 +198,7 @@ func InitializeProxy(conn *zk.Conn, path string) { if redis_status != nil && strings.EqualFold(string(redis_status), "RUNNING") { - // var CurrentE Entry - - //var CurrentE Entry + logger.Infof("redis instance %s status is running.", name) redis_id, _, err := conn.Get(RedisPath + "/" + name + "/Mname") @@ -211,7 +216,7 @@ func InitializeProxy(conn *zk.Conn, path string) { if CurrentE, ok := ConfigMap[name]; ok { - log.Printf("InitializeProxy: Redis name %s is already in the configMap, only change its backend redis addr. \n", name) + logger.Infof("Redis instance %s is in the configMap. \n", name) if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { @@ -219,12 +224,12 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_tcp_local_port = string(redis_port_byte[:]) - log.Printf("InitializeProxy: Redis local port %s is already in the MrRedisLocalPort, sync with zk to keep it consistent . \n", redis_tcp_local_port) + logger.Infof("InitializeProxy: Redis %s local port %s is already in the MrRedisLocalPort, sync with zk to keep it consistent . \n", name, redis_tcp_local_port) CurrentE.Pair.From = "127.0.0.1" + ":" + redis_tcp_local_port - log.Printf("InitializeProxy: set redis instance %s Pair.From properties to %s" , name, CurrentE.Pair.From) - + logger.Infof("Set redis instance %s Pair.From properties to %s" , name, CurrentE.Pair.From) + } CurrentE.Pair.To = string(redis_ip) + ":" + string(redis_port) @@ -233,7 +238,7 @@ func InitializeProxy(conn *zk.Conn, path string) { } else { - log.Printf("InitializeProxy: Redis name %s not found in the configMap \n", name) + logger.Infof("Redis name %s not found in the configMap \n", name) if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { @@ -241,7 +246,7 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_tcp_local_port = string(redis_port_byte[:]) - log.Printf("redis instance %s exist, redis tcp local port is %s \n", name, redis_tcp_local_port) + logger.Infof("redis instance %s already exists, redis tcp local port is %s \n", name, redis_tcp_local_port) } else { @@ -254,35 +259,33 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_tcp_local_port = strconv.Itoa(random_port) - log.Printf("redis %s generate random local_ ort number is %s \n", name, redis_tcp_local_port) + logger.Infof("redis %s generate random local_ ort number is %s \n", name, redis_tcp_local_port) local_port_num := len(LocalPortsMap) - log.Printf("local port num is %d \n", local_port_num) + logger.Infof("redis %s local port num is %d \n", name, local_port_num) if local_port_num > 0 { for _, value := range LocalPortsMap { if strings.EqualFold(redis_tcp_local_port, value) { redis_port_found = true - log.Printf("Redis %sredis port %s is already assigned.\n", name, value) + logger.Infof("Redis %sredis port %s is already assigned.\n", name, value) break } } if redis_port_found { - log.Printf("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) + logger.Infof("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) continue } else { - log.Printf("random_tcp_port not assigned in local, so it can be used, will skip this loop.") + logger.Info("random_tcp_port not assigned in local, so it can be used, will skip this loop.") break } } else { - log.Println("LocalPortsMap length is zero, so a random port can be choosen") + logger.Warn("LocalPortsMap length is zero, so a random port can be choosen") break } - log.Printf("loop redis %s to check local port over\n", name) - } //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) @@ -302,7 +305,7 @@ func InitializeProxy(conn *zk.Conn, path string) { to_tcp_addr, _ := net.ResolveTCPAddr("tcp4", to_addr) - log.Printf("InitializeProxy: Redis %s local_tcp_addr is %s, to_tcp_addr is %s \n", name, local_tcp_addr.String(), to_tcp_addr.String()) + logger.Infof("Redis %s local_tcp_addr is %s, to_tcp_addr is %s \n", name, local_tcp_addr.String(), to_tcp_addr.String()) currentProxyPair := PorxyPair{From: local_tcp_addr.String(), To: to_tcp_addr.String()} @@ -312,7 +315,6 @@ func InitializeProxy(conn *zk.Conn, path string) { go HandleConnection(CurrentEntry) - log.Println("InitializeProxy: End of InitializeProxy") } } } @@ -326,12 +328,12 @@ func HandleConnection(E Entry) error { var CurrentE Entry //A Temp variable to get the latest Desination proxy value var OK bool - log.Printf("HandleConnection() %v", E) + logger.Info("HandleConnection() %v", E) //src, err := net.Listen("tcp", E.Pair.From) listener, err := newTCPListener(E.Pair.From) if err != nil { - log.Printf("Error binding to the IP %v", err) + logger.Errorf("Error binding to the IP %v", err) return err } @@ -343,13 +345,13 @@ func HandleConnection(E Entry) error { for { conn, err := listener.Accept() if err != nil { - log.Printf("Error accepting a new connection %v", err) + logger.Errorf("Error accepting a new connection %v", err) continue } //Get the latest Entry from the MAP because it migh thave been updated on the fly. if CurrentE, OK = ConfigMap[E.Name]; !OK { - log.Printf("Error Proxy entry is incorrect / empty for %s", E.Name) + logger.Errorf("Error Proxy entry is incorrect / empty for %s", E.Name) conn.Close() continue } @@ -362,7 +364,7 @@ func HandleConnection(E Entry) error { T, err := net.Dial("tcp", E.Pair.To) if err != nil { - log.Printf("Unable to connect to the Destination %s %v", E.Pair.To, err) + logger.Errorf("Unable to connect to the Destination %s %v", E.Pair.To, err) return } defer T.Close() @@ -382,25 +384,30 @@ func cleanProxy(conn *zk.Conn) { time.Sleep(time.Second * CleanUpInterval) - log.Printf("cleanProxy: Sleep %d seconds", CleanUpInterval) + logger.Infof("cleanProxy: Sleep %d seconds", CleanUpInterval) redis_instances, _, err := conn.Children(RedisPath) - log.Printf("cleanProxy: redis_instaces nodes are %v", redis_instances) + if err != nil { + logger.Errorf("Failed to get redis instances.") + return + } + + logger.Infof("cleanProxy: redis_instaces nodes are %v", redis_instances) for _,name := range redis_instances { redis_status,_, err := conn.Get(RedisPath + "/" + name + "/" + "Status") - log.Printf("cleanProxy: redis %s status is %s", name, redis_status) + logger.Infof("cleanProxy: redis %s status is %s", name, redis_status) if err != nil { - log.Printf("cleanProxy: err occured when getting redis %s path %v", name, err) + logger.Errorf("cleanProxy: err occured when getting redis %s path %v", name, err) return } - log.Printf("cleanProxy:redis %s status is %s.\n", name, redis_status) + logger.Infof("cleanProxy:redis %s status is %s.\n", name, redis_status) if strings.EqualFold(string(redis_status), "DELETED") || redis_status == nil { @@ -412,24 +419,22 @@ func cleanProxy(conn *zk.Conn) { CurrentE, ok := ConfigMap[name] if ok { - log.Printf("cleanProxy: redis %s is in the ConfigMap", name) + logger.Infof("cleanProxy: redis %s is in the ConfigMap", name) from_addr := CurrentE.Pair.From - log.Printf("cleanProxy: redis %s from_addr is %s", name, from_addr) + logger.Infof("cleanProxy: redis %s from_addr is %s", name, from_addr) delete(ConfigMap, name) } else { - log.Printf("cleanProxy: redis %s is not in the ConfigMap", name) + logger.Infof("cleanProxy: redis %s is not in the ConfigMap", name) } } } - if err != nil { - return - } + } }() } @@ -444,7 +449,7 @@ func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { r.Body.Close() if err != nil { //log.Printf(w, "Error understanding the Body %v", err) - log.Printf("Error understanding the Body %v", err) + logger.Errorf("Error understanding the Body %v", err) return } @@ -454,15 +459,15 @@ func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { err = json.Unmarshal(content, &val) if err != nil { //log.Printf(w, "Wrong json format %v", err) - log.Printf("Wrong json format %v", err) + logger.Errorf("Wrong json format %v", err) return } if CurrentE, OK = ConfigMap[val.Name]; !OK { - log.Printf("Error Proxy entry is incorrect / empty for %s", val.Name) + logger.Infof("Error Proxy entry is incorrect / empty for %s", val.Name) //log.Printf(w, "Error Proxy entry is incorrect / empty for %s", val.Name) return } - log.Printf("Updating From porxy for %s From %s TO %s", val.Name, CurrentE.Pair.To, val.Addr) + logger.Info("Updating From porxy for %s From %s TO %s", val.Name, CurrentE.Pair.To, val.Addr) CurrentE.Pair.To = val.Addr ConfigMap[val.Name] = CurrentE return @@ -474,7 +479,7 @@ func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { retBytes, err := json.MarshalIndent(ConfigMap, " ", " ") if err != nil { - log.Printf("Error Marshalling HandleHTTPGet() %v", err) + logger.Errorf("Error Marshalling HandleHTTPGet() %v", err) //log.Printf(w, "Error Marshalling HandleHTTPGet() %v", err) return @@ -493,31 +498,66 @@ func main() { //Initialize the global LocalPorts map LocalPortsMap = make(map[string]string) - //set log rotating policy - - log.SetOutput(&lumberjack.Logger{ - Filename: "/data/apps/log/MrRedis-local-proxy.log", - MaxSize: 50, // megabytes - MaxBackups: 10, - MaxAge: 3, //days - }) - //Read a config file that has json update the config files cfgFileName := flag.String("config", "./config.json", "Supply the location of MrRedis configuration file") flag.Parse() - log.Printf("The config file name is %s ", *cfgFileName) + filePath := LogFilePath + + fileMode := os.O_APPEND + + bufferSize := 0 + + bufferFlushTime := 30 * time.Second + + inputChanSize := 1 + + backupCount := uint32(LogFileMaxBackups) + // set the maximum size of every file to 100 M bytes + fileMaxBytes := uint64(LogFileMaxSize) + + + + // create a handler(which represents a log message destination) + handler := logging.MustNewRotatingFileHandler( + filePath, fileMode, bufferSize, bufferFlushTime, inputChanSize, + fileMaxBytes, backupCount) + + + // the format for the whole log message + format := "%(asctime)s %(levelname)s (%(filename)s:%(lineno)d) " + + "%(name)s %(message)s" + + // the format for the time part + dateFormat := "%Y-%m-%d %H:%M:%S.%3n" + + // create a formatter(which controls how log messages are formatted) + formatter := logging.NewStandardFormatter(format, dateFormat) + + // set formatter for handler + handler.SetFormatter(formatter) + + + logger.SetLevel(logging.LevelInfo) + + logger.AddHandler(handler) + + + // ensure all log messages are flushed to disk before program exits. + defer logging.Shutdown() + + logger.Infof("The config file name is %s ", *cfgFileName) cfgFile, err := ioutil.ReadFile(*cfgFileName) if err != nil { - log.Printf("Error Reading the configration file. Resorting to default values") + logger.Error("Error Reading the configration file. Resorting to default values") } err = json.Unmarshal(cfgFile, &Cfg) if err != nil { - log.Fatalf("Error parsing the config file %v", err) + logger.Errorf("Error parsing the config file %v", err) return } - log.Printf("Configuration file is = %v", Cfg) + logger.Infof("Configuration file is = %v", Cfg) conn := connect() @@ -532,12 +572,8 @@ func main() { InitializeProxy(conn, RedisPath) //Clean up unused tcp ports, eg: when redis status is DELETED, the local proxy server of that redis will be shutdown. - - conn1 := connect() - - defer conn1.Close() - - cleanProxy(conn1) + //comment cleanProxy to avoid concurrent write Map error + //cleanProxy(conn) go func() { @@ -545,7 +581,7 @@ func main() { time.Sleep(time.Second * SyncZKIntervalSecs) - log.Printf("Routine: Sync redis infomration from zk...") + logger.Info("Routine: Sync redis infomration from zk...") InitializeProxy(conn, RedisPath) } @@ -556,7 +592,7 @@ func main() { http.HandleFunc("/Get/", HandleHTTPGet) - log.Fatal(http.ListenAndServe(":"+Cfg.HTTPPort, nil)) + logger.Fatal(http.ListenAndServe(":"+Cfg.HTTPPort, nil)) //Wait indefinitely waitCh := make(chan bool) @@ -564,4 +600,3 @@ func main() { <-waitCh } - From 9262cc4a86f40a4849248cdc62584db2e1203d87 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Thu, 21 Sep 2017 19:22:39 +0800 Subject: [PATCH 07/14] Chagne broken pipe bug --- proxy/redis_proxy.go | 71 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 457ed9c..d85f36b 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -202,15 +202,24 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_id, _, err := conn.Get(RedisPath + "/" + name + "/Mname") - must(err) + if err != nil { + logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath + "/" + name + "/Mname") + must(err) + } redis_ip, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP") - must(err) + if err != nil { + logger.Errorf("zk path name/Pros/instance/IP error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP") + must(err) + } redis_port, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") - must(err) + if err != nil { + logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") + must(err) + } var redis_tcp_local_port string @@ -360,23 +369,67 @@ func HandleConnection(E Entry) error { //F := From Connection //T := To Connection //This proxy will simply transfer everything from F to T net.Conn - go func(E Entry, F net.Conn) { + go func(E Entry, srcConn net.Conn) { - T, err := net.Dial("tcp", E.Pair.To) + destConn, err := net.Dial("tcp", E.Pair.To) if err != nil { logger.Errorf("Unable to connect to the Destination %s %v", E.Pair.To, err) return } - defer T.Close() - defer F.Close() + //defer destConn.Close() + //defer srcConn.Close() + + stop := make(chan bool) + + go relay(srcConn, destConn, stop) + go relay(destConn, srcConn, stop) + + select { + case <-stop: + logger.Errorf("Whether connection is stopped or not: %v", stop) + time.Sleep(time.Second * 1) + //return + } + + //go io.Copy(F, T) + //io.Copy(T, F) + /*ExitChan := make(chan bool, 1) - go io.Copy(F, T) - io.Copy(T, F) + go func(sconn net.Conn, dconn net.Conn, Exit chan bool) { + _, err := io.Copy(srcConn, destConn) + logger.Errorf("Failed to send data to %v, error is:%v\n", E.Pair.To, err) + ExitChan <- true + }(srcConn, destConn, ExitChan) + + go func(sconn net.Conn, dconn net.Conn, Exit chan bool) { + _, err := io.Copy(destConn, srcConn) + logger.Errorf("Failed to receive data from %v, error is:%v\n", E.Pair.To, err) + ExitChan <- true + }(srcConn, destConn, ExitChan) + + <-ExitChan + + destConn.Close() + srcConn.Close() + */ }(CurrentE, conn) } } + +func relay(src net.Conn, dst net.Conn, stop chan bool) { + _,err := io.Copy(dst, src) + if err != nil { + logger.Errorf("src addr is %v, dst addr is %v, error is:%v\n", src.LocalAddr(), dst.LocalAddr(),err) + dst.Close() + src.Close() + stop <- true + return + } + +} + func cleanProxy(conn *zk.Conn) { go func(){ From 8f55b71eed3feec93081f2d8b88d203808d03b0a Mon Sep 17 00:00:00 2001 From: zou sheng Date: Fri, 22 Sep 2017 12:24:27 +0800 Subject: [PATCH 08/14] Change tcp proxy to fix connection reset error --- proxy/redis_proxy.go | 53 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index d85f36b..0eb39d8 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "time" + "sync" ) @@ -115,7 +116,13 @@ func newTCPListener(addr string) (net.Listener, error) { var logger = logging.GetLogger("redis_proxy") - +var bufferPool = sync.Pool{ + New: func() interface{} { + // TODO maybe different buffer size? + // benchmark pls + return make([]byte, 1<<15) + }, +} func RandInt64(min, max int) int { if min >= max || min == 0 || max == 0 { @@ -376,9 +383,36 @@ func HandleConnection(E Entry) error { logger.Errorf("Unable to connect to the Destination %s %v", E.Pair.To, err) return } + + first := make(chan<- struct{}, 1) + var wg sync.WaitGroup + cp := func(dst net.Conn, src net.Conn) { + buf := bufferPool.Get().([]byte) + // TODO use splice on linux + // TODO needs some timeout to prevent torshammer ddos + _, err := io.CopyBuffer(dst, src, buf) + select { + case first <- struct{}{}: + if err != nil { + logger.Errorf("Copy error is %v:",err) + } + _ = dst.Close() + _ = src.Close() + default: + } + bufferPool.Put(buf) + wg.Done() + } + wg.Add(2) + go cp(destConn, srcConn) + go cp(srcConn, destConn) + wg.Wait() + //defer destConn.Close() //defer srcConn.Close() + /* + This part has connection reset error. stop := make(chan bool) go relay(srcConn, destConn, stop) @@ -391,6 +425,7 @@ func HandleConnection(E Entry) error { //return } + */ //go io.Copy(F, T) //io.Copy(T, F) /*ExitChan := make(chan bool, 1) @@ -417,6 +452,22 @@ func HandleConnection(E Entry) error { } } +func broker(dst, src net.Conn, srcClosed chan struct{}) { + // We can handle errors in a finer-grained manner by inlining io.Copy (it's + // simple, and we drop the ReaderFrom or WriterTo checks for + // net.Conn->net.Conn transfers, which aren't needed). This would also let + // us adjust buffersize. + _, err := io.Copy(dst, src) + + if err != nil { + logger.Errorf("Copy error: %s", err) + } + if err := src.Close(); err != nil { + logger.Errorf("Close error: %s", err) + } + srcClosed <- struct{}{} +} + func relay(src net.Conn, dst net.Conn, stop chan bool) { _,err := io.Copy(dst, src) From 7028faeba225bad6463e1e52ea968cf1f6c703fe Mon Sep 17 00:00:00 2001 From: zou sheng Date: Tue, 26 Sep 2017 07:48:30 +0800 Subject: [PATCH 09/14] remove configs --- proxy/redis_proxy.go | 639 ++++++++++++++++++++++++++----------------- 1 file changed, 381 insertions(+), 258 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 0eb39d8..1292226 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -2,9 +2,9 @@ package main import ( "encoding/json" - "flag" "fmt" "github.com/curator-go/curator" + "github.com/curator-go/curator/recipes/cache" "github.com/samuel/go-zookeeper/zk" "github.com/hhkbp2/go-logging" "io" @@ -19,7 +19,6 @@ import ( "sync" ) - var ( //ConfigMap A map of name of the proxy vs its actually backend endpoint ConfigMap map[string]Entry @@ -27,7 +26,26 @@ var ( //LocalPortsMap A map of local ports which fetch the data from zk once proxy daemon restarts LocalPortsMap map[string]string + //Define logger name of program as redis_proxy + //logger = logging.GetLogger("redis_proxy") + + logger logging.Logger + ) + +var startTime = time.Now() + +//Add WMutex to ConfigMap avoid concurrent read and write error +var lock = sync.RWMutex{} + +var bufferPool = sync.Pool{ + New: func() interface{} { + // TODO maybe different buffer size? + // benchmark pls + return make([]byte, 1<<15) + }, +} + const ( //RedisPortBaseNum Local redis listen port range from 6100 @@ -37,12 +55,6 @@ const ( ProxyPort = 7979 - CleanUpInterval = 3 - - CleanUpZKMaxReties = 3 - - CleanUpZKCheckIntervalSecs = 15 - SyncZKIntervalSecs = 3 RedisPath = "/MrRedis/Instances" @@ -55,8 +67,7 @@ const ( LogFileMaxBackups = 10 - LogFileMaxAge = 7 //days - + ProgrameStartTimeAtLeast = 60 ) //Config json config structure for the proxy @@ -107,23 +118,6 @@ func newTCPListener(addr string) (net.Listener, error) { return conn, nil } -/*func random(min, max int) int { - rand.Seed(time.Now().Unix()) - return rand.Intn(max-min) + min -} -*/ - - -var logger = logging.GetLogger("redis_proxy") - -var bufferPool = sync.Pool{ - New: func() interface{} { - // TODO maybe different buffer size? - // benchmark pls - return make([]byte, 1<<15) - }, -} - func RandInt64(min, max int) int { if min >= max || min == 0 || max == 0 { return max @@ -132,9 +126,13 @@ func RandInt64(min, max int) int { } func PrepareLocalPorts(conn *zk.Conn, path string) { + logger.Info("Begin to prepare redis_local_ports") + found, _, err := conn.Exists(path) + must(err) + if found { logger.Infof(path + " already exist.") } else { @@ -168,28 +166,58 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { } -func DeleteZKPathRecursive(path string) { +func getRedisMnameInfo(name string, conn *zk.Conn) (string, string){ - zksStr := os.Getenv("ZOOKEEPER_SERVERS") + logger.Infof("Get redis %v Mname info redis_ip and redis_port.", name) - if zksStr != "" { - retryPolicy := curator.NewExponentialBackoffRetry(time.Second, CleanUpZKMaxReties, CleanUpZKCheckIntervalSecs*time.Second) - client := curator.NewClient(zksStr, retryPolicy) - client.Start() - client.Delete().DeletingChildrenIfNeeded().ForPath(path) - logger.Infof("deleteZKPathRecursive: remove zk znode %s recursively.", path) + redis_id_path := RedisPath + "/" + name + "/Mname" - defer client.Close() + redis_id, _, err := conn.Get(redis_id_path) - } else { + if err != nil { + logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath + "/" + name + "/Mname") + must(err) + } + + redis_ip_path := RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP" - logger.Error("deleteZKPathRecursive: failed to get env variable ZOOKEEPER_SERVERS.") + var redis_ip []byte + redis_ip, _, err = conn.Get(redis_ip_path) + + if err != nil { + + logger.Errorf("Redis %v failed to get new redis ip, the ip is %v. Will wait 5 seconds and try to fetch it again.", name, redis_ip) + time.Sleep(5 * time.Second) + + redis_ip, _, err = conn.Get(redis_ip_path) + + if err != nil { + + logger.Errorf("Failed to fetch new redis ip second time, wrill pass this") + return "", "" + + } else { + logger.Infof("Second time to fetch new redis %v ip successfully, the ip is %v.",name, redis_ip) + } + //must(err) } + + redis_port_path := RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port" + redis_port, _, err := conn.Get(redis_port_path) + + if err != nil { + logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") + must(err) + } + + return string(redis_ip), string(redis_port) } + func InitializeProxy(conn *zk.Conn, path string) { + logger.Infof("Run InitializeProxy at boot time %v", time.Now()) redis_instance, _, err := conn.Children(path) @@ -207,26 +235,7 @@ func InitializeProxy(conn *zk.Conn, path string) { logger.Infof("redis instance %s status is running.", name) - redis_id, _, err := conn.Get(RedisPath + "/" + name + "/Mname") - - if err != nil { - logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath + "/" + name + "/Mname") - must(err) - } - - redis_ip, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP") - - if err != nil { - logger.Errorf("zk path name/Pros/instance/IP error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP") - must(err) - } - - redis_port, _, err := conn.Get(RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") - - if err != nil { - logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") - must(err) - } + redis_ip, redis_port := getRedisMnameInfo(name, conn) var redis_tcp_local_port string @@ -268,41 +277,7 @@ func InitializeProxy(conn *zk.Conn, path string) { //redis_listen_port := RedisPortBaseNum + len(redis_instance) - var redis_port_found bool = false - - for { - random_port := RandInt64(RedisPortMinNum, RedisPortMaxNum) - - redis_tcp_local_port = strconv.Itoa(random_port) - - logger.Infof("redis %s generate random local_ ort number is %s \n", name, redis_tcp_local_port) - - local_port_num := len(LocalPortsMap) - - logger.Infof("redis %s local port num is %d \n", name, local_port_num) - - if local_port_num > 0 { - for _, value := range LocalPortsMap { - if strings.EqualFold(redis_tcp_local_port, value) { - redis_port_found = true - logger.Infof("Redis %sredis port %s is already assigned.\n", name, value) - break - } - } - - if redis_port_found { - logger.Infof("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) - continue - } else { - logger.Info("random_tcp_port not assigned in local, so it can be used, will skip this loop.") - break - } - } else { - logger.Warn("LocalPortsMap length is zero, so a random port can be choosen") - break - } - - } + redis_tcp_local_port = getLocalRedisPort() //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) flags := int32(0) @@ -338,6 +313,49 @@ func InitializeProxy(conn *zk.Conn, path string) { } +func getLocalRedisPort() string { + + var redis_port_found bool = false + + var redis_tcp_local_port string + + for { + random_port := RandInt64(RedisPortMinNum, RedisPortMaxNum) + + redis_tcp_local_port = strconv.Itoa(random_port) + + logger.Infof("redis generate random local_ ort number is %s \n", redis_tcp_local_port) + + local_port_num := len(LocalPortsMap) + + logger.Infof("redis local port num is %d \n", local_port_num) + + if local_port_num > 0 { + for _, value := range LocalPortsMap { + if strings.EqualFold(redis_tcp_local_port, value) { + redis_port_found = true + logger.Infof("redis port %s is already assigned.\n", value) + break + } + } + + if redis_port_found { + logger.Infof("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) + continue + } else { + logger.Info("random_tcp_port not assigned in local, so it can be used, will skip this loop.") + break + } + } else { + logger.Warn("LocalPortsMap length is zero, so a random port can be choosen") + break + } + + } + + return redis_tcp_local_port +} + //HandleConnection Actuall proxy implementation per client. Untimatly this performs a implments a duplex io.Copy func HandleConnection(E Entry) error { @@ -379,6 +397,7 @@ func HandleConnection(E Entry) error { go func(E Entry, srcConn net.Conn) { destConn, err := net.Dial("tcp", E.Pair.To) + if err != nil { logger.Errorf("Unable to connect to the Destination %s %v", E.Pair.To, err) return @@ -408,193 +427,327 @@ func HandleConnection(E Entry) error { go cp(srcConn, destConn) wg.Wait() - //defer destConn.Close() - //defer srcConn.Close() + }(CurrentE, conn) + } +} - /* - This part has connection reset error. - stop := make(chan bool) +//HandleHTTPUpdate Call beack to handle /Update/ HTTP call back +func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { + //log.Printf(w, "Hi there, Going to Update %s! Method=%s\n", r.URL.Path[1:], r.Method) + if r.Method == "PUT" { + //This can be used for updating an existing variable + content, err := ioutil.ReadAll(r.Body) + r.Body.Close() + if err != nil { + //log.Printf(w, "Error understanding the Body %v", err) + logger.Errorf("Error understanding the Body %v", err) + return + } - go relay(srcConn, destConn, stop) - go relay(destConn, srcConn, stop) + var val HTTPUpdate + var CurrentE Entry + var OK bool + err = json.Unmarshal(content, &val) + if err != nil { + //log.Printf(w, "Wrong json format %v", err) + logger.Errorf("Wrong json format %v", err) + return + } + if CurrentE, OK = ConfigMap[val.Name]; !OK { + logger.Infof("Error Proxy entry is incorrect / empty for %s", val.Name) + //log.Printf(w, "Error Proxy entry is incorrect / empty for %s", val.Name) + return + } + logger.Info("Updating From porxy for %s From %s TO %s", val.Name, CurrentE.Pair.To, val.Addr) + CurrentE.Pair.To = val.Addr + ConfigMap[val.Name] = CurrentE + return + } + return +} - select { - case <-stop: - logger.Errorf("Whether connection is stopped or not: %v", stop) - time.Sleep(time.Second * 1) - //return - } +//HandleHTTPGet call back to handle /Get/ HTTP callback +func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { + retBytes, err := json.MarshalIndent(ConfigMap, " ", " ") + if err != nil { + logger.Errorf("Error Marshalling HandleHTTPGet() %v", err) + //log.Printf(w, "Error Marshalling HandleHTTPGet() %v", err) + return - */ - //go io.Copy(F, T) - //io.Copy(T, F) - /*ExitChan := make(chan bool, 1) + } + fmt.Fprintf(w, string(retBytes) ) + return +} - go func(sconn net.Conn, dconn net.Conn, Exit chan bool) { - _, err := io.Copy(srcConn, destConn) - logger.Errorf("Failed to send data to %v, error is:%v\n", E.Pair.To, err) - ExitChan <- true - }(srcConn, destConn, ExitChan) +func addRedisProxy(name string, conn *zk.Conn) { - go func(sconn net.Conn, dconn net.Conn, Exit chan bool) { - _, err := io.Copy(destConn, srcConn) - logger.Errorf("Failed to receive data from %v, error is:%v\n", E.Pair.To, err) - ExitChan <- true - }(srcConn, destConn, ExitChan) - <-ExitChan - destConn.Close() - srcConn.Close() - */ + if CurrentE, ok := ConfigMap[name]; ok { + logger.Infof("Redis instance %v proxy already exist in configMap.", name) + return + } else { - }(CurrentE, conn) - } -} + logger.Infof("Redis instance not exsit in configMap.", name) -func broker(dst, src net.Conn, srcClosed chan struct{}) { - // We can handle errors in a finer-grained manner by inlining io.Copy (it's - // simple, and we drop the ReaderFrom or WriterTo checks for - // net.Conn->net.Conn transfers, which aren't needed). This would also let - // us adjust buffersize. - _, err := io.Copy(dst, src) + redis_ip, redis_port := getRedisMnameInfo(name, conn) + //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur + lock.Lock() + + defer lock.Unlock() + + CurrentE.Pair.To = redis_ip + ":" + redis_port + + ConfigMap[name] = CurrentE + + + redis_tcp_local_port := getLocalRedisPort() + + //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) + flags := int32(0) + + acl := zk.WorldACL(zk.PermAll) + + conn.Create(RedisLocalPortsPath+"/"+name, []byte(redis_tcp_local_port), flags, acl) + + local_addr := "127.0.0.1" + ":" + redis_tcp_local_port + + logger.Infof("Redis %s local addr is %s", name, local_addr) + + local_tcp_addr, _ := net.ResolveTCPAddr("tcp4", local_addr) + + to_addr := string(redis_ip) + ":" + string(redis_port) + + to_tcp_addr, _ := net.ResolveTCPAddr("tcp4", to_addr) + + logger.Infof("Redis %s local_tcp_addr is %s, to_tcp_addr is %s \n", name, local_tcp_addr.String(), to_tcp_addr.String()) + + currentProxyPair := PorxyPair{From: local_tcp_addr.String(), To: to_tcp_addr.String()} + + CurrentEntry := Entry{Name: name, Pair: currentProxyPair} + + ConfigMap[name] = CurrentEntry + + go HandleConnection(CurrentEntry) - if err != nil { - logger.Errorf("Copy error: %s", err) - } - if err := src.Close(); err != nil { - logger.Errorf("Close error: %s", err) } - srcClosed <- struct{}{} } +func updateRedisProxy(name string, conn *zk.Conn) { -func relay(src net.Conn, dst net.Conn, stop chan bool) { - _,err := io.Copy(dst, src) - if err != nil { - logger.Errorf("src addr is %v, dst addr is %v, error is:%v\n", src.LocalAddr(), dst.LocalAddr(),err) - dst.Close() - src.Close() - stop <- true + var CurrentE Entry + var OK bool + + if CurrentE, OK = ConfigMap[name]; OK { + logger.Infof("Redis %s exist in ConfigMap, and it might have failoevr occurred, will master ip.", name) + + redis_ip, redis_port := getRedisMnameInfo(name, conn) + + if redis_ip == "" || redis_port == "" { + logger.Errorf("Failed to update redis, eigher redis_ip or redis_port values is empty. redis_ip is %v, redis_port is %v", redis_ip, redis_port) + return + } + //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur + lock.Lock() + + defer lock.Unlock() + + CurrentE.Pair.To = redis_ip + ":" + redis_port + ConfigMap[name] = CurrentE + logging.Warnf("Change Redis %v master address into %v", name, CurrentE.Pair.To) + + return + + } else { + logger.Warnf("Redis %s not exit in ConfigMap, will return", name) return } } -func cleanProxy(conn *zk.Conn) { +func watchRedisStatus(conn *zk.Conn) { - go func(){ - for { + zksStr := os.Getenv("ZOOKEEPER_SERVERS") - time.Sleep(time.Second * CleanUpInterval) + retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15 * time.Second) - logger.Infof("cleanProxy: Sleep %d seconds", CleanUpInterval) + client := curator.NewClient(zksStr,retryPolicy) - redis_instances, _, err := conn.Children(RedisPath) + client.Start() - if err != nil { - logger.Errorf("Failed to get redis instances.") - return - } + defer client.Close() - logger.Infof("cleanProxy: redis_instaces nodes are %v", redis_instances) + treeCache := cache.NewTreeCache(client, RedisPath, cache.DefaultTreeCacheSelector) - for _,name := range redis_instances { + treeCache.Start() - redis_status,_, err := conn.Get(RedisPath + "/" + name + "/" + "Status") + defer treeCache.Stop() - logger.Infof("cleanProxy: redis %s status is %s", name, redis_status) + cacheListener := cache.NewTreeCacheListener(func(client curator.CuratorFramework, event cache.TreeCacheEvent) error { - if err != nil { - logger.Errorf("cleanProxy: err occured when getting redis %s path %v", name, err) - return - } + switch event.Type.String() { + case "NodeAdded": + //fmt.Printf( event_path) + event_path := event.Data.Path() + logger.Infof("TreeCache event is: NodeAdded, zk path is %s \n", event_path) - logger.Infof("cleanProxy:redis %s status is %s.\n", name, redis_status) + if strings.Contains(event_path, "Mname") { - if strings.EqualFold(string(redis_status), "DELETED") || redis_status == nil { + elapsed := time.Since(startTime).Seconds() + if elapsed < ProgrameStartTimeAtLeast { - //delete znode in zk + logger.Infof("Program is just started in, will skip the InitializePorxy function ") - DeleteZKPathRecursive(RedisPath + "/" + name) + } else { - CurrentE, ok := ConfigMap[name] + logger.Infof("New redis has been created, Will Sync the status") - if ok { - logger.Infof("cleanProxy: redis %s is in the ConfigMap", name) + time.Sleep(2 * SyncZKIntervalSecs * time.Second) - from_addr := CurrentE.Pair.From + redisName := strings.Split(event_path,"/")[3] - logger.Infof("cleanProxy: redis %s from_addr is %s", name, from_addr) + if _, ok := ConfigMap[redisName]; ok { - delete(ConfigMap, name) + logger.Infof("Redis %s has already been created!", redisName) } else { - logger.Infof("cleanProxy: redis %s is not in the ConfigMap", name) + logger.Infof("Redis %s has not been created, will create it by running addRedisProxy function", redisName) + addRedisProxy(redisName, conn) + } } } + case "NodeUpdated": + // fmt.Printf( event_path) + event_path := event.Data.Path() - } - }() -} + logger.Infof("TreeCache event is: NodeUpdated, zk path is %s \n", event_path) + if strings.Contains(event_path, "Mname") { -//HandleHTTPUpdate Call beack to handle /Update/ HTTP call back -func HandleHTTPUpdate(w http.ResponseWriter, r *http.Request) { - //log.Printf(w, "Hi there, Going to Update %s! Method=%s\n", r.URL.Path[1:], r.Method) - if r.Method == "PUT" { - //This can be used for updating an existing variable - content, err := ioutil.ReadAll(r.Body) - r.Body.Close() - if err != nil { - //log.Printf(w, "Error understanding the Body %v", err) - logger.Errorf("Error understanding the Body %v", err) - return - } + logger.Infof("Redis node instance has changed, possibly master and slave switched. Will Sync the status") + //time.Sleep(SyncZKIntervalSecs * time.Second) - var val HTTPUpdate - var CurrentE Entry - var OK bool - err = json.Unmarshal(content, &val) - if err != nil { - //log.Printf(w, "Wrong json format %v", err) - logger.Errorf("Wrong json format %v", err) - return - } - if CurrentE, OK = ConfigMap[val.Name]; !OK { - logger.Infof("Error Proxy entry is incorrect / empty for %s", val.Name) - //log.Printf(w, "Error Proxy entry is incorrect / empty for %s", val.Name) - return + elapsed := time.Since(startTime).Seconds() + + if elapsed < ProgrameStartTimeAtLeast { + + logger.Infof("Program is just started in, will skip the InitializePorxy function ") + + } else { + + logger.Infof("New redis has been created, Will Sync the status") + + time.Sleep( SyncZKIntervalSecs * time.Second) + + redisName := strings.Split(event_path,"/")[3] + + if redisName != "" { + + if _, ok := ConfigMap[redisName]; ok { + + logger.Infof("Redis %s in ConfigMap might have failover occurred, will try to update the master ip by running updateRedisProxy.!", redisName) + updateRedisProxy(redisName, conn) + + } else { + + logger.Infof("Redis %s has not been created, will create it by running addRedisProxy function", redisName) + + } + + } else { + logger.Errorf("Failed to extract redis name from event_path %v, and redis name %v is empty", event_path, redisName) + } + + } + + } + + case "NodeRemoved": + //fmt.Printf( event_path) + event_path := event.Data.Path() + logger.Infof("TreeCache event is: NodeRemoved \n, zk path is %s", event_path) + case "ConnSuspended": + //fmt.Printf( event_path) + logger.Infof("TreeCache event is: ConnSuspended \n") + case "ConnReconnected": + //fmt.Printf( event_path) + logger.Infof("TreeCache event is: ConnReconnected \n") + case "ConnLost": + //fmt.Printf( event_path) + logger.Infof("TreeCache event is: ConnLost \n") + case "Initialized": + //fmt.Printf( event_path) + logger.Infof("TreeCache event is: Initialized \n") + default: + logger.Infof("TreeCache event is: unknown. \n") } - logger.Info("Updating From porxy for %s From %s TO %s", val.Name, CurrentE.Pair.To, val.Addr) - CurrentE.Pair.To = val.Addr - ConfigMap[val.Name] = CurrentE - return - } - return -} + return nil + }) -//HandleHTTPGet call back to handle /Get/ HTTP callback -func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { - retBytes, err := json.MarshalIndent(ConfigMap, " ", " ") - if err != nil { - logger.Errorf("Error Marshalling HandleHTTPGet() %v", err) - //log.Printf(w, "Error Marshalling HandleHTTPGet() %v", err) - return + treeCache.Listenable().AddListener(cacheListener) - } - fmt.Fprintf(w, string(retBytes) ) - return + logger.Infof("Adding listener for treeCache.") + + wait_ch := make(chan bool) + <-wait_ch } -func main() { +/* +func init() { - var Cfg Config + filePath := LogFilePath + + fileMode := os.O_APPEND + + bufferSize := 0 + + bufferFlushTime := 30 * time.Second + + inputChanSize := 1 + + backupCount := uint32(LogFileMaxBackups) + // set the maximum size of every file to 100 M bytes + fileMaxBytes := uint64(LogFileMaxSize) + + // create a handler(which represents a log message destination) + handler := logging.MustNewRotatingFileHandler( + filePath, fileMode, bufferSize, bufferFlushTime, inputChanSize, + fileMaxBytes, backupCount) + + + // the format for the whole log message + format := "%(asctime)s %(levelname)s (%(filename)s:%(lineno)d) " + + "%(name)s %(message)s" + + // the format for the time part + dateFormat := "%Y-%m-%d %H:%M:%S.%3n" + + // create a formatter(which controls how log messages are formatted) + formatter := logging.NewStandardFormatter(format, dateFormat) + + // set formatter for handler + handler.SetFormatter(formatter) + + //Define logger name of program as redis_proxy + logger = logging.GetLogger("redis_proxy") + + logger.SetLevel(logging.LevelInfo) + + logger.AddHandler(handler) + + // ensure all log messages are flushed to disk before program exits. + defer logging.Shutdown() + + fmt.Println("Finish init") + +} +*/ +func main() { //Initialize the global Config map ConfigMap = make(map[string]Entry) @@ -602,10 +755,7 @@ func main() { //Initialize the global LocalPorts map LocalPortsMap = make(map[string]string) - //Read a config file that has json update the config files - cfgFileName := flag.String("config", "./config.json", "Supply the location of MrRedis configuration file") - flag.Parse() - + //init() filePath := LogFilePath fileMode := os.O_APPEND @@ -620,8 +770,6 @@ func main() { // set the maximum size of every file to 100 M bytes fileMaxBytes := uint64(LogFileMaxSize) - - // create a handler(which represents a log message destination) handler := logging.MustNewRotatingFileHandler( filePath, fileMode, bufferSize, bufferFlushTime, inputChanSize, @@ -641,27 +789,16 @@ func main() { // set formatter for handler handler.SetFormatter(formatter) + //Define logger name of program as redis_proxy + logger = logging.GetLogger("redis_proxy") logger.SetLevel(logging.LevelInfo) logger.AddHandler(handler) - // ensure all log messages are flushed to disk before program exits. defer logging.Shutdown() - logger.Infof("The config file name is %s ", *cfgFileName) - cfgFile, err := ioutil.ReadFile(*cfgFileName) - - if err != nil { - logger.Error("Error Reading the configration file. Resorting to default values") - } - err = json.Unmarshal(cfgFile, &Cfg) - if err != nil { - logger.Errorf("Error parsing the config file %v", err) - return - } - logger.Infof("Configuration file is = %v", Cfg) conn := connect() @@ -675,28 +812,14 @@ func main() { InitializeProxy(conn, RedisPath) - //Clean up unused tcp ports, eg: when redis status is DELETED, the local proxy server of that redis will be shutdown. - //comment cleanProxy to avoid concurrent write Map error - //cleanProxy(conn) - - go func() { - - for { - - time.Sleep(time.Second * SyncZKIntervalSecs) - - logger.Info("Routine: Sync redis infomration from zk...") - - InitializeProxy(conn, RedisPath) - } - - }() + //Watch each redis status and take action if failover occurs or new redis created + go watchRedisStatus(conn) http.HandleFunc("/Update/", HandleHTTPUpdate) http.HandleFunc("/Get/", HandleHTTPGet) - logger.Fatal(http.ListenAndServe(":"+Cfg.HTTPPort, nil)) + logger.Fatal(http.ListenAndServe(":" + string(ProxyPort) , nil)) //Wait indefinitely waitCh := make(chan bool) From 03d53c5113b1b864ac7ab2f60b1ea44dde3214bd Mon Sep 17 00:00:00 2001 From: zou sheng Date: Tue, 26 Sep 2017 08:15:17 +0800 Subject: [PATCH 10/14] Use gofmt reformat the code --- proxy/redis_proxy.go | 47 ++++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 1292226..936b2a8 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -5,8 +5,8 @@ import ( "fmt" "github.com/curator-go/curator" "github.com/curator-go/curator/recipes/cache" - "github.com/samuel/go-zookeeper/zk" "github.com/hhkbp2/go-logging" + "github.com/samuel/go-zookeeper/zk" "io" "io/ioutil" "math/rand" @@ -15,8 +15,8 @@ import ( "os" "strconv" "strings" - "time" "sync" + "time" ) var ( @@ -30,7 +30,6 @@ var ( //logger = logging.GetLogger("redis_proxy") logger logging.Logger - ) var startTime = time.Now() @@ -63,11 +62,11 @@ const ( LogFilePath = "/data/apps/log/MrRedis-local-proxy.log" - LogFileMaxSize = 100 * 1024 * 1024 // megabytes + LogFileMaxSize = 100 * 1024 * 1024 // megabytes LogFileMaxBackups = 10 - ProgrameStartTimeAtLeast = 60 + ProgrameStartTimeAtLeast = 60 ) //Config json config structure for the proxy @@ -166,7 +165,7 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { } -func getRedisMnameInfo(name string, conn *zk.Conn) (string, string){ +func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { logger.Infof("Get redis %v Mname info redis_ip and redis_port.", name) @@ -175,7 +174,7 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string){ redis_id, _, err := conn.Get(redis_id_path) if err != nil { - logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath + "/" + name + "/Mname") + logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath+"/"+name+"/Mname") must(err) } @@ -198,7 +197,7 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string){ return "", "" } else { - logger.Infof("Second time to fetch new redis %v ip successfully, the ip is %v.",name, redis_ip) + logger.Infof("Second time to fetch new redis %v ip successfully, the ip is %v.", name, redis_ip) } //must(err) } @@ -207,14 +206,13 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string){ redis_port, _, err := conn.Get(redis_port_path) if err != nil { - logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port") + logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath+"/"+name+"/Procs/"+string(redis_id)+"/Port") must(err) } return string(redis_ip), string(redis_port) } - func InitializeProxy(conn *zk.Conn, path string) { logger.Infof("Run InitializeProxy at boot time %v", time.Now()) @@ -230,7 +228,6 @@ func InitializeProxy(conn *zk.Conn, path string) { redis_status, _, _ := conn.Get(RedisPath + "/" + name + "/Status") - if redis_status != nil && strings.EqualFold(string(redis_status), "RUNNING") { logger.Infof("redis instance %s status is running.", name) @@ -253,7 +250,7 @@ func InitializeProxy(conn *zk.Conn, path string) { CurrentE.Pair.From = "127.0.0.1" + ":" + redis_tcp_local_port - logger.Infof("Set redis instance %s Pair.From properties to %s" , name, CurrentE.Pair.From) + logger.Infof("Set redis instance %s Pair.From properties to %s", name, CurrentE.Pair.From) } @@ -310,7 +307,6 @@ func InitializeProxy(conn *zk.Conn, path string) { } } - } func getLocalRedisPort() string { @@ -413,7 +409,7 @@ func HandleConnection(E Entry) error { select { case first <- struct{}{}: if err != nil { - logger.Errorf("Copy error is %v:",err) + logger.Errorf("Copy error is %v:", err) } _ = dst.Close() _ = src.Close() @@ -475,14 +471,12 @@ func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { return } - fmt.Fprintf(w, string(retBytes) ) + fmt.Fprintf(w, string(retBytes)) return } func addRedisProxy(name string, conn *zk.Conn) { - - if CurrentE, ok := ConfigMap[name]; ok { logger.Infof("Redis instance %v proxy already exist in configMap.", name) return @@ -500,7 +494,6 @@ func addRedisProxy(name string, conn *zk.Conn) { ConfigMap[name] = CurrentE - redis_tcp_local_port := getLocalRedisPort() //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) @@ -569,9 +562,9 @@ func watchRedisStatus(conn *zk.Conn) { zksStr := os.Getenv("ZOOKEEPER_SERVERS") - retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15 * time.Second) + retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second) - client := curator.NewClient(zksStr,retryPolicy) + client := curator.NewClient(zksStr, retryPolicy) client.Start() @@ -606,9 +599,9 @@ func watchRedisStatus(conn *zk.Conn) { time.Sleep(2 * SyncZKIntervalSecs * time.Second) - redisName := strings.Split(event_path,"/")[3] + redisName := strings.Split(event_path, "/")[3] - if _, ok := ConfigMap[redisName]; ok { + if _, ok := ConfigMap[redisName]; ok { logger.Infof("Redis %s has already been created!", redisName) @@ -642,13 +635,13 @@ func watchRedisStatus(conn *zk.Conn) { logger.Infof("New redis has been created, Will Sync the status") - time.Sleep( SyncZKIntervalSecs * time.Second) + time.Sleep(SyncZKIntervalSecs * time.Second) - redisName := strings.Split(event_path,"/")[3] + redisName := strings.Split(event_path, "/")[3] if redisName != "" { - if _, ok := ConfigMap[redisName]; ok { + if _, ok := ConfigMap[redisName]; ok { logger.Infof("Redis %s in ConfigMap might have failover occurred, will try to update the master ip by running updateRedisProxy.!", redisName) updateRedisProxy(redisName, conn) @@ -775,7 +768,6 @@ func main() { filePath, fileMode, bufferSize, bufferFlushTime, inputChanSize, fileMaxBytes, backupCount) - // the format for the whole log message format := "%(asctime)s %(levelname)s (%(filename)s:%(lineno)d) " + "%(name)s %(message)s" @@ -799,7 +791,6 @@ func main() { // ensure all log messages are flushed to disk before program exits. defer logging.Shutdown() - conn := connect() defer conn.Close() @@ -819,7 +810,7 @@ func main() { http.HandleFunc("/Get/", HandleHTTPGet) - logger.Fatal(http.ListenAndServe(":" + string(ProxyPort) , nil)) + logger.Fatal(http.ListenAndServe(":"+string(ProxyPort), nil)) //Wait indefinitely waitCh := make(chan bool) From 73351b57eeda4964ccca0c29674fdd8a8a38d7f1 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Tue, 26 Sep 2017 18:07:45 +0800 Subject: [PATCH 11/14] remove panic and add zk path updats watch when redis created or removed --- proxy/redis_proxy.go | 265 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 210 insertions(+), 55 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 936b2a8..bcdd168 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -52,7 +52,7 @@ const ( RedisPortMaxNum = 6400 - ProxyPort = 7979 + ProxyAddr = "127.0.0.1:7979" SyncZKIntervalSecs = 3 @@ -66,7 +66,9 @@ const ( LogFileMaxBackups = 10 - ProgrameStartTimeAtLeast = 60 + ProgrameStartTimeAtLeast = 30 + + FetchRedisIpTimeOutSecs = 60 ) //Config json config structure for the proxy @@ -146,8 +148,10 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { } redis_local_ports, _, err := conn.Children(path) - - must(err) + if err != nil { + logger.Errorf("Error to get redis_local_ports, error is %s.", err) + return + } for _, name := range redis_local_ports { @@ -165,49 +169,107 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { } + func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { logger.Infof("Get redis %v Mname info redis_ip and redis_port.", name) redis_id_path := RedisPath + "/" + name + "/Mname" - redis_id, _, err := conn.Get(redis_id_path) + var redis_id string + + idTimeCount := time.Now() + + for { + redisId, _, idErr := conn.Get(redis_id_path) + + if idErr != nil { + logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath+"/"+name+"/Mname") + return "","" + } + + if redisId != nil && string(redisId) != "" { + + logger.Infof("Redis %s get the id from zk, the redis id is %s", name, string(redis_id)) + redis_id = string(redisId) + break + + } else { + elapsed := time.Since(idTimeCount).Seconds() + + logger.Infof("Fetch redis %s spends %d seconds already.", name, elapsed) + + if elapsed > FetchRedisIpTimeOutSecs { + logger.Errorf("Failed to fetch redis %s id, and it's over %d secoonds. will ignore this request!", name, FetchRedisIpTimeOutSecs) + break + } + + time.Sleep(1 * time.Second) + + logger.Errorf("Redis %s failed to get new redis id, the id is %s. Will get fetch it again.", name, string(redisId)) + } + - if err != nil { - logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath+"/"+name+"/Mname") - must(err) } - redis_ip_path := RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/IP" + if redis_id == "" { + logger.Errorf("Get redis %s Mname id null, will return empty string!") + return "","" + } - var redis_ip []byte + redis_ip_path := RedisPath + "/" + name + "/Procs/" + redis_id + "/IP" - redis_ip, _, err = conn.Get(redis_ip_path) + logger.Infof("redis %s redis_ip_path is %s", name, redis_id_path) - if err != nil { + var redis_ip string - logger.Errorf("Redis %v failed to get new redis ip, the ip is %v. Will wait 5 seconds and try to fetch it again.", name, redis_ip) - time.Sleep(5 * time.Second) + timeCount := time.Now() - redis_ip, _, err = conn.Get(redis_ip_path) + for { + redisIp, _, err := conn.Get(redis_ip_path) - if err != nil { + if err == nil { - logger.Errorf("Failed to fetch new redis ip second time, wrill pass this") - return "", "" + if redisIp != nil && string(redisIp) != "" { + + logger.Infof("Redis %s get the ip from zk, the redis ip is %v", name, redis_ip) + redis_ip = string(redisIp) + break + + } else { + + logger.Errorf("Redis %v failed to get new redis ip, the ip is %v. Will get fetch it again.", name, redis_ip) + } + + elapsed := time.Since(timeCount).Seconds() + + logger.Infof("Fetch redis %s spends %d seconds already.", name, elapsed) + + if elapsed > FetchRedisIpTimeOutSecs { + logger.Errorf("Failed to fetch redis %s ip, and it's over %d secoonds. will ignore this request!", name, FetchRedisIpTimeOutSecs) + break + } + + time.Sleep(1 * time.Second) } else { - logger.Infof("Second time to fetch new redis %v ip successfully, the ip is %v.", name, redis_ip) + logger.Errorf("failed to get redis ip, error is %s", err) + logger.Error("Failed to get redis %s ip, redis ip is %v, zk conection might have problem, eth error is %v", name, redis_ip, err) + break } - //must(err) } - redis_port_path := RedisPath + "/" + name + "/Procs/" + string(redis_id) + "/Port" + if redis_ip == "" { + logger.Errorf("Get redis %s IP as null, will return empty string!") + return "","" + } + + redis_port_path := RedisPath + "/" + name + "/Procs/" + redis_id + "/Port" redis_port, _, err := conn.Get(redis_port_path) if err != nil { logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath+"/"+name+"/Procs/"+string(redis_id)+"/Port") - must(err) + return "", "" } return string(redis_ip), string(redis_port) @@ -232,8 +294,20 @@ func InitializeProxy(conn *zk.Conn, path string) { logger.Infof("redis instance %s status is running.", name) + redis_mname, _, _ := conn.Get(RedisPath + "/" + name + "/Mname") + + if redis_mname == nil || string(redis_mname) == "" { + logger.Errorf("redis %s Mname is empty. Will skip this redis instance.", name) + continue + } + redis_ip, redis_port := getRedisMnameInfo(name, conn) + if redis_ip == "" || redis_port == "" { + logger.Errorf("redis %s Pairto ip %s or port %s is empty. Will skip this redis instance.", name, redis_ip, redis_port) + continue + } + var redis_tcp_local_port string if CurrentE, ok := ConfigMap[name]; ok { @@ -477,23 +551,37 @@ func HandleHTTPGet(w http.ResponseWriter, r *http.Request) { func addRedisProxy(name string, conn *zk.Conn) { - if CurrentE, ok := ConfigMap[name]; ok { + var CurrentE Entry + var OK bool + + if name == "" { + logger.Errorf("Redis name is empty, will ingore this request.") + return + } + if CurrentE, OK = ConfigMap[name]; OK { + logger.Infof("Redis instance %v proxy already exist in configMap.", name) return + } else { logger.Infof("Redis instance not exsit in configMap.", name) redis_ip, redis_port := getRedisMnameInfo(name, conn) + //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur + + if redis_ip == "" || redis_port == "" { + logger.Errorf("Failed to add redis instance %s, eigher redis_ip or redis_port values is empty. redis_ip is %v, redis_port is %v", name, redis_ip, redis_port) + return + } + lock.Lock() defer lock.Unlock() CurrentE.Pair.To = redis_ip + ":" + redis_port - ConfigMap[name] = CurrentE - redis_tcp_local_port := getLocalRedisPort() //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) @@ -503,25 +591,11 @@ func addRedisProxy(name string, conn *zk.Conn) { conn.Create(RedisLocalPortsPath+"/"+name, []byte(redis_tcp_local_port), flags, acl) - local_addr := "127.0.0.1" + ":" + redis_tcp_local_port - - logger.Infof("Redis %s local addr is %s", name, local_addr) - - local_tcp_addr, _ := net.ResolveTCPAddr("tcp4", local_addr) - - to_addr := string(redis_ip) + ":" + string(redis_port) + CurrentE.Pair.From = "127.0.0.1" + ":" + redis_tcp_local_port - to_tcp_addr, _ := net.ResolveTCPAddr("tcp4", to_addr) - - logger.Infof("Redis %s local_tcp_addr is %s, to_tcp_addr is %s \n", name, local_tcp_addr.String(), to_tcp_addr.String()) - - currentProxyPair := PorxyPair{From: local_tcp_addr.String(), To: to_tcp_addr.String()} - - CurrentEntry := Entry{Name: name, Pair: currentProxyPair} - - ConfigMap[name] = CurrentEntry + ConfigMap[name] = CurrentE - go HandleConnection(CurrentEntry) + go HandleConnection(CurrentE) } } @@ -532,6 +606,7 @@ func updateRedisProxy(name string, conn *zk.Conn) { var OK bool if CurrentE, OK = ConfigMap[name]; OK { + logger.Infof("Redis %s exist in ConfigMap, and it might have failoevr occurred, will master ip.", name) redis_ip, redis_port := getRedisMnameInfo(name, conn) @@ -601,19 +676,25 @@ func watchRedisStatus(conn *zk.Conn) { redisName := strings.Split(event_path, "/")[3] - if _, ok := ConfigMap[redisName]; ok { + if redisName != "" { - logger.Infof("Redis %s has already been created!", redisName) + if _, ok := ConfigMap[redisName]; ok { - } else { + logger.Infof("Redis %s has already been created!", redisName) + + } else { - logger.Infof("Redis %s has not been created, will create it by running addRedisProxy function", redisName) - addRedisProxy(redisName, conn) + logger.Infof("Redis %s has not been created, will create it later.", redisName) + //addRedisProxy(redisName, conn) + } + } else { + logger.Errorf("Failed to get redis name ") } } } + case "NodeUpdated": // fmt.Printf( event_path) event_path := event.Data.Path() @@ -622,18 +703,17 @@ func watchRedisStatus(conn *zk.Conn) { if strings.Contains(event_path, "Mname") { - logger.Infof("Redis node instance has changed, possibly master and slave switched. Will Sync the status") + logger.Infof("Redis node instance has changed, will sync the updates to ConfigMap.") //time.Sleep(SyncZKIntervalSecs * time.Second) elapsed := time.Since(startTime).Seconds() if elapsed < ProgrameStartTimeAtLeast { - logger.Infof("Program is just started in, will skip the InitializePorxy function ") + logger.Infof("Program might be just started in very short time, will skip the InitializePorxy function ") } else { - logger.Infof("New redis has been created, Will Sync the status") time.Sleep(SyncZKIntervalSecs * time.Second) @@ -643,12 +723,31 @@ func watchRedisStatus(conn *zk.Conn) { if _, ok := ConfigMap[redisName]; ok { - logger.Infof("Redis %s in ConfigMap might have failover occurred, will try to update the master ip by running updateRedisProxy.!", redisName) - updateRedisProxy(redisName, conn) + redis_status_path := RedisPath + "/" + redisName + "/Status" + + redis_status, _, err := conn.Get(redis_status_path) + + if err != nil { + logger.Errorf("Failed to get redis %v status %v, error is %v", redisName, redis_status, err.Error()) + } else { + + logger.Infof("redis %s status is %v.", redisName, redis_status) + } + + switch string(redis_status) { + + case "RUNNING": + logger.Infof("Redis %s status is %v, failover might have occurred, will try to update the master ip by running updateRedisProxy.!", redisName, redis_status) + updateRedisProxy(redisName, conn) + default: + logger.Infof("Redis %s status is %s, failover might have occurred, or redis is deleted!", redisName, redis_status) + + } } else { - logger.Infof("Redis %s has not been created, will create it by running addRedisProxy function", redisName) + logger.Infof("Redis %s is not in ConfigMap, will create it by running addRedisProxy function", redisName) + addRedisProxy(redisName, conn) } @@ -660,6 +759,52 @@ func watchRedisStatus(conn *zk.Conn) { } + if strings.Contains(event_path, "/Status") { + + logger.Infof("Redis node instance status has changed, will sync the updates to ConfigMap.") + + redisName := strings.Split(event_path, "/")[3] + + if redisName != "" { + + if _, ok := ConfigMap[redisName]; ok { + + redis_status_path := RedisPath + "/" + redisName + "/Status" + + redis_status, _, err := conn.Get(redis_status_path) + + if err != nil { + logger.Errorf("Failed to get redis %v status %v, error is %v", redisName, redis_status, err.Error()) + } else { + + logger.Infof("redis %s status is %v.", redisName, redis_status) + } + + switch string(redis_status) { + + case "DELETED": + logger.Infof("Redis %v status is deleted, should remove it from configMap.") + lock.Lock() + defer lock.Unlock() + delete(ConfigMap, redisName) + default: + logger.Infof("redis %s status is %s, will do nothing about it.", redisName, redis_status ) + } + } + } + } + + logging.Infof("Last setp on UpdateNode, CLEAN empty key.") + for key,_ := range ConfigMap { + if key == "" { + lock.Lock() + defer lock.Unlock() + delete(ConfigMap,"") + } + } + + + case "NodeRemoved": //fmt.Printf( event_path) event_path := event.Data.Path() @@ -801,7 +946,7 @@ func main() { //Initialize existent proxy instance inside zk and added them into ConfigMap - InitializeProxy(conn, RedisPath) + go InitializeProxy(conn, RedisPath) //Watch each redis status and take action if failover occurs or new redis created go watchRedisStatus(conn) @@ -810,7 +955,17 @@ func main() { http.HandleFunc("/Get/", HandleHTTPGet) - logger.Fatal(http.ListenAndServe(":"+string(ProxyPort), nil)) + err := http.ListenAndServe(ProxyAddr, nil) + + if err != nil { + + logger.Errorf("Failed to start http server on port %v!, error is %v", ProxyAddr, err.Error()) + + } else { + + logger.Infof("Start http server on port %v successfuly!", ProxyAddr) + + } //Wait indefinitely waitCh := make(chan bool) From 1987ee05e1652fac094dedebc0c3508e1281a66a Mon Sep 17 00:00:00 2001 From: zou sheng Date: Wed, 27 Sep 2017 11:06:36 +0800 Subject: [PATCH 12/14] Fix creating redis instance name empty issue. --- proxy/redis_proxy.go | 401 ++++++++++++++++--------------------------- 1 file changed, 152 insertions(+), 249 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index bcdd168..1097290 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "errors" "fmt" "github.com/curator-go/curator" "github.com/curator-go/curator/recipes/cache" @@ -54,8 +55,6 @@ const ( ProxyAddr = "127.0.0.1:7979" - SyncZKIntervalSecs = 3 - RedisPath = "/MrRedis/Instances" RedisLocalPortsPath = "/MrRedisLocalPorts" @@ -68,7 +67,9 @@ const ( ProgrameStartTimeAtLeast = 30 - FetchRedisIpTimeOutSecs = 60 + FetchRedisIpTimeOutSecs = 30 + + RedisFailOverTimeSecs = 5 ) //Config json config structure for the proxy @@ -155,21 +156,20 @@ func PrepareLocalPorts(conn *zk.Conn, path string) { for _, name := range redis_local_ports { - local_port, _, _ := conn.Get(path + "/" + name) + local_port, _ := getValueFromZK(conn, path+"/"+name) _, ok := LocalPortsMap[name] if ok { logger.Infof("%s local port %s already exist in LocalPortsMap.\n", name, local_port) } else { - LocalPortsMap[name] = string(local_port) + LocalPortsMap[name] = local_port } } } - func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { logger.Infof("Get redis %v Mname info redis_ip and redis_port.", name) @@ -181,59 +181,56 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { idTimeCount := time.Now() for { - redisId, _, idErr := conn.Get(redis_id_path) + redisId, idErr := getValueFromZK(conn, redis_id_path) + logger.Infof("redis %s path is %s", name, redisId) if idErr != nil { logger.Errorf("zk path /name/instance/Mname error: %v\n", RedisPath+"/"+name+"/Mname") - return "","" + return "", "" } - if redisId != nil && string(redisId) != "" { + redis_id = redisId - logger.Infof("Redis %s get the id from zk, the redis id is %s", name, string(redis_id)) - redis_id = string(redisId) + if redis_id != "" { + + logger.Infof("Redis %s redis_id value is %s.", name, redis_id) break } else { elapsed := time.Since(idTimeCount).Seconds() - - logger.Infof("Fetch redis %s spends %d seconds already.", name, elapsed) - if elapsed > FetchRedisIpTimeOutSecs { - logger.Errorf("Failed to fetch redis %s id, and it's over %d secoonds. will ignore this request!", name, FetchRedisIpTimeOutSecs) + logger.Errorf("Failed to fetch redis %s id, and it's over %d secoonds. Skip this", name, FetchRedisIpTimeOutSecs) break } - time.Sleep(1 * time.Second) - - logger.Errorf("Redis %s failed to get new redis id, the id is %s. Will get fetch it again.", name, string(redisId)) + logger.Infof("Redis %s redis_id value %s is empty.", name, redis_id) + logger.Errorf("Redis %s failed to get new redis id, the id is %s. Will try to fetch it again.", name, redis_id) } - } - if redis_id == "" { - logger.Errorf("Get redis %s Mname id null, will return empty string!") - return "","" + if redis_id == "" { + logger.Errorf("Get redis %s Mname id null, will return empty string!") + return "", "" } redis_ip_path := RedisPath + "/" + name + "/Procs/" + redis_id + "/IP" - logger.Infof("redis %s redis_ip_path is %s", name, redis_id_path) + logger.Infof("redis %s redis_ip_path is %s", name, redis_ip_path) var redis_ip string timeCount := time.Now() for { - redisIp, _, err := conn.Get(redis_ip_path) + redisIp, err := getValueFromZK(conn, redis_ip_path) if err == nil { - if redisIp != nil && string(redisIp) != "" { + if redisIp != "" { logger.Infof("Redis %s get the ip from zk, the redis ip is %v", name, redis_ip) - redis_ip = string(redisIp) + redis_ip = redisIp break } else { @@ -261,18 +258,33 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { if redis_ip == "" { logger.Errorf("Get redis %s IP as null, will return empty string!") - return "","" + return "", "" } redis_port_path := RedisPath + "/" + name + "/Procs/" + redis_id + "/Port" - redis_port, _, err := conn.Get(redis_port_path) + redis_port, err := getValueFromZK(conn, redis_port_path) if err != nil { - logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath+"/"+name+"/Procs/"+string(redis_id)+"/Port") + logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath+"/"+name+"/Procs/"+redis_id+"/Port") return "", "" } - return string(redis_ip), string(redis_port) + return redis_ip, redis_port +} + +func getValueFromZK(conn *zk.Conn, path string) (string, error) { + + var result string + + val, _, err := conn.Get(path) + if err != nil { + return "", err + } + if val != nil { + result = string(val[:]) + } + + return result, nil } func InitializeProxy(conn *zk.Conn, path string) { @@ -288,20 +300,25 @@ func InitializeProxy(conn *zk.Conn, path string) { for _, name := range redis_instance { - redis_status, _, _ := conn.Get(RedisPath + "/" + name + "/Status") + redis_status_path := RedisPath + "/" + name + "/Status" + redis_mname_path := RedisPath + "/" + name + "/Mname" + redis_status, _ := getValueFromZK(conn, redis_status_path) - if redis_status != nil && strings.EqualFold(string(redis_status), "RUNNING") { + if redis_status == "RUNNING" { - logger.Infof("redis instance %s status is running.", name) + logger.Infof("Redis instance %s status is running.", name) - redis_mname, _, _ := conn.Get(RedisPath + "/" + name + "/Mname") + redis_mname, _ := getValueFromZK(conn, redis_mname_path) - if redis_mname == nil || string(redis_mname) == "" { + if redis_mname == "" { logger.Errorf("redis %s Mname is empty. Will skip this redis instance.", name) continue } - redis_ip, redis_port := getRedisMnameInfo(name, conn) + redis_ip_path := RedisPath + "/" + name + "/Procs/" + redis_mname + "/IP" + redis_ip, _ := getValueFromZK(conn, redis_ip_path) + redis_port_path := RedisPath + "/" + name + "/Procs/" + redis_mname + "/Port" + redis_port, _ := getValueFromZK(conn, redis_port_path) if redis_ip == "" || redis_port == "" { logger.Errorf("redis %s Pairto ip %s or port %s is empty. Will skip this redis instance.", name, redis_ip, redis_port) @@ -309,16 +326,15 @@ func InitializeProxy(conn *zk.Conn, path string) { } var redis_tcp_local_port string + redis_local_port_path := RedisLocalPortsPath + "/" + name if CurrentE, ok := ConfigMap[name]; ok { logger.Infof("Redis instance %s is in the configMap. \n", name) - if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { + if found, _, _ := conn.Exists(redis_local_port_path); found { - redis_port_byte, _, _ := conn.Get(RedisLocalPortsPath + "/" + name) - - redis_tcp_local_port = string(redis_port_byte[:]) + redis_tcp_local_port, _ = getValueFromZK(conn, redis_local_port_path) logger.Infof("InitializeProxy: Redis %s local port %s is already in the MrRedisLocalPort, sync with zk to keep it consistent . \n", name, redis_tcp_local_port) @@ -328,7 +344,7 @@ func InitializeProxy(conn *zk.Conn, path string) { } - CurrentE.Pair.To = string(redis_ip) + ":" + string(redis_port) + CurrentE.Pair.To = redis_ip + ":" + redis_port ConfigMap[name] = CurrentE @@ -336,18 +352,18 @@ func InitializeProxy(conn *zk.Conn, path string) { logger.Infof("Redis name %s not found in the configMap \n", name) - if found, _, _ := conn.Exists(RedisLocalPortsPath + "/" + name); found { - - redis_port_byte, _, _ := conn.Get(RedisLocalPortsPath + "/" + name) + if found, _, _ := conn.Exists(redis_local_port_path); found { - redis_tcp_local_port = string(redis_port_byte[:]) + redis_tcp_local_port, _ = getValueFromZK(conn, redis_local_port_path) - logger.Infof("redis instance %s already exists, redis tcp local port is %s \n", name, redis_tcp_local_port) + logger.Infof("redis %s local port already exists, redis tcp local port is %s \n", name, redis_tcp_local_port) } else { //redis_listen_port := RedisPortBaseNum + len(redis_instance) + logger.Info("InitializeProxy invokes getLocalRedisPort()") + redis_tcp_local_port = getLocalRedisPort() //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) @@ -363,7 +379,7 @@ func InitializeProxy(conn *zk.Conn, path string) { local_tcp_addr, _ := net.ResolveTCPAddr("tcp4", local_addr) - to_addr := string(redis_ip) + ":" + string(redis_port) + to_addr := redis_ip + ":" + redis_port to_tcp_addr, _ := net.ResolveTCPAddr("tcp4", to_addr) @@ -389,6 +405,8 @@ func getLocalRedisPort() string { var redis_tcp_local_port string + logger.Infof("getLocalRedisPort function Run") + for { random_port := RandInt64(RedisPortMinNum, RedisPortMaxNum) @@ -401,7 +419,8 @@ func getLocalRedisPort() string { logger.Infof("redis local port num is %d \n", local_port_num) if local_port_num > 0 { - for _, value := range LocalPortsMap { + for redis_name, value := range LocalPortsMap { + logger.Infof("LocalPortsMap redis %s port is %s", redis_name, value) if strings.EqualFold(redis_tcp_local_port, value) { redis_port_found = true logger.Infof("redis port %s is already assigned.\n", value) @@ -411,6 +430,7 @@ func getLocalRedisPort() string { if redis_port_found { logger.Infof("Local tcp port %s is duplicated, will generate a new one.\n", redis_tcp_local_port) + time.Sleep(1 * time.Second) continue } else { logger.Info("random_tcp_port not assigned in local, so it can be used, will skip this loop.") @@ -443,6 +463,11 @@ func HandleConnection(E Entry) error { defer listener.Close() + if E.Name == "" { + nameErr := errors.New("HandleConnection Entry name is empty. will return") + return nameErr + } + //Add this in the global Map so that it can be updated dynamically by HTTP apis ConfigMap[E.Name] = E @@ -572,7 +597,7 @@ func addRedisProxy(name string, conn *zk.Conn) { //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur if redis_ip == "" || redis_port == "" { - logger.Errorf("Failed to add redis instance %s, eigher redis_ip or redis_port values is empty. redis_ip is %v, redis_port is %v", name, redis_ip, redis_port) + logger.Errorf("Failed to add redis instance %s, eigher redis_ip or redis_port values is empty. redis_ip is %s, redis_port is %s", name, redis_ip, redis_port) return } @@ -582,6 +607,7 @@ func addRedisProxy(name string, conn *zk.Conn) { CurrentE.Pair.To = redis_ip + ":" + redis_port + logger.Infof("addRedisProxy runs getLocalRedisPort for redis %s", name) redis_tcp_local_port := getLocalRedisPort() //redis_tcp_listen_port := strconv.Itoa(random_tcp_port) @@ -593,6 +619,8 @@ func addRedisProxy(name string, conn *zk.Conn) { CurrentE.Pair.From = "127.0.0.1" + ":" + redis_tcp_local_port + CurrentE.Name = name + ConfigMap[name] = CurrentE go HandleConnection(CurrentE) @@ -612,9 +640,12 @@ func updateRedisProxy(name string, conn *zk.Conn) { redis_ip, redis_port := getRedisMnameInfo(name, conn) if redis_ip == "" || redis_port == "" { - logger.Errorf("Failed to update redis, eigher redis_ip or redis_port values is empty. redis_ip is %v, redis_port is %v", redis_ip, redis_port) + logger.Errorf("Failed to update redis, eigher redis_ip or redis_port values is empty. redis_ip is %s, redis_port is %s", redis_ip, redis_port) return } + + time.Sleep(RedisFailOverTimeSecs * time.Second) + //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur lock.Lock() @@ -622,208 +653,130 @@ func updateRedisProxy(name string, conn *zk.Conn) { CurrentE.Pair.To = redis_ip + ":" + redis_port ConfigMap[name] = CurrentE - logging.Warnf("Change Redis %v master address into %v", name, CurrentE.Pair.To) + logging.Warnf("Change Redis %s master address into %s", name, CurrentE.Pair.To) return } else { - logger.Warnf("Redis %s not exit in ConfigMap, will return", name) + logger.Warnf("Redis %s not exists in ConfigMap, will return", name) return } } -func watchRedisStatus(conn *zk.Conn) { - - zksStr := os.Getenv("ZOOKEEPER_SERVERS") - - retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second) - - client := curator.NewClient(zksStr, retryPolicy) - - client.Start() - - defer client.Close() - - treeCache := cache.NewTreeCache(client, RedisPath, cache.DefaultTreeCacheSelector) +func checkRedisUpdate(event_path string, redisName string, conn *zk.Conn) { - treeCache.Start() + if strings.Contains(event_path, "Mname") { - defer treeCache.Stop() + logger.Infof("CheckRedisUpdate for event_path %s which contains Mname change for redis %s.", event_path, redisName) - cacheListener := cache.NewTreeCacheListener(func(client curator.CuratorFramework, event cache.TreeCacheEvent) error { + redis_status_path := RedisPath + "/" + redisName + "/Status" - switch event.Type.String() { + redis_status, err := getValueFromZK(conn, redis_status_path) - case "NodeAdded": - //fmt.Printf( event_path) - event_path := event.Data.Path() - logger.Infof("TreeCache event is: NodeAdded, zk path is %s \n", event_path) + if err != nil { - if strings.Contains(event_path, "Mname") { + logger.Errorf("Failed to get redis %s status %s, error is %s.", redisName, redis_status, err.Error()) - elapsed := time.Since(startTime).Seconds() + } else { - if elapsed < ProgrameStartTimeAtLeast { + logger.Infof("Redis %s status is %s.", redisName, redis_status) + } - logger.Infof("Program is just started in, will skip the InitializePorxy function ") + switch redis_status { - } else { + case "RUNNING": - logger.Infof("New redis has been created, Will Sync the status") + logger.Infof("Redis %s status is %s, failover might have occurred, will try to update the master ip by running updateRedisProxy.!", redisName, redis_status) + updateRedisProxy(redisName, conn) + cleanEmptyEntry() - time.Sleep(2 * SyncZKIntervalSecs * time.Second) + case "DELETED": - redisName := strings.Split(event_path, "/")[3] + logger.Infof("Redis %s status is deleted, should remove it from configMap.", redisName) + lock.Lock() + defer lock.Unlock() + delete(ConfigMap, redisName) - if redisName != "" { + default: + logger.Infof("Redis %s status is %s, failover might have occurred, or redis is deleted!", redisName, redis_status) + } + } +} - if _, ok := ConfigMap[redisName]; ok { +func cleanEmptyEntry() { + logging.Infof("Clean empty entry in ConfigMap.") + for key, _ := range ConfigMap { + if key == "" { + lock.Lock() + defer lock.Unlock() + delete(ConfigMap, "") + } + } +} - logger.Infof("Redis %s has already been created!", redisName) +func getElapsedTime() int { + elapsed := time.Since(startTime).Seconds() + return int(elapsed) +} - } else { +func watchRedisStatus(conn *zk.Conn) { - logger.Infof("Redis %s has not been created, will create it later.", redisName) - //addRedisProxy(redisName, conn) + logger.Info("Run watchRedisStatus method") - } - } else { - logger.Errorf("Failed to get redis name ") - } + zksStr := os.Getenv("ZOOKEEPER_SERVERS") - } - } + retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second) - case "NodeUpdated": - // fmt.Printf( event_path) - event_path := event.Data.Path() + client := curator.NewClient(zksStr, retryPolicy) - logger.Infof("TreeCache event is: NodeUpdated, zk path is %s \n", event_path) + client.Start() - if strings.Contains(event_path, "Mname") { + defer client.Close() - logger.Infof("Redis node instance has changed, will sync the updates to ConfigMap.") - //time.Sleep(SyncZKIntervalSecs * time.Second) + treeCache := cache.NewTreeCache(client, RedisPath, cache.DefaultTreeCacheSelector) - elapsed := time.Since(startTime).Seconds() + treeCache.Start() - if elapsed < ProgrameStartTimeAtLeast { + defer treeCache.Stop() - logger.Infof("Program might be just started in very short time, will skip the InitializePorxy function ") + cacheListener := cache.NewTreeCacheListener(func(client curator.CuratorFramework, event cache.TreeCacheEvent) error { - } else { + switch event.Type.String() { + case "NodeAdded": - time.Sleep(SyncZKIntervalSecs * time.Second) + event_path := event.Data.Path() + if getElapsedTime() > ProgrameStartTimeAtLeast { + if event_path != "" && len(strings.Split(event_path, "/")) > 3 { redisName := strings.Split(event_path, "/")[3] - if redisName != "" { - - if _, ok := ConfigMap[redisName]; ok { - - redis_status_path := RedisPath + "/" + redisName + "/Status" - - redis_status, _, err := conn.Get(redis_status_path) - - if err != nil { - logger.Errorf("Failed to get redis %v status %v, error is %v", redisName, redis_status, err.Error()) - } else { - - logger.Infof("redis %s status is %v.", redisName, redis_status) - } - - switch string(redis_status) { - - case "RUNNING": - logger.Infof("Redis %s status is %v, failover might have occurred, will try to update the master ip by running updateRedisProxy.!", redisName, redis_status) - updateRedisProxy(redisName, conn) - default: - logger.Infof("Redis %s status is %s, failover might have occurred, or redis is deleted!", redisName, redis_status) - - } - - } else { - - logger.Infof("Redis %s is not in ConfigMap, will create it by running addRedisProxy function", redisName) - addRedisProxy(redisName, conn) - - } - + var OK bool + if _, OK = ConfigMap[redisName]; OK { + logger.Infof("Case NodeAdded: redis %s already exist in ConfigMap, skip") } else { - logger.Errorf("Failed to extract redis name from event_path %v, and redis name %v is empty", event_path, redisName) + logger.Infof("Case NodeAdded: zk path is %s, will try to add redis %s into ConfigMap by running addRedisProxy func \n", event_path, redisName) + addRedisProxy(redisName, conn) } } - } - if strings.Contains(event_path, "/Status") { - - logger.Infof("Redis node instance status has changed, will sync the updates to ConfigMap.") - - redisName := strings.Split(event_path, "/")[3] - - if redisName != "" { - - if _, ok := ConfigMap[redisName]; ok { - - redis_status_path := RedisPath + "/" + redisName + "/Status" - - redis_status, _, err := conn.Get(redis_status_path) - - if err != nil { - logger.Errorf("Failed to get redis %v status %v, error is %v", redisName, redis_status, err.Error()) - } else { - - logger.Infof("redis %s status is %v.", redisName, redis_status) - } - - switch string(redis_status) { - - case "DELETED": - logger.Infof("Redis %v status is deleted, should remove it from configMap.") - lock.Lock() - defer lock.Unlock() - delete(ConfigMap, redisName) - default: - logger.Infof("redis %s status is %s, will do nothing about it.", redisName, redis_status ) - } - } - } - } + case "NodeUpdated": - logging.Infof("Last setp on UpdateNode, CLEAN empty key.") - for key,_ := range ConfigMap { - if key == "" { - lock.Lock() - defer lock.Unlock() - delete(ConfigMap,"") + event_path := event.Data.Path() + if getElapsedTime() > ProgrameStartTimeAtLeast { + if event_path != "" && len(strings.Split(event_path, "/")) > 3 { + redisName := strings.Split(event_path, "/")[3] + checkRedisUpdate(event_path, redisName, conn) } } - - - case "NodeRemoved": - //fmt.Printf( event_path) - event_path := event.Data.Path() - logger.Infof("TreeCache event is: NodeRemoved \n, zk path is %s", event_path) - case "ConnSuspended": - //fmt.Printf( event_path) - logger.Infof("TreeCache event is: ConnSuspended \n") - case "ConnReconnected": - //fmt.Printf( event_path) - logger.Infof("TreeCache event is: ConnReconnected \n") - case "ConnLost": - //fmt.Printf( event_path) - logger.Infof("TreeCache event is: ConnLost \n") - case "Initialized": - //fmt.Printf( event_path) - logger.Infof("TreeCache event is: Initialized \n") default: logger.Infof("TreeCache event is: unknown. \n") } + return nil }) @@ -835,56 +788,6 @@ func watchRedisStatus(conn *zk.Conn) { <-wait_ch } -/* -func init() { - - filePath := LogFilePath - - fileMode := os.O_APPEND - - bufferSize := 0 - - bufferFlushTime := 30 * time.Second - - inputChanSize := 1 - - backupCount := uint32(LogFileMaxBackups) - // set the maximum size of every file to 100 M bytes - fileMaxBytes := uint64(LogFileMaxSize) - - // create a handler(which represents a log message destination) - handler := logging.MustNewRotatingFileHandler( - filePath, fileMode, bufferSize, bufferFlushTime, inputChanSize, - fileMaxBytes, backupCount) - - - // the format for the whole log message - format := "%(asctime)s %(levelname)s (%(filename)s:%(lineno)d) " + - "%(name)s %(message)s" - - // the format for the time part - dateFormat := "%Y-%m-%d %H:%M:%S.%3n" - - // create a formatter(which controls how log messages are formatted) - formatter := logging.NewStandardFormatter(format, dateFormat) - - // set formatter for handler - handler.SetFormatter(formatter) - - //Define logger name of program as redis_proxy - logger = logging.GetLogger("redis_proxy") - - logger.SetLevel(logging.LevelInfo) - - logger.AddHandler(handler) - - // ensure all log messages are flushed to disk before program exits. - defer logging.Shutdown() - - fmt.Println("Finish init") - -} -*/ func main() { //Initialize the global Config map @@ -959,11 +862,11 @@ func main() { if err != nil { - logger.Errorf("Failed to start http server on port %v!, error is %v", ProxyAddr, err.Error()) + logger.Errorf("Failed to start http server on port %s!, error is %s", ProxyAddr, err.Error()) } else { - logger.Infof("Start http server on port %v successfuly!", ProxyAddr) + logger.Infof("Start http server on port %s successfuly!", ProxyAddr) } From 2f3f06d4e1e5f44e01ec24c239c5a1aa2e178aa9 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Wed, 27 Sep 2017 12:32:38 +0800 Subject: [PATCH 13/14] Fix master failover not working issue --- proxy/redis_proxy.go | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 1097290..976656f 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -229,7 +229,7 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { if redisIp != "" { - logger.Infof("Redis %s get the ip from zk, the redis ip is %v", name, redis_ip) + logger.Infof("Redis %s get the ip from zk, the redis ip is %v", name, redisIp) redis_ip = redisIp break @@ -268,7 +268,7 @@ func getRedisMnameInfo(name string, conn *zk.Conn) (string, string) { logger.Errorf("zk path name/Pros/instance/Port error: %v\n", RedisPath+"/"+name+"/Procs/"+redis_id+"/Port") return "", "" } - + logger.Infof("getRedisMnameInfo: successfully return redis ip and port %s:%s", name, redis_ip, redis_port) return redis_ip, redis_port } @@ -401,13 +401,14 @@ func InitializeProxy(conn *zk.Conn, path string) { func getLocalRedisPort() string { - var redis_port_found bool = false - var redis_tcp_local_port string logger.Infof("getLocalRedisPort function Run") for { + + redis_port_found := false + random_port := RandInt64(RedisPortMinNum, RedisPortMaxNum) redis_tcp_local_port = strconv.Itoa(random_port) @@ -502,8 +503,6 @@ func HandleConnection(E Entry) error { var wg sync.WaitGroup cp := func(dst net.Conn, src net.Conn) { buf := bufferPool.Get().([]byte) - // TODO use splice on linux - // TODO needs some timeout to prevent torshammer ddos _, err := io.CopyBuffer(dst, src, buf) select { case first <- struct{}{}: @@ -644,7 +643,8 @@ func updateRedisProxy(name string, conn *zk.Conn) { return } - time.Sleep(RedisFailOverTimeSecs * time.Second) + //time.Sleep(RedisFailOverTimeSecs * time.Second) + logging.Infof("updateRedisProxy: Ready to change redis %s ip and port to %s:%s", name, redis_ip, redis_port) //Add lock to ConfigMap in case of concurrent read and write on configMap. eg: create redis and existant redis failover happens at the same time, this might occur lock.Lock() @@ -689,7 +689,6 @@ func checkRedisUpdate(event_path string, redisName string, conn *zk.Conn) { logger.Infof("Redis %s status is %s, failover might have occurred, will try to update the master ip by running updateRedisProxy.!", redisName, redis_status) updateRedisProxy(redisName, conn) - cleanEmptyEntry() case "DELETED": @@ -704,17 +703,6 @@ func checkRedisUpdate(event_path string, redisName string, conn *zk.Conn) { } } -func cleanEmptyEntry() { - logging.Infof("Clean empty entry in ConfigMap.") - for key, _ := range ConfigMap { - if key == "" { - lock.Lock() - defer lock.Unlock() - delete(ConfigMap, "") - } - } -} - func getElapsedTime() int { elapsed := time.Since(startTime).Seconds() return int(elapsed) From 664b84cdf8a3ec8269d4f8b2bac8203954fb3397 Mon Sep 17 00:00:00 2001 From: zou sheng Date: Thu, 28 Sep 2017 17:20:09 +0800 Subject: [PATCH 14/14] Add rand.Seed() to ensure the random port choosen for local proxy. --- proxy/redis_proxy.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/proxy/redis_proxy.go b/proxy/redis_proxy.go index 976656f..a1cdbdc 100644 --- a/proxy/redis_proxy.go +++ b/proxy/redis_proxy.go @@ -53,7 +53,7 @@ const ( RedisPortMaxNum = 6400 - ProxyAddr = "127.0.0.1:7979" + ProxyAddr = ":7979" RedisPath = "/MrRedis/Instances" @@ -121,6 +121,9 @@ func newTCPListener(addr string) (net.Listener, error) { } func RandInt64(min, max int) int { + + rand.Seed(time.Now().UTC().UnixNano()) + if min >= max || min == 0 || max == 0 { return max }