Skip to content

Commit

Permalink
Fixups for v18
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 28, 2024
1 parent 0881860 commit b949849
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 76 deletions.
60 changes: 1 addition & 59 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

Expand Down Expand Up @@ -92,9 +89,8 @@ func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace,
tablets: make(map[string]map[int]*topodatapb.Tablet),
cell: cell,
}
venv := vtenv.NewTestEnv()
env.tmc = newTestTMClient(env)
env.ws = NewServer(venv, env.ts, env.tmc)
env.ws = NewServer(env.ts, env.tmc)

serving := true
tabletID := startingSourceTabletUID
Expand Down Expand Up @@ -429,60 +425,6 @@ func (tmc *testTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, r
}, nil
}

func (tmc *testTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) {
return &tabletmanagerdatapb.HasVReplicationWorkflowsResponse{
Has: false,
}, nil
}

func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
if strings.Contains(wf, "lookup") {
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
}
ks := tmc.env.sourceKeyspace
if tmc.reverse.Load() {
ks = tmc.env.targetKeyspace
}
return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{
Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
{
Workflow: req.IncludeWorkflows[0],
WorkflowType: workflowType,
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
{
Id: 1,
State: binlogdatapb.VReplicationWorkflowState_Running,
Bls: &binlogdatapb.BinlogSource{
Keyspace: ks.KeyspaceName,
Shard: ks.ShardNames[0],
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "/.*/",
},
},
},
},
Pos: "MySQL56/" + position,
TimeUpdated: protoutil.TimeToProto(time.Now()),
TimeHeartbeat: protoutil.TimeToProto(time.Now()),
},
},
},
},
}, nil
} else {
return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{}, nil
}
}

func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{
Result: &querypb.QueryResult{
Expand Down
12 changes: 0 additions & 12 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package workflow
import (
"context"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"sync"
"testing"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
Expand All @@ -41,11 +39,6 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

type queryResult struct {
query string
result *querypb.QueryResult
}

type testMaterializerEnv struct {
ws *Server
ms *vtctldatapb.MaterializeSettings
Expand All @@ -62,11 +55,6 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
t.Helper()
env := &testMaterializerEnv{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

const position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97"
const getWorkflowQuery = "select id from _vt.vreplication where db_name='vt_targetks' and workflow='workflow'"
const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'"
const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
Expand All @@ -47,7 +48,6 @@ const mzGetWorkflowStatusQuery = "select id, workflow, source, pos, stop_pos, ma
const mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
const mzGetLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)"
const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys\) values `
const eol = "$"

var (
defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()
Expand Down
46 changes: 43 additions & 3 deletions go/vt/vtctl/workflow/resharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -60,6 +61,17 @@ func TestReshardCreate(t *testing.T) {
},
}

var binlogSource = &binlogdatapb.BinlogSource{
Keyspace: sourceKeyspaceName,
Shard: "0",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
},
}

testcases := []struct {
name string
sourceKeyspace, targetKeyspace *testKeyspace
Expand Down Expand Up @@ -141,11 +153,22 @@ func TestReshardCreate(t *testing.T) {

for i := range tc.sourceKeyspace.ShardNames {
tabletUID := startingSourceTabletUID + (tabletUIDStep * i)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s' and message != 'FROZEN'", targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
"select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1",
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'",
workflowName, targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
"select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)",
Expand All @@ -155,11 +178,16 @@ func TestReshardCreate(t *testing.T) {

for i, target := range tc.targetKeyspace.ShardNames {
tabletUID := startingTargetTabletUID + (tabletUIDStep * i)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
insertPrefix+
`\('`+workflowName+`', 'keyspace:"`+targetKeyspaceName+`" shard:"0" filter:{rules:{match:"/.*" filter:"`+target+`"}}', '', [0-9]*, [0-9]*, '`+
env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false, '{}'\)`+eol,
`\('`+workflowName+`', 'keyspace:\\"`+targetKeyspaceName+`\\" shard:\\"0\\" filter:{rules:{match:\\"/.*\\" filter:\\"`+target+`\\"}}', '', [0-9]*, [0-9]*, '`+
env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false\)`+eol,
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
Expand All @@ -169,7 +197,19 @@ func TestReshardCreate(t *testing.T) {
)
env.tmc.expectVRQuery(
tabletUID,
"select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)",
fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'",
workflowName, targetKeyspaceName),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type|time_heartbeat|defer_secondary_keys|component_throttled|time_throttled|rows_copied",
"int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("1|%s|%s|MySQL56/%s|NULL|0|Running|vt_%s|1686577659|0|||1|0|0|0||0|10", workflowName, binlogSource, position, sourceKeyspaceName),
),
)
env.tmc.expectVRQuery(
tabletUID,
"select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)",
&sqltypes.Result{},
)
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,6 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{
Keyspace: keyspace,
Workflow: req.Workflow,
Shards: req.TargetShards,
})
}

Expand Down

0 comments on commit b949849

Please sign in to comment.