Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Jan 24, 2024
1 parent 3f9ab25 commit d994497
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 29 deletions.
36 changes: 28 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -168,6 +169,9 @@ type txThrottlerStateImpl struct {

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

shardMaxLag int64
endChannel chan bool
}

// NewTxThrottler tries to construct a txThrottler from the relevant
Expand Down Expand Up @@ -285,6 +289,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
endChannel: make(chan bool),
}

// get cells from topo if none defined in tabletenv config
Expand All @@ -299,6 +304,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
go state.updateMaxShardLag()

return state, nil
}
Expand Down Expand Up @@ -358,17 +364,31 @@ func (ts *txThrottlerStateImpl) throttle() bool {
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()

var maxLag uint32
maxLag := atomic.LoadInt64(&ts.shardMaxLag)

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec
}

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
func (ts *txThrottlerStateImpl) updateMaxShardLag() {
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
for {
select {
case _ = <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag))
case _ = <-ts.endChannel:
break
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
int64(maxLag) > ts.config.TxThrottlerConfig.TargetReplicationLagSec
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down
38 changes: 17 additions & 21 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -86,10 +87,7 @@ func TestEnabledThrottler(t *testing.T) {
call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
calls = append(calls, call)

// No underlying throttling & lag present
call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)
// 1
call = mockThrottler.EXPECT().Throttle(0)
call.Return(0 * time.Second)
calls = append(calls, call)
Expand All @@ -104,26 +102,17 @@ func TestEnabledThrottler(t *testing.T) {
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
calls = append(calls, call)

// Underlying throttling & lag present
call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)
// 2
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// Underlying throttling & lag present
call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(20))
calls = append(calls, call)
// 3
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)

// Underlying throttling & no lag present
call = mockThrottler.EXPECT().LastMaxLagNotIgnoredForTabletType(topodatapb.TabletType_REPLICA)
call.Return(uint32(1))
calls = append(calls, call)
// 4
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)
Expand All @@ -150,11 +139,17 @@ func TestEnabledThrottler(t *testing.T) {
})

assert.Nil(t, throttlerImpl.Open())
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl)
assert.True(t, ok)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())

// No underlying throttling & lag present - Don't throttle despite priority
// Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerStateImpl.endChannel <- true

// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerStateImpl.shardMaxLag, 20)
assert.False(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"])
Expand All @@ -173,17 +168,18 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// Underlying throttling & lag present - Throttle due to priority
// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
assert.True(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])

// Underlying throttling & lag present - Do not throttle due to priority
// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
assert.False(t, throttlerImpl.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])

// Underlying throttling & no lag present - Do no throttle despite priority
// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerStateImpl.shardMaxLag, 1)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])
Expand Down

0 comments on commit d994497

Please sign in to comment.