Skip to content

Commit df9cbd8

Browse files
dedelalademmer
andcommitted
restructure discovery to do more up front (#315)
* move organization of target hosts to parse time * rework metrics and logging of parse errors * add discovery bits to debug status page * reset parseErr in the right place * add sync and change debug page to do the shuffle * unrelated but just move some code around --------- Co-authored-by: Michael Demmer <mdemmer@slack-corp.com>
1 parent 3d7dee5 commit df9cbd8

File tree

2 files changed

+140
-88
lines changed

2 files changed

+140
-88
lines changed

go/vt/vtgateproxy/discovery.go

Lines changed: 137 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ import (
2323
"io"
2424
"math/rand"
2525
"os"
26+
"sort"
27+
"sync"
2628
"time"
2729

2830
"google.golang.org/grpc/resolver"
2931

3032
"vitess.io/vitess/go/stats"
3133
"vitess.io/vitess/go/vt/log"
34+
"vitess.io/vitess/go/vt/servenv"
3235
)
3336

3437
// File based discovery for vtgate grpc endpoints
@@ -54,14 +57,29 @@ import (
5457
// type: Only select from hosts of this type (required)
5558
//
5659

60+
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
61+
type JSONGateResolver struct {
62+
target resolver.Target
63+
clientConn resolver.ClientConn
64+
poolType string
65+
}
66+
67+
func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {}
68+
69+
func (r *JSONGateResolver) Close() {
70+
log.Infof("Closing resolver for target %s", r.target.URL.String())
71+
}
72+
5773
type JSONGateResolverBuilder struct {
5874
jsonPath string
5975
addressField string
6076
portField string
6177
poolTypeField string
6278
affinityField string
79+
affinityValue string
6380

64-
targets []targetHost
81+
mu sync.RWMutex
82+
targets map[string][]targetHost
6583
resolvers []*JSONGateResolver
6684

6785
rand *rand.Rand
@@ -70,24 +88,14 @@ type JSONGateResolverBuilder struct {
7088
}
7189

7290
type targetHost struct {
73-
addr string
74-
poolType string
75-
affinity string
76-
}
77-
78-
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
79-
type JSONGateResolver struct {
80-
target resolver.Target
81-
clientConn resolver.ClientConn
82-
poolType string
83-
affinity string
91+
Addr string
92+
PoolType string
93+
Affinity string
8494
}
8595

8696
var (
87-
buildCount = stats.NewCounter("JsonDiscoveryBuild", "JSON host discovery rebuilt the host list")
88-
unchangedCount = stats.NewCounter("JsonDiscoveryUnchanged", "JSON host discovery parsed and determined no change to the file")
89-
affinityCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostAffinity", "Count of hosts returned from discovery by AZ affinity", "affinity")
90-
poolTypeCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostPoolType", "Count of hosts returned from discovery by pool type", "type")
97+
parseCount = stats.NewCountersWithSingleLabel("JsonDiscoveryParseCount", "Count of results of JSON host file parsing (changed, unchanged, error)", "result")
98+
targetCount = stats.NewGaugesWithSingleLabel("JsonDiscoveryTargetCount", "Count of hosts returned from discovery by pool type", "pool")
9199
)
92100

93101
func RegisterJSONGateResolver(
@@ -96,13 +104,16 @@ func RegisterJSONGateResolver(
96104
portField string,
97105
poolTypeField string,
98106
affinityField string,
107+
affinityValue string,
99108
) (*JSONGateResolverBuilder, error) {
100109
jsonDiscovery := &JSONGateResolverBuilder{
110+
targets: map[string][]targetHost{},
101111
jsonPath: jsonPath,
102112
addressField: addressField,
103113
portField: portField,
104114
poolTypeField: poolTypeField,
105115
affinityField: affinityField,
116+
affinityValue: affinityValue,
106117
}
107118

108119
resolver.Register(jsonDiscovery)
@@ -113,6 +124,8 @@ func RegisterJSONGateResolver(
113124
return nil, err
114125
}
115126

127+
servenv.AddStatusPart("JSON Discovery", targetsTemplate, jsonDiscovery.debugTargets)
128+
116129
return jsonDiscovery, nil
117130
}
118131

@@ -138,17 +151,19 @@ func (b *JSONGateResolverBuilder) start() error {
138151
poolTypes := map[string]int{}
139152
affinityTypes := map[string]int{}
140153

141-
for _, t := range b.targets {
142-
count := poolTypes[t.poolType]
143-
poolTypes[t.poolType] = count + 1
154+
for _, ts := range b.targets {
155+
for _, t := range ts {
156+
count := poolTypes[t.PoolType]
157+
poolTypes[t.PoolType] = count + 1
144158

145-
count = affinityTypes[t.affinity]
146-
affinityTypes[t.affinity] = count + 1
159+
count = affinityTypes[t.Affinity]
160+
affinityTypes[t.Affinity] = count + 1
161+
}
147162
}
148163

149-
buildCount.Add(1)
164+
parseCount.Add("changed", 1)
150165

151-
log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes)
166+
log.Infof("loaded targets, pool types %v, affinity %s, groups %v", poolTypes, *affinityValue, affinityTypes)
152167

153168
// Start a config watcher
154169
b.ticker = time.NewTicker(1 * time.Second)
@@ -158,10 +173,12 @@ func (b *JSONGateResolverBuilder) start() error {
158173
}
159174

160175
go func() {
176+
var parseErr error
161177
for range b.ticker.C {
162178
checkFileStat, err := os.Stat(b.jsonPath)
163179
if err != nil {
164180
log.Errorf("Error stat'ing config %v\n", err)
181+
parseCount.Add("error", 1)
165182
continue
166183
}
167184
isUnchanged := checkFileStat.Size() == fileStat.Size() && checkFileStat.ModTime() == fileStat.ModTime()
@@ -173,12 +190,20 @@ func (b *JSONGateResolverBuilder) start() error {
173190
fileStat = checkFileStat
174191

175192
contentsChanged, err := b.parse()
176-
if err != nil || !contentsChanged {
177-
unchangedCount.Add(1)
193+
if err != nil {
194+
parseCount.Add("error", 1)
195+
if parseErr == nil || err.Error() != parseErr.Error() {
196+
parseErr = err
197+
log.Error(err)
198+
}
178199
continue
179200
}
180-
181-
buildCount.Add(1)
201+
parseErr = nil
202+
if !contentsChanged {
203+
parseCount.Add("unchanged", 1)
204+
continue
205+
}
206+
parseCount.Add("changed", 1)
182207

183208
// notify all the resolvers that the targets changed
184209
for _, r := range b.resolvers {
@@ -217,7 +242,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
217242
return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err)
218243
}
219244

220-
var targets []targetHost
245+
var targets = map[string][]targetHost{}
221246
for _, host := range hosts {
222247
address, hasAddress := host[b.addressField]
223248
port, hasPort := host[b.portField]
@@ -258,29 +283,46 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
258283
return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port)
259284
}
260285

261-
targets = append(targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)})
286+
target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}
287+
targets[target.PoolType] = append(targets[target.PoolType], target)
288+
}
289+
290+
for poolType := range targets {
291+
if b.affinityField != "" {
292+
sort.Slice(targets[poolType], func(i, j int) bool {
293+
return b.affinityValue == targets[poolType][i].Affinity
294+
})
295+
}
296+
if len(targets[poolType]) > *numConnections {
297+
targets[poolType] = targets[poolType][:*numConnections]
298+
}
299+
targetCount.Set(poolType, int64(len(targets[poolType])))
262300
}
301+
302+
b.mu.Lock()
263303
b.targets = targets
304+
b.mu.Unlock()
264305

265306
return true, nil
266307
}
267308

268-
// Update the current list of hosts for the given resolver
269-
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
270-
271-
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)
309+
func (b *JSONGateResolverBuilder) GetPools() []string {
310+
b.mu.RLock()
311+
defer b.mu.RUnlock()
312+
var pools []string
313+
for pool := range b.targets {
314+
pools = append(pools, pool)
315+
}
316+
sort.Strings(pools)
317+
return pools
318+
}
272319

273-
// filter to only targets that match the pool type. if unset, this will just be a copy
274-
// of the full target list.
320+
func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost {
321+
// Copy the target slice
322+
b.mu.RLock()
275323
targets := []targetHost{}
276-
for _, target := range b.targets {
277-
if r.poolType == target.poolType {
278-
targets = append(targets, target)
279-
log.V(1000).Infof("matched target %v with type %s", target, r.poolType)
280-
} else {
281-
log.V(1000).Infof("skipping host %v with type %s", target, r.poolType)
282-
}
283-
}
324+
targets = append(targets, b.targets[poolType]...)
325+
b.mu.RUnlock()
284326

285327
// Shuffle to ensure every host has a different order to iterate through, putting
286328
// the affinity matching (e.g. same az) hosts at the front and the non-matching ones
@@ -293,7 +335,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
293335
for i := 0; i < n-1; i++ {
294336
j := head + b.rand.Intn(tail-head+1)
295337

296-
if r.affinity == "" || r.affinity == targets[j].affinity {
338+
if *affinityField != "" && *affinityValue == targets[j].Affinity {
297339
targets[head], targets[j] = targets[j], targets[head]
298340
head++
299341
} else {
@@ -302,32 +344,22 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
302344
}
303345
}
304346

305-
// Grab the first N addresses, and voila!
306-
var addrs []resolver.Address
307-
targets = targets[:min(*numConnections, len(targets))]
308-
for _, target := range targets {
309-
addrs = append(addrs, resolver.Address{Addr: target.addr})
310-
}
347+
return targets
348+
}
349+
350+
// Update the current list of hosts for the given resolver
351+
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
352+
353+
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)
311354

312-
// Count some metrics
313-
var unknown, local, remote int64
355+
targets := b.GetTargets(r.poolType)
356+
357+
var addrs []resolver.Address
314358
for _, target := range targets {
315-
if r.affinity == "" {
316-
unknown++
317-
} else if r.affinity == target.affinity {
318-
local++
319-
} else {
320-
remote++
321-
}
359+
addrs = append(addrs, resolver.Address{Addr: target.Addr})
322360
}
323-
if unknown != 0 {
324-
affinityCount.Add("unknown", unknown)
325-
}
326-
affinityCount.Add("local", local)
327-
affinityCount.Add("remote", remote)
328-
poolTypeCount.Add(r.poolType, int64(len(targets)))
329361

330-
log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote)
362+
log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)
331363

332364
r.clientConn.UpdateState(resolver.State{Addresses: addrs})
333365
}
@@ -346,19 +378,12 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
346378
}
347379
}
348380

349-
// Affinity on the other hand is just an optimization
350-
affinity := ""
351-
if b.affinityField != "" {
352-
affinity = attrs.Get(b.affinityField)
353-
}
354-
355-
log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity)
381+
log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, b.affinityValue)
356382

357383
r := &JSONGateResolver{
358384
target: target,
359385
clientConn: cc,
360386
poolType: poolType,
361-
affinity: affinity,
362387
}
363388

364389
b.update(r)
@@ -367,19 +392,44 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
367392
return r, nil
368393
}
369394

370-
func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {}
371-
372-
func (r *JSONGateResolver) Close() {
373-
log.Infof("Closing resolver for target %s", r.target.URL.String())
374-
}
375-
376-
// Utilities
377-
func min(a, b int) int {
378-
if a < b {
379-
return a
395+
// debugTargets will return the builder's targets with a sorted slice of
396+
// poolTypes for rendering debug output
397+
func (b *JSONGateResolverBuilder) debugTargets() any {
398+
pools := b.GetPools()
399+
targets := map[string][]targetHost{}
400+
for pool := range b.targets {
401+
targets[pool] = b.GetTargets(pool)
402+
}
403+
return struct {
404+
Pools []string
405+
Targets map[string][]targetHost
406+
}{
407+
Pools: pools,
408+
Targets: targets,
380409
}
381-
return b
382410
}
383411

384-
func init() {
385-
}
412+
const (
413+
// targetsTemplate is a HTML template to display the gate resolver's target hosts.
414+
targetsTemplate = `
415+
<style>
416+
table {
417+
border-collapse: collapse;
418+
}
419+
td, th {
420+
border: 1px solid #999;
421+
padding: 0.2rem;
422+
}
423+
</style>
424+
<table>
425+
{{range $i, $p := .Pools}} <tr>
426+
<th colspan="2">{{$p}}</th>
427+
</tr>
428+
{{range index $.Targets $p}} <tr>
429+
<td>{{.Addr}}</td>
430+
<td>{{.Affinity}}</td>
431+
</tr>{{end}}
432+
{{end}}
433+
</table>
434+
`
435+
)

go/vt/vtgateproxy/vtgateproxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ var (
4343
vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution")
4444
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
4545
poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts")
46-
affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'")
46+
affinityField = flag.String("affinity_field", "", "Attribute (JSON file) used to specify the routing affinity , e.g. 'az_id'")
47+
affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'")
4748
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
4849
portField = flag.String("port_field", "port", "field name in the json file containing the port")
4950

@@ -194,5 +195,6 @@ func Init() {
194195
*portField,
195196
*poolTypeField,
196197
*affinityField,
198+
*affinityValue,
197199
)
198200
}

0 commit comments

Comments
 (0)