diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index b95e08120f2..e3d04b3de9f 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "math/rand" - "slices" "sort" "strings" "sync" @@ -136,7 +135,8 @@ type TabletPicker struct { inOrder bool cellPref TabletPickerCellPreference localCellInfo localCellInfo - ignoreTablets []string + // This map is keyed on the results of TabletAlias.String(). + ignoreTablets map[string]struct{} } // NewTabletPicker returns a TabletPicker. @@ -146,7 +146,7 @@ func NewTabletPicker( cells []string, localCell, keyspace, shard, tabletTypesStr string, options TabletPickerOptions, - ignoreTablets ...string, + ignoreTablets ...*topodatapb.TabletAlias, ) (*TabletPicker, error) { // Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted. if tabletTypesStr == "" { @@ -222,7 +222,7 @@ func NewTabletPicker( } } - return &TabletPicker{ + tp := &TabletPicker{ ts: ts, cells: dedupeCells(cells), localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap}, @@ -231,8 +231,15 @@ func NewTabletPicker( tabletTypes: tabletTypes, inOrder: inOrder, cellPref: cellPref, - ignoreTablets: ignoreTablets, - }, nil + ignoreTablets: make(map[string]struct{}, len(ignoreTablets)), + } + + for _, ignoreTablet := range ignoreTablets { + tp.ignoreTablets[ignoreTablet.String()] = struct{}{} + } + + return tp, nil + } // dedupeCells is used to remove duplicates in the cell list in case it is passed in @@ -420,8 +427,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { - tabletAliasString := topoproto.TabletAliasString(tabletAlias) - tabletInfo, ok := tabletMap[tabletAliasString] + tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { // Either tablet disappeared on us, or we got a partial result // (GetTabletMap ignores topo.ErrNoNode); just log a warning. @@ -438,8 +444,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") }); err == nil || err == io.EOF { - // If this tablet is not in the ignore list, then add it as a candidate. - if !slices.Contains(tp.ignoreTablets, tabletAliasString) { + if _, ignore := tp.ignoreTablets[tabletAlias.String()]; !ignore { tablets = append(tablets, tabletInfo) } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 9191d161626..ac822124d58 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/topo/topoproto" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -404,14 +403,14 @@ func TestPickWithIgnoreList(t *testing.T) { defer deleteTablet(t, te, dontWant) // Specify the alias as the cell. - tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias())) + tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias()) require.NoError(t, err) - // Try it many times to be sure we don't ever pick from the ignore list + // Try it many times to be sure we don't ever pick from the ignore list. for i := 0; i < 100; i++ { tablet, err := tp.PickForStreaming(ctx) require.NoError(t, err) - assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want) + require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant) } } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 734dfaf6f2e..b31d5e7f408 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -27,18 +27,17 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" ) // vstreamManager manages vstream requests. @@ -475,7 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // journalDone is assigned a channel when a journal event is encountered. // It will be closed when all journal events converge. var journalDone chan struct{} - ignoreTablets := []string{} + ignoreTablets := make([]*topodatapb.TabletAlias, 0) errCount := 0 for { @@ -494,15 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - // Create a child context with a stricter timeout. - tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) - defer tpCancel() - - tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) if err != nil { log.Errorf(err.Error()) return err } + + // Create a child context with a stricter timeout when picking a tablet. + // This will prevent hanging in the case no tablets are found + tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer tpCancel() + tablet, err := tp.PickForStreaming(tpCtx) if err != nil { log.Errorf(err.Error()) @@ -685,7 +686,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } if ignoreTablet { - ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias())) + ignoreTablets = append(ignoreTablets, tablet.GetAlias()) } errCount++ @@ -699,12 +700,13 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } // shouldRetry determines whether we should exit immediately or retry the vstream. -// The first return value determines if the error can be retried, the second indicates whether -// the tablet on which the error occurred should be ommitted from the candidate list of tablets -// to choose from on the retry. +// The first return value determines if the error can be retried, while the second +// indicates whether the tablet with which the error occurred should be ommitted +// from the candidate list of tablets to choose from on the retry. // // An error should be retried if it is expected to be transient. -// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error. +// A tablet should be ignored upon retry if it's likely another tablet will not +// produce the same error. func (vs *vstream) shouldRetry(err error) (bool, bool) { errCode := vterrors.Code(err) @@ -712,8 +714,8 @@ func (vs *vstream) shouldRetry(err error) (bool, bool) { return true, false } - // If there is a GTIDSet Mismatch on the tablet, - // omit it from the candidate list in the TabletPicker on retry. + // If there is a GTIDSet Mismatch on the tablet, omit it from the candidate + // list in the TabletPicker on retry. if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") { return true, true }