Skip to content

Commit 4a46293

Browse files
committed
test: add a benchmarking code
Signed-off-by: Manan Gupta <manan@planetscale.com>
1 parent 327e5ce commit 4a46293

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

go/vt/concurrency/message_queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ func (mq *MessageQueue[T]) Send(value T) {
5050
mq.cond.Signal() // Notify a waiting receiver.
5151
}
5252

53+
// Length returns the length.
54+
func (mq *MessageQueue[T]) Length() int {
55+
mq.mu.Lock()
56+
defer mq.mu.Unlock()
57+
return len(mq.queue)
58+
}
59+
5360
// Receive fetches a message of type T from the queue.
5461
// Blocks if no messages are available or until the queue is closed.
5562
func (mq *MessageQueue[T]) Receive() (T, bool) {

go/vt/discovery/healthcheck_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/stretchr/testify/require"
3232

3333
"vitess.io/vitess/go/test/utils"
34+
"vitess.io/vitess/go/vt/concurrency"
3435
"vitess.io/vitess/go/vt/grpcclient"
3536
"vitess.io/vitess/go/vt/topo"
3637
"vitess.io/vitess/go/vt/topo/memorytopo"
@@ -1513,6 +1514,66 @@ func TestConcurrentUpdates(t *testing.T) {
15131514
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
15141515
}
15151516

1517+
// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer.
1518+
func BenchmarkAccess_FastConsumer(b *testing.B) {
1519+
ctx := context.Background()
1520+
// reset error counters
1521+
hcErrorCounters.ResetAll()
1522+
ts := memorytopo.NewServer(ctx, "cell")
1523+
defer ts.Close()
1524+
hc := createTestHc(ctx, ts)
1525+
// close healthcheck
1526+
defer hc.Close()
1527+
1528+
// Subscribe to the healthcheck with a fast consumer.
1529+
ch := hc.Subscribe()
1530+
go func() {
1531+
for range ch {
1532+
}
1533+
}()
1534+
1535+
for i := 0; i < b.N; i++ {
1536+
hc.broadcast(&TabletHealth{})
1537+
}
1538+
hc.Unsubscribe(ch)
1539+
waitForEmptyMessageQueue(hc.subscribers[ch])
1540+
}
1541+
1542+
// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer.
1543+
func BenchmarkAccess_SlowConsumer(b *testing.B) {
1544+
ctx := context.Background()
1545+
// reset error counters
1546+
hcErrorCounters.ResetAll()
1547+
ts := memorytopo.NewServer(ctx, "cell")
1548+
defer ts.Close()
1549+
hc := createTestHc(ctx, ts)
1550+
// close healthcheck
1551+
defer hc.Close()
1552+
1553+
// Subscribe to the healthcheck with a fast consumer.
1554+
ch := hc.Subscribe()
1555+
go func() {
1556+
for range ch {
1557+
time.Sleep(100 * time.Second)
1558+
}
1559+
}()
1560+
1561+
for i := 0; i < b.N; i++ {
1562+
hc.broadcast(&TabletHealth{})
1563+
}
1564+
hc.Unsubscribe(ch)
1565+
waitForEmptyMessageQueue(hc.subscribers[ch])
1566+
}
1567+
1568+
func waitForEmptyMessageQueue(queue *concurrency.MessageQueue[*TabletHealth]) {
1569+
for {
1570+
if queue.Length() == 0 {
1571+
return
1572+
}
1573+
time.Sleep(100 * time.Millisecond)
1574+
}
1575+
}
1576+
15161577
func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
15171578
connMapMu.Lock()
15181579
defer connMapMu.Unlock()

0 commit comments

Comments
 (0)