@@ -649,38 +649,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
649
649
workflow .WorkflowSubType = res .WorkflowSubType .String ()
650
650
workflow .DeferSecondaryKeys = res .DeferSecondaryKeys
651
651
652
- // MaxVReplicationTransactionLag estimates the actual statement processing lag
653
- // between the source and the target. If we are still processing source events it
654
- // is the difference b/w current time and the timestamp of the last event. If
655
- // heartbeats are more recent than the last event, then the lag is the time since
656
- // the last heartbeat as there can be an actual event immediately after the
657
- // heartbeat, but which has not yet been processed on the target.
658
- // We don't allow switching during the copy phase, so in that case we just return
659
- // a large lag. All timestamps are in seconds since epoch.
652
+ // MaxVReplicationTransactionLag estimates the max statement processing lag
653
+ // between the source and the target across all of the workflow streams.
660
654
if _ , ok := maxVReplicationTransactionLagByWorkflow [workflow .Name ]; ! ok {
661
655
maxVReplicationTransactionLagByWorkflow [workflow .Name ] = 0
662
656
}
663
- if rstream .TransactionTimestamp == nil {
664
- rstream .TransactionTimestamp = & vttimepb.Time {}
665
- }
666
- lastTransactionTime := rstream .TransactionTimestamp .Seconds
667
- if rstream .TimeHeartbeat == nil {
668
- rstream .TimeHeartbeat = & vttimepb.Time {}
669
- }
670
- lastHeartbeatTime := rstream .TimeHeartbeat .Seconds
671
- if stream .State == binlogdatapb .VReplicationWorkflowState_Copying .String () {
672
- maxVReplicationTransactionLagByWorkflow [workflow .Name ] = math .MaxInt64
673
- } else {
674
- if lastTransactionTime == 0 /* no new events after copy */ ||
675
- lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {
676
-
677
- lastTransactionTime = lastHeartbeatTime
678
- }
679
- now := time .Now ().Unix () /* seconds since epoch */
680
- transactionReplicationLag := float64 (now - lastTransactionTime )
681
- if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow [workflow .Name ] {
682
- maxVReplicationTransactionLagByWorkflow [workflow .Name ] = transactionReplicationLag
683
- }
657
+ transactionReplicationLag := getVReplicationTrxLag (rstream .TransactionTimestamp , rstream .TimeUpdated , rstream .TimeHeartbeat , rstream .State )
658
+ if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow [workflow .Name ] {
659
+ maxVReplicationTransactionLagByWorkflow [workflow .Name ] = transactionReplicationLag
684
660
}
685
661
}
686
662
@@ -4520,3 +4496,41 @@ func (s *Server) Logger() logutil.Logger {
4520
4496
}
4521
4497
return s .options .logger
4522
4498
}
4499
+
4500
+ // getVReplicationTrxLag estimates the actual statement processing lag between the
4501
+ // source and the target. If we are still processing source events it is the
4502
+ // difference between current time and the timestamp of the last event. If
4503
+ // heartbeats are more recent than the last event, then the lag is the time since
4504
+ // the last heartbeat as there can be an actual event immediately after the
4505
+ // heartbeat, but which has not yet been processed on the target. We don't allow
4506
+ // switching during the copy phase, so in that case we just return a large lag.
4507
+ // All timestamps are in seconds since epoch.
4508
+ func getVReplicationTrxLag (trxTs , updatedTs , heartbeatTs * vttimepb.Time , state binlogdatapb.VReplicationWorkflowState ) float64 {
4509
+ if state == binlogdatapb .VReplicationWorkflowState_Copying {
4510
+ return math .MaxInt64
4511
+ }
4512
+ if trxTs == nil {
4513
+ trxTs = & vttimepb.Time {}
4514
+ }
4515
+ lastTransactionTime := trxTs .Seconds
4516
+ if updatedTs == nil {
4517
+ updatedTs = & vttimepb.Time {}
4518
+ }
4519
+ lastUpdatedTime := updatedTs .Seconds
4520
+ if heartbeatTs == nil {
4521
+ heartbeatTs = & vttimepb.Time {}
4522
+ }
4523
+ lastHeartbeatTime := heartbeatTs .Seconds
4524
+ // We do NOT update the heartbeat timestamp when we are regularly updating the
4525
+ // position as we replicate transactions (GTIDs).
4526
+ // When we DO record a heartbeat, we set the updated time to the same value.
4527
+ // When recording that we are throttled, we update the updated time but NOT
4528
+ // the heartbeat time.
4529
+ if lastTransactionTime == 0 /* No replicated events after copy */ ||
4530
+ (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
4531
+ lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */ ) {
4532
+ lastTransactionTime = lastUpdatedTime
4533
+ }
4534
+ now := time .Now ().Unix () // Seconds since epoch
4535
+ return float64 (now - lastTransactionTime )
4536
+ }
0 commit comments