|
| 1 | +package vtgateproxy |
| 2 | + |
| 3 | +import ( |
| 4 | + "encoding/json" |
| 5 | + "flag" |
| 6 | + "fmt" |
| 7 | + "math/rand" |
| 8 | + "os" |
| 9 | + "strconv" |
| 10 | + "time" |
| 11 | + |
| 12 | + "google.golang.org/grpc/resolver" |
| 13 | +) |
| 14 | + |
| 15 | +var ( |
| 16 | + jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution") |
| 17 | +) |
| 18 | + |
| 19 | +// File based discovery for vtgate grpc endpoints |
| 20 | +// This loads the list of hosts from json and watches for changes to the list of hosts. It will select N connection to maintain to backend vtgates. |
| 21 | +// Connections will rebalance every 5 minutes |
| 22 | +// |
| 23 | +// Example json config - based on the slack hosts format |
| 24 | +// |
| 25 | +// [ |
| 26 | +// { |
| 27 | +// "address": "10.4.56.194", |
| 28 | +// "az_id": "use1-az1", |
| 29 | +// "grpc": "15999", |
| 30 | +// "type": "aux" |
| 31 | +// }, |
| 32 | +// |
| 33 | +// Naming scheme: |
| 34 | +// vtgate://<type>?num_connections=<int>&az_id=<string> |
| 35 | +// |
| 36 | +// num_connections: Option number of hosts to open connections to for round-robin selection |
| 37 | +// az_id: Filter to just hosts in this az (optional) |
| 38 | +// type: Only select from hosts of this type (required) |
| 39 | +// |
| 40 | + |
| 41 | +type DiscoveryHost struct { |
| 42 | + Address string |
| 43 | + NebulaAddress string `json:"nebula_address"` |
| 44 | + Grpc string |
| 45 | + AZId string `json:"az_id"` |
| 46 | + Type string |
| 47 | +} |
| 48 | + |
| 49 | +type JSONGateConfigDiscovery struct { |
| 50 | + JsonPath string |
| 51 | +} |
| 52 | + |
| 53 | +func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { |
| 54 | + queryOpts := target.URL.Query() |
| 55 | + queryParamCount := queryOpts.Get("num_connections") |
| 56 | + queryAZID := queryOpts.Get("az_id") |
| 57 | + num_connections := 0 |
| 58 | + |
| 59 | + gateType := target.URL.Host |
| 60 | + |
| 61 | + if queryParamCount != "" { |
| 62 | + num_connections, _ = strconv.Atoi(queryParamCount) |
| 63 | + } |
| 64 | + |
| 65 | + filters := resolveFilters{ |
| 66 | + gate_type: gateType, |
| 67 | + } |
| 68 | + |
| 69 | + if queryAZID != "" { |
| 70 | + filters.az_id = queryAZID |
| 71 | + } |
| 72 | + |
| 73 | + r := &resolveJSONGateConfig{ |
| 74 | + target: target, |
| 75 | + cc: cc, |
| 76 | + jsonPath: b.JsonPath, |
| 77 | + num_connections: num_connections, |
| 78 | + filters: filters, |
| 79 | + } |
| 80 | + r.start() |
| 81 | + return r, nil |
| 82 | +} |
| 83 | +func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" } |
| 84 | + |
| 85 | +func RegisterJsonDiscovery() { |
| 86 | + fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig) |
| 87 | + resolver.Register(&JSONGateConfigDiscovery{ |
| 88 | + JsonPath: *jsonDiscoveryConfig, |
| 89 | + }) |
| 90 | +} |
| 91 | + |
| 92 | +type resolveFilters struct { |
| 93 | + gate_type string |
| 94 | + az_id string |
| 95 | +} |
| 96 | + |
| 97 | +// exampleResolver is a |
| 98 | +// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). |
| 99 | +type resolveJSONGateConfig struct { |
| 100 | + target resolver.Target |
| 101 | + cc resolver.ClientConn |
| 102 | + jsonPath string |
| 103 | + ticker *time.Ticker |
| 104 | + rand *rand.Rand // safe for concurrent use. |
| 105 | + num_connections int |
| 106 | + filters resolveFilters |
| 107 | +} |
| 108 | + |
| 109 | +func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, error) { |
| 110 | + config := []DiscoveryHost{} |
| 111 | + |
| 112 | + data, err := os.ReadFile(r.jsonPath) |
| 113 | + if err != nil { |
| 114 | + return nil, err |
| 115 | + } |
| 116 | + |
| 117 | + err = json.Unmarshal(data, &config) |
| 118 | + if err != nil { |
| 119 | + fmt.Printf("parse err: %v\n", err) |
| 120 | + return nil, err |
| 121 | + } |
| 122 | + |
| 123 | + fmt.Printf("%v\n", config) |
| 124 | + |
| 125 | + addrs := []resolver.Address{} |
| 126 | + for _, s := range config { |
| 127 | + // Apply filters |
| 128 | + if r.filters.gate_type != "" { |
| 129 | + if r.filters.gate_type != s.Type { |
| 130 | + fmt.Printf("Dropped non matching type: %v\n", s.Type) |
| 131 | + continue |
| 132 | + } |
| 133 | + } |
| 134 | + |
| 135 | + if r.filters.az_id != "" { |
| 136 | + if r.filters.az_id != s.AZId { |
| 137 | + fmt.Printf("Dropped non matching az: %v\n", s.AZId) |
| 138 | + continue |
| 139 | + } |
| 140 | + } |
| 141 | + // Add matching hosts to registration list |
| 142 | + fmt.Printf("selected host for discovery: %v %v\n", fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), s) |
| 143 | + addrs = append(addrs, resolver.Address{Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc)}) |
| 144 | + } |
| 145 | + |
| 146 | + // Shuffle to ensure every host has a different order to iterate through |
| 147 | + r.rand.Shuffle(len(addrs), func(i, j int) { |
| 148 | + addrs[i], addrs[j] = addrs[j], addrs[i] |
| 149 | + }) |
| 150 | + |
| 151 | + // Slice off the first N hosts, optionally |
| 152 | + if r.num_connections > 0 && r.num_connections <= len(addrs) { |
| 153 | + addrs = addrs[0:r.num_connections] |
| 154 | + } |
| 155 | + |
| 156 | + fmt.Printf("Returning discovery: %v\n", addrs) |
| 157 | + |
| 158 | + return &addrs, nil |
| 159 | +} |
| 160 | + |
| 161 | +func (r *resolveJSONGateConfig) start() { |
| 162 | + fmt.Print("Starting discovery checker\n") |
| 163 | + r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) |
| 164 | + |
| 165 | + // Immediately load the initial config |
| 166 | + addrs, err := r.loadConfig() |
| 167 | + if err == nil { |
| 168 | + // if we parse ok, populate the local address store |
| 169 | + r.cc.UpdateState(resolver.State{Addresses: *addrs}) |
| 170 | + } |
| 171 | + |
| 172 | + // Start a config watcher |
| 173 | + r.ticker = time.NewTicker(100 * time.Millisecond) |
| 174 | + fileStat, err := os.Stat(r.jsonPath) |
| 175 | + if err != nil { |
| 176 | + return |
| 177 | + } |
| 178 | + lastLoaded := time.Now() |
| 179 | + |
| 180 | + go func() { |
| 181 | + for range r.ticker.C { |
| 182 | + checkFileStat, err := os.Stat(r.jsonPath) |
| 183 | + isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() |
| 184 | + isNotExpired := time.Since(lastLoaded) < 1*time.Minute |
| 185 | + if isUnchanged && isNotExpired { |
| 186 | + // no change |
| 187 | + continue |
| 188 | + } |
| 189 | + lastLoaded = time.Now() |
| 190 | + |
| 191 | + fileStat = checkFileStat |
| 192 | + fmt.Printf("Detected config change\n") |
| 193 | + |
| 194 | + addrs, err := r.loadConfig() |
| 195 | + if err != nil { |
| 196 | + // better luck next loop |
| 197 | + // TODO: log this |
| 198 | + fmt.Print("oh no\n") |
| 199 | + continue |
| 200 | + } |
| 201 | + |
| 202 | + r.cc.UpdateState(resolver.State{Addresses: *addrs}) |
| 203 | + } |
| 204 | + }() |
| 205 | +} |
| 206 | +func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {} |
| 207 | +func (r *resolveJSONGateConfig) Close() { |
| 208 | + r.ticker.Stop() |
| 209 | +} |
| 210 | + |
| 211 | +func init() { |
| 212 | + // Register the example ResolverBuilder. This is usually done in a package's |
| 213 | + // init() function. |
| 214 | +} |
0 commit comments