Skip to content

Commit

Permalink
VStream: Allow for automatic resume after Reshard across VStreams (vi…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattlord authored and makinje16 committed Jan 16, 2025
1 parent bb6271b commit ab70165
Show file tree
Hide file tree
Showing 6 changed files with 539 additions and 112 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
action, workflow, output)
}
if err != nil {
t.Fatalf("Reshard %s command failed with %+v\n", action, err)
t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output)
}
}

Expand Down
198 changes: 195 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
log.Errorf("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch("commit", 1000, false)
}
Expand Down Expand Up @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
require.NoError(t, err)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
Expand Down Expand Up @@ -512,6 +514,196 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
return ne
}

// Validate that we can resume a VStream when the keyspace has been resharded
// while not streaming. Ensure that there we successfully transition from the
// old shards -- which are in the VGTID from the previous stream -- and that
// we miss no row events during the process.
func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, newShardRowEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old/original shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*", // Match all keyspaces just to be more realistic.
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Only stream the customer table and its sequence backing table.
Match: "/customer.*",
}},
}
flags := &vtgatepb.VStreamFlags{}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "0":
// We expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
if len(newVGTID.GetShardGtids()) == 3 {
// We want a VGTID with a position for the global shard and the old shards.
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

// Confirm that we have shard GTIDs for the global shard and the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 3)

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "-40", "40-80", "80-c0", "c0-":
newShardRowEvents++
case "0":
// Again, we expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}()

// We should have a mix of events across the old and new shards.
require.NotZero(t, oldShardRowEvents)
require.NotZero(t, newShardRowEvents)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
customerCount, err := customerResult.Rows[0][0].ToInt64()
require.NoError(t, err)
require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
5 changes: 2 additions & 3 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/topo"
)

// FakeFactory implements the Factory interface. This is supposed to be used only for testing
Expand Down
90 changes: 81 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -498,24 +501,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
cells := vs.getCells()

tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
tpo := vs.tabletPickerOptions
resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace)
if err != nil {
log.Errorf(err.Error())
return err
return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace)
}
if resharded {
// The non-serving tablet in the old / non-serving shard will contain all of
// the GTIDs that we need before transitioning to the new shards along with
// the journal event that will then allow us to automatically transition to
// the new shards (provided the stop_on_reshard option is not set).
tpo.IncludeNonServingTablets = true
}

tabletPickerErr := func(err error) error {
tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
log.Errorf("%v", tperr)
return tperr
}
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...)
if err != nil {
return tabletPickerErr(err)
}
// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
return tabletPickerErr(err)
}
log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","),
sgtid.Keyspace, sgtid.Shard, vs.tabletType.String())
log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Shard: sgtid.Shard,
Expand Down Expand Up @@ -742,7 +761,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
if err := vs.getError(); err != nil {
return err
}
// convert all gtids to vgtids. This should be done here while holding the lock.
// Convert all gtids to vgtids. This should be done here while holding the lock.
for j, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
// Update the VGtid and send that instead.
Expand Down Expand Up @@ -926,3 +945,56 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
close(je.done)
return je, nil
}

// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed
// since the last VStream as indicated by the shard definitions provided in the VGTID.
func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) {
shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil || len(shards) == 0 {
return false, err
}

// First check the typical case, where the VGTID shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the VGTID contains shards that are all serving.
reshardPossible := false
ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids))
for _, s := range vs.vgtid.ShardGtids {
if s.GetKeyspace() == keyspace {
ksShardGTIDs = append(ksShardGTIDs, s)
}
}
for _, s := range ksShardGTIDs {
shard := shards[s.GetShard()]
if shard == nil {
return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace)
}
if !shard.GetIsPrimaryServing() {
reshardPossible = true
break
}
}
if !reshardPossible {
return false, nil
}

// Now that we know there MAY have been an applicable reshard, let's make a
// definitive determination by looking at the shard keyranges.
// All we care about are the shard info records now.
sis := maps.Values(shards)
for i := range sis {
for j := range sis {
if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) {
// It's the same shard so skip it.
continue
}
if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) {
// We have different shards with overlapping keyranges so we know
// that a reshard has happened.
return true, nil
}
}
}

return false, nil
}
Loading

0 comments on commit ab70165

Please sign in to comment.