@@ -22,6 +22,7 @@ import (
22
22
"fmt"
23
23
"math/rand/v2"
24
24
"net"
25
+ "slices"
25
26
"strconv"
26
27
"strings"
27
28
"sync"
@@ -35,9 +36,11 @@ import (
35
36
"vitess.io/vitess/go/test/endtoend/cluster"
36
37
"vitess.io/vitess/go/test/endtoend/throttler"
37
38
"vitess.io/vitess/go/vt/log"
39
+ "vitess.io/vitess/go/vt/topo/topoproto"
38
40
"vitess.io/vitess/go/vt/wrangler"
39
41
40
42
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
43
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
41
44
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
42
45
)
43
46
@@ -169,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
169
172
args = append (args , "--tablet-types" , tabletTypes )
170
173
}
171
174
args = append (args , "--action_timeout=10m" ) // At this point something is up so fail the test
172
- if debugMode {
173
- t .Logf ("Executing workflow command: vtctldclient %v" , strings .Join (args , " " ))
174
- }
175
+ t .Logf ("Executing workflow command: vtctldclient %s" , strings .Join (args , " " ))
175
176
output , err := vc .VtctldClient .ExecuteCommandWithOutput (args ... )
176
177
lastOutput = output
177
178
if err != nil {
@@ -334,33 +335,44 @@ func tstWorkflowCancel(t *testing.T) error {
334
335
return tstWorkflowAction (t , workflowActionCancel , "" , "" )
335
336
}
336
337
337
- func validateReadsRoute (t * testing.T , tabletTypes string , tablet * cluster.VttabletProcess ) {
338
+ func validateReadsRoute (t * testing.T , tabletType string , tablet * cluster.VttabletProcess ) {
338
339
if tablet == nil {
339
340
return
340
341
}
341
- if tabletTypes == "" {
342
- tabletTypes = "replica,rdonly"
343
- }
344
342
vtgateConn , closeConn := getVTGateConn ()
345
343
defer closeConn ()
346
- for _ , tt := range [] string { "replica" , "rdonly" } {
347
- destination := fmt . Sprintf ( "%s:%s@%s" , tablet . Keyspace , tablet . Shard , tt )
348
- if strings . Contains ( tabletTypes , tt ) {
349
- readQuery := "select cid from customer limit 10"
350
- assertQueryExecutesOnTablet ( t , vtgateConn , tablet , destination , readQuery , "select cid from customer limit :vtg1" )
351
- }
352
- }
344
+ // We do NOT want to target a shard as that goes around the routing rules and
345
+ // defeats the purpose here. We are using a query w/o a WHERE clause so for
346
+ // sharded keyspaces it should hit all shards as a SCATTER query. So all we
347
+ // care about is the keyspace and tablet type.
348
+ destination := fmt . Sprintf ( "%s@%s" , tablet . Keyspace , strings . ToLower ( tabletType ) )
349
+ readQuery := "select cid from customer limit 50"
350
+ assertQueryExecutesOnTablet ( t , vtgateConn , tablet , destination , readQuery , "select cid from customer limit :vtg1" )
353
351
}
354
352
355
353
func validateReadsRouteToSource (t * testing.T , tabletTypes string ) {
356
- if sourceReplicaTab != nil {
357
- validateReadsRoute (t , tabletTypes , sourceReplicaTab )
354
+ tt , err := topoproto .ParseTabletTypes (tabletTypes )
355
+ require .NoError (t , err )
356
+ if slices .Contains (tt , topodatapb .TabletType_REPLICA ) {
357
+ require .NotNil (t , sourceReplicaTab )
358
+ validateReadsRoute (t , topodatapb .TabletType_REPLICA .String (), sourceReplicaTab )
359
+ }
360
+ if slices .Contains (tt , topodatapb .TabletType_RDONLY ) {
361
+ require .NotNil (t , sourceRdonlyTab )
362
+ validateReadsRoute (t , topodatapb .TabletType_RDONLY .String (), sourceRdonlyTab )
358
363
}
359
364
}
360
365
361
366
func validateReadsRouteToTarget (t * testing.T , tabletTypes string ) {
362
- if targetReplicaTab1 != nil {
363
- validateReadsRoute (t , tabletTypes , targetReplicaTab1 )
367
+ tt , err := topoproto .ParseTabletTypes (tabletTypes )
368
+ require .NoError (t , err )
369
+ if slices .Contains (tt , topodatapb .TabletType_REPLICA ) {
370
+ require .NotNil (t , targetReplicaTab1 )
371
+ validateReadsRoute (t , topodatapb .TabletType_REPLICA .String (), targetReplicaTab1 )
372
+ }
373
+ if slices .Contains (tt , topodatapb .TabletType_RDONLY ) {
374
+ require .NotNil (t , targetRdonlyTab1 )
375
+ validateReadsRoute (t , topodatapb .TabletType_RDONLY .String (), targetRdonlyTab1 )
364
376
}
365
377
}
366
378
@@ -411,6 +423,13 @@ func getCurrentStatus(t *testing.T) string {
411
423
// but CI currently fails on creating multiple clusters even after the previous ones are torn down
412
424
413
425
func TestBasicV2Workflows (t * testing.T ) {
426
+ ogReplicas := defaultReplicas
427
+ ogRdOnly := defaultRdonly
428
+ defer func () {
429
+ defaultReplicas = ogReplicas
430
+ defaultRdonly = ogRdOnly
431
+ }()
432
+ defaultReplicas = 1
414
433
defaultRdonly = 1
415
434
extraVTTabletArgs = []string {
416
435
parallelInsertWorkers ,
@@ -664,7 +683,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
664
683
// If it's not then we'll get an error as the table doesn't exist in the vschema
665
684
createMoveTablesWorkflow (t , "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431" )
666
685
waitForWorkflowState (t , vc , ksWorkflow , binlogdatapb .VReplicationWorkflowState_Running .String ())
667
- validateReadsRouteToSource (t , "replica" )
686
+ validateReadsRouteToSource (t , "replica,rdonly " )
668
687
validateWritesRouteToSource (t )
669
688
670
689
// Verify that we've properly ignored any internal operational tables
@@ -725,6 +744,12 @@ func testPartialSwitches(t *testing.T) {
725
744
tstWorkflowSwitchReads (t , "" , "" )
726
745
checkStates (t , nextState , nextState ) // idempotency
727
746
747
+ tstWorkflowReverseReads (t , "replica,rdonly" , "" )
748
+ checkStates (t , wrangler .WorkflowStateReadsSwitched , wrangler .WorkflowStateNotSwitched )
749
+
750
+ tstWorkflowSwitchReads (t , "" , "" )
751
+ checkStates (t , wrangler .WorkflowStateNotSwitched , wrangler .WorkflowStateReadsSwitched )
752
+
728
753
tstWorkflowSwitchWrites (t )
729
754
currentState = nextState
730
755
nextState = wrangler .WorkflowStateAllSwitched
@@ -771,12 +796,12 @@ func testRestOfWorkflow(t *testing.T) {
771
796
waitForLowLag (t , "customer" , "wf1" )
772
797
tstWorkflowSwitchReads (t , "" , "" )
773
798
checkStates (t , wrangler .WorkflowStateNotSwitched , wrangler .WorkflowStateReadsSwitched )
774
- validateReadsRouteToTarget (t , "replica" )
799
+ validateReadsRouteToTarget (t , "replica,rdonly " )
775
800
validateWritesRouteToSource (t )
776
801
777
802
tstWorkflowSwitchWrites (t )
778
803
checkStates (t , wrangler .WorkflowStateReadsSwitched , wrangler .WorkflowStateAllSwitched )
779
- validateReadsRouteToTarget (t , "replica" )
804
+ validateReadsRouteToTarget (t , "replica,rdonly " )
780
805
validateWritesRouteToTarget (t )
781
806
782
807
// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
@@ -787,42 +812,45 @@ func testRestOfWorkflow(t *testing.T) {
787
812
waitForLowLag (t , keyspace , "wf1_reverse" )
788
813
tstWorkflowReverseReads (t , "" , "" )
789
814
checkStates (t , wrangler .WorkflowStateAllSwitched , wrangler .WorkflowStateWritesSwitched )
790
- validateReadsRouteToSource (t , "replica" )
815
+ validateReadsRouteToSource (t , "replica,rdonly " )
791
816
validateWritesRouteToTarget (t )
792
817
793
818
tstWorkflowReverseWrites (t )
794
819
checkStates (t , wrangler .WorkflowStateWritesSwitched , wrangler .WorkflowStateNotSwitched )
795
- validateReadsRouteToSource (t , "replica" )
820
+ validateReadsRouteToSource (t , "replica,rdonly " )
796
821
validateWritesRouteToSource (t )
797
822
798
823
waitForLowLag (t , "customer" , "wf1" )
799
824
tstWorkflowSwitchWrites (t )
800
825
checkStates (t , wrangler .WorkflowStateNotSwitched , wrangler .WorkflowStateWritesSwitched )
801
- validateReadsRouteToSource (t , "replica" )
826
+ validateReadsRouteToSource (t , "replica,rdonly " )
802
827
validateWritesRouteToTarget (t )
803
828
804
829
waitForLowLag (t , keyspace , "wf1_reverse" )
805
830
tstWorkflowReverseWrites (t )
806
- validateReadsRouteToSource (t , "replica" )
831
+ checkStates (t , wrangler .WorkflowStateWritesSwitched , wrangler .WorkflowStateNotSwitched )
832
+ validateReadsRouteToSource (t , "replica,rdonly" )
807
833
validateWritesRouteToSource (t )
808
834
809
835
waitForLowLag (t , "customer" , "wf1" )
810
836
tstWorkflowSwitchReads (t , "" , "" )
811
- validateReadsRouteToTarget (t , "replica" )
837
+ checkStates (t , wrangler .WorkflowStateNotSwitched , wrangler .WorkflowStateReadsSwitched )
838
+ validateReadsRouteToTarget (t , "replica,rdonly" )
812
839
validateWritesRouteToSource (t )
813
840
814
841
tstWorkflowReverseReads (t , "" , "" )
815
- validateReadsRouteToSource (t , "replica" )
842
+ checkStates (t , wrangler .WorkflowStateReadsSwitched , wrangler .WorkflowStateNotSwitched )
843
+ validateReadsRouteToSource (t , "replica,rdonly" )
816
844
validateWritesRouteToSource (t )
817
845
818
846
tstWorkflowSwitchReadsAndWrites (t )
819
- validateReadsRouteToTarget (t , "replica" )
820
- validateReadsRoute (t , "rdonly" , targetRdonlyTab1 )
847
+ checkStates (t , wrangler . WorkflowStateNotSwitched , wrangler . WorkflowStateAllSwitched )
848
+ validateReadsRouteToTarget (t , "replica, rdonly" )
821
849
validateWritesRouteToTarget (t )
822
850
waitForLowLag (t , keyspace , "wf1_reverse" )
823
851
tstWorkflowReverseReadsAndWrites (t )
824
- validateReadsRoute (t , "rdonly" , sourceRdonlyTab )
825
- validateReadsRouteToSource (t , "replica" )
852
+ checkStates (t , wrangler . WorkflowStateAllSwitched , wrangler . WorkflowStateNotSwitched )
853
+ validateReadsRouteToSource (t , "replica,rdonly " )
826
854
validateWritesRouteToSource (t )
827
855
828
856
// trying to complete an unswitched workflow should error
@@ -835,8 +863,7 @@ func testRestOfWorkflow(t *testing.T) {
835
863
waitForLowLag (t , "customer" , "customer_name" )
836
864
waitForLowLag (t , "customer" , "enterprise_customer" )
837
865
tstWorkflowSwitchReadsAndWrites (t )
838
- validateReadsRoute (t , "rdonly" , targetRdonlyTab1 )
839
- validateReadsRouteToTarget (t , "replica" )
866
+ validateReadsRouteToTarget (t , "replica,rdonly" )
840
867
validateWritesRouteToTarget (t )
841
868
842
869
err = tstWorkflowComplete (t )
@@ -899,7 +926,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
899
926
900
927
zone1 := vc .Cells ["zone1" ]
901
928
902
- vc .AddKeyspace (t , []* Cell {zone1 }, "product" , "0" , initialProductVSchema , initialProductSchema , 0 , 0 , 100 , nil )
929
+ vc .AddKeyspace (t , []* Cell {zone1 }, "product" , "0" , initialProductVSchema , initialProductSchema , defaultReplicas , defaultRdonly , 100 , nil )
903
930
904
931
verifyClusterHealth (t , vc )
905
932
insertInitialData (t )
@@ -912,7 +939,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
912
939
func setupMinimalCustomerKeyspace (t * testing.T ) map [string ]* cluster.VttabletProcess {
913
940
tablets := make (map [string ]* cluster.VttabletProcess )
914
941
if _ , err := vc .AddKeyspace (t , []* Cell {vc .Cells ["zone1" ]}, "customer" , "-80,80-" ,
915
- customerVSchema , customerSchema , 0 , 0 , 200 , nil ); err != nil {
942
+ customerVSchema , customerSchema , defaultReplicas , defaultRdonly , 200 , nil ); err != nil {
916
943
t .Fatal (err )
917
944
}
918
945
defaultCell := vc .Cells [vc .CellNames [0 ]]
@@ -1048,6 +1075,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
1048
1075
targetTab2 = custKs .Shards ["80-c0" ].Tablets ["zone1-600" ].Vttablet
1049
1076
targetTab1 = custKs .Shards ["40-80" ].Tablets ["zone1-500" ].Vttablet
1050
1077
targetReplicaTab1 = custKs .Shards ["-40" ].Tablets ["zone1-401" ].Vttablet
1078
+ targetRdonlyTab1 = custKs .Shards ["-40" ].Tablets ["zone1-402" ].Vttablet
1051
1079
1052
1080
sourceTab = custKs .Shards ["-80" ].Tablets ["zone1-200" ].Vttablet
1053
1081
sourceReplicaTab = custKs .Shards ["-80" ].Tablets ["zone1-201" ].Vttablet
@@ -1059,3 +1087,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
1059
1087
"--sql" , sql , keyspace )
1060
1088
require .NoError (t , err , fmt .Sprintf ("ApplySchema Error: %s" , err ))
1061
1089
}
1090
+
1091
+ func validateTableRoutingRule (t * testing.T , table , tabletType , fromKeyspace , toKeyspace string ) {
1092
+ tabletType = strings .ToLower (strings .TrimSpace (tabletType ))
1093
+ rr := getRoutingRules (t )
1094
+ // We set matched = true by default because it is possible, if --no-routing-rules is set while creating
1095
+ // a workflow, that the routing rules are empty when the workflow starts.
1096
+ // We set it to false below when the rule is found, but before matching the routed keyspace.
1097
+ matched := true
1098
+ for _ , r := range rr .GetRules () {
1099
+ fromRule := fmt .Sprintf ("%s.%s" , fromKeyspace , table )
1100
+ if tabletType != "" && tabletType != "primary" {
1101
+ fromRule = fmt .Sprintf ("%s@%s" , fromRule , tabletType )
1102
+ }
1103
+ if r .FromTable == fromRule {
1104
+ // We found the rule, so we can set matched to false here and check for the routed keyspace below.
1105
+ matched = false
1106
+ require .NotEmpty (t , r .ToTables )
1107
+ toTable := r .ToTables [0 ]
1108
+ // The ToTables value is of the form "routedKeyspace.table".
1109
+ routedKeyspace , routedTable , ok := strings .Cut (toTable , "." )
1110
+ require .True (t , ok )
1111
+ require .Equal (t , table , routedTable )
1112
+ if routedKeyspace == toKeyspace {
1113
+ // We found the rule, the table and keyspace matches, so our search is done.
1114
+ matched = true
1115
+ break
1116
+ }
1117
+ }
1118
+ }
1119
+ require .Truef (t , matched , "routing rule for %s.%s from %s to %s not found" , fromKeyspace , table , tabletType , toKeyspace )
1120
+ }
0 commit comments