Skip to content

Commit 534b326

Browse files
authored
Slack vitess r14.0.5 dsdefense throttle only if lag 2 (#172)
TxThrottler only throttles if current lag is above threshold.
1 parent 6be0320 commit 534b326

File tree

5 files changed

+153
-40
lines changed

5 files changed

+153
-40
lines changed

go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
169169
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
170170
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
171171
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
172-
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
173172
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
174173
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
175174
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

go/vt/throttler/throttler.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"sync"
3434
"time"
3535

36+
"vitess.io/vitess/go/vt/proto/topodata"
37+
3638
"vitess.io/vitess/go/vt/discovery"
3739
"vitess.io/vitess/go/vt/log"
3840

@@ -224,6 +226,28 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
224226
return t.threadThrottlers[threadID].throttle(t.nowFunc())
225227
}
226228

229+
// MaxLag returns the max of all the last replication lag values seen across all tablets of
230+
// the provided type, excluding ignored tablets.
231+
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
232+
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)
233+
234+
var maxLag uint32
235+
cacheEntries := cache.entries
236+
237+
for key := range cacheEntries {
238+
if cache.isIgnored(key) {
239+
continue
240+
}
241+
242+
lag := cache.latest(key).Stats.ReplicationLagSeconds
243+
if lag > maxLag {
244+
maxLag = lag
245+
}
246+
}
247+
248+
return maxLag
249+
}
250+
227251
// ThreadFinished marks threadID as finished and redistributes the thread's
228252
// rate allotment across the other threads.
229253
// After ThreadFinished() is called, Throttle() must not be called anymore.

go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Lines changed: 36 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"math/rand"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
"google.golang.org/protobuf/encoding/prototext"
@@ -86,6 +87,7 @@ type ThrottlerInterface interface {
8687
GetConfiguration() *throttlerdatapb.Configuration
8788
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
8889
ResetConfiguration()
90+
MaxLag(tabletType topodatapb.TabletType) uint32
8991
}
9092

9193
// TopologyWatcherInterface defines the public interface that is implemented by
@@ -184,6 +186,10 @@ type txThrottlerStateImpl struct {
184186

185187
healthCheck discovery.LegacyHealthCheck
186188
topologyWatchers []TopologyWatcherInterface
189+
190+
maxLag int64
191+
done chan bool
192+
waitForTermination sync.WaitGroup
187193
}
188194

189195
// NewTxThrottler tries to construct a txThrottler from the
@@ -301,7 +307,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
301307

302308
// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
303309
// are less likely to be throttled.
304-
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
310+
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()
305311

306312
t.requestsTotal.Add(workload, 1)
307313
if result {
@@ -331,6 +337,7 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
331337
result := &txThrottlerStateImpl{
332338
config: config,
333339
throttler: t,
340+
done: make(chan bool, 1),
334341
}
335342
result.healthCheck = healthCheckFactory()
336343
result.healthCheck.SetListener(result, false /* sendDownEvents */)
@@ -348,6 +355,10 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
348355
discovery.DefaultTopologyWatcherRefreshInterval,
349356
discovery.DefaultTopoReadConcurrency))
350357
}
358+
359+
result.waitForTermination.Add(1)
360+
go result.updateMaxLag()
361+
351362
return result, nil
352363
}
353364

@@ -359,7 +370,34 @@ func (ts *txThrottlerStateImpl) throttle() bool {
359370
// Serialize calls to ts.throttle.Throttle()
360371
ts.throttleMu.Lock()
361372
defer ts.throttleMu.Unlock()
362-
return ts.throttler.Throttle(0 /* threadId */) > 0
373+
374+
maxLag := atomic.LoadInt64(&ts.maxLag)
375+
376+
return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec &&
377+
ts.throttler.Throttle(0 /* threadId */) > 0
378+
}
379+
380+
func (ts *txThrottlerStateImpl) updateMaxLag() {
381+
defer ts.waitForTermination.Done()
382+
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
383+
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
384+
outerloop:
385+
for {
386+
select {
387+
case <-ticker.C:
388+
var maxLag uint32
389+
390+
for _, tabletType := range ts.config.tabletTypes {
391+
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
392+
if maxLagPerTabletType > maxLag {
393+
maxLag = maxLagPerTabletType
394+
}
395+
}
396+
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
397+
case <-ts.done:
398+
break outerloop
399+
}
400+
}
363401
}
364402

365403
func (ts *txThrottlerStateImpl) deallocateResources() {
@@ -374,6 +412,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
374412
ts.healthCheck.Close()
375413
ts.healthCheck = nil
376414

415+
ts.done <- true
416+
ts.waitForTermination.Wait()
377417
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
378418
// to be executing, so we can safely close the throttler.
379419
ts.throttler.Close()

go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package txthrottler
2222
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface
2323

2424
import (
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

@@ -84,32 +85,50 @@ func TestEnabledThrottler(t *testing.T) {
8485
}
8586

8687
mockThrottler := NewMockThrottlerInterface(mockCtrl)
88+
8789
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
8890
assert.Equal(t, 1, threadCount)
8991
return mockThrottler, nil
9092
}
9193

92-
call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
93-
call1 := mockThrottler.EXPECT().Throttle(0)
94-
call1.Return(0 * time.Second)
94+
var calls []*gomock.Call
95+
96+
call := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
97+
calls = append(calls, call)
98+
99+
// 1
100+
call = mockThrottler.EXPECT().Throttle(0)
101+
call.Return(0 * time.Second)
102+
calls = append(calls, call)
103+
95104
tabletStats := &discovery.LegacyTabletStats{
96105
Target: &querypb.Target{
97106
TabletType: topodatapb.TabletType_REPLICA,
98107
},
99108
}
100-
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
101-
call3 := mockThrottler.EXPECT().Throttle(0)
102-
call3.Return(1 * time.Second)
103109

104-
call4 := mockThrottler.EXPECT().Throttle(0)
105-
call4.Return(1 * time.Second)
106-
calllast := mockThrottler.EXPECT().Close()
110+
call = mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
111+
calls = append(calls, call)
107112

108-
call1.After(call0)
109-
call2.After(call1)
110-
call3.After(call2)
111-
call4.After(call3)
112-
calllast.After(call4)
113+
// 2
114+
call = mockThrottler.EXPECT().Throttle(0)
115+
call.Return(1 * time.Second)
116+
calls = append(calls, call)
117+
118+
// 3
119+
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
120+
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()
121+
122+
// 4
123+
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
124+
// whether there is lag or not, so no call to the underlying mockThrottler is issued.
125+
126+
call = mockThrottler.EXPECT().Close()
127+
calls = append(calls, call)
128+
129+
for i := 1; i < len(calls); i++ {
130+
calls[i].After(calls[i-1])
131+
}
113132

114133
config := tabletenv.NewDefaultConfig()
115134
config.EnableTxThrottler = true
@@ -126,6 +145,14 @@ func TestEnabledThrottler(t *testing.T) {
126145
assert.Nil(t, throttler.Open())
127146
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())
128147

148+
throttlerImpl, ok := throttler.state.(*txThrottlerStateImpl)
149+
assert.True(t, ok)
150+
// Stop the go routine that keeps updating the cached shard's max lag to preventi it from changing the value in a
151+
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
152+
throttlerImpl.done <- true
153+
154+
// 1 should not throttle due to return value of underlying Throttle(), despite high lag
155+
atomic.StoreInt64(&throttlerImpl.maxLag, 20)
129156
assert.False(t, throttler.Throttle(100, "some-workload"))
130157
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
131158
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])
@@ -138,15 +165,23 @@ func TestEnabledThrottler(t *testing.T) {
138165
}
139166
// This call should not be forwarded to the go/vt/throttler.Throttler object.
140167
hcListener.StatsUpdate(rdonlyTabletStats)
141-
// The second throttle call should reject.
168+
169+
// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
142170
assert.True(t, throttler.Throttle(100, "some-workload"))
143171
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
144172
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
145173

146-
// This call should not throttle due to priority. Check that's the case and counters agree.
174+
// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
147175
assert.False(t, throttler.Throttle(0, "some-workload"))
148176
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
149177
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
178+
179+
// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
180+
atomic.StoreInt64(&throttlerImpl.maxLag, 1)
181+
assert.False(t, throttler.Throttle(100, "some-workload"))
182+
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
183+
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
184+
150185
throttler.Close()
151186
assert.Zero(t, throttler.throttlerRunning.Get())
152187
}

0 commit comments

Comments
 (0)