diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index e00a5578171..c671d2a086d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) { expectDBClientAndVreplicationQueries(t, []string{ "begin", - "insert into tab1(id,val) values (1,_binary'a')", - "insert into tab1(id,val) values (2,_binary'b')", + "insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')", "/update _vt.vreplication set pos=", "commit", }, pos) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index fe8b62d3cef..12a05a69dbc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) { resetBinlogClient() vttablet.InitVReplicationConfigDefaults() - vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 // Engines cannot be initialized in testenv because it introduces circular dependencies. streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index a95e0bf17c5..a7e4794ba76 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 8a4409db06c..d1d8bd25187 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -56,16 +56,19 @@ func (vc *vdbClient) Begin() error { if vc.InTransaction { return nil } - if err := vc.DBClient.Begin(); err != nil { - return err + if vc.maxBatchSize == 0 { + // We're not batching so we BEGIN the transaction here. + if err := vc.DBClient.Begin(); err != nil { + return err + } + } else { + // If we're batching, we batch the contents of the + // transaction, which starts with the BEGIN and ends with + // the COMMIT, so we do not send a BEGIN down the wire + // ahead of time. + vc.queriesPos = int64(len(vc.queries)) + vc.batchSize = 6 // begin and semicolon } - - // If we're batching, we only batch the contents of the - // transaction, which starts with the begin and ends with - // the commit. - vc.queriesPos = int64(len(vc.queries)) - vc.batchSize = 6 // begin and semicolon - vc.queries = append(vc.queries, "begin") vc.InTransaction = true vc.startTime = time.Now() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..025d3512f9a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -133,7 +133,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vr.dbClient.Commit() } batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + // We only do batching in the running/replicating phase via vreplicator and not + // when used for FF and catchup via vcopier. + if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { batchMode = true } if batchMode { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 0cc568c1cf1..98178578958 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { // It does not work when filter is enabled output := qh.Expect( - "begin", "rollback", fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')", output: qh.Expect( "begin", - "insert into dst4(id1,val) values (1,_binary'aaa')", - "insert into dst4(id1,val) values (3,_binary'ccc')", + "insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')", "/update _vt.vreplication set pos=", "commit", ), @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')", output: qh.Expect( "begin", - "insert into dst5(id1,val) values (1,_binary'abc')", - "insert into dst5(id1,val) values (4,_binary'abc')", + "insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')", "/update _vt.vreplication set pos=", "commit", ), @@ -1495,9 +1492,7 @@ func TestPlayerRowMove(t *testing.T) { }) expectDBClientQueries(t, qh.Expect( "begin", - "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", + "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", "/update _vt.vreplication set pos=", "commit", )) @@ -1505,7 +1500,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "1", "1"}, {"2", "5", "2"}, }) - validateQueryCountStat(t, "replicate", 3) + validateQueryCountStat(t, "replicate", 1) execStatements(t, []string{ "update src set val1=1, val2=4 where id=3", @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "5", "2"}, {"2", "2", "1"}, }) - validateQueryCountStat(t, "replicate", 5) + validateQueryCountStat(t, "replicate", 3) } func TestPlayerTypes(t *testing.T) { @@ -1737,6 +1732,7 @@ func TestPlayerDDL(t *testing.T) { pos1 := primaryPosition(t) // The stop position must be the GTID of the first DDL expectDBClientQueries(t, qh.Expect( + "/update _vt.vreplication set state='Stopped'", "begin", fmt.Sprintf("/update _vt.vreplication set pos='%s'", pos1), "/update _vt.vreplication set state='Stopped'", @@ -1757,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update _vt.vreplication set state='Stopped'", "begin", fmt.Sprintf("/update.*'%s'", pos2), "/update _vt.vreplication set state='Stopped'", @@ -1922,6 +1919,7 @@ func TestPlayerStopPos(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update.*'Stopped'", "begin", "insert into yes(id,val) values (1,'aaa')", fmt.Sprintf("/update.*compress.*'%s'", stopPos), @@ -1947,6 +1945,7 @@ func TestPlayerStopPos(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update.*'Stopped'", "begin", // Since 'no' generates empty transactions that are skipped by // vplayer, a commit is done only for the stop position event. @@ -2179,6 +2178,14 @@ func TestPlayerSplitTransaction(t *testing.T) { func TestPlayerLockErrors(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), @@ -2258,6 +2265,14 @@ func TestPlayerLockErrors(t *testing.T) { func TestPlayerCancelOnLock(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),