@@ -17,28 +17,26 @@ limitations under the License.
17
17
package discovery
18
18
19
19
import (
20
+ "context"
20
21
"fmt"
22
+ "io"
21
23
"math/rand"
22
24
"sort"
23
25
"strings"
24
26
"sync"
25
27
"time"
26
28
27
29
"vitess.io/vitess/go/stats"
28
-
30
+ "vitess.io/vitess/go/vt/grpcclient"
31
+ "vitess.io/vitess/go/vt/log"
32
+ "vitess.io/vitess/go/vt/topo"
29
33
"vitess.io/vitess/go/vt/topo/topoproto"
30
-
31
- vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
32
-
34
+ "vitess.io/vitess/go/vt/vterrors"
33
35
"vitess.io/vitess/go/vt/vttablet/tabletconn"
34
36
35
- "vitess.io/vitess/go/vt/log"
36
-
37
- "context"
38
-
37
+ querypb "vitess.io/vitess/go/vt/proto/query"
39
38
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
40
- "vitess.io/vitess/go/vt/topo"
41
- "vitess.io/vitess/go/vt/vterrors"
39
+ vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
42
40
)
43
41
44
42
type TabletPickerCellPreference int
@@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
291
289
return candidates
292
290
}
293
291
294
- // PickForStreaming picks an available tablet.
292
+ // PickForStreaming picks a tablet that is healthy and serving .
295
293
// Selection is based on CellPreference.
296
294
// See prioritizeTablets for prioritization logic.
297
295
func (tp * TabletPicker ) PickForStreaming (ctx context.Context ) (* topodatapb.Tablet , error ) {
298
- rand .Seed (time .Now ().UnixNano ())
299
- // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
300
- // or the context is canceled
296
+ // Keep trying at intervals (tabletPickerRetryDelay) until a healthy
297
+ // serving tablet is found or the context is cancelled.
301
298
for {
302
299
select {
303
300
case <- ctx .Done ():
@@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
330
327
} else if tp .inOrder {
331
328
candidates = tp .orderByTabletType (candidates )
332
329
} else {
333
- // Randomize candidates
330
+ // Randomize candidates.
334
331
rand .Shuffle (len (candidates ), func (i , j int ) {
335
332
candidates [i ], candidates [j ] = candidates [j ], candidates [i ]
336
333
})
337
334
}
338
335
if len (candidates ) == 0 {
339
- // if no candidates were found, sleep and try again
336
+ // If no viable candidates were found, sleep and try again.
340
337
tp .incNoTabletFoundStat ()
341
- log .Infof ("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds" ,
338
+ log .Infof ("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds. " ,
342
339
tp .keyspace , tp .shard , tp .cells , tp .tabletTypes , float64 (GetTabletPickerRetryDelay ().Milliseconds ())/ 1000.0 )
343
340
timer := time .NewTimer (GetTabletPickerRetryDelay ())
344
341
select {
@@ -349,34 +346,24 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
349
346
}
350
347
continue
351
348
}
352
- for _ , ti := range candidates {
353
- // try to connect to tablet
354
- if conn , err := tabletconn .GetDialer ()(ti .Tablet , true ); err == nil {
355
- // OK to use ctx here because it is not actually used by the underlying Close implementation
356
- _ = conn .Close (ctx )
357
- log .Infof ("tablet picker found tablet %s" , ti .Tablet .String ())
358
- return ti .Tablet , nil
359
- }
360
- // err found
361
- log .Warningf ("unable to connect to tablet for alias %v" , ti .Alias )
362
- }
363
- // Got here? Means we iterated all tablets and did not find a healthy one
364
- tp .incNoTabletFoundStat ()
349
+ log .Infof ("Tablet picker found a healthy serving tablet for streaming: %s" , candidates [0 ].Tablet .String ())
350
+ return candidates [0 ].Tablet , nil
365
351
}
366
352
}
367
353
368
- // GetMatchingTablets returns a list of TabletInfo for tablets
369
- // that match the cells, keyspace, shard and tabletTypes for this TabletPicker
354
+ // GetMatchingTablets returns a list of TabletInfo for healthy
355
+ // serving tablets that match the cells, keyspace, shard and
356
+ // tabletTypes for this TabletPicker.
370
357
func (tp * TabletPicker ) GetMatchingTablets (ctx context.Context ) []* topo.TabletInfo {
371
- // Special handling for PRIMARY tablet type
372
- // Since there is only one primary, we ignore cell and find the primary
358
+ // Special handling for PRIMARY tablet type: since there is only
359
+ // one primary per shard , we ignore cell and find the primary.
373
360
aliases := make ([]* topodatapb.TabletAlias , 0 )
374
361
if len (tp .tabletTypes ) == 1 && tp .tabletTypes [0 ] == topodatapb .TabletType_PRIMARY {
375
362
shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
376
363
defer cancel ()
377
364
si , err := tp .ts .GetShard (shortCtx , tp .keyspace , tp .shard )
378
365
if err != nil {
379
- log .Errorf ("error getting shard %s/%s: %s " , tp .keyspace , tp .shard , err . Error () )
366
+ log .Errorf ("Error getting shard %s/%s: %v " , tp .keyspace , tp .shard , err )
380
367
return nil
381
368
}
382
369
if _ , ignore := tp .ignoreTablets [si .PrimaryAlias .String ()]; ! ignore {
@@ -385,37 +372,37 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
385
372
} else {
386
373
actualCells := make ([]string , 0 )
387
374
for _ , cell := range tp .cells {
388
- // check if cell is actually an alias
389
- // non-blocking read so that this is fast
375
+ // Check if cell is actually an alias; using a
376
+ // non-blocking read so that this is fast.
390
377
shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
391
378
defer cancel ()
392
379
_ , err := tp .ts .GetCellInfo (shortCtx , cell , false )
393
380
if err != nil {
394
- // not a valid cell, check whether it is a cell alias
381
+ // Not a valid cell, check whether it is a cell alias...
395
382
shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
396
383
defer cancel ()
397
384
alias , err := tp .ts .GetCellsAlias (shortCtx , cell , false )
398
- // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
385
+ // If we get an error, either cellAlias doesn't exist or
386
+ // it isn't a cell alias at all; ignore and continue.
399
387
if err == nil {
400
388
actualCells = append (actualCells , alias .Cells ... )
401
389
} else {
402
390
log .Infof ("Unable to resolve cell %s, ignoring" , cell )
403
391
}
404
392
} else {
405
- // valid cell, add it to our list
393
+ // Valid cell, add it to our list.
406
394
actualCells = append (actualCells , cell )
407
395
}
408
396
}
409
397
410
398
for _ , cell := range actualCells {
411
399
shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
412
400
defer cancel ()
413
- // match cell, keyspace and shard
401
+ // Match cell, keyspace, and shard.
414
402
sri , err := tp .ts .GetShardReplication (shortCtx , cell , tp .keyspace , tp .shard )
415
403
if err != nil {
416
404
continue
417
405
}
418
-
419
406
for _ , node := range sri .Nodes {
420
407
if _ , ignore := tp .ignoreTablets [node .TabletAlias .String ()]; ! ignore {
421
408
aliases = append (aliases , node .TabletAlias )
@@ -427,33 +414,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
427
414
if len (aliases ) == 0 {
428
415
return nil
429
416
}
417
+
430
418
shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
431
419
defer cancel ()
432
420
tabletMap , err := tp .ts .GetTabletMap (shortCtx , aliases , nil )
433
421
if err != nil {
434
- log .Warningf ("error fetching tablets from topo: %v" , err )
435
- // If we get a partial result we can still use it, otherwise return
422
+ log .Warningf ("Error fetching tablets from topo: %v" , err )
423
+ // If we get a partial result we can still use it, otherwise return.
436
424
if len (tabletMap ) == 0 {
437
425
return nil
438
426
}
439
427
}
428
+
440
429
tablets := make ([]* topo.TabletInfo , 0 , len (aliases ))
441
430
for _ , tabletAlias := range aliases {
442
431
tabletInfo , ok := tabletMap [topoproto .TabletAliasString (tabletAlias )]
443
432
if ! ok {
444
- // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores
445
- // topo.ErrNoNode). Just log a warning
446
- log .Warningf ("failed to load tablet %v" , tabletAlias )
433
+ // Either tablet disappeared on us, or we got a partial result
434
+ // (GetTabletMap ignores topo.ErrNoNode); just log a warning.
435
+ log .Warningf ("Tablet picker failed to load tablet %v" , tabletAlias )
447
436
} else if topoproto .IsTypeInList (tabletInfo .Type , tp .tabletTypes ) {
448
- tablets = append (tablets , tabletInfo )
437
+ // Try to connect to the tablet and confirm that it's usable.
438
+ if conn , err := tabletconn .GetDialer ()(tabletInfo .Tablet , grpcclient .FailFast (true )); err == nil {
439
+ // Ensure that the tablet is healthy and serving.
440
+ shortCtx , cancel := context .WithTimeout (ctx , topo .RemoteOperationTimeout )
441
+ defer cancel ()
442
+ if err := conn .StreamHealth (shortCtx , func (shr * querypb.StreamHealthResponse ) error {
443
+ if shr != nil && shr .Serving && shr .RealtimeStats != nil && shr .RealtimeStats .HealthError == "" {
444
+ return io .EOF // End the stream
445
+ }
446
+ return vterrors .New (vtrpcpb .Code_INTERNAL , "tablet is not healthy and serving" )
447
+ }); err == nil || err == io .EOF {
448
+ tablets = append (tablets , tabletInfo )
449
+ }
450
+ _ = conn .Close (ctx )
451
+ }
449
452
}
450
453
}
451
454
return tablets
452
455
}
453
456
454
457
func init () {
455
- // TODO(sougou): consolidate this call to be once per process.
456
- rand .Seed (time .Now ().UnixNano ())
457
458
globalTPStats = newTabletPickerStats ()
458
459
}
459
460
0 commit comments