diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index d8089b36324..6b63dbea2e8 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -170,8 +170,9 @@ type txThrottlerStateImpl struct { // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool - shardMaxLag int64 - endChannel chan bool + shardMaxLag int64 + endChannel chan bool + endWaitGroup sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the relevant @@ -289,7 +290,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes: tabletTypes, throttler: t, txThrottler: txThrottler, - endChannel: make(chan bool), + endChannel: make(chan bool, 1), } // get cells from topo if none defined in tabletenv config @@ -304,6 +305,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi state.stopHealthCheck = cancel state.initHealthCheckStream(txThrottler.topoServer, target) go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + state.endWaitGroup.Add(1) go state.updateMaxShardLag() return state, nil @@ -371,11 +373,13 @@ func (ts *txThrottlerStateImpl) throttle() bool { } func (ts *txThrottlerStateImpl) updateMaxShardLag() { + defer ts.endWaitGroup.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) +outerloop: for { select { - case _ = <-ticker.C: + case <-ticker.C: var maxLag uint32 for tabletType := range ts.tabletTypes { @@ -385,8 +389,8 @@ func (ts *txThrottlerStateImpl) updateMaxShardLag() { } } atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag)) - case _ = <-ts.endChannel: - break + case <-ts.endChannel: + break outerloop } } } @@ -396,6 +400,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() { ts.closeHealthCheckStream() ts.healthCheck = nil + ts.endChannel <- true + ts.endWaitGroup.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close()