Skip to content

Commit e552d3c

Browse files
committed
Enable VPlayerBatching in unit tests
Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent 2da3893 commit e552d3c

File tree

6 files changed

+57
-23
lines changed

6 files changed

+57
-23
lines changed

go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) {
163163

164164
expectDBClientAndVreplicationQueries(t, []string{
165165
"begin",
166-
"insert into tab1(id,val) values (1,_binary'a')",
167-
"insert into tab1(id,val) values (2,_binary'b')",
166+
"insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')",
168167
"/update _vt.vreplication set pos=",
169168
"commit",
170169
}, pos)

go/vt/vttablet/tabletmanager/vreplication/framework_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) {
141141
resetBinlogClient()
142142

143143
vttablet.InitVReplicationConfigDefaults()
144-
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
145144

146145
// Engines cannot be initialized in testenv because it introduces circular dependencies.
147146
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])

go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) {
684684
reset := vstreamer.AdjustPacketSize(1)
685685
defer reset()
686686

687+
// The test is written to match the behavior w/o
688+
// VReplicationExperimentalFlagOptimizeInserts enabled.
689+
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
690+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
691+
defer func() {
692+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
693+
}()
694+
687695
savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
688696
// copyPhaseDuration should be low enough to have time to send one row.
689697
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
@@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) {
814822
reset := vstreamer.AdjustPacketSize(1)
815823
defer reset()
816824

825+
// The test is written to match the behavior w/o
826+
// VReplicationExperimentalFlagOptimizeInserts enabled.
827+
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
828+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
829+
defer func() {
830+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
831+
}()
832+
817833
savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
818834
// copyPhaseDuration should be low enough to have time to send one row.
819835
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond

go/vt/vttablet/tabletmanager/vreplication/vdbclient.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,19 @@ func (vc *vdbClient) Begin() error {
5656
if vc.InTransaction {
5757
return nil
5858
}
59-
if err := vc.DBClient.Begin(); err != nil {
60-
return err
59+
if vc.maxBatchSize == 0 {
60+
// We're not batching so we BEGIN the transaction here.
61+
if err := vc.DBClient.Begin(); err != nil {
62+
return err
63+
}
64+
} else {
65+
// If we're batching, we batch the contents of the
66+
// transaction, which starts with the BEGIN and ends with
67+
// the COMMIT, so we do not send a BEGIN down the wire
68+
// ahead of time.
69+
vc.queriesPos = int64(len(vc.queries))
70+
vc.batchSize = 6 // begin and semicolon
6171
}
62-
63-
// If we're batching, we only batch the contents of the
64-
// transaction, which starts with the begin and ends with
65-
// the commit.
66-
vc.queriesPos = int64(len(vc.queries))
67-
vc.batchSize = 6 // begin and semicolon
68-
6972
vc.queries = append(vc.queries, "begin")
7073
vc.InTransaction = true
7174
vc.startTime = time.Now()

go/vt/vttablet/tabletmanager/vreplication/vplayer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
133133
return vr.dbClient.Commit()
134134
}
135135
batchMode := false
136-
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
136+
// We only do batching in the running/replicating phase via vreplicator and not when
137+
// used for FF and catchup via vcopier.
138+
if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
137139
batchMode = true
138140
}
139141
if batchMode {

go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {
628628

629629
// It does not work when filter is enabled
630630
output := qh.Expect(
631-
"begin",
632631
"rollback",
633632
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
634633
)
@@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) {
975974
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
976975
output: qh.Expect(
977976
"begin",
978-
"insert into dst4(id1,val) values (1,_binary'aaa')",
979-
"insert into dst4(id1,val) values (3,_binary'ccc')",
977+
"insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')",
980978
"/update _vt.vreplication set pos=",
981979
"commit",
982980
),
@@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) {
987985
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
988986
output: qh.Expect(
989987
"begin",
990-
"insert into dst5(id1,val) values (1,_binary'abc')",
991-
"insert into dst5(id1,val) values (4,_binary'abc')",
988+
"insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')",
992989
"/update _vt.vreplication set pos=",
993990
"commit",
994991
),
@@ -1495,17 +1492,15 @@ func TestPlayerRowMove(t *testing.T) {
14951492
})
14961493
expectDBClientQueries(t, qh.Expect(
14971494
"begin",
1498-
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
1499-
"insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
1500-
"insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
1495+
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
15011496
"/update _vt.vreplication set pos=",
15021497
"commit",
15031498
))
15041499
expectData(t, "dst", [][]string{
15051500
{"1", "1", "1"},
15061501
{"2", "5", "2"},
15071502
})
1508-
validateQueryCountStat(t, "replicate", 3)
1503+
validateQueryCountStat(t, "replicate", 1)
15091504

15101505
execStatements(t, []string{
15111506
"update src set val1=1, val2=4 where id=3",
@@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) {
15211516
{"1", "5", "2"},
15221517
{"2", "2", "1"},
15231518
})
1524-
validateQueryCountStat(t, "replicate", 5)
1519+
validateQueryCountStat(t, "replicate", 3)
15251520
}
15261521

15271522
func TestPlayerTypes(t *testing.T) {
@@ -1737,6 +1732,7 @@ func TestPlayerDDL(t *testing.T) {
17371732
pos1 := primaryPosition(t)
17381733
// The stop position must be the GTID of the first DDL
17391734
expectDBClientQueries(t, qh.Expect(
1735+
"/update _vt.vreplication set state='Stopped'",
17401736
"begin",
17411737
fmt.Sprintf("/update _vt.vreplication set pos='%s'", pos1),
17421738
"/update _vt.vreplication set state='Stopped'",
@@ -1757,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) {
17571753
// Second update is from vreplicator.
17581754
"/update _vt.vreplication set message='Picked source tablet.*",
17591755
"/update.*'Running'",
1756+
"/update _vt.vreplication set state='Stopped'",
17601757
"begin",
17611758
fmt.Sprintf("/update.*'%s'", pos2),
17621759
"/update _vt.vreplication set state='Stopped'",
@@ -1922,6 +1919,7 @@ func TestPlayerStopPos(t *testing.T) {
19221919
// Second update is from vreplicator.
19231920
"/update _vt.vreplication set message='Picked source tablet.*",
19241921
"/update.*'Running'",
1922+
"/update.*'Stopped'",
19251923
"begin",
19261924
"insert into yes(id,val) values (1,'aaa')",
19271925
fmt.Sprintf("/update.*compress.*'%s'", stopPos),
@@ -1947,6 +1945,7 @@ func TestPlayerStopPos(t *testing.T) {
19471945
// Second update is from vreplicator.
19481946
"/update _vt.vreplication set message='Picked source tablet.*",
19491947
"/update.*'Running'",
1948+
"/update.*'Stopped'",
19501949
"begin",
19511950
// Since 'no' generates empty transactions that are skipped by
19521951
// vplayer, a commit is done only for the stop position event.
@@ -2179,6 +2178,14 @@ func TestPlayerSplitTransaction(t *testing.T) {
21792178
func TestPlayerLockErrors(t *testing.T) {
21802179
defer deleteTablet(addTablet(100))
21812180

2181+
// The immediate retry behavior does not apply when doing
2182+
// VPlayer Batching.
2183+
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
2184+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
2185+
defer func() {
2186+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
2187+
}()
2188+
21822189
execStatements(t, []string{
21832190
"create table t1(id int, val varchar(128), primary key(id))",
21842191
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
@@ -2258,6 +2265,14 @@ func TestPlayerLockErrors(t *testing.T) {
22582265
func TestPlayerCancelOnLock(t *testing.T) {
22592266
defer deleteTablet(addTablet(100))
22602267

2268+
// The immediate retry behavior does not apply when doing
2269+
// VPlayer Batching.
2270+
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
2271+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
2272+
defer func() {
2273+
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
2274+
}()
2275+
22612276
execStatements(t, []string{
22622277
"create table t1(id int, val varchar(128), primary key(id))",
22632278
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),

0 commit comments

Comments
 (0)