From 872bbcd7569147a561539cd193c0f59bd4dd86cd Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 26 Dec 2024 16:26:04 +0100 Subject: [PATCH 1/4] Handle race condition in TestMoveTables(Un)sharded where vplayer can start in some runs and update mock support for it Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletconntest/fakequeryservice.go | 3 ++- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 10 ++++++++++ go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 4 ++-- .../vttablet/tabletmanager/vreplication/vreplicator.go | 1 + 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index 2d62b017433..e63cd028d05 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -702,7 +702,8 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer // VStream is part of the queryservice.QueryService interface func (f *FakeQueryService) VStream(ctx context.Context, request *binlogdatapb.VStreamRequest, send func([]*binlogdatapb.VEvent) error) error { - panic("not implemented") + // This is called as part of vreplication unit tests, so we don't panic here. + return fmt.Errorf("VStream not implemented") } // VStreamRows is part of the QueryService interface. diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 3f8bc85ac7f..06a6eb8b0a7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/constants/sidecar" @@ -1783,6 +1785,14 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID )) dbClient.AddInvariant(fmt.Sprintf(updatePickedSourceTablet, cell, sourceTabletUID, vreplID), &sqltypes.Result{}) dbClient.AddInvariant("update _vt.vreplication set state='Running', message='' where id=1", &sqltypes.Result{}) + dbClient.AddInvariant(vreplication.SqlMaxAllowedPacket, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@max_allowed_packet", + "int64", + ), + "65536", + )) + dbClient.AddInvariant("update _vt.vreplication set message", &sqltypes.Result{}) } func addMaterializeSettingsTablesToSchema(ms *vtctldatapb.MaterializeSettings, tenv *testEnv, venv *vtenv.Environment) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 98e36119622..96dcd9884f3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -125,7 +125,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } - + log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vr.id, settings.StartPos, settings.StopPos, vr.source) queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) } @@ -142,7 +142,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize) // We explicitly do NOT want to batch this, we want to send it down the wire // immediately so we use ExecuteFetch directly. - res, err := vr.dbClient.ExecuteFetch("select @@session.max_allowed_packet as max_allowed_packet", 1) + res, err := vr.dbClient.ExecuteFetch(SqlMaxAllowedPacket, 1) if err != nil { log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", vr.workflowConfig.RelayLogMaxSize, err) } else { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 42701288a44..76177b56b5b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -89,6 +89,7 @@ const ( json_unquote(json_extract(action, '$.type'))=%a and vrepl_id=%a and table_name=%a` sqlDeletePostCopyAction = `delete from _vt.post_copy_action where vrepl_id=%a and table_name=%a and id=%a` + SqlMaxAllowedPacket = "select @@session.max_allowed_packet as max_allowed_packet" ) // vreplicator provides the core logic to start vreplication streams From 606240bcc6a9fb56bf54ab4737cdd90d3430ab9c Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 26 Dec 2024 19:13:09 +0100 Subject: [PATCH 2/4] Comment FnDateFormat for now since it is failing consistently Signed-off-by: Rohit Nayak --- go/vt/vtgate/evalengine/testcases/cases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/evalengine/testcases/cases.go b/go/vt/vtgate/evalengine/testcases/cases.go index ff6c0c0f311..a9deb91fd70 100644 --- a/go/vt/vtgate/evalengine/testcases/cases.go +++ b/go/vt/vtgate/evalengine/testcases/cases.go @@ -126,7 +126,7 @@ var Cases = []TestCase{ {Run: FnSHA1}, {Run: FnSHA2}, {Run: FnRandomBytes}, - {Run: FnDateFormat}, + // Temporarily commenting failing test // {Run: FnDateFormat}, {Run: FnConvertTz}, {Run: FnDate}, {Run: FnDayOfMonth}, From 7065d270f4031d4458cfc284af2b86a1d6f2cb67 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 27 Dec 2024 10:17:28 +0100 Subject: [PATCH 3/4] Remove skipping FnDateFormat since it has been fixed Signed-off-by: Rohit Nayak --- go/vt/vtgate/evalengine/testcases/cases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/evalengine/testcases/cases.go b/go/vt/vtgate/evalengine/testcases/cases.go index a9deb91fd70..ff6c0c0f311 100644 --- a/go/vt/vtgate/evalengine/testcases/cases.go +++ b/go/vt/vtgate/evalengine/testcases/cases.go @@ -126,7 +126,7 @@ var Cases = []TestCase{ {Run: FnSHA1}, {Run: FnSHA2}, {Run: FnRandomBytes}, - // Temporarily commenting failing test // {Run: FnDateFormat}, + {Run: FnDateFormat}, {Run: FnConvertTz}, {Run: FnDate}, {Run: FnDayOfMonth}, From e072939999cc6430da3a96591f61a70108823751 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 30 Dec 2024 17:27:56 +0100 Subject: [PATCH 4/4] Address review comments Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/rpc_vreplication_test.go | 2 +- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 06a6eb8b0a7..762b384a5f6 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -1787,7 +1787,7 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID dbClient.AddInvariant("update _vt.vreplication set state='Running', message='' where id=1", &sqltypes.Result{}) dbClient.AddInvariant(vreplication.SqlMaxAllowedPacket, sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "@@max_allowed_packet", + "max_allowed_packet", "int64", ), "65536", diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 96dcd9884f3..31ab895934c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -125,7 +125,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } - log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vr.id, settings.StartPos, settings.StopPos, vr.source) + log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v", + vr.id, settings.StartPos, settings.StopPos, vr.source.Filter) queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) }