Skip to content

Commit 9dc9ccb

Browse files
committed
Address PR comments
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
1 parent e53536b commit 9dc9ccb

File tree

1 file changed

+33
-11
lines changed

1 file changed

+33
-11
lines changed

go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"math/rand"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
"google.golang.org/protobuf/encoding/prototext"
@@ -185,6 +186,9 @@ type txThrottlerStateImpl struct {
185186

186187
healthCheck discovery.LegacyHealthCheck
187188
topologyWatchers []TopologyWatcherInterface
189+
190+
shardMaxLag atomic.Int64
191+
endChannel chan bool
188192
}
189193

190194
// NewTxThrottler tries to construct a txThrottler from the
@@ -330,8 +334,9 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
330334
return nil, err
331335
}
332336
result := &txThrottlerStateImpl{
333-
config: config,
334-
throttler: t,
337+
config: config,
338+
throttler: t,
339+
endChannel: make(chan bool),
335340
}
336341
result.healthCheck = healthCheckFactory()
337342
result.healthCheck.SetListener(result, false /* sendDownEvents */)
@@ -349,6 +354,9 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
349354
discovery.DefaultTopologyWatcherRefreshInterval,
350355
discovery.DefaultTopoReadConcurrency))
351356
}
357+
358+
go result.updateMaxShardLag()
359+
352360
return result, nil
353361
}
354362

@@ -361,18 +369,31 @@ func (ts *txThrottlerStateImpl) throttle() bool {
361369
ts.throttleMu.Lock()
362370
defer ts.throttleMu.Unlock()
363371

364-
var maxLag uint32
365-
366-
for _, tabletType := range ts.config.tabletTypes {
367-
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
368-
if maxLagPerTabletType > maxLag {
369-
maxLag = maxLagPerTabletType
370-
}
371-
}
372+
maxLag := ts.shardMaxLag.Load()
372373

373374
return ts.throttler.Throttle(0 /* threadId */) > 0 &&
374-
int64(maxLag) > ts.config.throttlerConfig.TargetReplicationLagSec
375+
maxLag > ts.config.throttlerConfig.TargetReplicationLagSec
376+
}
375377

378+
func (ts *txThrottlerStateImpl) updateMaxShardLag() {
379+
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
380+
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
381+
for {
382+
select {
383+
case _ = <-ticker.C:
384+
var maxLag uint32
385+
386+
for _, tabletType := range ts.config.tabletTypes {
387+
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
388+
if maxLagPerTabletType > maxLag {
389+
maxLag = maxLagPerTabletType
390+
}
391+
}
392+
ts.shardMaxLag.Store(int64(maxLag))
393+
case _ = <-ts.endChannel:
394+
break
395+
}
396+
}
376397
}
377398

378399
func (ts *txThrottlerStateImpl) deallocateResources() {
@@ -387,6 +408,7 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
387408
ts.healthCheck.Close()
388409
ts.healthCheck = nil
389410

411+
ts.endChannel <- true
390412
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
391413
// to be executing, so we can safely close the throttler.
392414
ts.throttler.Close()

0 commit comments

Comments
 (0)