Skip to content

Commit

Permalink
make list of tablet alias instead of string
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <pbibra@slack-corp.com>
  • Loading branch information
pbibra committed Oct 17, 2023
1 parent c052bc2 commit eaad958
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
25 changes: 15 additions & 10 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"math/rand"
"slices"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -136,7 +135,8 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
ignoreTablets []string
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -146,7 +146,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...string,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -222,7 +222,7 @@ func NewTabletPicker(
}
}

return &TabletPicker{
tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
Expand All @@ -231,8 +231,15 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: ignoreTablets,
}, nil
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil

}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
Expand Down Expand Up @@ -420,8 +427,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn

tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletAliasString := topoproto.TabletAliasString(tabletAlias)
tabletInfo, ok := tabletMap[tabletAliasString]
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
if !ok {
// Either tablet disappeared on us, or we got a partial result
// (GetTabletMap ignores topo.ErrNoNode); just log a warning.
Expand All @@ -438,8 +444,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
// If this tablet is not in the ignore list, then add it as a candidate.
if !slices.Contains(tp.ignoreTablets, tabletAliasString) {
if _, ignore := tp.ignoreTablets[tabletAlias.String()]; !ignore {
tablets = append(tablets, tabletInfo)
}
}
Expand Down
7 changes: 3 additions & 4 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -404,14 +403,14 @@ func TestPickWithIgnoreList(t *testing.T) {
defer deleteTablet(t, te, dontWant)

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, topoproto.TabletAliasString(dontWant.GetAlias()))
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias())
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list
// Try it many times to be sure we don't ever pick from the ignore list.
for i := 0; i < 100; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
assert.True(t, proto.Equal(want, tablet), "Pick: %v, want %v", tablet, want)
require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant)
}
}

Expand Down
38 changes: 20 additions & 18 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

// vstreamManager manages vstream requests.
Expand Down Expand Up @@ -475,7 +474,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := []string{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)

errCount := 0
for {
Expand All @@ -494,15 +493,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
cells := vs.getCells()

// Create a child context with a stricter timeout.
tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer tpCancel()

tp, err := discovery.NewTabletPicker(tpCtx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
if err != nil {
log.Errorf(err.Error())
return err
}

// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found
tpCtx, tpCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
Expand Down Expand Up @@ -685,7 +686,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, topoproto.TabletAliasString(tablet.GetAlias()))
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
}

errCount++
Expand All @@ -699,21 +700,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, the second indicates whether
// the tablet on which the error occurred should be ommitted from the candidate list of tablets
// to choose from on the retry.
// The first return value determines if the error can be retried, while the second
// indicates whether the tablet with which the error occurred should be ommitted
// from the candidate list of tablets to choose from on the retry.
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will succeed without the same error.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet,
// omit it from the candidate list in the TabletPicker on retry.
// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}
Expand Down

0 comments on commit eaad958

Please sign in to comment.