Skip to content

Commit 00ac76d

Browse files
slack-15.0: pre-backport txthrottler crash fixes (#480)
* `txthrottler`: move `ThrottlerInterface` to `go/vt/throttler`, use `slices` pkg, add stats (vitessio#16248) Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * revert to `reflect` Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * Support passing filters to `discovery.NewHealthCheck(...)` Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go Co-authored-by: Matt Lord <mattalord@gmail.com> Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * Address some PR suggestions Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * PR ctx suggestion Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * fix test Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * simplify updateHealthCheckCells signature Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> * Fix race in `replicationLagModule` of `go/vt/throttle` Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> --------- Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com> Co-authored-by: Matt Lord <mattalord@gmail.com>
1 parent 3a56f70 commit 00ac76d

21 files changed

+453
-211
lines changed

go/vt/discovery/healthcheck.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"bytes"
3636
"context"
3737
"encoding/json"
38+
"errors"
3839
"fmt"
3940
"hash/crc32"
4041
"html/template"
@@ -98,6 +99,9 @@ var (
9899

99100
// How much to sleep between each check.
100101
waitAvailableTabletInterval = 100 * time.Millisecond
102+
103+
// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
104+
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
101105
)
102106

103107
// See the documentation for NewHealthCheck below for an explanation of these parameters.
@@ -296,6 +300,27 @@ type HealthCheckImpl struct {
296300
healthCheckDialSem *semaphore.Weighted
297301
}
298302

303+
// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
304+
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
305+
if len(tabletFilters) > 0 {
306+
if len(KeyspacesToWatch) > 0 {
307+
return nil, errKeyspacesToWatchAndTabletFilters
308+
}
309+
310+
fbs, err := NewFilterByShard(tabletFilters)
311+
if err != nil {
312+
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
313+
}
314+
filters = append(filters, fbs)
315+
} else if len(KeyspacesToWatch) > 0 {
316+
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
317+
}
318+
if len(tabletFilterTags) > 0 {
319+
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
320+
}
321+
return filters, nil
322+
}
323+
299324
// NewHealthCheck creates a new HealthCheck object.
300325
// Parameters:
301326
// retryDelay.
@@ -317,10 +342,14 @@ type HealthCheckImpl struct {
317342
//
318343
// The localCell for this healthcheck
319344
//
320-
// callback.
345+
// cellsToWatch.
321346
//
322-
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
323-
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
347+
// Is a list of cells to watch for tablets.
348+
//
349+
// filters.
350+
//
351+
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
352+
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
324353
log.Infof("loading tablets for cells: %v", cellsToWatch)
325354

326355
hc := &HealthCheckImpl{
@@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
342371
}
343372

344373
for _, c := range cells {
345-
var filters TabletFilters
346374
log.Infof("Setting up healthcheck for cell: %v", c)
347375
if c == "" {
348376
continue
349377
}
350-
if len(tabletFilters) > 0 {
351-
if len(KeyspacesToWatch) > 0 {
352-
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
353-
}
354-
355-
fbs, err := NewFilterByShard(tabletFilters)
356-
if err != nil {
357-
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
358-
}
359-
filters = append(filters, fbs)
360-
} else if len(KeyspacesToWatch) > 0 {
361-
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
362-
}
363-
if len(tabletFilterTags) > 0 {
364-
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
365-
}
366378
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
367379
}
368380

go/vt/discovery/healthcheck_test.go

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,77 @@ func init() {
6464
refreshInterval = time.Minute
6565
}
6666

67+
func TestNewVTGateHealthCheckFilters(t *testing.T) {
68+
defer func() {
69+
KeyspacesToWatch = nil
70+
tabletFilters = nil
71+
tabletFilterTags = nil
72+
}()
73+
74+
testCases := []struct {
75+
name string
76+
keyspacesToWatch []string
77+
tabletFilters []string
78+
tabletFilterTags map[string]string
79+
expectedError string
80+
expectedFilterTypes []any
81+
}{
82+
{
83+
name: "noFilters",
84+
},
85+
{
86+
name: "tabletFilters",
87+
tabletFilters: []string{"ks1|-80"},
88+
expectedFilterTypes: []any{&FilterByShard{}},
89+
},
90+
{
91+
name: "keyspacesToWatch",
92+
keyspacesToWatch: []string{"ks1"},
93+
expectedFilterTypes: []any{&FilterByKeyspace{}},
94+
},
95+
{
96+
name: "tabletFiltersAndTags",
97+
tabletFilters: []string{"ks1|-80"},
98+
tabletFilterTags: map[string]string{"test": "true"},
99+
expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}},
100+
},
101+
{
102+
name: "keyspacesToWatchAndTags",
103+
tabletFilterTags: map[string]string{"test": "true"},
104+
keyspacesToWatch: []string{"ks1"},
105+
expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}},
106+
},
107+
{
108+
name: "failKeyspacesToWatchAndFilters",
109+
tabletFilters: []string{"ks1|-80"},
110+
keyspacesToWatch: []string{"ks1"},
111+
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
112+
},
113+
{
114+
name: "failInvalidTabletFilters",
115+
tabletFilters: []string{"shouldfail!@#!"},
116+
expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!",
117+
},
118+
}
119+
120+
for _, testCase := range testCases {
121+
t.Run(testCase.name, func(t *testing.T) {
122+
KeyspacesToWatch = testCase.keyspacesToWatch
123+
tabletFilters = testCase.tabletFilters
124+
tabletFilterTags = testCase.tabletFilterTags
125+
126+
filters, err := NewVTGateHealthCheckFilters()
127+
if testCase.expectedError != "" {
128+
assert.EqualError(t, err, testCase.expectedError)
129+
}
130+
assert.Len(t, filters, len(testCase.expectedFilterTypes))
131+
for i, filter := range filters {
132+
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
133+
}
134+
})
135+
}
136+
}
137+
67138
func TestHealthCheck(t *testing.T) {
68139
// reset error counters
69140
hcErrorCounters.ResetAll()
@@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) {
9431014

9441015
func TestPrimaryInOtherCell(t *testing.T) {
9451016
ts := memorytopo.NewServer("cell1", "cell2")
946-
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
1017+
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
9471018
defer hc.Close()
9481019

9491020
// add a tablet as primary in different cell
@@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) {
10001071

10011072
func TestReplicaInOtherCell(t *testing.T) {
10021073
ts := memorytopo.NewServer("cell1", "cell2")
1003-
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
1074+
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
10041075
defer hc.Close()
10051076

10061077
// add a tablet as replica
@@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) {
11021173

11031174
func TestCellAliases(t *testing.T) {
11041175
ts := memorytopo.NewServer("cell1", "cell2")
1105-
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
1176+
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
11061177
defer hc.Close()
11071178

11081179
cellsAlias := &topodatapb.CellsAlias{
@@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
12481319
}
12491320

12501321
func createTestHc(ts *topo.Server) *HealthCheckImpl {
1251-
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "")
1322+
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
12521323
}
12531324

12541325
type fakeConn struct {

go/vt/discovery/keyspace_events_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
3939
factory.AddCell(cell)
4040
ts := faketopo.NewFakeTopoServer(factory)
4141
ts2 := &fakeTopoServer{}
42-
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
42+
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
4343
defer hc.Close()
4444
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)
4545
kss := &keyspaceState{
@@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
8282
factory.AddCell(cell)
8383
ts := faketopo.NewFakeTopoServer(factory)
8484
ts2 := &fakeTopoServer{}
85-
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
85+
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
8686
defer hc.Close()
8787
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)
8888

go/vt/throttler/demo/throttler_demo.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ type replica struct {
101101

102102
// throttler is used to enforce the maximum rate at which replica applies
103103
// transactions. It must not be confused with the client's throttler.
104-
throttler *throttler.Throttler
104+
throttler throttler.Throttler
105105
lastHealthUpdate time.Time
106106
lagUpdateInterval time.Duration
107107

@@ -224,7 +224,7 @@ type client struct {
224224
primary *primary
225225

226226
healthCheck discovery.HealthCheck
227-
throttler *throttler.Throttler
227+
throttler throttler.Throttler
228228

229229
stopChan chan struct{}
230230
wg sync.WaitGroup
@@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client {
237237
log.Fatal(err)
238238
}
239239

240-
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "")
240+
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
241241
c := &client{
242242
primary: primary,
243243
healthCheck: healthCheck,

go/vt/throttler/manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ type managerImpl struct {
6464
// mu guards all fields in this group.
6565
mu sync.Mutex
6666
// throttlers tracks all running throttlers (by their name).
67-
throttlers map[string]*Throttler
67+
throttlers map[string]Throttler
6868
}
6969

7070
func newManager() *managerImpl {
7171
return &managerImpl{
72-
throttlers: make(map[string]*Throttler),
72+
throttlers: make(map[string]Throttler),
7373
}
7474
}
7575

76-
func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error {
76+
func (m *managerImpl) registerThrottler(name string, throttler Throttler) error {
7777
m.mu.Lock()
7878
defer m.mu.Unlock()
7979

@@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {
207207

208208
// log returns the most recent changes of the MaxReplicationLag module.
209209
// There will be one result for each processed replication lag record.
210-
func (m *managerImpl) log(throttlerName string) ([]result, error) {
210+
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
211211
m.mu.Lock()
212212
defer m.mu.Unlock()
213213

@@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
216216
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
217217
}
218218

219-
return t.log(), nil
219+
return t.Log(), nil
220220
}

go/vt/throttler/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ var (
3737

3838
type managerTestFixture struct {
3939
m *managerImpl
40-
t1, t2 *Throttler
40+
t1, t2 Throttler
4141
}
4242

4343
func (f *managerTestFixture) setUp() error {

go/vt/throttler/max_replication_lag_module.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec
312312

313313
m.memory.ageBadRate(now)
314314

315-
r := result{
315+
r := Result{
316316
Now: now,
317317
RateChange: unchangedRate,
318318
lastRateChange: m.lastRateChange,
@@ -445,7 +445,7 @@ func stateGreater(a, b state) bool {
445445
// and we should not skip the current replica ("lagRecordNow").
446446
// Even if it's the same replica we may skip it and return false because
447447
// we want to wait longer for the propagation of the current rate change.
448-
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
448+
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
449449
if m.replicaUnderTest == nil {
450450
return true
451451
}
@@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
471471
return true
472472
}
473473

474-
func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
474+
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
475475
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)
476476

477477
oldRate := m.rate.Get()
@@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
559559
return minDuration
560560
}
561561

562-
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
562+
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
563563
// Guess replication rate based on the difference in the replication lag of this
564564
// particular replica.
565565
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
@@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
630630
// guessReplicationRate guesses the actual replication rate based on the new bac
631631
// Note that "lagDifference" can be positive (lag increased) or negative (lag
632632
// decreased).
633-
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
633+
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
634634
// avgReplicationRate is the average rate (per second) at which the replica
635635
// applied transactions from the replication stream. We infer the value
636636
// from the relative change in the replication lag.
@@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
675675
return int64(newRate), reason
676676
}
677677

678-
func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
678+
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
679679
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)
680680

681681
decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
682682
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
683683
}
684684

685-
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
685+
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
686686
oldRate := m.rate.Get()
687687
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
688688
if rate == 0 {
@@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
694694
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
695695
}
696696

697-
func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
697+
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
698698
oldRate := m.rate.Get()
699699

700700
m.currentState = newState
@@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int
722722

723723
// markCurrentRateAsBadOrGood determines the actual rate between the last rate
724724
// change and "now" and determines if that rate was bad or good.
725-
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
725+
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
726726
if m.lastRateChange.IsZero() {
727727
// Module was just started. We don't have any data points yet.
728728
r.GoodOrBad = ignoredRate
@@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
796796
}
797797
}
798798

799-
func (m *MaxReplicationLagModule) log() []result {
799+
func (m *MaxReplicationLagModule) log() []Result {
800800
return m.results.latestValues()
801801
}

0 commit comments

Comments
 (0)