diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4e50ea12af3..d19eb42f95d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1431,7 +1431,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t action, workflow, output) } if err != nil { - t.Fatalf("Reshard %s command failed with %+v\n", action, err) + t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output) } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 8b21cf6fb60..988deea94a9 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { - log.Infof("error inserting row %d: %v", id, err) + log.Errorf("error inserting row %d: %v", id, err) } vtgateConn.ExecuteFetch("commit", 1000, false) } @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + require.NoError(t, err) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + require.NoError(t, err) ctx := context.Background() vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) @@ -512,6 +514,196 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven return ne } +// Validate that we can resume a VStream when the keyspace has been resharded +// while not streaming. Ensure that there we successfully transition from the +// old shards -- which are in the VGTID from the previous stream -- and that +// we miss no row events during the process. +func TestMultiVStreamsKeyspaceReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old/original shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table and its sequence backing table. + Match: "/customer.*", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + // We expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + // Confirm that we have shard GTIDs for the global shard and the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 3) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + // Again, we expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + }() + + // We should have a mix of events across the old and new shards. + require.NotZero(t, oldShardRowEvents) + require.NotZero(t, newShardRowEvents) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") + customerCount, err := customerResult.Rows[0][0].ToInt64() + require.NoError(t, err) + require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 8601d28f5b6..69ccf08a969 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -21,12 +21,11 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - - "vitess.io/vitess/go/vt/topo" ) // FakeFactory implements the Factory interface. This is supposed to be used only for testing diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index d56584a1287..c5bc460de09 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -25,8 +25,11 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -498,24 +501,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tpo := vs.tabletPickerOptions + resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace) if err != nil { - log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace) + } + if resharded { + // The non-serving tablet in the old / non-serving shard will contain all of + // the GTIDs that we need before transitioning to the new shards along with + // the journal event that will then allow us to automatically transition to + // the new shards (provided the stop_on_reshard option is not set). + tpo.IncludeNonServingTablets = true } + tabletPickerErr := func(err error) error { + tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...) + if err != nil { + return tabletPickerErr(err) + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() - tablet, err := tp.PickForStreaming(tpCtx) if err != nil { - log.Errorf(err.Error()) - return err + return tabletPickerErr(err) } - log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","), - sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + target := &querypb.Target{ Keyspace: sgtid.Keyspace, Shard: sgtid.Shard, @@ -742,7 +761,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e if err := vs.getError(); err != nil { return err } - // convert all gtids to vgtids. This should be done here while holding the lock. + // Convert all gtids to vgtids. This should be done here while holding the lock. for j, event := range events { if event.Type == binlogdatapb.VEventType_GTID { // Update the VGtid and send that instead. @@ -926,3 +945,56 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar close(je.done) return je, nil } + +// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed +// since the last VStream as indicated by the shard definitions provided in the VGTID. +func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) { + shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + if err != nil || len(shards) == 0 { + return false, err + } + + // First check the typical case, where the VGTID shards match the serving shards. + // In that case it's NOT possible that an applicable reshard has happened because + // the VGTID contains shards that are all serving. + reshardPossible := false + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) + for _, s := range vs.vgtid.ShardGtids { + if s.GetKeyspace() == keyspace { + ksShardGTIDs = append(ksShardGTIDs, s) + } + } + for _, s := range ksShardGTIDs { + shard := shards[s.GetShard()] + if shard == nil { + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace) + } + if !shard.GetIsPrimaryServing() { + reshardPossible = true + break + } + } + if !reshardPossible { + return false, nil + } + + // Now that we know there MAY have been an applicable reshard, let's make a + // definitive determination by looking at the shard keyranges. + // All we care about are the shard info records now. + sis := maps.Values(shards) + for i := range sis { + for j := range sis { + if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // It's the same shard so skip it. + continue + } + if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // We have different shards with overlapping keyranges so we know + // that a reshard has happened. + return true, nil + } + } + } + + return false, nil +} diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index fe3201ad630..6eec06a1bac 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,28 +25,22 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/sandboxconn" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/test/utils" ) var mu sync.Mutex @@ -1283,123 +1277,284 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -// TestVStreamManagerHealthCheckResponseHandling tests the handling of healthcheck responses by -// the vstream manager to confirm that we are correctly restarting the vstream when we should. -func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { +func TestKeyspaceHasBeenSharded(t *testing.T) { ctx := utils.LeakCheckContext(t) - // Capture the vstream warning log. Otherwise we need to re-implement the vstream error - // handling in SandboxConn's implementation and then we're not actually testing the - // production code. - logger := logutil.NewMemoryLogger() - log.Warningf = logger.Warningf - - cell := "aa" - ks := "TestVStream" - shard := "0" - tabletType := topodatapb.TabletType_REPLICA - _ = createSandbox(ks) - hc := discovery.NewFakeHealthCheck(nil) - st := getSandboxTopo(ctx, cell, ks, []string{shard}) - vsm := newTestVStreamManager(ctx, hc, st, cell) - vgtid := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: ks, - Shard: shard, - }}, - } - source := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, shard, tabletType, true, 0, nil) - tabletAlias := topoproto.TabletAliasString(source.Tablet().Alias) - addTabletToSandboxTopo(t, ctx, st, ks, shard, source.Tablet()) - target := &querypb.Target{ - Cell: cell, - Keyspace: ks, - Shard: shard, - TabletType: tabletType, - } - highLag := uint32(discovery.GetLowReplicationLag().Seconds()) + 1 + cell := "zone1" + ks := "testks" type testcase struct { - name string - hcRes *querypb.StreamHealthResponse - wantErr string + name string + oldshards []string + newshards []string + vgtid *binlogdatapb.VGtid + trafficSwitched bool + want bool + wantErr string } testcases := []testcase{ { - name: "all healthy", // Will hit the context timeout + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 8, split both, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, }, { - name: "failure", - hcRes: &querypb.StreamHealthResponse{ - TabletAlias: source.Tablet().Alias, - Target: nil, // This is seen as a healthcheck stream failure + name: "2 to 4, split only first shard, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + // 80- is not being resharded. }, - wantErr: fmt.Sprintf("health check failed on %s", tabletAlias), + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, }, { - name: "tablet type changed", - hcRes: &querypb.StreamHealthResponse{ - TabletAlias: source.Tablet().Alias, - Target: &querypb.Target{ - Cell: cell, - Keyspace: ks, - Shard: shard, - TabletType: topodatapb.TabletType_PRIMARY, + name: "4 to 2, merge both shards, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + "-80", + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, }, - PrimaryTermStartTimestamp: time.Now().Unix(), - RealtimeStats: &querypb.RealtimeStats{}, }, - wantErr: fmt.Sprintf("tablet %s type has changed from %s to %s", - tabletAlias, tabletType, topodatapb.TabletType_PRIMARY.String()), + trafficSwitched: true, + want: true, }, { - name: "unhealthy", - hcRes: &querypb.StreamHealthResponse{ - TabletAlias: source.Tablet().Alias, - Target: target, - RealtimeStats: &querypb.RealtimeStats{ - HealthError: "unhealthy", + name: "4 to 3, merge second half, traffic not switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, }, }, - wantErr: fmt.Sprintf("tablet %s is no longer healthy", tabletAlias), + trafficSwitched: false, + want: false, }, { - name: "replication lag too high", - hcRes: &querypb.StreamHealthResponse{ - TabletAlias: source.Tablet().Alias, - Target: target, - RealtimeStats: &querypb.RealtimeStats{ - ReplicationLagSeconds: highLag, + name: "4 to 3, merge second half, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, }, }, - wantErr: fmt.Sprintf("%s has a replication lag of %d seconds which is beyond the value provided", - tabletAlias, highLag), + trafficSwitched: true, + want: true, }, } + addTablet := func(t *testing.T, ctx context.Context, host string, port int32, cell, ks, shard string, ts *topo.Server, hc *discovery.FakeHealthCheck, serving bool) { + tabletconn := hc.AddTestTablet(cell, host, port, ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := ts.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = ts.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - done := make(chan struct{}) - go func() { - sctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - defer close(done) - // SandboxConn's VStream implementation always waits for the context to timeout. - err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { - require.Fail(t, "unexpected event", "Received unexpected events: %v", events) - return nil - }) - if tc.wantErr != "" { // Otherwise we simply expect the context to timeout - if !strings.Contains(logger.String(), tc.wantErr) { - require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) - } - } - }() + hc := discovery.NewFakeHealthCheck(nil) + _ = createSandbox(ks) + st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vs := vstream{ + vgtid: tc.vgtid, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: cell, + vsm: vsm, + ts: st.topoServer, + } + for i, shard := range tc.oldshards { + addTablet(t, ctx, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), cell, ks, shard, st.topoServer, hc, !tc.trafficSwitched) + } + for i, shard := range tc.newshards { + addTablet(t, ctx, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), cell, ks, shard, st.topoServer, hc, tc.trafficSwitched) + } + got, err := vs.keyspaceHasBeenResharded(ctx, ks) if tc.wantErr != "" { - source.SetStreamHealthResponse(tc.hcRes) + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) } - <-done - logger.Clear() + require.Equal(t, tc.want, got) }) } } diff --git a/test/config.json b/test/config.json index 53ef22a5e8f..3fd00df4126 100644 --- a/test/config.json +++ b/test/config.json @@ -1130,6 +1130,15 @@ "RetryMax": 1, "Tags": [] }, + "multi_vstreams_keyspace_reshard": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultiVStreamsKeyspaceReshard", "-timeout", "15m"], + "Command": [], + "Manual": false, + "Shard": "vstream", + "RetryMax": 1, + "Tags": [] + }, "vstream_failover": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"],