Skip to content

Commit

Permalink
Fix unit tests race
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Jan 24, 2024
1 parent d994497 commit a96b135
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ type txThrottlerStateImpl struct {
// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

shardMaxLag int64
endChannel chan bool
shardMaxLag int64
endChannel chan bool
endWaitGroup sync.WaitGroup
}

// NewTxThrottler tries to construct a txThrottler from the relevant
Expand Down Expand Up @@ -289,7 +290,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
endChannel: make(chan bool),
endChannel: make(chan bool, 1),
}

// get cells from topo if none defined in tabletenv config
Expand All @@ -304,6 +305,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
state.endWaitGroup.Add(1)
go state.updateMaxShardLag()

return state, nil
Expand Down Expand Up @@ -371,11 +373,13 @@ func (ts *txThrottlerStateImpl) throttle() bool {
}

func (ts *txThrottlerStateImpl) updateMaxShardLag() {
defer ts.endWaitGroup.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
outerloop:
for {
select {
case _ = <-ticker.C:
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
Expand All @@ -385,8 +389,8 @@ func (ts *txThrottlerStateImpl) updateMaxShardLag() {
}
}
atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag))
case _ = <-ts.endChannel:
break
case <-ts.endChannel:
break outerloop
}
}
}
Expand All @@ -396,6 +400,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
ts.closeHealthCheckStream()
ts.healthCheck = nil

ts.endChannel <- true
ts.endWaitGroup.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
Expand Down

0 comments on commit a96b135

Please sign in to comment.