@@ -128,10 +128,11 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
128
128
ws := workflow .NewServer (vtenv .NewTestEnv (), tenv .ts , tenv .tmc )
129
129
130
130
tests := []struct {
131
- name string
132
- req * vtctldatapb.MoveTablesCreateRequest
133
- schema * tabletmanagerdatapb.SchemaDefinition
134
- query string
131
+ name string
132
+ req * vtctldatapb.MoveTablesCreateRequest
133
+ schema * tabletmanagerdatapb.SchemaDefinition
134
+ query string
135
+ selectTableQuery string
135
136
}{
136
137
{
137
138
name : "defaults" ,
@@ -144,6 +145,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
144
145
},
145
146
query : fmt .Sprintf (`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"}}', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 0, '{}')` ,
146
147
insertVReplicationPrefix , wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName ),
148
+ selectTableQuery : "(select 't1' from t1 limit 1)" ,
147
149
},
148
150
{
149
151
name : "all values" ,
@@ -179,6 +181,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
179
181
},
180
182
query : fmt .Sprintf (`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')` ,
181
183
insertVReplicationPrefix , wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName ),
184
+ selectTableQuery : "(select 't1' from t1 limit 1)" ,
182
185
},
183
186
{
184
187
name : "binlog source order with include" ,
@@ -219,6 +222,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
219
222
},
220
223
query : fmt .Sprintf (`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"wut" filter:"select * from wut"} rules:{match:"zt" filter:"select * from zt"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')` ,
221
224
insertVReplicationPrefix , wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName ),
225
+ selectTableQuery : "/.*union all.*union all.*" ,
222
226
},
223
227
{
224
228
name : "binlog source order with all-tables" ,
@@ -259,6 +263,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
259
263
},
260
264
query : fmt .Sprintf (`%s values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"wut" filter:"select * from wut"} rules:{match:"zt" filter:"select * from zt"}} on_ddl:EXEC stop_after_copy:true source_time_zone:"EDT" target_time_zone:"UTC"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1, '{}')` ,
261
265
insertVReplicationPrefix , wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName ),
266
+ selectTableQuery : "/.*union all.*union all.*" ,
262
267
},
263
268
}
264
269
@@ -289,6 +294,7 @@ func TestCreateVReplicationWorkflow(t *testing.T) {
289
294
targetTablet .vrdbClient .ExpectRequest (fmt .Sprintf (readAllWorkflows , tenv .dbName , "" ), & sqltypes.Result {}, nil )
290
295
targetTablet .vrdbClient .ExpectRequest (fmt .Sprintf ("use %s" , sidecar .GetIdentifier ()), & sqltypes.Result {}, nil )
291
296
targetTablet .vrdbClient .ExpectRequest (tt .query , & sqltypes.Result {}, errShortCircuit )
297
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , tt .selectTableQuery , & sqltypes.Result {})
292
298
_ , err := ws .MoveTablesCreate (ctx , tt .req )
293
299
tenv .tmc .tablets [targetTabletUID ].vrdbClient .Wait ()
294
300
require .ErrorIs (t , err , errShortCircuit )
@@ -712,6 +718,7 @@ func TestMoveTablesSharded(t *testing.T) {
712
718
fmt .Sprintf ("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}" , wf , vreplID , bls , position , targetKs ),
713
719
), nil )
714
720
tenv .tmc .setVReplicationExecResults (ftc .tablet , fmt .Sprintf (getLatestCopyState , vreplID , vreplID ), & sqltypes.Result {})
721
+ tenv .tmc .setVReplicationExecResults (ftc .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
715
722
}
716
723
717
724
// We use the tablet's UID in the mocked results for the max value used on each target shard.
@@ -1334,6 +1341,7 @@ func TestSourceShardSelection(t *testing.T) {
1334
1341
tt := targetTablets [uid ]
1335
1342
tt .vrdbClient .ExpectRequest (fmt .Sprintf ("use %s" , sidecar .GetIdentifier ()), & sqltypes.Result {}, nil )
1336
1343
tt .vrdbClient .ExpectRequest (fmt .Sprintf (readAllWorkflows , tenv .dbName , "" ), & sqltypes.Result {}, nil )
1344
+ tenv .tmc .setVReplicationExecResults (tt .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
1337
1345
for i , sourceShard := range streams {
1338
1346
var err error
1339
1347
if i == len (streams )- 1 {
@@ -1484,6 +1492,7 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
1484
1492
fmt .Sprintf (deleteWorkflow , targetKs , wf ),
1485
1493
& sqltypes.Result {RowsAffected : 1 },
1486
1494
)
1495
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
1487
1496
1488
1497
// Save the current target vschema.
1489
1498
vs , err := tenv .ts .GetVSchema (ctx , targetKs )
@@ -2112,6 +2121,7 @@ func TestMaterializerOneToOne(t *testing.T) {
2112
2121
fmt .Sprintf (` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"} rules:{match:"t4"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')` ,
2113
2122
wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName )
2114
2123
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {}, errShortCircuit )
2124
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "/.*union all.*union all.*" , & sqltypes.Result {})
2115
2125
2116
2126
err := ws .Materialize (ctx , ms )
2117
2127
targetTablet .vrdbClient .Wait ()
@@ -2198,6 +2208,7 @@ func TestMaterializerManyToOne(t *testing.T) {
2198
2208
} else {
2199
2209
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {InsertID : uint64 (vreplID )}, errShortCircuit )
2200
2210
}
2211
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "/.*union all.*" , & sqltypes.Result {})
2201
2212
}
2202
2213
2203
2214
err := ws .Materialize (ctx , ms )
@@ -2300,6 +2311,7 @@ func TestMaterializerOneToMany(t *testing.T) {
2300
2311
} else {
2301
2312
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {InsertID : uint64 (vreplID )}, errShortCircuit )
2302
2313
}
2314
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2303
2315
}
2304
2316
2305
2317
err = ws .Materialize (ctx , ms )
@@ -2408,6 +2420,7 @@ func TestMaterializerManyToMany(t *testing.T) {
2408
2420
fmt .Sprintf ("%d|%s" , vreplID , bls ),
2409
2421
), nil )
2410
2422
}
2423
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2411
2424
}
2412
2425
}
2413
2426
@@ -2516,6 +2529,7 @@ func TestMaterializerMulticolumnVindex(t *testing.T) {
2516
2529
} else {
2517
2530
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {InsertID : uint64 (vreplID )}, errShortCircuit )
2518
2531
}
2532
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2519
2533
}
2520
2534
2521
2535
err = ws .Materialize (ctx , ms )
@@ -2587,6 +2601,7 @@ func TestMaterializerDeploySchema(t *testing.T) {
2587
2601
fmt .Sprintf (` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')` ,
2588
2602
wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName )
2589
2603
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {}, errShortCircuit )
2604
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2590
2605
2591
2606
err := ws .Materialize (ctx , ms )
2592
2607
targetTablet .vrdbClient .Wait ()
@@ -2657,6 +2672,7 @@ func TestMaterializerCopySchema(t *testing.T) {
2657
2672
fmt .Sprintf (` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}}', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')` ,
2658
2673
wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName )
2659
2674
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {}, errShortCircuit )
2675
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "/.*union all.*" , & sqltypes.Result {})
2660
2676
2661
2677
err := ws .Materialize (ctx , ms )
2662
2678
targetTablet .vrdbClient .Wait ()
@@ -2763,6 +2779,7 @@ func TestMaterializerExplicitColumns(t *testing.T) {
2763
2779
} else {
2764
2780
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {InsertID : uint64 (vreplID )}, errShortCircuit )
2765
2781
}
2782
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2766
2783
}
2767
2784
2768
2785
err = ws .Materialize (ctx , ms )
@@ -2870,6 +2887,7 @@ func TestMaterializerRenamedColumns(t *testing.T) {
2870
2887
} else {
2871
2888
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {InsertID : uint64 (vreplID )}, errShortCircuit )
2872
2889
}
2890
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
2873
2891
}
2874
2892
2875
2893
err = ws .Materialize (ctx , ms )
@@ -2932,6 +2950,7 @@ func TestMaterializerStopAfterCopy(t *testing.T) {
2932
2950
fmt .Sprintf (` values ('%s', 'keyspace:"%s" shard:"%s" filter:{rules:{match:"t1" filter:"select * from t1"} rules:{match:"t2" filter:"select * from t3"}} stop_after_copy:true', '', 0, 0, '%s', 'primary,rdonly', now(), 0, 'Stopped', '%s', 0, 0, 0, '{}')` ,
2933
2951
wf , sourceKs , shard , tenv .cells [0 ], tenv .dbName )
2934
2952
targetTablet .vrdbClient .ExpectRequest (insert , & sqltypes.Result {}, errShortCircuit )
2953
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "/.*union all.*" , & sqltypes.Result {})
2935
2954
2936
2955
err := ws .Materialize (ctx , ms )
2937
2956
targetTablet .vrdbClient .Wait ()
@@ -3386,6 +3405,7 @@ func TestMaterializerNoGoodVindex(t *testing.T) {
3386
3405
targetTablet := targetShards [targetShard ]
3387
3406
addInvariants (targetTablet .vrdbClient , vreplID , sourceTabletUID , position , wf , tenv .cells [0 ])
3388
3407
targetTablet .vrdbClient .ExpectRequest (fmt .Sprintf (readAllWorkflows , tenv .dbName , "" ), & sqltypes.Result {}, nil )
3408
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
3389
3409
errs = append (errs , errNoVindex )
3390
3410
}
3391
3411
@@ -3461,6 +3481,7 @@ func TestMaterializerComplexVindexExpression(t *testing.T) {
3461
3481
targetTablet := targetShards [targetShard ]
3462
3482
addInvariants (targetTablet .vrdbClient , vreplID , sourceTabletUID , position , wf , tenv .cells [0 ])
3463
3483
targetTablet .vrdbClient .ExpectRequest (fmt .Sprintf (readAllWorkflows , tenv .dbName , "" ), & sqltypes.Result {}, nil )
3484
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
3464
3485
errs = append (errs , errNoVindex )
3465
3486
}
3466
3487
@@ -3536,6 +3557,7 @@ func TestMaterializerNoVindexInExpression(t *testing.T) {
3536
3557
targetTablet := targetShards [targetShard ]
3537
3558
addInvariants (targetTablet .vrdbClient , vreplID , sourceTabletUID , position , wf , tenv .cells [0 ])
3538
3559
targetTablet .vrdbClient .ExpectRequest (fmt .Sprintf (readAllWorkflows , tenv .dbName , "" ), & sqltypes.Result {}, nil )
3560
+ tenv .tmc .setVReplicationExecResults (targetTablet .tablet , "(select 't1' from t1 limit 1)" , & sqltypes.Result {})
3539
3561
errs = append (errs , errNoVindex )
3540
3562
}
3541
3563
0 commit comments