Skip to content

Commit 0c35274

Browse files
GuptaManan100deepthi
authored andcommitted
[release-21.0] Increase health check buffer size (vitessio#17636)
Signed-off-by: Manan Gupta <manan@planetscale.com> Signed-off-by: deepthi <deepthi@planetscale.com>
1 parent c1374d1 commit 0c35274

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

go/vt/discovery/healthcheck.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ var (
9191
// How much to sleep between each check.
9292
waitAvailableTabletInterval = 100 * time.Millisecond
9393

94+
// Size of channel buffer for each subscriber
95+
broadcastChannelBufferSize = 2048
96+
9497
// HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the
9598
// HTML code required to render the cache of the HealthCheck.
9699
HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache")
@@ -624,7 +627,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
624627
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
625628
hc.subMu.Lock()
626629
defer hc.subMu.Unlock()
627-
c := make(chan *TabletHealth, 2)
630+
c := make(chan *TabletHealth, broadcastChannelBufferSize)
628631
hc.subscribers[c] = struct{}{}
629632
return c
630633
}
@@ -643,6 +646,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
643646
select {
644647
case c <- th:
645648
default:
649+
// If the channel is full, we drop the message.
650+
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
646651
}
647652
}
648653
}

go/vt/discovery/healthcheck_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,50 @@ func TestDebugURLFormatting(t *testing.T) {
14811481
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
14821482
}
14831483

1484+
// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
1485+
// Added in response to https://github.com/vitessio/vitess/issues/17629.
1486+
func TestConcurrentUpdates(t *testing.T) {
1487+
ctx := utils.LeakCheckContext(t)
1488+
var mu sync.Mutex
1489+
// reset error counters
1490+
hcErrorCounters.ResetAll()
1491+
ts := memorytopo.NewServer(ctx, "cell")
1492+
defer ts.Close()
1493+
hc := createTestHc(ctx, ts)
1494+
// close healthcheck
1495+
defer hc.Close()
1496+
1497+
// Subscribe to the healthcheck
1498+
// Make the receiver keep track of the updates received.
1499+
ch := hc.Subscribe()
1500+
totalCount := 0
1501+
go func() {
1502+
for range ch {
1503+
mu.Lock()
1504+
totalCount++
1505+
mu.Unlock()
1506+
// Simulate a somewhat slow consumer.
1507+
time.Sleep(100 * time.Millisecond)
1508+
}
1509+
}()
1510+
1511+
// Run multiple updates really quickly
1512+
// one after the other.
1513+
totalUpdates := 10
1514+
for i := 0; i < totalUpdates; i++ {
1515+
hc.broadcast(&TabletHealth{})
1516+
}
1517+
// Unsubscribe from the healthcheck
1518+
// and verify we process all the updates eventually.
1519+
hc.Unsubscribe(ch)
1520+
defer close(ch)
1521+
require.Eventuallyf(t, func() bool {
1522+
mu.Lock()
1523+
defer mu.Unlock()
1524+
return totalUpdates == totalCount
1525+
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
1526+
}
1527+
14841528
func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
14851529
connMapMu.Lock()
14861530
defer connMapMu.Unlock()

0 commit comments

Comments
 (0)