Skip to content

Commit

Permalink
allow tablet picker to exclude specified tablets from its candidate list
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <pbibra@slack-corp.com>
  • Loading branch information
pbibra committed Oct 10, 2023
1 parent 315c48b commit c4e4311
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 18 deletions.
8 changes: 7 additions & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
ignoreTablets map[string]topodatapb.TabletAlias
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -144,6 +145,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets map[string]topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -228,6 +230,7 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: ignoreTablets,
}, nil
}

Expand Down Expand Up @@ -433,7 +436,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
// if this tablet is not in the ignore list, then add it as a candidate
if _, ok := tp.ignoreTablets[tabletInfo.Alias.String()]; !ok {
tablets = append(tablets, tabletInfo)
}
}
_ = conn.Close(ctx)
}
Expand Down
23 changes: 12 additions & 11 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPickPrimary(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"otherCell"}, "cell", te.keyspace, te.shard, "primary", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestPickLocalPreferences(t *testing.T) {
deleteTablet(t, te, tab)
}
}()
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options)
tp, err := NewTabletPicker(ctx, te.topoServ, tcase.inCells, tcase.localCell, te.keyspace, te.shard, tcase.inTabletTypes, tcase.options, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)
require.Equal(t, tp.localCellInfo.localCell, tcase.localCell)
require.ElementsMatch(t, tp.cells, tcase.tpCells)
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPickCellPreferenceLocalCell(t *testing.T) {
defer deleteTablet(t, te, want1)

// Local cell preference is default
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestPickCellPreferenceLocalAlias(t *testing.T) {

// test env puts all cells into an alias called "cella"
te := newPickerTestEnv(t, ctx, []string{"cell", "otherCell"})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

// create a tablet in the other cell, it should be picked
Expand All @@ -370,7 +370,7 @@ func TestPickUsingCellAsAlias(t *testing.T) {
// added to the alias.
te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2", "cell3"}, "xtracell")
// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

// Create a tablet in one of the main cells, it should be
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
want1 := addTablet(ctx, te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)

tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

tablet, err := tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

te := newPickerTestEnv(t, ctx, []string{"cell"})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)

delay := GetTabletPickerRetryDelay()
Expand Down Expand Up @@ -472,10 +472,11 @@ func TestPickErrorLocalPreferenceDefault(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell"})
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{})
var ignoreTablets map[string]topodatapb.TabletAlias
_, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "badtype", TabletPickerOptions{}, ignoreTablets)
assert.EqualError(t, err, "failed to parse list of tablet types: badtype")

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{}, ignoreTablets)
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -503,7 +504,7 @@ func TestPickErrorOnlySpecified(t *testing.T) {

te := newPickerTestEnv(t, ctx, []string{"cell"})

tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"})
tp, err := NewTabletPicker(ctx, te.topoServ, te.cells, "cell", te.keyspace, te.shard, "replica", TabletPickerOptions{CellPreference: "OnlySpecified"}, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)
delay := GetTabletPickerRetryDelay()
defer func() {
Expand Down Expand Up @@ -559,7 +560,7 @@ func TestPickFallbackType(t *testing.T) {
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
tp, err := NewTabletPicker(ctx, te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options, make(map[string]topodatapb.TabletAlias))
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(ctx, 1*time.Second)
defer cancel2()
Expand Down
20 changes: 18 additions & 2 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
var ignoreTablets map[string]topodatapb.TabletAlias

errCount := 0
for {
Expand All @@ -490,7 +491,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions)
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -670,10 +671,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE {
if !vs.isRetriableError(err) {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
ignoreTablets[tablet.Alias.String()] = *tablet.GetAlias()
errCount++
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
Expand All @@ -683,6 +685,20 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

func (vs *vstream) isRetriableError(err error) bool {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE || errCode == vtrpcpb.Code_NOT_FOUND {
return true
}

if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.HasPrefix(err.Error(), "GTIDSet Mismatch") {
return true
}

return false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodata.TabletAlias))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
return nil, err
}
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, cells, ct.vre.cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *tra
if cells == nil {
cells = append(cells, shard.PrimaryAlias.Cell)
}
tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, wr.ts, cells, shard.PrimaryAlias.Cell, keyspace, shard.ShardName(), tabletTypes, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
if err != nil {
allErrors.RecordError(err)
return
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (df *vdiff) selectTablets(ctx context.Context, ts *trafficSwitcher) error {
if ts.ExternalTopo() != nil {
sourceTopo = ts.ExternalTopo()
}
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{})
tp, err := discovery.NewTabletPicker(ctx, sourceTopo, []string{df.sourceCell}, df.sourceCell, df.ts.SourceKeyspaceName(), shard, df.tabletTypesStr, discovery.TabletPickerOptions{}, make(map[string]topodatapb.TabletAlias))
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions tools/rowlog/rowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st
discovery.TabletPickerOptions{
CellPreference: "OnlySpecified",
},
make(map[string]topodatapb.TabletAlias),
)
if err != nil {
return ""
Expand Down

0 comments on commit c4e4311

Please sign in to comment.