From 0e0085e119521e6c23b58fccfa04f8276666d5f3 Mon Sep 17 00:00:00 2001 From: Matt Braymer-Hayes Date: Thu, 21 Dec 2023 14:48:20 -0500 Subject: [PATCH] More tests --- opwindow_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/opwindow_test.go b/opwindow_test.go index a25427b..a781814 100644 --- a/opwindow_test.go +++ b/opwindow_test.go @@ -30,11 +30,11 @@ func TestOpWindow(t *testing.T) { ctx := context.Background() completed1 := 0 completed2 := 0 - cg1 := NewCallGroup(func(finalState map[ID]*Response) { + cg1 := NewCallGroup(func(map[ID]*Response) { completed1++ }) - cg2 := NewCallGroup(func(finalState map[ID]*Response) { + cg2 := NewCallGroup(func(map[ID]*Response) { completed2++ }) @@ -79,8 +79,8 @@ func TestOpWindowClose(t *testing.T) { winTime := 100 * time.Hour // we want everything to hang until we close the queue. - cg1 := NewCallGroup(func(finalState map[ID]*Response) {}) - cg2 := NewCallGroup(func(finalState map[ID]*Response) {}) + cg1 := NewCallGroup(func(map[ID]*Response) {}) + cg2 := NewCallGroup(func(map[ID]*Response) {}) now := time.Now() @@ -123,4 +123,58 @@ func TestOpWindowClose(t *testing.T) { time.Sleep(1000 * time.Millisecond) assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes)) assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued + + err = window.Enqueue(ctx, op1_1.Key, op1_1) + require.ErrorIs(t, err, ErrQueueClosed) +} + +func TestOpWindowErrQueueSaturatedWidth(t *testing.T) { + t.Parallel() + cg := NewCallGroup(func(map[ID]*Response) {}) + now := time.Now() + + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(1, &tsMsg{123, now}) + + window := NewOpWindow(2, 1, time.Millisecond) + ctx := context.Background() + err := window.Enqueue(ctx, op1.Key, op1) + require.NoError(t, err) + + err = window.Enqueue(ctx, op2.Key, op2) + require.ErrorIs(t, err, ErrQueueSaturatedWidth) + + _, err = window.Dequeue(ctx) + require.NoError(t, err) + + err = window.Enqueue(ctx, op2.Key, op2) + require.NoError(t, err) +} + +func TestOpWindowErrQueueSaturatedDepth(t *testing.T) { + t.Parallel() + cg := NewCallGroup(func(map[ID]*Response) {}) + now := time.Now() + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(2, &tsMsg{234, now}) + + window := NewOpWindow(1, 1, time.Millisecond) + err := window.Enqueue(context.Background(), op1.Key, op1) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) // let it run for a sec for coverage ¯\_(ツ)_/¯ + defer cancel() + go func() { + <-ctx.Done() + // pretend we dequeued but were full again + window.fullCond.Signal() + }() + err = window.Enqueue(ctx, op2.Key, op2) + require.ErrorIs(t, err, ErrQueueSaturatedDepth) + + _, err = window.Dequeue(context.Background()) + require.NoError(t, err) + + err = window.Enqueue(context.Background(), op2.Key, op2) + require.NoError(t, err) }