Skip to content

Commit

Permalink
Moar
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 27, 2024
1 parent b2a120f commit 3547f3e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 74 deletions.
118 changes: 50 additions & 68 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,55 +345,53 @@ func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *
}

func waitForWorkflowToBeCreated(t *testing.T, vc *VitessCluster, ksWorkflow string) {
keyspace, workflow, err := topoproto.ParseKeyspaceShard(ksWorkflow)
require.NoError(t, err)
require.NoError(t, waitForCondition("workflow to be created", func() bool {
_, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
return err == nil
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow, "--compact", "--include-logs=false")
return err == nil && output != emptyWorkflowShowResponse
}, defaultTimeout))
}

// waitForWorkflowState waits for all of the given workflow's
// streams to reach the provided state. You can pass optional
// key value pairs of the form "key==value" to also wait for
// additional stream sub-state such as "Message==for vdiff".
// additional stream sub-state such as "message==for vdiff".
// Invalid checks are ignored.
func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wantState string, fieldEqualityChecks ...string) {
keyspace, workflow, err := topoproto.ParseKeyspaceShard(ksWorkflow)
require.NoError(t, err)
done := false
timer := time.NewTimer(workflowStateTimeout)
log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState)
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow, "--compact", "--include-logs=false")
require.NoError(t, err, output)
done = true
state := ""
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each participating tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
// we need to wait for all streams to have the desired state
state = attributeValue.Get("State").String()
if state == wantState {
for i := 0; i < len(fieldEqualityChecks); i++ {
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
key := kvparts[0]
val := kvparts[1]
res := attributeValue.Get(key).String()
if !strings.EqualFold(res, val) {
done = false
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" {
done = false
}
} else {
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
info := stream.Map()
// We need to wait for all streams to have the desired state.
state = info["state"].String()
if state == wantState {
for i := 0; i < len(fieldEqualityChecks); i++ {
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
key := kvparts[0]
val := kvparts[1]
res := info[key].String()
if !strings.EqualFold(res, val) {
done = false
}
return true
})
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() &&
(info["position"].Exists() && info["position"].String() == "") {
done = false
}
return true
})
} else {
done = false
}
return true
})
if done {
Expand Down Expand Up @@ -548,7 +546,7 @@ func checkIfTableExists(t *testing.T, vc *VitessCluster, tabletAlias string, tab
var err error
found := false

if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetSchema", "--", "--tables", table, tabletAlias); err != nil {
if output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", "--tables", table, tabletAlias); err != nil {
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
Expand All @@ -571,19 +569,10 @@ func validateTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, ta
}

func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table string) (bool, error) {
var output string
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
if string(value) == table {
found = true
}
}, "tablet_controls", "[0]", "denied_tables")
return found, nil
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
require.NoError(t, err, "GetShard error", "%v %v", err, output)
deniedTable := gjson.Get(output, fmt.Sprintf("shard.tablet_controls.0.denied_tables.#(==\"%s\"", table))
return deniedTable.Exists(), nil
}

// expectNumberOfStreams waits for the given number of streams to be present and
Expand All @@ -609,7 +598,7 @@ func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database str

func printShardPositions(vc *VitessCluster, ksShards []string) {
for _, ksShard := range ksShards {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard)
output, err := vc.VtctldClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard)
if err != nil {
fmt.Printf("Error in ShardReplicationPositions: %v, output %v", err, output)
} else {
Expand All @@ -621,7 +610,7 @@ func printShardPositions(vc *VitessCluster, ksShards []string) {
func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error {
var output string
var err error
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules"); err != nil {
if output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules", "--compact"); err != nil {
return err
}
fmt.Printf("Routing Rules::%s:\n%s\n", msg, output)
Expand All @@ -648,29 +637,22 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
ksWorkflow := fmt.Sprintf("%s.%s", targetKS, workflow)
for {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
result := gjson.Get(output, "ShardStatuses")
result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each source tablet
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state := attributeValue.Get("State").String()
pos := attributeValue.Get("Pos").String()
// If we've actually copied anything then we'll have a position in the stream
if (state == binlogdatapb.VReplicationWorkflowState_Running.String() || state == binlogdatapb.VReplicationWorkflowState_Copying.String()) && pos != "" {
require.FailNowf(t, "Unexpected data copied in workflow",
"The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s",
ksWorkflow, defaultTimeout, output)
}
return true // end attribute loop
})
}
return true // end stream loop
})
return true // end tablet loop
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false")
require.NoError(t, err, output)
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
info := stream.Map()
state := info["state"]
pos := info["position"]
// If we've actually copied anything then we'll have a position in the stream
if (state.Exists() && (state.String() == binlogdatapb.VReplicationWorkflowState_Running.String() || state.String() == binlogdatapb.VReplicationWorkflowState_Copying.String())) &&
(pos.Exists() && pos.String() != "") {
require.FailNowf(t, "Unexpected data copied in workflow",
"The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s",
ksWorkflow, defaultTimeout, output)
}
return true
})
select {
case <-timer.C:
Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestVreplicationCopyThrottling(t *testing.T) {
moveTablesActionWithTabletTypes(t, "Create", defaultCell.Name, workflow, sourceKs, targetKs, table, "primary", true)
// Wait for the copy phase to start
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKs, workflow), binlogdatapb.VReplicationWorkflowState_Copying.String())
// The initial copy phase should be blocking on the history list
// The initial copy phase should be blocking on the history list.
confirmWorkflowHasCopiedNoData(t, targetKs, workflow)
releaseInnoDBRowHistory(t, trxConn)
trxConn.Close()
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
testVStreamFrom(t, vtgate, keyspace, 2)
})
shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false)
isTableInDenyList(t, vc, "product:0", "customer")
isTableInDenyList(t, vc, "product/0", "customer")
// we tag along this test so as not to create the overhead of creating another cluster
testVStreamCellFlag(t)
}
Expand Down Expand Up @@ -878,13 +878,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchWrites(t, workflowType, ksWorkflow, false)

var exists bool
exists, err = isTableInDenyList(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product/0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.True(t, exists)

moveTablesAction(t, "Complete", cellNames, workflow, sourceKs, targetKs, tables)

exists, err = isTableInDenyList(t, vc, "product:0", "customer")
exists, err = isTableInDenyList(t, vc, "product/0", "customer")
require.NoError(t, err, "Error getting denylist for customer:0")
require.False(t, exists)

Expand Down Expand Up @@ -1739,12 +1739,12 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e
require.Equal(t, 1, len(res.Rows))
historyLen, err = res.Rows[0][0].ToInt64()
require.NoError(t, err)
if historyLen > expectedLength {
if historyLen >= expectedLength {
return
}
select {
case <-timer.C:
t.Fatalf("Did not reach the expected InnoDB history length of %d before the timeout of %s; last seen value: %d", expectedLength, defaultTimeout, historyLen)
require.FailNow(t, "Did not reach the minimum expected InnoDB history length of %d before the timeout of %s; last seen value: %d", expectedLength, defaultTimeout, historyLen)
default:
time.Sleep(defaultTick)
}
Expand Down

0 comments on commit 3547f3e

Please sign in to comment.