Skip to content

Commit

Permalink
Avoid flaky topo concurrency test (#17407)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt authored Jan 10, 2025
1 parent 555f1d7 commit 1355018
Showing 1 changed file with 90 additions and 102 deletions.
192 changes: 90 additions & 102 deletions go/vt/topo/stats_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

// testStatsConnReadSem is a semaphore for unit tests.
// It intentionally has a concurrency limit of '1' to
// allow semaphore contention in tests.
var testStatsConnReadSem = semaphore.NewWeighted(1)

// testStatsConnStatsReset resets StatsConn-based stats.
func testStatsConnStatsReset() {
topoStatsConnErrors.ResetAll()
topoStatsConnReadWaitTimings.Reset()
topoStatsConnTimings.Reset()
}

// The fakeConn is a wrapper for a Conn that emits stats for every operation
type fakeConn struct {
v Version
Expand Down Expand Up @@ -185,238 +195,216 @@ func (st *fakeConn) IsReadOnly() bool {
}

// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore.
func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) {
func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration, semAcquiredChan chan struct{}) {
if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil {
panic(err)
}
defer testStatsConnReadSem.Release(1)
semAcquiredChan <- struct{}{}
time.Sleep(duration)
}

// TestStatsConnTopoListDir emits stats on ListDir
func TestStatsConnTopoListDir(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

go createTestReadSemaphoreContention(ctx, 100*time.Millisecond)
semAcquiredChan := make(chan struct{})
go createTestReadSemaphoreContention(ctx, 100*time.Millisecond, semAcquiredChan)
<-semAcquiredChan
statsConn.ListDir(ctx, "", true)
timingCounts := topoStatsConnTimings.Counts()["ListDir.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["ListDir.global"])
require.NotZero(t, topoStatsConnTimings.Time())

waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"]
if got := waitTimingsCounts; got != 1 {
t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
}
require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["ListDir.global"])
require.NotZero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["ListDir.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["ListDir.global"])

statsConn.ListDir(ctx, "error", true)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["ListDir.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["ListDir.global"])
}

// TestStatsConnTopoCreate emits stats on Create
func TestStatsConnTopoCreate(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Create(ctx, "", []byte{})
timingCounts := topoStatsConnTimings.Counts()["Create.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Create.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Create.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Create.global"])

statsConn.Create(ctx, "error", []byte{})

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Create.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Create.global"])
}

// TestStatsConnTopoUpdate emits stats on Update
func TestStatsConnTopoUpdate(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Update(ctx, "", []byte{}, conn.v)
timingCounts := topoStatsConnTimings.Counts()["Update.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Update.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Update.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Update.global"])

statsConn.Update(ctx, "error", []byte{}, conn.v)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Update.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Update.global"])
}

// TestStatsConnTopoGet emits stats on Get
func TestStatsConnTopoGet(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

go createTestReadSemaphoreContention(ctx, time.Millisecond*100)
semAcquiredChan := make(chan struct{})
go createTestReadSemaphoreContention(ctx, time.Millisecond*100, semAcquiredChan)
<-semAcquiredChan
statsConn.Get(ctx, "")
timingCounts := topoStatsConnTimings.Counts()["Get.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Get.global"])
require.NotZero(t, topoStatsConnTimings.Time())

waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"]
if got := waitTimingsCounts; got != 1 {
t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
}
require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["Get.global"])
require.NotZero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Get.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Get.global"])

statsConn.Get(ctx, "error")

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Get.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Get.global"])
}

// TestStatsConnTopoDelete emits stats on Delete
func TestStatsConnTopoDelete(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Delete(ctx, "", conn.v)
timingCounts := topoStatsConnTimings.Counts()["Delete.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Delete.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Delete.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Delete.global"])

statsConn.Delete(ctx, "error", conn.v)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Delete.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Delete.global"])
}

// TestStatsConnTopoLock emits stats on Lock
func TestStatsConnTopoLock(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Lock(ctx, "", "")
timingCounts := topoStatsConnTimings.Counts()["Lock.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Lock.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

statsConn.LockWithTTL(ctx, "", "", time.Second)
timingCounts = topoStatsConnTimings.Counts()["LockWithTTL.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockWithTTL.global"])

statsConn.LockName(ctx, "", "")
timingCounts = topoStatsConnTimings.Counts()["LockName.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockName.global"])

// Error is zero before getting an error.
errorCount := topoStatsConnErrors.Counts()["Lock.global"]
require.Equal(t, errorCount, int64(0))
require.Zero(t, topoStatsConnErrors.Counts()["Lock.global"])

statsConn.Lock(ctx, "error", "")

// Error stats gets emitted.
errorCount = topoStatsConnErrors.Counts()["Lock.global"]
require.Equal(t, errorCount, int64(1))
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Lock.global"])
}

// TestStatsConnTopoWatch emits stats on Watch
func TestStatsConnTopoWatch(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Watch(ctx, "")
timingCounts := topoStatsConnTimings.Counts()["Watch.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}

require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Watch.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())
}

// TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation
func TestStatsConnTopoNewLeaderParticipation(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)

_, _ = statsConn.NewLeaderParticipation("", "")
timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["NewLeaderParticipation.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["NewLeaderParticipation.global"])

_, _ = statsConn.NewLeaderParticipation("error", "")

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["NewLeaderParticipation.global"])
}

// TestStatsConnTopoClose emits stats on Close
func TestStatsConnTopoClose(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)

statsConn.Close()
timingCounts := topoStatsConnTimings.Counts()["Close.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Close.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())
}

0 comments on commit 1355018

Please sign in to comment.