73
73
clusterInstance * cluster.LocalProcessCluster
74
74
shards []cluster.Shard
75
75
vtParams mysql.ConnParams
76
+ primary * cluster.Vttablet
76
77
77
78
normalWaitTime = 20 * time .Second
78
79
extendedWaitTime = 60 * time .Second
@@ -310,6 +311,7 @@ func testScheduler(t *testing.T) {
310
311
defer cluster .PanicHandler (t )
311
312
shards = clusterInstance .Keyspaces [0 ].Shards
312
313
require .Equal (t , 1 , len (shards ))
314
+ primary = shards [0 ].PrimaryTablet ()
313
315
314
316
ddlStrategy := "vitess"
315
317
@@ -334,7 +336,7 @@ func testScheduler(t *testing.T) {
334
336
}
335
337
}
336
338
337
- mysqlVersion := onlineddl .GetMySQLVersion (t , clusterInstance . Keyspaces [ 0 ]. Shards [ 0 ]. PrimaryTablet () )
339
+ mysqlVersion := onlineddl .GetMySQLVersion (t , primary )
338
340
require .NotEmpty (t , mysqlVersion )
339
341
capableOf := mysql .ServerVersionCapableOf (mysqlVersion )
340
342
@@ -586,7 +588,7 @@ func testScheduler(t *testing.T) {
586
588
commitTransactionChan := make (chan any )
587
589
transactionErrorChan := make (chan error )
588
590
t .Run ("locking table rows" , func (t * testing.T ) {
589
- go runInTransaction (t , ctx , shards [ 0 ]. Vttablets [ 0 ] , "select * from t1_test for update" , commitTransactionChan , transactionErrorChan )
591
+ go runInTransaction (t , ctx , primary , "select * from t1_test for update" , commitTransactionChan , transactionErrorChan )
590
592
})
591
593
t .Run ("check no force_cutover" , func (t * testing.T ) {
592
594
rs := onlineddl .ReadMigrations (t , & vtParams , t1uuid )
@@ -1035,6 +1037,22 @@ func testScheduler(t *testing.T) {
1035
1037
})
1036
1038
})
1037
1039
1040
+ readCleanupsTimetamps := func (t * testing.T , migrationsLike string ) (rows int64 , cleanedUp int64 , needCleanup int64 , artifacts []string ) {
1041
+ rs := onlineddl .ReadMigrations (t , & vtParams , migrationsLike )
1042
+ require .NotNil (t , rs )
1043
+ for _ , row := range rs .Named ().Rows {
1044
+ rows ++
1045
+ if row ["cleanup_timestamp" ].IsNull () {
1046
+ needCleanup ++
1047
+ } else {
1048
+ cleanedUp ++
1049
+ }
1050
+ migrationArtifacts := textutil .SplitDelimitedList (row .AsString ("artifacts" , "" ))
1051
+ artifacts = append (artifacts , migrationArtifacts ... )
1052
+ }
1053
+ return
1054
+ }
1055
+
1038
1056
t .Run ("Cleanup artifacts" , func (t * testing.T ) {
1039
1057
// Create a migration with a low --retain-artifacts value.
1040
1058
// We will cancel the migration and expect the artifact to be cleaned.
@@ -1071,14 +1089,14 @@ func testScheduler(t *testing.T) {
1071
1089
defer cancel ()
1072
1090
1073
1091
for {
1074
- rs := onlineddl .ReadMigrations (t , & vtParams , t1uuid )
1075
- require .NotNil (t , rs )
1076
- row := rs .Named ().Row ()
1077
- require .NotNil (t , row )
1078
- if ! row ["cleanup_timestamp" ].IsNull () {
1092
+ rows , cleanedUp , needCleanup , _ := readCleanupsTimetamps (t , t1uuid )
1093
+ assert .EqualValues (t , 1 , rows )
1094
+ if cleanedUp == 1 {
1079
1095
// This is what we've been waiting for
1080
1096
break
1081
1097
}
1098
+ assert .EqualValues (t , 0 , cleanedUp )
1099
+ assert .EqualValues (t , 1 , needCleanup )
1082
1100
select {
1083
1101
case <- ctx .Done ():
1084
1102
assert .Fail (t , "timeout waiting for cleanup" )
@@ -1092,6 +1110,108 @@ func testScheduler(t *testing.T) {
1092
1110
})
1093
1111
})
1094
1112
1113
+ t .Run ("cleanup artifacts with CLEANUP ALL" , func (t * testing.T ) {
1114
+ // First, cleanup any existing migrations. We don't have an exact track of how many we've had so far.
1115
+ t .Run ("initial cleanup all" , func (t * testing.T ) {
1116
+ t .Run ("validate migrations exist that need cleanup" , func (t * testing.T ) {
1117
+ _ , _ , needCleanup , _ := readCleanupsTimetamps (t , "%" )
1118
+ assert .Greater (t , needCleanup , int64 (1 ))
1119
+ })
1120
+ t .Run ("issue cleanup all" , func (t * testing.T ) {
1121
+ cleanedUp := onlineddl .CheckCleanupAllMigrations (t , & vtParams , - 1 )
1122
+ t .Logf ("marked %d migrations for cleanup" , cleanedUp )
1123
+ })
1124
+ t .Run ("wait for all migrations cleanup" , func (t * testing.T ) {
1125
+ ctx , cancel := context .WithTimeout (context .Background (), extendedWaitTime )
1126
+ defer cancel ()
1127
+
1128
+ for {
1129
+ rows , cleanedUp , needCleanup , artifacts := readCleanupsTimetamps (t , "%" )
1130
+ if needCleanup == 0 {
1131
+ // This is what we've been waiting for
1132
+ assert .NotZero (t , rows )
1133
+ assert .Equal (t , rows , cleanedUp )
1134
+ assert .Empty (t , artifacts )
1135
+ t .Logf ("rows needing cleanup: %v" , needCleanup )
1136
+ return
1137
+ }
1138
+ select {
1139
+ case <- ctx .Done ():
1140
+ assert .Fail (t , "timeout waiting for cleanup" , "rows needing cleanup: %v. artifacts: %v" , needCleanup , artifacts )
1141
+ return
1142
+ case <- time .After (time .Second ):
1143
+ }
1144
+ t .Logf ("rows needing cleanup: %v. artifacts: %v" , needCleanup , artifacts )
1145
+ }
1146
+ })
1147
+ })
1148
+ // Create a migration with a low --retain-artifacts value.
1149
+ // We will cancel the migration and expect the artifact to be cleaned.
1150
+ t .Run ("start migration" , func (t * testing.T ) {
1151
+ // Intentionally set `--retain-artifacts=1h` which is a long time. Then we will issue
1152
+ // `ALTER VITESS_MIGRATION CLEANUP ALL` and expect the artifact to be cleaned.
1153
+ t1uuid = testOnlineDDLStatement (t , createParams (trivialAlterT1Statement , ddlStrategy + " --postpone-completion --retain-artifacts=1h" , "vtctl" , "" , "" , true )) // skip wait
1154
+ onlineddl .WaitForMigrationStatus (t , & vtParams , shards , t1uuid , normalWaitTime , schema .OnlineDDLStatusRunning )
1155
+ })
1156
+ t .Run ("wait for ready_to_complete" , func (t * testing.T ) {
1157
+ waitForReadyToComplete (t , t1uuid , true )
1158
+ })
1159
+ var artifacts []string
1160
+ t .Run ("validate artifact exists" , func (t * testing.T ) {
1161
+ rs := onlineddl .ReadMigrations (t , & vtParams , t1uuid )
1162
+ require .NotNil (t , rs )
1163
+ row := rs .Named ().Row ()
1164
+ require .NotNil (t , row )
1165
+
1166
+ artifacts = textutil .SplitDelimitedList (row .AsString ("artifacts" , "" ))
1167
+ assert .Len (t , artifacts , 1 )
1168
+ checkTable (t , artifacts [0 ], true )
1169
+
1170
+ retainArtifactsSeconds := row .AsInt64 ("retain_artifacts_seconds" , 0 )
1171
+ assert .EqualValues (t , 3600 , retainArtifactsSeconds ) // due to --retain-artifacts=1h
1172
+ })
1173
+ t .Run ("check needs cleanup" , func (t * testing.T ) {
1174
+ _ , _ , needCleanup , _ := readCleanupsTimetamps (t , "%" )
1175
+ assert .EqualValues (t , 1 , needCleanup )
1176
+ })
1177
+ t .Run ("complete migration" , func (t * testing.T ) {
1178
+ onlineddl .CheckCompleteMigration (t , & vtParams , shards , t1uuid , true )
1179
+ status := onlineddl .WaitForMigrationStatus (t , & vtParams , shards , t1uuid , normalWaitTime , schema .OnlineDDLStatusComplete , schema .OnlineDDLStatusFailed , schema .OnlineDDLStatusCancelled )
1180
+ fmt .Printf ("# Migration status (for debug purposes): <%s>\n " , status )
1181
+ onlineddl .CheckMigrationStatus (t , & vtParams , shards , t1uuid , schema .OnlineDDLStatusComplete )
1182
+ })
1183
+ t .Run ("cleanup all" , func (t * testing.T ) {
1184
+ onlineddl .CheckCleanupAllMigrations (t , & vtParams , 1 )
1185
+ })
1186
+ t .Run ("wait for migration cleanup" , func (t * testing.T ) {
1187
+ ctx , cancel := context .WithTimeout (context .Background (), extendedWaitTime )
1188
+ defer cancel ()
1189
+
1190
+ for {
1191
+ rows , cleanedUp , needCleanup , artifacts := readCleanupsTimetamps (t , "%" )
1192
+ if needCleanup == 0 {
1193
+ // This is what we've been waiting for
1194
+ assert .NotZero (t , rows )
1195
+ assert .Equal (t , rows , cleanedUp )
1196
+ assert .Empty (t , artifacts )
1197
+ t .Logf ("rows needing cleanup: %v" , needCleanup )
1198
+ return
1199
+ }
1200
+ select {
1201
+ case <- ctx .Done ():
1202
+ assert .Fail (t , "timeout waiting for cleanup" , "rows needing cleanup: %v. artifacts: %v" , needCleanup , artifacts )
1203
+ return
1204
+ case <- time .After (time .Second ):
1205
+ }
1206
+ t .Logf ("rows needing cleanup: %v. artifacts: %v" , needCleanup , artifacts )
1207
+ }
1208
+ })
1209
+
1210
+ t .Run ("validate artifact does not exist" , func (t * testing.T ) {
1211
+ checkTable (t , artifacts [0 ], false )
1212
+ })
1213
+ })
1214
+
1095
1215
checkConstraintCapable , err := capableOf (capabilities .CheckConstraintsCapability ) // 8.0.16 and above
1096
1216
require .NoError (t , err )
1097
1217
if checkConstraintCapable {
@@ -1105,7 +1225,7 @@ func testScheduler(t *testing.T) {
1105
1225
// name it `with_constraint_chk_1`. But we expect Online DDL to explicitly
1106
1226
// modify the constraint name, specifically to get rid of the <table-name> prefix,
1107
1227
// so that we don't get into https://bugs.mysql.com/bug.php?id=107772 situation.
1108
- createStatement := getCreateTableStatement (t , shards [ 0 ]. Vttablets [ 0 ] , "with_constraint" )
1228
+ createStatement := getCreateTableStatement (t , primary , "with_constraint" )
1109
1229
assert .NotContains (t , createStatement , "with_constraint_chk" )
1110
1230
})
1111
1231
})
@@ -2433,7 +2553,7 @@ func testForeignKeys(t *testing.T) {
2433
2553
//
2434
2554
// In this stress test, we enable Online DDL if the variable 'rename_table_preserve_foreign_key' is present. The Online DDL mechanism will in turn
2435
2555
// query for this variable, and manipulate it, when starting the migration and when cutting over.
2436
- rs , err := shards [ 0 ]. Vttablets [ 0 ] .VttabletProcess .QueryTablet ("show global variables like 'rename_table_preserve_foreign_key'" , keyspaceName , false )
2556
+ rs , err := primary .VttabletProcess .QueryTablet ("show global variables like 'rename_table_preserve_foreign_key'" , keyspaceName , false )
2437
2557
require .NoError (t , err )
2438
2558
fkOnlineDDLPossible = len (rs .Rows ) > 0
2439
2559
t .Logf ("MySQL support for 'rename_table_preserve_foreign_key': %v" , fkOnlineDDLPossible )
0 commit comments