@@ -1294,85 +1294,86 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply
1294
1294
}
1295
1295
1296
1296
// parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry.
1297
- func parseBinlogEntryTimestamp (logEntry string ) (found bool , t time.Time , err error ) {
1297
+ func parseBinlogEntryTimestamp (logEntry string ) (t time.Time , err error ) {
1298
1298
if len (logEntry ) == 0 {
1299
- return false , t , nil
1299
+ return t , nil
1300
1300
}
1301
1301
if logEntry [0 ] != '#' {
1302
- return false , t , nil
1302
+ return t , nil
1303
1303
}
1304
1304
if submatch := binlogEntryCommittedTimestampRegex .FindStringSubmatch (logEntry ); submatch != nil {
1305
1305
// MySQL 8.0
1306
1306
binlogEntryCommittedTimestamp := submatch [1 ]
1307
1307
unixMicros , err := strconv .ParseInt (binlogEntryCommittedTimestamp , 10 , 64 )
1308
1308
if err != nil {
1309
- return false , t , err
1309
+ return t , err
1310
1310
}
1311
- return true , time .UnixMicro (unixMicros ), nil
1311
+ return time .UnixMicro (unixMicros ), nil
1312
1312
}
1313
1313
if submatch := binlogEntryTimestampGTIDRegexp .FindStringSubmatch (logEntry ); submatch != nil {
1314
1314
// MySQL 5.7
1315
1315
t , err = ParseBinlogTimestamp (submatch [1 ])
1316
1316
if err != nil {
1317
- return false , t , err
1317
+ return t , err
1318
1318
}
1319
- return true , t , nil
1319
+ return t , nil
1320
1320
}
1321
- return false , t , nil
1321
+ return t , nil
1322
1322
}
1323
1323
1324
1324
// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
1325
- // either looks for the first such timestamp or the last.
1326
- func (mysqld * Mysqld ) scanBinlogTimestamp (mysqlbinlogDir string , mysqlbinlogEnv []string , mysqlbinlogName string , binlogFile string , stopAtFirst bool ) (matchedTime time.Time , matchFound bool , err error ) {
1325
+ // looks for the first and last timestamps.
1326
+ func (mysqld * Mysqld ) scanBinlogTimestamp (
1327
+ mysqlbinlogDir string ,
1328
+ mysqlbinlogEnv []string ,
1329
+ mysqlbinlogName string ,
1330
+ binlogFile string ,
1331
+ stopAtFirst bool , // unused at this moment, to be used as an optimization hint
1332
+ ) (
1333
+ firstMatchedTime time.Time ,
1334
+ lastMatchedTime time.Time ,
1335
+ err error ,
1336
+ ) {
1327
1337
args := []string {binlogFile }
1328
1338
mysqlbinlogCmd := exec .Command (mysqlbinlogName , args ... )
1329
1339
mysqlbinlogCmd .Dir = mysqlbinlogDir
1330
1340
mysqlbinlogCmd .Env = mysqlbinlogEnv
1331
1341
log .Infof ("ApplyBinlogFile: running mysqlbinlog command: %#v" , mysqlbinlogCmd )
1332
1342
pipe , err := mysqlbinlogCmd .StdoutPipe () // to be piped into mysql
1333
1343
if err != nil {
1334
- return matchedTime , false , err
1335
- }
1336
- scanComplete := make (chan error )
1337
- intentionalKill := false
1338
- scan := func () {
1339
- defer close (scanComplete )
1340
- defer func () {
1341
- intentionalKill = true
1342
- mysqlbinlogCmd .Process .Kill () // ensures the binlog file is released
1343
- }()
1344
+ return firstMatchedTime , lastMatchedTime , err
1345
+ }
1346
+ scan := func () error {
1344
1347
// Read line by line and process it
1345
1348
scanner := bufio .NewScanner (pipe )
1346
1349
for scanner .Scan () {
1347
1350
logEntry := scanner .Text ()
1348
1351
1349
- found , t , err := parseBinlogEntryTimestamp (logEntry )
1352
+ t , err := parseBinlogEntryTimestamp (logEntry )
1350
1353
if err != nil {
1351
- scanComplete <- err
1352
- return
1354
+ return err
1353
1355
}
1354
- if found {
1355
- matchedTime = t
1356
- matchFound = true
1356
+ if t .IsZero () {
1357
+ continue
1357
1358
}
1358
- if found && stopAtFirst {
1359
- // Found the first timestamp and it's all we need. We won't scan any further and so we should also
1360
- // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
1361
- return
1359
+ if firstMatchedTime .IsZero () {
1360
+ firstMatchedTime = t
1362
1361
}
1362
+ lastMatchedTime = t
1363
1363
}
1364
+ return nil
1364
1365
}
1365
- if err := mysqlbinlogCmd .Start (); err != nil {
1366
- return matchedTime , false , err
1366
+ if err := mysqlbinlogCmd .Start (); err != nil { // Start() is nonblockig
1367
+ return firstMatchedTime , lastMatchedTime , err
1367
1368
}
1368
- go scan ()
1369
- if err := mysqlbinlogCmd . Wait (); err != nil && ! intentionalKill {
1370
- return matchedTime , false , vterrors .Wrapf (err , "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps" )
1369
+ defer mysqlbinlogCmd . Process . Kill ()
1370
+ if err := scan (); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait()
1371
+ return firstMatchedTime , lastMatchedTime , vterrors .Wrapf (err , "scanning mysqlbinlog output in ReadBinlogFilesTimestamps" )
1371
1372
}
1372
- if err := <- scanComplete ; err != nil {
1373
- return matchedTime , false , vterrors .Wrapf (err , "scanning mysqlbinlog output in ReadBinlogFilesTimestamps " )
1373
+ if err := mysqlbinlogCmd . Wait () ; err != nil {
1374
+ return firstMatchedTime , lastMatchedTime , vterrors .Wrapf (err , "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps" )
1374
1375
}
1375
- return matchedTime , matchFound , nil
1376
+ return firstMatchedTime , lastMatchedTime , nil
1376
1377
}
1377
1378
1378
1379
// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
@@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
1402
1403
return nil , err
1403
1404
}
1404
1405
1406
+ lastMatchedTimeMap := map [string ]time.Time {} // a simple cache to avoid rescanning same files. Key=binlog file name
1407
+
1405
1408
resp := & mysqlctlpb.ReadBinlogFilesTimestampsResponse {}
1406
1409
// Find first timestamp
1407
- for _ , binlogFile := range req .BinlogFileNames {
1408
- t , found , err := mysqld .scanBinlogTimestamp (dir , env , mysqlbinlogName , binlogFile , true )
1409
- if err != nil {
1410
- return nil , err
1411
- }
1412
- if found {
1413
- resp .FirstTimestamp = protoutil .TimeToProto (t )
1410
+ err = func () error {
1411
+ for _ , binlogFile := range req .BinlogFileNames {
1412
+ firstMatchedTime , lastMatchedTime , err := mysqld .scanBinlogTimestamp (dir , env , mysqlbinlogName , binlogFile , true )
1413
+ if err != nil {
1414
+ return vterrors .Wrapf (err , "while scanning for first binlog timestamp in %v" , binlogFile )
1415
+ }
1416
+ if ! lastMatchedTime .IsZero () {
1417
+ // cache result
1418
+ lastMatchedTimeMap [binlogFile ] = lastMatchedTime
1419
+ }
1420
+ if firstMatchedTime .IsZero () {
1421
+ // Timestamp not found in this file.
1422
+ continue
1423
+ }
1424
+ resp .FirstTimestamp = protoutil .TimeToProto (firstMatchedTime )
1414
1425
resp .FirstTimestampBinlog = binlogFile
1415
- break
1426
+ return nil // early break
1416
1427
}
1428
+ return nil
1429
+ }()
1430
+ if err != nil {
1431
+ return resp , err
1417
1432
}
1418
1433
// Find last timestamp
1419
- for i := len (req .BinlogFileNames ) - 1 ; i >= 0 ; i -- {
1420
- binlogFile := req .BinlogFileNames [i ]
1421
- t , found , err := mysqld .scanBinlogTimestamp (dir , env , mysqlbinlogName , binlogFile , false )
1422
- if err != nil {
1423
- return nil , err
1424
- }
1425
- if found {
1426
- resp .LastTimestamp = protoutil .TimeToProto (t )
1434
+ err = func () error {
1435
+ for i := len (req .BinlogFileNames ) - 1 ; i >= 0 ; i -- {
1436
+ binlogFile := req .BinlogFileNames [i ]
1437
+
1438
+ // See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames,
1439
+ // which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp.
1440
+ lastMatchedTime , ok := lastMatchedTimeMap [binlogFile ]
1441
+ if ! ok {
1442
+ var err error
1443
+ _ , lastMatchedTime , err = mysqld .scanBinlogTimestamp (dir , env , mysqlbinlogName , binlogFile , false )
1444
+ if err != nil {
1445
+ return vterrors .Wrapf (err , "while scanning for last binlog timestamp in %v" , binlogFile )
1446
+ }
1447
+ }
1448
+ if lastMatchedTime .IsZero () {
1449
+ // Timestamp not found in this file.
1450
+ continue
1451
+ }
1452
+ resp .LastTimestamp = protoutil .TimeToProto (lastMatchedTime )
1427
1453
resp .LastTimestampBinlog = binlogFile
1428
- break
1454
+ return nil // early break
1429
1455
}
1456
+ return nil
1457
+ }()
1458
+ if err != nil {
1459
+ return resp , err
1430
1460
}
1431
1461
return resp , nil
1432
1462
}
0 commit comments