From d994497471a2a877323250fd2eddf6162967a64d Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:47:50 +0100 Subject: [PATCH] Address PR comments Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com> --- .../tabletserver/txthrottler/tx_throttler.go | 36 ++++++++++++++---- .../txthrottler/tx_throttler_test.go | 38 +++++++++---------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index d5e266a5b4d..d8089b36324 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -22,6 +22,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "time" "vitess.io/vitess/go/stats" @@ -168,6 +169,9 @@ type txThrottlerStateImpl struct { // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool + + shardMaxLag int64 + endChannel chan bool } // NewTxThrottler tries to construct a txThrottler from the relevant @@ -285,6 +289,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes: tabletTypes, throttler: t, txThrottler: txThrottler, + endChannel: make(chan bool), } // get cells from topo if none defined in tabletenv config @@ -299,6 +304,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi state.stopHealthCheck = cancel state.initHealthCheckStream(txThrottler.topoServer, target) go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + go state.updateMaxShardLag() return state, nil } @@ -358,17 +364,31 @@ func (ts *txThrottlerStateImpl) throttle() bool { ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - var maxLag uint32 + maxLag := atomic.LoadInt64(&ts.shardMaxLag) + + return ts.throttler.Throttle(0 /* threadId */) > 0 && + maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec +} - for tabletType := range ts.tabletTypes { - maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType) - if maxLagPerTabletType > maxLag { - maxLag = maxLagPerTabletType +func (ts *txThrottlerStateImpl) updateMaxShardLag() { + // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value + ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) + for { + select { + case _ = <-ticker.C: + var maxLag uint32 + + for tabletType := range ts.tabletTypes { + maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType) + if maxLagPerTabletType > maxLag { + maxLag = maxLagPerTabletType + } + } + atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag)) + case _ = <-ts.endChannel: + break } } - - return ts.throttler.Throttle(0 /* threadId */) > 0 && - int64(maxLag) > ts.config.TxThrottlerConfig.TargetReplicationLagSec } func (ts *txThrottlerStateImpl) deallocateResources() { diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index bdf6632c234..3e465d9ea81 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,7 @@ package txthrottler import ( "context" + "sync/atomic" "testing" "time" @@ -86,10 +87,7 @@ func TestEnabledThrottler(t *testing.T) { call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) calls = append(calls, call) - // No underlying throttling & lag present - call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA) - call.Return(uint32(20)) - calls = append(calls, call) + // 1 call = mockThrottler.EXPECT().Throttle(0) call.Return(0 * time.Second) calls = append(calls, call) @@ -104,26 +102,17 @@ func TestEnabledThrottler(t *testing.T) { call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) calls = append(calls, call) - // Underlying throttling & lag present - call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA) - call.Return(uint32(20)) - calls = append(calls, call) + // 2 call = mockThrottler.EXPECT().Throttle(0) call.Return(1 * time.Second) calls = append(calls, call) - // Underlying throttling & lag present - call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA) - call.Return(uint32(20)) - calls = append(calls, call) + // 3 call = mockThrottler.EXPECT().Throttle(0) call.Return(1 * time.Second) calls = append(calls, call) - // Underlying throttling & no lag present - call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA) - call.Return(uint32(1)) - calls = append(calls, call) + // 4 call = mockThrottler.EXPECT().Throttle(0) call.Return(1 * time.Second) calls = append(calls, call) @@ -150,11 +139,17 @@ func TestEnabledThrottler(t *testing.T) { }) assert.Nil(t, throttlerImpl.Open()) - throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) + throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl) + assert.True(t, ok) assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - // No underlying throttling & lag present - Don't throttle despite priority + // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // way that will interfere with how we manipulate that value in our tests to evaluate different cases: + throttlerStateImpl.endChannel <- true + + // 1 should not throttle due to return value of underlying Throttle(), despite high lag + atomic.StoreInt64(&throttlerStateImpl.shardMaxLag, 20) assert.False(t, throttlerImpl.Throttle(100, "some-workload")) assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"]) assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"]) @@ -173,17 +168,18 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) - // Underlying throttling & lag present - Throttle due to priority + // 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100 assert.True(t, throttlerImpl.Throttle(100, "some-workload")) assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) - // Underlying throttling & lag present - Do not throttle due to priority + // 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0 assert.False(t, throttlerImpl.Throttle(0, "some-workload")) assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"]) - // Underlying throttling & no lag present - Do no throttle despite priority + // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag + atomic.StoreInt64(&throttlerStateImpl.shardMaxLag, 1) assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])