Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid flaky topo concurrency test #17407

Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 90 additions & 102 deletions go/vt/topo/stats_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

// testStatsConnReadSem is a semaphore for unit tests.
// It intentionally has a concurrency limit of '1' to
// allow semaphore contention in tests.
var testStatsConnReadSem = semaphore.NewWeighted(1)

// testStatsConnStatsReset resets StatsConn-based stats.
func testStatsConnStatsReset() {
topoStatsConnErrors.ResetAll()
topoStatsConnReadWaitTimings.Reset()
topoStatsConnTimings.Reset()
}

// The fakeConn is a wrapper for a Conn that emits stats for every operation
type fakeConn struct {
v Version
Expand Down Expand Up @@ -185,238 +195,216 @@ func (st *fakeConn) IsReadOnly() bool {
}

// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore.
func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) {
func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration, semAcquiredChan chan bool) {
if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil {
panic(err)
}
defer testStatsConnReadSem.Release(1)
semAcquiredChan <- true
time.Sleep(duration)
}

// TestStatsConnTopoListDir emits stats on ListDir
func TestStatsConnTopoListDir(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

go createTestReadSemaphoreContention(ctx, 100*time.Millisecond)
semAcquiredChan := make(chan bool)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
go createTestReadSemaphoreContention(ctx, 100*time.Millisecond, semAcquiredChan)
<-semAcquiredChan
statsConn.ListDir(ctx, "", true)
timingCounts := topoStatsConnTimings.Counts()["ListDir.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["ListDir.global"])
require.NotZero(t, topoStatsConnTimings.Time())

waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"]
if got := waitTimingsCounts; got != 1 {
t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
}
require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["ListDir.global"])
require.NotZero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["ListDir.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["ListDir.global"])

statsConn.ListDir(ctx, "error", true)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["ListDir.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["ListDir.global"])
}

// TestStatsConnTopoCreate emits stats on Create
func TestStatsConnTopoCreate(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Create(ctx, "", []byte{})
timingCounts := topoStatsConnTimings.Counts()["Create.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Create.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Create.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Create.global"])

statsConn.Create(ctx, "error", []byte{})

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Create.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Create.global"])
}

// TestStatsConnTopoUpdate emits stats on Update
func TestStatsConnTopoUpdate(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Update(ctx, "", []byte{}, conn.v)
timingCounts := topoStatsConnTimings.Counts()["Update.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Update.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Update.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Update.global"])

statsConn.Update(ctx, "error", []byte{}, conn.v)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Update.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Update.global"])
}

// TestStatsConnTopoGet emits stats on Get
func TestStatsConnTopoGet(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

go createTestReadSemaphoreContention(ctx, time.Millisecond*100)
semAcquiredChan := make(chan bool)
go createTestReadSemaphoreContention(ctx, time.Millisecond*100, semAcquiredChan)
<-semAcquiredChan
statsConn.Get(ctx, "")
timingCounts := topoStatsConnTimings.Counts()["Get.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Get.global"])
require.NotZero(t, topoStatsConnTimings.Time())

waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"]
if got := waitTimingsCounts; got != 1 {
t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
}
require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["Get.global"])
require.NotZero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Get.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Get.global"])

statsConn.Get(ctx, "error")

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Get.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Get.global"])
}

// TestStatsConnTopoDelete emits stats on Delete
func TestStatsConnTopoDelete(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Delete(ctx, "", conn.v)
timingCounts := topoStatsConnTimings.Counts()["Delete.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Delete.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Delete.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["Delete.global"])

statsConn.Delete(ctx, "error", conn.v)

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["Delete.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Delete.global"])
}

// TestStatsConnTopoLock emits stats on Lock
func TestStatsConnTopoLock(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Lock(ctx, "", "")
timingCounts := topoStatsConnTimings.Counts()["Lock.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Lock.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

statsConn.LockWithTTL(ctx, "", "", time.Second)
timingCounts = topoStatsConnTimings.Counts()["LockWithTTL.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockWithTTL.global"])

statsConn.LockName(ctx, "", "")
timingCounts = topoStatsConnTimings.Counts()["LockName.global"]
require.Equal(t, timingCounts, int64(1))
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockName.global"])

// Error is zero before getting an error.
errorCount := topoStatsConnErrors.Counts()["Lock.global"]
require.Equal(t, errorCount, int64(0))
require.Zero(t, topoStatsConnErrors.Counts()["Lock.global"])

statsConn.Lock(ctx, "error", "")

// Error stats gets emitted.
errorCount = topoStatsConnErrors.Counts()["Lock.global"]
require.Equal(t, errorCount, int64(1))
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Lock.global"])
}

// TestStatsConnTopoWatch emits stats on Watch
func TestStatsConnTopoWatch(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()

statsConn.Watch(ctx, "")
timingCounts := topoStatsConnTimings.Counts()["Watch.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}

require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Watch.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())
}

// TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation
func TestStatsConnTopoNewLeaderParticipation(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)

_, _ = statsConn.NewLeaderParticipation("", "")
timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["NewLeaderParticipation.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())

// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]
if got, want := errorCount, int64(0); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Zero(t, topoStatsConnErrors.Counts()["NewLeaderParticipation.global"])

_, _ = statsConn.NewLeaderParticipation("error", "")

// error stats gets emitted
errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]
if got, want := errorCount, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnErrors.Counts()["NewLeaderParticipation.global"])
}

// TestStatsConnTopoClose emits stats on Close
func TestStatsConnTopoClose(t *testing.T) {
testStatsConnStatsReset()
defer testStatsConnStatsReset()

conn := &fakeConn{}
statsConn := NewStatsConn("global", conn, testStatsConnReadSem)

statsConn.Close()
timingCounts := topoStatsConnTimings.Counts()["Close.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Close.global"])
require.NotZero(t, topoStatsConnTimings.Time())
require.Zero(t, topoStatsConnReadWaitTimings.Time())
}
Loading