@@ -23,12 +23,15 @@ import (
23
23
"io"
24
24
"math/rand"
25
25
"os"
26
+ "sort"
27
+ "sync"
26
28
"time"
27
29
28
30
"google.golang.org/grpc/resolver"
29
31
30
32
"vitess.io/vitess/go/stats"
31
33
"vitess.io/vitess/go/vt/log"
34
+ "vitess.io/vitess/go/vt/servenv"
32
35
)
33
36
34
37
// File based discovery for vtgate grpc endpoints
@@ -54,14 +57,29 @@ import (
54
57
// type: Only select from hosts of this type (required)
55
58
//
56
59
60
+ // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
61
+ type JSONGateResolver struct {
62
+ target resolver.Target
63
+ clientConn resolver.ClientConn
64
+ poolType string
65
+ }
66
+
67
+ func (r * JSONGateResolver ) ResolveNow (o resolver.ResolveNowOptions ) {}
68
+
69
+ func (r * JSONGateResolver ) Close () {
70
+ log .Infof ("Closing resolver for target %s" , r .target .URL .String ())
71
+ }
72
+
57
73
type JSONGateResolverBuilder struct {
58
74
jsonPath string
59
75
addressField string
60
76
portField string
61
77
poolTypeField string
62
78
affinityField string
79
+ affinityValue string
63
80
64
- targets []targetHost
81
+ mu sync.RWMutex
82
+ targets map [string ][]targetHost
65
83
resolvers []* JSONGateResolver
66
84
67
85
rand * rand.Rand
@@ -70,24 +88,14 @@ type JSONGateResolverBuilder struct {
70
88
}
71
89
72
90
type targetHost struct {
73
- addr string
74
- poolType string
75
- affinity string
76
- }
77
-
78
- // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
79
- type JSONGateResolver struct {
80
- target resolver.Target
81
- clientConn resolver.ClientConn
82
- poolType string
83
- affinity string
91
+ Addr string
92
+ PoolType string
93
+ Affinity string
84
94
}
85
95
86
96
var (
87
- buildCount = stats .NewCounter ("JsonDiscoveryBuild" , "JSON host discovery rebuilt the host list" )
88
- unchangedCount = stats .NewCounter ("JsonDiscoveryUnchanged" , "JSON host discovery parsed and determined no change to the file" )
89
- affinityCount = stats .NewCountersWithSingleLabel ("JsonDiscoveryHostAffinity" , "Count of hosts returned from discovery by AZ affinity" , "affinity" )
90
- poolTypeCount = stats .NewCountersWithSingleLabel ("JsonDiscoveryHostPoolType" , "Count of hosts returned from discovery by pool type" , "type" )
97
+ parseCount = stats .NewCountersWithSingleLabel ("JsonDiscoveryParseCount" , "Count of results of JSON host file parsing (changed, unchanged, error)" , "result" )
98
+ targetCount = stats .NewGaugesWithSingleLabel ("JsonDiscoveryTargetCount" , "Count of hosts returned from discovery by pool type" , "pool" )
91
99
)
92
100
93
101
func RegisterJSONGateResolver (
@@ -96,13 +104,16 @@ func RegisterJSONGateResolver(
96
104
portField string ,
97
105
poolTypeField string ,
98
106
affinityField string ,
107
+ affinityValue string ,
99
108
) (* JSONGateResolverBuilder , error ) {
100
109
jsonDiscovery := & JSONGateResolverBuilder {
110
+ targets : map [string ][]targetHost {},
101
111
jsonPath : jsonPath ,
102
112
addressField : addressField ,
103
113
portField : portField ,
104
114
poolTypeField : poolTypeField ,
105
115
affinityField : affinityField ,
116
+ affinityValue : affinityValue ,
106
117
}
107
118
108
119
resolver .Register (jsonDiscovery )
@@ -113,6 +124,8 @@ func RegisterJSONGateResolver(
113
124
return nil , err
114
125
}
115
126
127
+ servenv .AddStatusPart ("JSON Discovery" , targetsTemplate , jsonDiscovery .debugTargets )
128
+
116
129
return jsonDiscovery , nil
117
130
}
118
131
@@ -138,17 +151,19 @@ func (b *JSONGateResolverBuilder) start() error {
138
151
poolTypes := map [string ]int {}
139
152
affinityTypes := map [string ]int {}
140
153
141
- for _ , t := range b .targets {
142
- count := poolTypes [t .poolType ]
143
- poolTypes [t .poolType ] = count + 1
154
+ for _ , ts := range b .targets {
155
+ for _ , t := range ts {
156
+ count := poolTypes [t .PoolType ]
157
+ poolTypes [t .PoolType ] = count + 1
144
158
145
- count = affinityTypes [t .affinity ]
146
- affinityTypes [t .affinity ] = count + 1
159
+ count = affinityTypes [t .Affinity ]
160
+ affinityTypes [t .Affinity ] = count + 1
161
+ }
147
162
}
148
163
149
- buildCount .Add (1 )
164
+ parseCount .Add ("changed" , 1 )
150
165
151
- log .Infof ("loaded %d targets, pool types %v, affinity groups %v" , len ( b . targets ), poolTypes , affinityTypes )
166
+ log .Infof ("loaded targets, pool types %v, affinity %s, groups %v" , poolTypes , * affinityValue , affinityTypes )
152
167
153
168
// Start a config watcher
154
169
b .ticker = time .NewTicker (1 * time .Second )
@@ -158,10 +173,12 @@ func (b *JSONGateResolverBuilder) start() error {
158
173
}
159
174
160
175
go func () {
176
+ var parseErr error
161
177
for range b .ticker .C {
162
178
checkFileStat , err := os .Stat (b .jsonPath )
163
179
if err != nil {
164
180
log .Errorf ("Error stat'ing config %v\n " , err )
181
+ parseCount .Add ("error" , 1 )
165
182
continue
166
183
}
167
184
isUnchanged := checkFileStat .Size () == fileStat .Size () && checkFileStat .ModTime () == fileStat .ModTime ()
@@ -173,12 +190,20 @@ func (b *JSONGateResolverBuilder) start() error {
173
190
fileStat = checkFileStat
174
191
175
192
contentsChanged , err := b .parse ()
176
- if err != nil || ! contentsChanged {
177
- unchangedCount .Add (1 )
193
+ if err != nil {
194
+ parseCount .Add ("error" , 1 )
195
+ if parseErr == nil || err .Error () != parseErr .Error () {
196
+ parseErr = err
197
+ log .Error (err )
198
+ }
178
199
continue
179
200
}
180
-
181
- buildCount .Add (1 )
201
+ parseErr = nil
202
+ if ! contentsChanged {
203
+ parseCount .Add ("unchanged" , 1 )
204
+ continue
205
+ }
206
+ parseCount .Add ("changed" , 1 )
182
207
183
208
// notify all the resolvers that the targets changed
184
209
for _ , r := range b .resolvers {
@@ -217,7 +242,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
217
242
return false , fmt .Errorf ("error parsing JSON discovery file %s: %v" , b .jsonPath , err )
218
243
}
219
244
220
- var targets [] targetHost
245
+ var targets = map [ string ][] targetHost {}
221
246
for _ , host := range hosts {
222
247
address , hasAddress := host [b .addressField ]
223
248
port , hasPort := host [b .portField ]
@@ -258,29 +283,46 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
258
283
return false , fmt .Errorf ("error parsing JSON discovery file %s: port field %s has invalid value %v" , b .jsonPath , b .portField , port )
259
284
}
260
285
261
- targets = append (targets , targetHost {fmt .Sprintf ("%s:%s" , address , port ), poolType .(string ), affinity .(string )})
286
+ target := targetHost {fmt .Sprintf ("%s:%s" , address , port ), poolType .(string ), affinity .(string )}
287
+ targets [target .PoolType ] = append (targets [target .PoolType ], target )
288
+ }
289
+
290
+ for poolType := range targets {
291
+ if b .affinityField != "" {
292
+ sort .Slice (targets [poolType ], func (i , j int ) bool {
293
+ return b .affinityValue == targets [poolType ][i ].Affinity
294
+ })
295
+ }
296
+ if len (targets [poolType ]) > * numConnections {
297
+ targets [poolType ] = targets [poolType ][:* numConnections ]
298
+ }
299
+ targetCount .Set (poolType , int64 (len (targets [poolType ])))
262
300
}
301
+
302
+ b .mu .Lock ()
263
303
b .targets = targets
304
+ b .mu .Unlock ()
264
305
265
306
return true , nil
266
307
}
267
308
268
- // Update the current list of hosts for the given resolver
269
- func (b * JSONGateResolverBuilder ) update (r * JSONGateResolver ) {
270
-
271
- log .V (100 ).Infof ("resolving target %s to %d connections\n " , r .target .URL .String (), * numConnections )
309
+ func (b * JSONGateResolverBuilder ) GetPools () []string {
310
+ b .mu .RLock ()
311
+ defer b .mu .RUnlock ()
312
+ var pools []string
313
+ for pool := range b .targets {
314
+ pools = append (pools , pool )
315
+ }
316
+ sort .Strings (pools )
317
+ return pools
318
+ }
272
319
273
- // filter to only targets that match the pool type. if unset, this will just be a copy
274
- // of the full target list.
320
+ func (b * JSONGateResolverBuilder ) GetTargets (poolType string ) []targetHost {
321
+ // Copy the target slice
322
+ b .mu .RLock ()
275
323
targets := []targetHost {}
276
- for _ , target := range b .targets {
277
- if r .poolType == target .poolType {
278
- targets = append (targets , target )
279
- log .V (1000 ).Infof ("matched target %v with type %s" , target , r .poolType )
280
- } else {
281
- log .V (1000 ).Infof ("skipping host %v with type %s" , target , r .poolType )
282
- }
283
- }
324
+ targets = append (targets , b .targets [poolType ]... )
325
+ b .mu .RUnlock ()
284
326
285
327
// Shuffle to ensure every host has a different order to iterate through, putting
286
328
// the affinity matching (e.g. same az) hosts at the front and the non-matching ones
@@ -293,7 +335,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
293
335
for i := 0 ; i < n - 1 ; i ++ {
294
336
j := head + b .rand .Intn (tail - head + 1 )
295
337
296
- if r . affinity == "" || r . affinity == targets [j ].affinity {
338
+ if * affinityField != "" && * affinityValue == targets [j ].Affinity {
297
339
targets [head ], targets [j ] = targets [j ], targets [head ]
298
340
head ++
299
341
} else {
@@ -302,32 +344,22 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
302
344
}
303
345
}
304
346
305
- // Grab the first N addresses, and voila!
306
- var addrs []resolver.Address
307
- targets = targets [:min (* numConnections , len (targets ))]
308
- for _ , target := range targets {
309
- addrs = append (addrs , resolver.Address {Addr : target .addr })
310
- }
347
+ return targets
348
+ }
349
+
350
+ // Update the current list of hosts for the given resolver
351
+ func (b * JSONGateResolverBuilder ) update (r * JSONGateResolver ) {
352
+
353
+ log .V (100 ).Infof ("resolving target %s to %d connections\n " , r .target .URL .String (), * numConnections )
311
354
312
- // Count some metrics
313
- var unknown , local , remote int64
355
+ targets := b .GetTargets (r .poolType )
356
+
357
+ var addrs []resolver.Address
314
358
for _ , target := range targets {
315
- if r .affinity == "" {
316
- unknown ++
317
- } else if r .affinity == target .affinity {
318
- local ++
319
- } else {
320
- remote ++
321
- }
359
+ addrs = append (addrs , resolver.Address {Addr : target .Addr })
322
360
}
323
- if unknown != 0 {
324
- affinityCount .Add ("unknown" , unknown )
325
- }
326
- affinityCount .Add ("local" , local )
327
- affinityCount .Add ("remote" , remote )
328
- poolTypeCount .Add (r .poolType , int64 (len (targets )))
329
361
330
- log .V (100 ).Infof ("updated targets for %s to %v (local %d / remote %d) " , r .target .URL .String (), targets , local , remote )
362
+ log .V (100 ).Infof ("updated targets for %s to %v" , r .target .URL .String (), targets )
331
363
332
364
r .clientConn .UpdateState (resolver.State {Addresses : addrs })
333
365
}
@@ -346,19 +378,12 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
346
378
}
347
379
}
348
380
349
- // Affinity on the other hand is just an optimization
350
- affinity := ""
351
- if b .affinityField != "" {
352
- affinity = attrs .Get (b .affinityField )
353
- }
354
-
355
- log .V (100 ).Infof ("Start discovery for target %v poolType %s affinity %s\n " , target .URL .String (), poolType , affinity )
381
+ log .V (100 ).Infof ("Start discovery for target %v poolType %s affinity %s\n " , target .URL .String (), poolType , b .affinityValue )
356
382
357
383
r := & JSONGateResolver {
358
384
target : target ,
359
385
clientConn : cc ,
360
386
poolType : poolType ,
361
- affinity : affinity ,
362
387
}
363
388
364
389
b .update (r )
@@ -367,19 +392,44 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
367
392
return r , nil
368
393
}
369
394
370
- func (r * JSONGateResolver ) ResolveNow (o resolver.ResolveNowOptions ) {}
371
-
372
- func (r * JSONGateResolver ) Close () {
373
- log .Infof ("Closing resolver for target %s" , r .target .URL .String ())
374
- }
375
-
376
- // Utilities
377
- func min (a , b int ) int {
378
- if a < b {
379
- return a
395
+ // debugTargets will return the builder's targets with a sorted slice of
396
+ // poolTypes for rendering debug output
397
+ func (b * JSONGateResolverBuilder ) debugTargets () any {
398
+ pools := b .GetPools ()
399
+ targets := map [string ][]targetHost {}
400
+ for pool := range b .targets {
401
+ targets [pool ] = b .GetTargets (pool )
402
+ }
403
+ return struct {
404
+ Pools []string
405
+ Targets map [string ][]targetHost
406
+ }{
407
+ Pools : pools ,
408
+ Targets : targets ,
380
409
}
381
- return b
382
410
}
383
411
384
- func init () {
385
- }
412
+ const (
413
+ // targetsTemplate is a HTML template to display the gate resolver's target hosts.
414
+ targetsTemplate = `
415
+ <style>
416
+ table {
417
+ border-collapse: collapse;
418
+ }
419
+ td, th {
420
+ border: 1px solid #999;
421
+ padding: 0.2rem;
422
+ }
423
+ </style>
424
+ <table>
425
+ {{range $i, $p := .Pools}} <tr>
426
+ <th colspan="2">{{$p}}</th>
427
+ </tr>
428
+ {{range index $.Targets $p}} <tr>
429
+ <td>{{.Addr}}</td>
430
+ <td>{{.Affinity}}</td>
431
+ </tr>{{end}}
432
+ {{end}}
433
+ </table>
434
+ `
435
+ )
0 commit comments