diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c3f3e4e6557..6a372b3fd09 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -58,7 +58,7 @@ var ( targetKsOpts = make(map[string]string) httpClient = throttlebase.SetupHTTPClient(time.Second) sourceThrottlerAppName = throttlerapp.VStreamerName - targetThrottlerAppName = throttlerapp.VReplicationName + targetThrottlerAppName = throttlerapp.VPlayerName ) const ( @@ -1228,18 +1228,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 5) // Confirm that we updated the stats on the target tablets as expected. - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2} - vstreamerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vstreamer`, workflow)).Int() - require.Greater(t, vstreamerThrottledCount, int64(0)) - // We only need to do this stat check once. - val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) - require.NoError(t, err) - throttledCount, err := strconv.ParseInt(val, 10, 64) - require.NoError(t, err) - require.GreaterOrEqual(t, throttledCount, vstreamerThrottledCount) + confirmVReplicationThrottling(t, tab, sourceKs, workflow, sourceThrottlerAppName) } }) t.Run("unthrottle-app-product", func(t *testing.T) { @@ -1274,12 +1263,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { for _, tab := range customerTablets { waitForRowCountInTablet(t, tab, keyspace, workflow, 8) // Confirm that we updated the stats on the target tablets as expected. - jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) - require.NoError(t, err) - require.NotEqual(t, "{}", jsVal) - // The JSON value now looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} - vplayerThrottledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.vplayer`, workflow)).Int() - require.Greater(t, vplayerThrottledCount, int64(0)) + confirmVReplicationThrottling(t, tab, sourceKs, workflow, targetThrottlerAppName) } }) t.Run("unthrottle-app-customer", func(t *testing.T) { @@ -1709,3 +1693,52 @@ func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, e func releaseInnoDBRowHistory(t *testing.T, dbConn *mysql.Conn) { execQuery(t, dbConn, "rollback") } + +// confirmVReplicationThrottling confirms that the throttling related metrics reflect that +// the workflow is being throttled as expected, via the expected app name, and that this +// is impacting the lag as expected. +// The tablet passed should be a target tablet for the given workflow while the keyspace +// name provided should be the source keyspace as the target tablet stats note the stream's +// source keyspace and shard. +func confirmVReplicationThrottling(t *testing.T, tab *cluster.VttabletProcess, keyspace, workflow string, appname throttlerapp.Name) { + const ( + sleepTime = 5 * time.Second + zv = int64(0) + ) + time.Sleep(sleepTime) // To be sure that we accrue some lag + + jsVal, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCounts"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"cproduct.4.tablet.vstreamer": 2, "cproduct.4.tablet.vplayer": 4} + throttledCount := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.tablet\.%s`, workflow, appname)).Int() + require.Greater(t, throttledCount, zv, "JSON value: %s", jsVal) + + val, err := getDebugVar(t, tab.Port, []string{"VReplicationThrottledCountTotal"}) + require.NoError(t, err) + require.NotEqual(t, "", val) + throttledCountTotal, err := strconv.ParseInt(val, 10, 64) + require.NoError(t, err) + require.GreaterOrEqual(t, throttledCountTotal, throttledCount, "Value: %s", val) + + // We do not calculate replication lag for the vcopier as it's not replicating + // events. + if appname != throttlerapp.VCopierName { + jsVal, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSeconds"}) + require.NoError(t, err) + require.NotEqual(t, "{}", jsVal) + // The JSON value looks like this: {"product.0.cproduct.4": 6} + vreplLagSeconds := gjson.Get(jsVal, fmt.Sprintf(`%s\.*\.%s\.*`, keyspace, workflow)).Int() + require.NoError(t, err) + // Take off 1 second to deal with timing issues in the test. + minLagSecs := int64(int64(sleepTime.Seconds()) - 1) + require.GreaterOrEqual(t, vreplLagSeconds, minLagSecs, "JSON value: %s", jsVal) + + val, err = getDebugVar(t, tab.Port, []string{"VReplicationLagSecondsMax"}) + require.NoError(t, err) + require.NotEqual(t, "", val) + vreplLagSecondsMax, err := strconv.ParseInt(val, 10, 64) + require.NoError(t, err) + require.GreaterOrEqual(t, vreplLagSecondsMax, vreplLagSeconds, "Value: %s", val) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 2b8b8130f89..b1a84dd90b1 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -476,12 +476,18 @@ func (vp *vplayer) recordHeartbeat() error { func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { defer vp.vr.dbClient.Rollback() + estimateLag := func() { + behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs + vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9) + vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second) + } + // If we're not running, set ReplicationLagSeconds to be very high. // TODO(sougou): if we also stored the time of the last event, we // can estimate this value more accurately. defer vp.vr.stats.ReplicationLagSeconds.Store(math.MaxInt64) defer vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), math.MaxInt64) - var sbm int64 = -1 + var lagSecs int64 for { if ctx.Err() != nil { return ctx.Err() @@ -489,6 +495,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // Check throttler. if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) + estimateLag() continue } @@ -496,13 +503,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err != nil { return err } - // No events were received. This likely means that there's a network partition. - // So, we should assume we're falling behind. - if len(items) == 0 { - behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs - vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9) - vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second) - } + // Empty transactions are saved at most once every idleTimeout. // This covers two situations: // 1. Fetch was idle for idleTimeout. @@ -520,12 +521,20 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } } + lagSecs = -1 for i, events := range items { for j, event := range events { if event.Timestamp != 0 { - vp.lastTimestampNs = event.Timestamp * 1e9 - vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime - sbm = event.CurrentTime/1e9 - event.Timestamp + // If the event is a heartbeat sent while throttled then do not update + // the lag based on it. + // If the batch consists only of throttled heartbeat events then we cannot + // determine the actual lag, as the vstreamer is fully throttled, and we + // will estimate it after processing the batch. + if !(event.Type == binlogdatapb.VEventType_HEARTBEAT && event.Throttled) { + vp.lastTimestampNs = event.Timestamp * 1e9 + vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime + lagSecs = event.CurrentTime/1e9 - event.Timestamp + } } mustSave := false switch event.Type { @@ -566,11 +575,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } } - if sbm >= 0 { - vp.vr.stats.ReplicationLagSeconds.Store(sbm) - vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second) + if lagSecs >= 0 { + vp.vr.stats.ReplicationLagSeconds.Store(lagSecs) + vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(lagSecs)*time.Second) + } else { // We couldn't determine the lag, so we need to estimate it + estimateLag() } - } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 634c9a5d40c..824f79e20f1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -31,7 +31,6 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/binlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" @@ -288,11 +287,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog defer hbTimer.Stop() injectHeartbeat := func(throttled bool, throttledReason string) error { - now := time.Now().UnixNano() select { case <-ctx.Done(): return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") default: + now := time.Now().UnixNano() err := bufferAndTransmit(&binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_HEARTBEAT, Timestamp: now / 1e9, @@ -305,24 +304,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } logger := logutil.NewThrottledLogger(vs.vse.GetTabletInfo(), throttledLoggerInterval) + wfNameLog := "" + if vs.filter != nil && vs.filter.WorkflowName != "" { + wfNameLog = fmt.Sprintf(" in workflow %s", vs.filter.WorkflowName) + } throttleEvents := func(throttledEvents chan mysql.BinlogEvent) { - throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) - defer throttledHeartbeatsRateLimiter.Stop() for { - // check throttler. + // Check throttler. if checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp); !ok { - // make sure to leave if context is cancelled + // Make sure to leave if context is cancelled. select { case <-ctx.Done(): return default: - // do nothing special + // Do nothing special. } - throttledHeartbeatsRateLimiter.Do(func() error { - return injectHeartbeat(true, checkResult.Summary()) - }) - // we won't process events, until we're no longer throttling - logger.Infof("throttled.") + logger.Infof("vstreamer throttled%s: %s.", wfNameLog, checkResult.Summary()) continue } select { @@ -394,7 +391,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case <-ctx.Done(): return nil case <-hbTimer.C: - if err := injectHeartbeat(false, ""); err != nil { + checkResult, ok := vs.vse.throttlerClient.ThrottleCheckOK(ctx, vs.throttlerApp) + if err := injectHeartbeat(!ok, checkResult.Summary()); err != nil { if err == io.EOF { return nil }