From e21588ebf020ed4e45914d94ec1098279e215e46 Mon Sep 17 00:00:00 2001 From: twthorn Date: Mon, 12 Aug 2024 13:05:07 -0400 Subject: [PATCH 1/2] VReplication: Make Source Tablet Selection More Robust (#13582) Signed-off-by: Matt Lord Signed-off-by: twthorn --- go/vt/discovery/tablet_picker.go | 101 ++++---- go/vt/discovery/tablet_picker_test.go | 65 +++++- .../tabletmanager/vreplication/controller.go | 67 ++++-- .../tabletmanager/vreplication/engine.go | 2 +- .../tabletmanager/vreplication/stats.go | 12 +- .../tabletmanager/vreplication/stats_test.go | 17 +- go/vt/wrangler/fake_tablet_test.go | 25 +- go/vt/wrangler/traffic_switcher_env_test.go | 219 ++++++++++++++++++ go/vt/wrangler/traffic_switcher_test.go | 26 +-- go/vt/wrangler/vdiff_env_test.go | 16 ++ go/vt/wrangler/wrangler.go | 3 + 11 files changed, 447 insertions(+), 106 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index c9537d3851e..167708c5e2d 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,7 +17,9 @@ limitations under the License. package discovery import ( + "context" "fmt" + "io" "math/rand" "sort" "strings" @@ -25,20 +27,16 @@ import ( "time" "vitess.io/vitess/go/stats" - + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/log" - - "context" - + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type TabletPickerCellPreference int @@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo return candidates } -// PickForStreaming picks an available tablet. +// PickForStreaming picks a tablet that is healthy and serving. // Selection is based on CellPreference. // See prioritizeTablets for prioritization logic. func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - rand.Seed(time.Now().UnixNano()) - // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found - // or the context is canceled + // Keep trying at intervals (tabletPickerRetryDelay) until a healthy + // serving tablet is found or the context is cancelled. for { select { case <-ctx.Done(): @@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } else if tp.inOrder { candidates = tp.orderByTabletType(candidates) } else { - // Randomize candidates + // Randomize candidates. rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) } if len(candidates) == 0 { - // if no candidates were found, sleep and try again + // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds", + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { @@ -349,34 +346,24 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - for _, ti := range candidates { - // try to connect to tablet - if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil - } - // err found - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - } - // Got here? Means we iterated all tablets and did not find a healthy one - tp.incNoTabletFoundStat() + log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String()) + return candidates[0].Tablet, nil } } -// GetMatchingTablets returns a list of TabletInfo for tablets -// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +// GetMatchingTablets returns a list of TabletInfo for healthy +// serving tablets that match the cells, keyspace, shard and +// tabletTypes for this TabletPicker. func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { - // Special handling for PRIMARY tablet type - // Since there is only one primary, we ignore cell and find the primary + // Special handling for PRIMARY tablet type: since there is only + // one primary per shard, we ignore cell and find the primary. aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error()) + log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err) return nil } if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore { @@ -385,24 +372,25 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn } else { actualCells := make([]string, 0) for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast + // Check if cell is actually an alias; using a + // non-blocking read so that this is fast. shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() _, err := tp.ts.GetCellInfo(shortCtx, cell, false) if err != nil { - // not a valid cell, check whether it is a cell alias + // Not a valid cell, check whether it is a cell alias... shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) - // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + // If we get an error, either cellAlias doesn't exist or + // it isn't a cell alias at all; ignore and continue. if err == nil { actualCells = append(actualCells, alias.Cells...) } else { log.Infof("Unable to resolve cell %s, ignoring", cell) } } else { - // valid cell, add it to our list + // Valid cell, add it to our list. actualCells = append(actualCells, cell) } } @@ -410,12 +398,11 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - // match cell, keyspace and shard + // Match cell, keyspace, and shard. sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) if err != nil { continue } - for _, node := range sri.Nodes { if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore { aliases = append(aliases, node.TabletAlias) @@ -427,33 +414,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if len(aliases) == 0 { return nil } + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) if err != nil { - log.Warningf("error fetching tablets from topo: %v", err) - // If we get a partial result we can still use it, otherwise return + log.Warningf("Error fetching tablets from topo: %v", err) + // If we get a partial result we can still use it, otherwise return. if len(tabletMap) == 0 { return nil } } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) for _, tabletAlias := range aliases { tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] if !ok { - // Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores - // topo.ErrNoNode). Just log a warning - log.Warningf("failed to load tablet %v", tabletAlias) + // Either tablet disappeared on us, or we got a partial result + // (GetTabletMap ignores topo.ErrNoNode); just log a warning. + log.Warningf("Tablet picker failed to load tablet %v", tabletAlias) } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { - tablets = append(tablets, tabletInfo) + // Try to connect to the tablet and confirm that it's usable. + if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil { + // Ensure that the tablet is healthy and serving. + shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { + if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" { + return io.EOF // End the stream + } + return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") + }); err == nil || err == io.EOF { + tablets = append(tablets, tabletInfo) + } + _ = conn.Close(ctx) + } } } return tablets } func init() { - // TODO(sougou): consolidate this call to be once per process. - rand.Seed(time.Now().UnixNano()) globalTPStats = newTabletPickerStats() } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index fd2c1635359..91b936303df 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -22,10 +22,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) func TestPickPrimary(t *testing.T) { @@ -503,6 +504,45 @@ func TestPickErrorOnlySpecified(t *testing.T) { require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0)) } +// TestPickFallbackType tests that when providing a list of tablet types to +// pick from, with the list in preference order, that when the primary/first +// type has no available healthy serving tablets that we select a healthy +// serving tablet from the secondary/second type. +func TestPickFallbackType(t *testing.T) { + cells := []string{"cell1", "cell2"} + localCell := cells[0] + tabletTypes := "replica,primary" + options := TabletPickerOptions{ + TabletOrder: "InOrder", + } + te := newPickerTestEnv(t, cells) + + // This one should be selected even though it's the secondary type + // as it is healthy and serving. + primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Replica tablet should not be selected as it is unhealthy. + replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false) + defer deleteTablet(t, te, replicaTablet) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel2() + tablet, err := tp.PickForStreaming(ctx2) + require.NoError(t, err) + assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -551,18 +591,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell err := te.topoServ.CreateTablet(context.Background(), tablet) require.NoError(te.t, err) + shr := &querypb.StreamHealthResponse{ + Serving: serving, + Target: &querypb.Target{ + Keyspace: te.keyspace, + Shard: te.shard, + TabletType: tabletType, + }, + RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"}, + } if healthy { - _ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{ - Serving: serving, - Target: &querypb.Target{ - Keyspace: te.keyspace, - Shard: te.shard, - TabletType: tabletType, - }, - RealtimeStats: &querypb.RealtimeStats{HealthError: ""}, - }) + shr.RealtimeStats.HealthError = "" } + _ = createFixedHealthConn(tablet, shr) + return tablet } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 0eed7de71ee..31fbefedeb2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "time" "google.golang.org/protobuf/encoding/prototext" @@ -29,7 +30,6 @@ import ( "context" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" @@ -59,11 +59,18 @@ type controller struct { done chan struct{} // The following fields are updated after start. So, they need synchronization. - sourceTablet sync2.AtomicString + sourceTablet atomic.Value lastWorkflowError *lastError } +const ( + // How many times to retry tablet selection before we + // give up and return an error message that the user + // can see and act upon if needed. + tabletPickerRetries = 5 +) + // newController creates a new controller. Unless a stream is explicitly 'Stopped', // this function launches a goroutine to perform continuous vreplication. func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats, vre *Engine) (*controller, error) { @@ -79,6 +86,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor done: make(chan struct{}), source: &binlogdatapb.BinlogSource{}, } + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) // id @@ -173,7 +181,7 @@ func (ct *controller) run(ctx context.Context) { func (ct *controller) runBlp(ctx context.Context) (err error) { defer func() { - ct.sourceTablet.Set("") + ct.sourceTablet.Store(&topodatapb.TabletAlias{}) if x := recover(); x != nil { log.Errorf("stream %v: caught panic: %v\n%s", ct.id, x, tb.Stack(4)) err = fmt.Errorf("panic: %v", x) @@ -203,23 +211,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { } defer dbClient.Close() - var tablet *topodatapb.Tablet - if ct.source.GetExternalMysql() == "" { - log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id) - tablet, err = ct.tabletPicker.PickForStreaming(ctx) - if err != nil { - select { - case <-ctx.Done(): - default: - ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) - ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) - } - return err - } - ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) - log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String()) - ct.sourceTablet.Set(tablet.Alias.String()) + tablet, err := ct.pickSourceTablet(ctx, dbClient) + if err != nil { + return err } + switch { case len(ct.source.Tables) > 0: // Table names can have search patterns. Resolve them against the schema. @@ -268,8 +264,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { vr := newVReplicator(ct.id, ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre) err = vr.Replicate(ctx) ct.lastWorkflowError.record(err) + // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted beyond the retry limit (maxTimeToRetryError) + // we cannot identify this as non-recoverable, but it has persisted + // beyond the retry limit (maxTimeToRetryError). if isUnrecoverableError(err) || !ct.lastWorkflowError.shouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogplayer.BlpError, err.Error()); errSetState != nil { @@ -294,6 +292,35 @@ func (ct *controller) setMessage(dbClient binlogplayer.DBClient, message string) } return nil } + +// pickSourceTablet picks a healthy serving tablet to source for +// the vreplication stream. If the source is marked as external, it +// returns nil. +func (ct *controller) pickSourceTablet(ctx context.Context, dbClient binlogplayer.DBClient) (*topodatapb.Tablet, error) { + if ct.source.GetExternalMysql() != "" { + return nil, nil + } + log.Infof("Trying to find an eligible source tablet for vreplication stream id %d for workflow: %s", + ct.id, ct.workflow) + tpCtx, tpCancel := context.WithTimeout(ctx, discovery.GetTabletPickerRetryDelay()*tabletPickerRetries) + defer tpCancel() + tablet, err := ct.tabletPicker.PickForStreaming(tpCtx) + if err != nil { + select { + case <-ctx.Done(): + default: + ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1) + ct.setMessage(dbClient, fmt.Sprintf("Error picking tablet: %s", err.Error())) + } + return tablet, err + } + ct.setMessage(dbClient, fmt.Sprintf("Picked source tablet: %s", tablet.Alias.String())) + log.Infof("Found eligible source tablet %s for vreplication stream id %d for workflow %s", + tablet.Alias.String(), ct.id, ct.workflow) + ct.sourceTablet.Store(tablet.Alias) + return tablet, err +} + func (ct *controller) Stop() { ct.cancel() <-ct.done diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index afce87aa630..ef0d1376857 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -739,7 +739,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { return fmt.Errorf("unexpected result: %v", qr) } - // When err is not nil then we got a retryable error and will loop again + // When err is not nil then we got a retryable error and will loop again. if err == nil { current, dcerr := binlogplayer.DecodePosition(qr.Rows[0][0].ToString()) if dcerr != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index c8c242bab05..727d32b9d8d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -27,6 +27,8 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/servenv" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -142,7 +144,10 @@ func (st *vrStats) register() { defer st.mu.Unlock() result := make(map[string]string, len(st.controllers)) for _, ct := range st.controllers { - result[fmt.Sprintf("%v", ct.id)] = ct.sourceTablet.Get() + ta := ct.sourceTablet.Load() + if ta != nil { + result[fmt.Sprintf("%v", ct.id)] = ta.(*topodatapb.TabletAlias).String() + } } return result })) @@ -394,8 +399,7 @@ func (st *vrStats) status() *EngineStatus { ReplicationLagSeconds: ct.blpStats.ReplicationLagSeconds.Get(), Counts: ct.blpStats.Timings.Counts(), Rates: ct.blpStats.Rates.Get(), - State: ct.blpStats.State.Get(), - SourceTablet: ct.sourceTablet.Get(), + SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), @@ -427,7 +431,7 @@ type ControllerStatus struct { Counts map[string]int64 Rates map[string][]float64 State string - SourceTablet string + SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 PhaseTimings map[string]int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index 2accc3cfa24..b63583e57ee 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -28,6 +28,8 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/proto/binlogdata" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var wantOut = ` @@ -107,8 +109,14 @@ func TestStatusHtml(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") - testStats.controllers[2].sourceTablet.Set("src2") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) + testStats.controllers[2].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 02, + }) close(testStats.controllers[2].done) tpl := template.Must(template.New("test").Parse(vreplicationTemplate)) @@ -135,7 +143,10 @@ func TestVReplicationStats(t *testing.T) { done: make(chan struct{}), }, } - testStats.controllers[1].sourceTablet.Set("src1") + testStats.controllers[1].sourceTablet.Store(&topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 01, + }) sleepTime := 1 * time.Millisecond record := func(phase string) { diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 9fdb6e616a1..254e1813d8d 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -32,6 +32,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -48,6 +50,12 @@ import ( _ "vitess.io/vitess/go/vt/vttablet/grpctabletconn" ) +func init() { + // Ensure we will use the right protocol (gRPC) in all unit tests. + tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") + tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +} + // This file was copied from testlib. All tests from testlib should be moved // to the current directory. In order to move tests from there, we have to // remove the circular dependency it causes (through vtctl dependence). @@ -81,6 +89,8 @@ type fakeTablet struct { StartHTTPServer bool HTTPListener net.Listener HTTPServer *http.Server + + queryservice.QueryService } // TabletOption is an interface for changing tablet parameters. @@ -141,6 +151,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy Tablet: tablet, FakeMysqlDaemon: fakeMysqlDaemon, RPCServer: grpc.NewServer(), + QueryService: fakes.ErrorQueryService, } } @@ -238,8 +249,14 @@ func (ft *fakeTablet) Target() querypb.Target { } } -func init() { - // enforce we will use the right protocol (gRPC) in all unit tests - tabletconntest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") - tmclienttest.SetProtocol("go.vt.wrangler.fake_tablet_test", "grpc") +func (ft *fakeTablet) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error { + return callback(&querypb.StreamHealthResponse{ + Serving: true, + Target: &querypb.Target{ + Keyspace: ft.Tablet.Keyspace, + Shard: ft.Tablet.Shard, + TabletType: ft.Tablet.Type, + }, + RealtimeStats: &querypb.RealtimeStats{}, + }) } diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 02eb5afb377..cf19cb5353c 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -18,9 +18,14 @@ package wrangler import ( "fmt" + "math/rand" + "sync" "testing" "time" + "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/mysql/fakesqldb" @@ -30,6 +35,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -39,6 +45,9 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -74,6 +83,7 @@ type testMigraterEnv struct { sourceKeyRanges []*topodatapb.KeyRange targetKeyRanges []*topodatapb.KeyRange tmeDB *fakesqldb.DB + mu sync.Mutex } // testShardMigraterEnv has some convenience functions for adding expected queries. @@ -135,6 +145,19 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschemapb.Vindex{ @@ -260,6 +283,169 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, return tme } +// newTestTablePartialMigrater creates a test tablet migrater +// specifially for partial or shard by shard migrations. +// The shards must be the same on the source and target, and we +// must be moving a subset of them. +// fmtQuery should be of the form: 'select a, b %s group by a'. +// The test will Sprintf a from clause and where clause as needed. +func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv { + require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces") + tme := &testMigraterEnv{} + tme.ts = memorytopo.NewServer("cell1", "cell2") + tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient()) + tme.wr.sem = semaphore.NewWeighted(1) + tme.sourceShards = shards + tme.targetShards = shards + tme.tmeDB = fakesqldb.New(t) + expectVDiffQueries(tme.tmeDB) + tabletID := 10 + for _, shard := range tme.sourceShards { + tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard))) + tabletID += 10 + + _, sourceKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) + } + tpChoiceTablet := tme.sourcePrimaries[0].Tablet + tpChoice = &testTabletPickerChoice{ + keyspace: tpChoiceTablet.Keyspace, + shard: tpChoiceTablet.Shard, + } + for _, shard := range tme.targetShards { + tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard))) + tabletID += 10 + + _, targetKeyRange, err := topo.ValidateShardName(shard) + if err != nil { + t.Fatal(err) + } + tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) + } + + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "c1", + Name: "hash", + }}, + }, + }, + } + err := tme.ts.SaveVSchema(ctx, "ks1", vs) + require.NoError(t, err) + err = tme.ts.SaveVSchema(ctx, "ks2", vs) + require.NoError(t, err) + err = tme.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false) + require.NoError(t, err) + err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false) + require.NoError(t, err) + + tme.startTablets(t) + tme.createDBClients(ctx, t) + tme.setPrimaryPositions() + now := time.Now().Unix() + + for i, shard := range shards { + for _, shardToMove := range shardsToMove { + var streamInfoRows []string + var streamExtInfoRows []string + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks1", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) + } + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...)) + tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"), + streamExtInfoRows...)) + } + } + + for i, shard := range shards { + for _, shardToMove := range shardsToMove { + var streamInfoRows []string + if shardToMove == shard { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks2", + Shard: shard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)), + }}, + }, + } + streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + } + tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + streamInfoRows...), + ) + } + } + + tme.targetKeyspace = "ks2" + return tme +} + func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv { tme := &testShardMigraterEnv{} tme.ts = memorytopo.NewServer("cell1", "cell2") @@ -296,6 +482,19 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange) } + dialerName := fmt.Sprintf("TrafficSwitcherTest-%s-%d", t.Name(), rand.Intn(1000000000)) + tabletconn.RegisterDialer(dialerName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { + tme.mu.Lock() + defer tme.mu.Unlock() + for _, ft := range append(tme.sourcePrimaries, tme.targetPrimaries...) { + if ft.Tablet.Alias.Uid == tablet.Alias.Uid { + return ft, nil + } + } + return nil, nil + }) + tabletconntest.SetProtocol("go.vt.wrangler.traffic_switcher_env_test", dialerName) + vs := &vschemapb.Keyspace{ Sharded: true, Vindexes: map[string]*vschema.Vindex{ @@ -394,6 +593,8 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe } func (tme *testMigraterEnv) startTablets(t *testing.T) { + tme.mu.Lock() + defer tme.mu.Unlock() allPrimarys := append(tme.sourcePrimaries, tme.targetPrimaries...) for _, primary := range allPrimarys { primary.StartActionLoop(t, tme.wr) @@ -428,6 +629,8 @@ func (tme *testMigraterEnv) stopTablets(t *testing.T) { } func (tme *testMigraterEnv) createDBClients(ctx context.Context, t *testing.T) { + tme.mu.Lock() + defer tme.mu.Unlock() for _, primary := range tme.sourcePrimaries { dbclient := newFakeDBClient(primary.Tablet.Alias.String()) tme.dbSourceClients = append(tme.dbSourceClients, dbclient) @@ -592,3 +795,19 @@ func (tme *testShardMigraterEnv) expectNoPreviousJournals() { dbclient.addQueryRE(tsCheckJournals, &sqltypes.Result{}, nil) } } + +func (tme *testMigraterEnv) close(t *testing.T) { + tme.mu.Lock() + defer tme.mu.Unlock() + tme.stopTablets(t) + for _, dbclient := range tme.dbSourceClients { + dbclient.Close() + } + for _, dbclient := range tme.dbTargetClients { + dbclient.Close() + } + tme.tmeDB.CloseAllConnections() + tme.ts.Close() + tme.wr.tmc.Close() + tme.wr = nil +} diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 957454eaaee..7deea9c6077 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -76,7 +76,7 @@ const ( func TestTableMigrateMainflow(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) checkCellRouting(t, tme.wr, "cell1", map[string][]string{ "t1": {"ks1.t1"}, @@ -492,7 +492,7 @@ func TestTableMigrateMainflow(t *testing.T) { func TestShardMigrateMainflow(t *testing.T) { ctx := context.Background() tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"}) - defer tme.stopTablets(t) + defer tme.close(t) // Initial check checkServedTypes(t, tme.ts, "ks:-40", 3) @@ -789,7 +789,7 @@ func TestTableMigrateOneToManyKeepAllArtifacts(t *testing.T) { func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) { ctx := context.Background() tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s") - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -979,7 +979,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { var err error ctx := context.Background() tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s") - defer tme.stopTablets(t) + defer tme.close(t) wantdryRunReads := []string{ "Lock keyspace ks1", @@ -1094,7 +1094,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { func TestMigrateFailJournal(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1191,7 +1191,7 @@ func TestMigrateFailJournal(t *testing.T) { func TestTableMigrateJournalExists(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1269,7 +1269,7 @@ func TestTableMigrateJournalExists(t *testing.T) { func TestShardMigrateJournalExists(t *testing.T) { ctx := context.Background() tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"}) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1331,7 +1331,7 @@ func TestShardMigrateJournalExists(t *testing.T) { func TestTableMigrateCancel(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1383,7 +1383,7 @@ func TestTableMigrateCancel(t *testing.T) { func TestTableMigrateCancelDryRun(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) want := []string{ "Lock keyspace ks1", @@ -1441,7 +1441,7 @@ func TestTableMigrateCancelDryRun(t *testing.T) { func TestTableMigrateNoReverse(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1542,7 +1542,7 @@ func TestTableMigrateNoReverse(t *testing.T) { func TestMigrateFrozen(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.expectNoPreviousJournals() _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) @@ -1584,7 +1584,7 @@ func TestMigrateFrozen(t *testing.T) { func TestMigrateNoStreamsFound(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) tme.dbTargetClients[0].addQuery(streamInfoKs2, &sqltypes.Result{}, nil) tme.dbTargetClients[1].addQuery(streamInfoKs2, &sqltypes.Result{}, nil) @@ -1600,7 +1600,7 @@ func TestMigrateNoStreamsFound(t *testing.T) { func TestMigrateDistinctSources(t *testing.T) { ctx := context.Background() tme := newTestTableMigrater(ctx, t) - defer tme.stopTablets(t) + defer tme.close(t) bls := &binlogdatapb.BinlogSource{ Keyspace: "ks2", diff --git a/go/vt/wrangler/vdiff_env_test.go b/go/vt/wrangler/vdiff_env_test.go index 1d50f4dc28a..ca456867340 100644 --- a/go/vt/wrangler/vdiff_env_test.go +++ b/go/vt/wrangler/vdiff_env_test.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/logutil" @@ -342,3 +343,18 @@ func (tmc *testVDiffTMClient) PrimaryPosition(ctx context.Context, tablet *topod } return pos, nil } + +func expectVDiffQueries(db *fakesqldb.DB) { + res := &sqltypes.Result{} + queries := []string{ + "USE `vt_ks`", + "USE `vt_ks1`", + "USE `vt_ks2`", + "optimize table _vt.copy_state", + "alter table _vt.copy_state auto_increment = 1", + } + for _, query := range queries { + db.AddQuery(query, res) + } + db.AddQueryPattern("delete from vd, vdt, vdl.*", res) +} diff --git a/go/vt/wrangler/wrangler.go b/go/vt/wrangler/wrangler.go index ea04baf6569..142c051da9f 100644 --- a/go/vt/wrangler/wrangler.go +++ b/go/vt/wrangler/wrangler.go @@ -21,6 +21,8 @@ package wrangler import ( "context" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" @@ -53,6 +55,7 @@ type Wrangler struct { // VExecFunc is a test-only fixture that allows us to short circuit vexec commands. // DO NOT USE in production code. VExecFunc func(ctx context.Context, workflow, keyspace, query string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) + sem *semaphore.Weighted } // New creates a new Wrangler object. From 934acc7afdb8f9163a785e71c278beeb61ebfcfe Mon Sep 17 00:00:00 2001 From: twthorn Date: Mon, 12 Aug 2024 13:20:29 -0400 Subject: [PATCH 2/2] Fix format --- go/vt/wrangler/traffic_switcher_env_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index cf19cb5353c..c39e6f0fd49 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -19,6 +19,8 @@ package wrangler import ( "fmt" "math/rand" + "strconv" + "strings" "sync" "testing" "time" @@ -378,9 +380,10 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar now := time.Now().Unix() for i, shard := range shards { - for _, shardToMove := range shardsToMove { + for j, shardToMove := range shardsToMove { var streamInfoRows []string var streamExtInfoRows []string + var vreplIDs []string if shardToMove == shard { bls := &binlogdatapb.BinlogSource{ Keyspace: "ks1", @@ -397,8 +400,10 @@ func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shar } streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls)) streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now)) + vreplIDs = append(vreplIDs, strconv.FormatInt(int64(j+1), 10)) } - tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult) + vreplIDsJoined := strings.Join(vreplIDs, ", ") + tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, vreplIDsJoined, vreplIDsJoined), noResult) tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|message|cell|tablet_types", "int64|varchar|varchar|varchar|varchar"),