From b949849fef31669dea5a9aea65807e07511f2ba9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 28 Jun 2024 09:23:47 -0400 Subject: [PATCH] Fixups for v18 Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/framework_test.go | 60 +------------------ go/vt/vtctl/workflow/materializer_env_test.go | 12 ---- go/vt/vtctl/workflow/materializer_test.go | 2 +- go/vt/vtctl/workflow/resharder_test.go | 46 +++++++++++++- go/vt/vtctl/workflow/server.go | 1 - 5 files changed, 45 insertions(+), 76 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index e2ccde0a0e7..81837ca9e12 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -26,20 +26,17 @@ import ( "sync" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" - "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -92,9 +89,8 @@ func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace, tablets: make(map[string]map[int]*topodatapb.Tablet), cell: cell, } - venv := vtenv.NewTestEnv() env.tmc = newTestTMClient(env) - env.ws = NewServer(venv, env.ts, env.tmc) + env.ws = NewServer(env.ts, env.tmc) serving := true tabletID := startingSourceTabletUID @@ -429,60 +425,6 @@ func (tmc *testTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, r }, nil } -func (tmc *testTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) { - return &tabletmanagerdatapb.HasVReplicationWorkflowsResponse{ - Has: false, - }, nil -} - -func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { - tmc.mu.Lock() - defer tmc.mu.Unlock() - - workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables - if len(req.IncludeWorkflows) > 0 { - for _, wf := range req.IncludeWorkflows { - if strings.Contains(wf, "lookup") { - workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex - } - } - ks := tmc.env.sourceKeyspace - if tmc.reverse.Load() { - ks = tmc.env.targetKeyspace - } - return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ - Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ - { - Workflow: req.IncludeWorkflows[0], - WorkflowType: workflowType, - Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ - { - Id: 1, - State: binlogdatapb.VReplicationWorkflowState_Running, - Bls: &binlogdatapb.BinlogSource{ - Keyspace: ks.KeyspaceName, - Shard: ks.ShardNames[0], - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{ - { - Match: "/.*/", - }, - }, - }, - }, - Pos: "MySQL56/" + position, - TimeUpdated: protoutil.TimeToProto(time.Now()), - TimeHeartbeat: protoutil.TimeToProto(time.Now()), - }, - }, - }, - }, - }, nil - } else { - return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{}, nil - } -} - func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ Result: &querypb.QueryResult{ diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f1ddf6be645..48ad6e548ea 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -19,14 +19,12 @@ package workflow import ( "context" "fmt" - "os" "regexp" "strconv" "strings" "sync" "testing" - _flag "vitess.io/vitess/go/internal/flag" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -41,11 +39,6 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) -type queryResult struct { - query string - result *querypb.QueryResult -} - type testMaterializerEnv struct { ws *Server ms *vtctldatapb.MaterializeSettings @@ -62,11 +55,6 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { - _flag.ParseFlagsForTest() - os.Exit(m.Run()) -} - func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { t.Helper() env := &testMaterializerEnv{ diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 39aabcf45ad..df3d496a17b 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -39,6 +39,7 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) +const position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" const getWorkflowQuery = "select id from _vt.vreplication where db_name='vt_targetks' and workflow='workflow'" const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'" const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" @@ -47,7 +48,6 @@ const mzGetWorkflowStatusQuery = "select id, workflow, source, pos, stop_pos, ma const mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" const mzGetLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)" const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys\) values ` -const eol = "$" var ( defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go index 1bb2f065e0f..a7a73d23ba9 100644 --- a/go/vt/vtctl/workflow/resharder_test.go +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" @@ -60,6 +61,17 @@ func TestReshardCreate(t *testing.T) { }, } + var binlogSource = &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + } + testcases := []struct { name string sourceKeyspace, targetKeyspace *testKeyspace @@ -141,11 +153,22 @@ func TestReshardCreate(t *testing.T) { for i := range tc.sourceKeyspace.ShardNames { tabletUID := startingSourceTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s' and message != 'FROZEN'", targetKeyspaceName), + &sqltypes.Result{}, + ) env.tmc.expectVRQuery( tabletUID, "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", &sqltypes.Result{}, ) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'", + workflowName, targetKeyspaceName), + &sqltypes.Result{}, + ) env.tmc.expectVRQuery( tabletUID, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", @@ -155,11 +178,16 @@ func TestReshardCreate(t *testing.T) { for i, target := range tc.targetKeyspace.ShardNames { tabletUID := startingTargetTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", targetKeyspaceName), + &sqltypes.Result{}, + ) env.tmc.expectVRQuery( tabletUID, insertPrefix+ - `\('`+workflowName+`', 'keyspace:"`+targetKeyspaceName+`" shard:"0" filter:{rules:{match:"/.*" filter:"`+target+`"}}', '', [0-9]*, [0-9]*, '`+ - env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false, '{}'\)`+eol, + `\('`+workflowName+`', 'keyspace:\\"`+targetKeyspaceName+`\\" shard:\\"0\\" filter:{rules:{match:\\"/.*\\" filter:\\"`+target+`\\"}}', '', [0-9]*, [0-9]*, '`+ + env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false\)`+eol, &sqltypes.Result{}, ) env.tmc.expectVRQuery( @@ -169,7 +197,19 @@ func TestReshardCreate(t *testing.T) { ) env.tmc.expectVRQuery( tabletUID, - "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'", + workflowName, targetKeyspaceName), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type|time_heartbeat|defer_secondary_keys|component_throttled|time_throttled|rows_copied", + "int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|int64|int64", + ), + fmt.Sprintf("1|%s|%s|MySQL56/%s|NULL|0|Running|vt_%s|1686577659|0|||1|0|0|0||0|10", workflowName, binlogSource, position, sourceKeyspaceName), + ), + ) + env.tmc.expectVRQuery( + tabletUID, + "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)", &sqltypes.Result{}, ) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 54401c632ff..43d1f1a2b05 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1611,7 +1611,6 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ Keyspace: keyspace, Workflow: req.Workflow, - Shards: req.TargetShards, }) }