Skip to content

Commit

Permalink
Multi-tenant MoveTables: Create vreplication streams only on specifie…
Browse files Browse the repository at this point in the history
…d shards (#15746)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps authored Apr 29, 2024
1 parent 11c8d3e commit c9a81e3
Show file tree
Hide file tree
Showing 12 changed files with 2,290 additions and 2,038 deletions.
9 changes: 9 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/movetables/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ var (
if err := checkAtomicCopyOptions(); err != nil {
return err
}

tenantId := createOptions.WorkflowOptions.GetTenantId()
if len(createOptions.WorkflowOptions.GetShards()) > 0 && tenantId == "" {
return fmt.Errorf("--shards specified, but not --tenant-id: you can only specify target shards for multi-tenant migrations")
}
if tenantId != "" && len(createOptions.SourceShards) > 0 {
return fmt.Errorf("cannot specify both --tenant-id (i.e. a multi-tenant migration) and --source-shards (i.e. a shard-by-shard migration)")
}

return nil
},
RunE: commandCreate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func registerCommands(root *cobra.Command) {
create.Flags().StringSliceVar(&createOptions.ExcludeTables, "exclude-tables", nil, "Source tables to exclude from copying.")
create.Flags().BoolVar(&createOptions.NoRoutingRules, "no-routing-rules", false, "(Advanced) Do not create routing rules while creating the workflow. See the reference documentation for limitations if you use this flag.")
create.Flags().BoolVar(&createOptions.AtomicCopy, "atomic-copy", false, "(EXPERIMENTAL) A single copy phase is run for all tables from the source. Use this, for example, if your source keyspace has tables which use foreign key constraints.")
create.Flags().StringVar(&createOptions.WorkflowOptions.TenantId, "tenant-id", "", "(EXPERIMENTAL) The tenant ID to use for the MoveTables workflow into a multi-tenant keyspace.")
create.Flags().StringVar(&createOptions.WorkflowOptions.TenantId, "tenant-id", "", "(EXPERIMENTAL: Multi-tenant migrations only) The tenant ID to use for the MoveTables workflow into a multi-tenant keyspace.")
create.Flags().BoolVar(&createOptions.WorkflowOptions.StripShardedAutoIncrement, "remove-sharded-auto-increment", true, "If moving the table(s) to a sharded keyspace, remove any auto_increment clauses when copying the schema to the target as sharded keyspaces should rely on either user/application generated values or Vitess sequences to ensure uniqueness.")
create.Flags().StringSliceVar(&createOptions.WorkflowOptions.Shards, "shards", nil, "(EXPERIMENTAL: Multi-tenant migrations only) Specify that vreplication streams should only be created on this subset of target shards. Warning: you should first ensure that all rows on the source route to the specified subset of target shards using your VIndex of choice or you could lose data during the migration.")
base.AddCommand(create)

opts := &common.SubCommandsOpts{
Expand Down
112 changes: 112 additions & 0 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"testing"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/proto/vtctldata"
)

type tenantMigrationStatus int
Expand Down Expand Up @@ -97,6 +100,30 @@ const (
"t1": {}
}
}
`
mtShardedVSchema = `
{
"sharded": true,
"multi_tenant_spec": {
"tenant_id_column_name": "tenant_id",
"tenant_id_column_type": "INT64"
},
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"t1": {
"column_vindexes": [
{
"column": "tenant_id",
"name": "reverse_bits"
}
]
}
}
}
`
stSchema = mtSchema
stVSchema = `
Expand Down Expand Up @@ -224,6 +251,91 @@ func confirmOnlyWritesSwitched(t *testing.T) {
validateKeyspaceRoutingRules(t, vc, rules)
}

// TestMultiTenantSimpleSharded tests a single tenant migration to a sharded target. The aim is to test
// the specification of the target shards in all the MoveTables subcommands, including creating only one stream
// for a tenant on the shard to which this tenant id will be routed, using the specified Vindex.
func TestMultiTenantSimpleSharded(t *testing.T) {
setSidecarDBName("_vt")
// Don't create RDONLY tablets to reduce number of tablets created to reduce resource requirements for the test.
origDefaultRdonly := defaultRdonly
defer func() {
defaultRdonly = origDefaultRdonly
}()
defaultRdonly = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

targetKeyspace := "mt"
_, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, targetKeyspace, "-40,40-80,80-a0,a0-", mtShardedVSchema, mtSchema, 1, 0, 200, nil)
require.NoError(t, err)

tenantId := int64(1)
tenantShard := "80-a0" // matches the vindex
sourceKeyspace := getSourceKeyspace(tenantId)
_, err = vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, sourceKeyspace, "0", stVSchema, stSchema, 1, 0, getInitialTabletIdForTenant(tenantId), nil)
require.NoError(t, err)

vtgateConn, closeConn := getVTGateConn()
defer closeConn()
numRows := 10
lastIndex := int64(0)
insertRows := func(lastIndex int64, keyspace string) int64 {
for i := 1; i <= numRows; i++ {
execQueryWithRetry(t, vtgateConn,
fmt.Sprintf("insert into %s.t1(id, tenant_id) values(%d, %d)", keyspace, int64(i)+lastIndex, tenantId), queryTimeout)
}
return int64(numRows) + lastIndex
}
lastIndex = insertRows(lastIndex, sourceKeyspace)

mt := newVtctldMoveTables(&moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: fmt.Sprintf("wf%d", tenantId),
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
createFlags: []string{
"--tenant-id", strconv.FormatInt(tenantId, 10),
"--shards", tenantShard, // create the workflow for tenantid 1 in shard 80-a0: matches the vindex
},
switchFlags: []string{
"--shards", tenantShard,
},
completeFlags: []string{
"--shards", tenantShard,
},
showFlags: []string{
"--shards", tenantShard,
},
})

mt.Create()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
mt.Show()
var workflowState vtctldata.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(mt.lastOutput), &workflowState)
require.NoError(t, err)
require.Equal(t, 1, len(workflowState.Workflows))
wf := workflowState.Workflows[0]
// Verifies that only one stream is created for the tenant on the shard to which this tenant id will be routed.
require.Equal(t, 1, len(wf.ShardStreams))

// Note: we cannot insert into the target keyspace since that is never routed to the source keyspace.
lastIndex = insertRows(lastIndex, sourceKeyspace)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
// Note: here we have already switched, and we can insert into the target keyspace, and it should get reverse
// replicated to the source keyspace. The source keyspace is routed to the target keyspace at this point.
lastIndex = insertRows(lastIndex, sourceKeyspace)
mt.Complete()
require.Zero(t, len(getKeyspaceRoutingRules(t, vc).Rules))
actualRowsInserted := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Equal(t, lastIndex, int64(actualRowsInserted))
require.Equal(t, lastIndex, int64(getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))))
log.Infof("Migration completed, total rows in target: %d", actualRowsInserted)
}

func confirmBothReadsAndWritesSwitched(t *testing.T) {
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"rdonly", "replica"})
confirmKeyspacesRoutedTo(t, "s1", "mt", "t1", []string{"primary"})
Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type moveTablesWorkflow struct {
createFlags []string
completeFlags []string
switchFlags []string
showFlags []string
}

type iMoveTables interface {
Expand Down Expand Up @@ -228,7 +229,9 @@ func (v VtctldMoveTables) ReverseReadsAndWrites() {
}

func (v VtctldMoveTables) Show() {
v.exec("Show")
args := []string{"Show"}
args = append(args, v.showFlags...)
v.exec(args...)
}

func (v VtctldMoveTables) SwitchReads() {
Expand Down
Loading

0 comments on commit c9a81e3

Please sign in to comment.