Skip to content

Commit

Permalink
Enable VPlayerBatching in unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 5, 2024
1 parent 2da3893 commit fc4187d
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,_binary'a')",
"insert into tab1(id,val) values (2,_binary'b')",
"insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')",
"/update _vt.vreplication set pos=",
"commit",
}, pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) {
resetBinlogClient()

vttablet.InitVReplicationConfigDefaults()
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0

// Engines cannot be initialized in testenv because it introduces circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down Expand Up @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ func (vc *vdbClient) Begin() error {
if vc.InTransaction {
return nil
}
if err := vc.DBClient.Begin(); err != nil {
return err
if vc.maxBatchSize == 0 {
// We're not batching so we BEGIN the transaction here.
if err := vc.DBClient.Begin(); err != nil {
return err
}
} else {
// If we're batching, we batch the contents of the
// transaction, which starts with the BEGIN and ends with
// the COMMIT, so we do not send a BEGIN down the wire
// ahead of time.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon
}

// If we're batching, we only batch the contents of the
// transaction, which starts with the begin and ends with
// the commit.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon

vc.queries = append(vc.queries, "begin")
vc.InTransaction = true
vc.startTime = time.Now()
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return vr.dbClient.Commit()
}
batchMode := false
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
// We only do batching in the running/replicating phase via vreplicator and not
// when used for FF and catchup via vcopier.
if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
batchMode = true
}
if batchMode {
Expand Down
35 changes: 25 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)
Expand Down Expand Up @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
output: qh.Expect(
"begin",
"insert into dst4(id1,val) values (1,_binary'aaa')",
"insert into dst4(id1,val) values (3,_binary'ccc')",
"insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand All @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
output: qh.Expect(
"begin",
"insert into dst5(id1,val) values (1,_binary'abc')",
"insert into dst5(id1,val) values (4,_binary'abc')",
"insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand Down Expand Up @@ -1495,17 +1492,15 @@ func TestPlayerRowMove(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 3)
validateQueryCountStat(t, "replicate", 1)

execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
Expand All @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) {
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 5)
validateQueryCountStat(t, "replicate", 3)
}

func TestPlayerTypes(t *testing.T) {
Expand Down Expand Up @@ -1737,6 +1732,7 @@ func TestPlayerDDL(t *testing.T) {
pos1 := primaryPosition(t)
// The stop position must be the GTID of the first DDL
expectDBClientQueries(t, qh.Expect(
"/update _vt.vreplication set state='Stopped'",
"begin",
fmt.Sprintf("/update _vt.vreplication set pos='%s'", pos1),
"/update _vt.vreplication set state='Stopped'",
Expand All @@ -1757,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) {
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/update.*'Running'",
"/update _vt.vreplication set state='Stopped'",
"begin",
fmt.Sprintf("/update.*'%s'", pos2),
"/update _vt.vreplication set state='Stopped'",
Expand Down Expand Up @@ -1922,6 +1919,7 @@ func TestPlayerStopPos(t *testing.T) {
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/update.*'Running'",
"/update.*'Stopped'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
fmt.Sprintf("/update.*compress.*'%s'", stopPos),
Expand All @@ -1947,6 +1945,7 @@ func TestPlayerStopPos(t *testing.T) {
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/update.*'Running'",
"/update.*'Stopped'",
"begin",
// Since 'no' generates empty transactions that are skipped by
// vplayer, a commit is done only for the stop position event.
Expand Down Expand Up @@ -2179,6 +2178,14 @@ func TestPlayerSplitTransaction(t *testing.T) {
func TestPlayerLockErrors(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -2258,6 +2265,14 @@ func TestPlayerLockErrors(t *testing.T) {
func TestPlayerCancelOnLock(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down

0 comments on commit fc4187d

Please sign in to comment.