Skip to content

Commit e497060

Browse files
Fix logic that checks for workflow state
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
1 parent aabf1c9 commit e497060

File tree

1 file changed

+23
-19
lines changed

1 file changed

+23
-19
lines changed

go/test/endtoend/vreplication/helper_test.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -368,29 +368,33 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
368368
require.NoError(t, err, output)
369369
done = true
370370
state := ""
371-
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
372-
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
373-
info := stream.Map()
374-
// We need to wait for all streams to have the desired state.
375-
state = info["state"].String()
376-
if state == wantState {
377-
for i := 0; i < len(fieldEqualityChecks); i++ {
378-
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
379-
key := kvparts[0]
380-
val := kvparts[1]
381-
res := info[key].String()
382-
if !strings.EqualFold(res, val) {
383-
done = false
371+
shardStreams := gjson.Get(output, "workflows.0.shard_streams")
372+
// We need to wait for all streams in all shard streams to have the desired state.
373+
shardStreams.ForEach(func(shardStreamId, shardStream gjson.Result) bool {
374+
streams := shardStream.Get("*")
375+
streams.ForEach(func(streamId, stream gjson.Result) bool {
376+
info := stream.Map()
377+
state = info["state"].String()
378+
if state == wantState {
379+
for i := 0; i < len(fieldEqualityChecks); i++ {
380+
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
381+
key := kvparts[0]
382+
val := kvparts[1]
383+
res := info[key].String()
384+
if !strings.EqualFold(res, val) {
385+
done = false
386+
}
384387
}
385388
}
386-
}
387-
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() &&
388-
(info["position"].Exists() && info["position"].String() == "") {
389+
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() &&
390+
(info["position"].Exists() && info["position"].String() == "") {
391+
done = false
392+
}
393+
} else {
389394
done = false
390395
}
391-
} else {
392-
done = false
393-
}
396+
return true
397+
})
394398
return true
395399
})
396400
if done {

0 commit comments

Comments
 (0)