diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go deleted file mode 100644 index bd7b22a9d24..00000000000 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright The OpenTelemetry Authors -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" - -import ( - "context" - - "go.opentelemetry.io/collector/component" -) - -type noopDone struct{} - -func (*noopDone) OnDone(error) {} - -var noopDoneInst = &noopDone{} - -// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue, -// where the queue is bounded and if it fills up due to slow consumers, the new items written by -// the producer are dropped. -type boundedMemoryQueue[T any] struct { - component.StartFunc - *sizedQueue[T] -} - -// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. -type memoryQueueSettings[T any] struct { - sizer sizer[T] - capacity int64 - blocking bool -} - -// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional -// callback for dropped items (e.g. useful to emit metrics). -func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] { - return &boundedMemoryQueue[T]{ - sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking), - } -} - -func (q *boundedMemoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool) { - ctx, req, ok := q.sizedQueue.pop() - return ctx, req, noopDoneInst, ok -} diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go deleted file mode 100644 index ef30d938ae8..00000000000 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright The OpenTelemetry Authors -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package exporterqueue - -import ( - "context" - "strconv" - "sync" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" -) - -// In this test we run a queue with capacity 1 and a single consumer. -// We want to test the overflow behavior, so we block the consumer -// by holding a startLock before submitting items to the queue. -func TestBoundedQueue(t *testing.T) { - q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1}) - - require.NoError(t, q.Offer(context.Background(), "a")) - - numConsumed := 0 - assert.True(t, consume(q, func(_ context.Context, item string) error { - assert.Equal(t, "a", item) - numConsumed++ - return nil - })) - assert.Equal(t, 1, numConsumed) - assert.Equal(t, int64(0), q.Size()) - - // produce two more items. The first one should be accepted, but not consumed. - require.NoError(t, q.Offer(context.Background(), "b")) - assert.Equal(t, int64(1), q.Size()) - - // the second should be rejected since the queue is full - require.ErrorIs(t, q.Offer(context.Background(), "c"), ErrQueueIsFull) - assert.Equal(t, int64(1), q.Size()) - - assert.True(t, consume(q, func(_ context.Context, item string) error { - assert.Equal(t, "b", item) - numConsumed++ - return nil - })) - assert.Equal(t, 2, numConsumed) - - for _, toAddItem := range []string{"d", "e", "f"} { - require.NoError(t, q.Offer(context.Background(), toAddItem)) - assert.True(t, consume(q, func(_ context.Context, item string) error { - assert.Equal(t, toAddItem, item) - numConsumed++ - return nil - })) - } - assert.Equal(t, 5, numConsumed) - require.NoError(t, q.Shutdown(context.Background())) - assert.False(t, consume(q, func(_ context.Context, item string) error { - panic(item) - })) -} - -// In this test we run a queue with many items and a slow consumer. -// When the queue is stopped, the remaining items should be processed. -// Due to the way q.Stop() waits for all consumers to finish, the -// same lock strategy use above will not work, as calling Unlock -// only after Stop will mean the consumers are still locked while -// trying to perform the final consumptions. -func TestShutdownWhileNotEmpty(t *testing.T) { - q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000}) - - require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - for i := 0; i < 10; i++ { - require.NoError(t, q.Offer(context.Background(), strconv.FormatInt(int64(i), 10))) - } - require.NoError(t, q.Shutdown(context.Background())) - - assert.Equal(t, int64(10), q.Size()) - numConsumed := 0 - for i := 0; i < 10; i++ { - assert.True(t, consume(q, func(_ context.Context, item string) error { - assert.Equal(t, strconv.FormatInt(int64(i), 10), item) - numConsumed++ - return nil - })) - } - assert.Equal(t, 10, numConsumed) - assert.Equal(t, int64(0), q.Size()) - - assert.False(t, consume(q, func(_ context.Context, item string) error { - panic(item) - })) -} - -func TestQueueUsage(t *testing.T) { - tests := []struct { - name string - sizer sizer[uint64] - }{ - { - name: "requests_based", - sizer: &requestSizer[uint64]{}, - }, - { - name: "items_based", - sizer: &itemsSizer{}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)}) - consumed := &atomic.Int64{} - ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done Done) { - consumed.Add(1) - done.OnDone(nil) - }) - require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) - for j := 0; j < 10; j++ { - require.NoError(t, q.Offer(context.Background(), uint64(10))) - } - require.NoError(t, ac.Shutdown(context.Background())) - assert.Equal(t, int64(10), consumed.Load()) - }) - } -} - -func TestBlockingQueueUsage(t *testing.T) { - tests := []struct { - name string - sizer sizer[uint64] - }{ - { - name: "requests_based", - sizer: &requestSizer[uint64]{}, - }, - { - name: "items_based", - sizer: &itemsSizer{}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true}) - consumed := &atomic.Int64{} - ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done Done) { - consumed.Add(1) - done.OnDone(nil) - }) - require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) - wg := &sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < 100_000; j++ { - assert.NoError(t, q.Offer(context.Background(), uint64(10))) - } - }() - } - wg.Wait() - require.NoError(t, ac.Shutdown(context.Background())) - assert.Equal(t, int64(1_000_000), consumed.Load()) - }) - } -} - -func TestZeroSizeNoConsumers(t *testing.T) { - q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0}) - err := q.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - require.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process - assert.NoError(t, q.Shutdown(context.Background())) -} - -func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool { - ctx, req, done, ok := q.Read(context.Background()) - if !ok { - return false - } - done.OnDone(consumeFunc(ctx, req)) - return true -} - -func BenchmarkOffer(b *testing.B) { - tests := []struct { - name string - sizer sizer[uint64] - }{ - { - name: "requests_based", - sizer: &requestSizer[uint64]{}, - }, - { - name: "items_based", - sizer: &itemsSizer{}, - }, - } - for _, tt := range tests { - b.Run(tt.name, func(b *testing.B) { - q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)}) - consumed := &atomic.Int64{} - require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done Done) { - consumed.Add(1) - done.OnDone(nil) - }) - require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost())) - b.ResetTimer() - b.ReportAllocs() - for j := 0; j < b.N; j++ { - require.NoError(b, q.Offer(context.Background(), uint64(10))) - } - require.NoError(b, ac.Shutdown(context.Background())) - assert.Equal(b, int64(b.N), consumed.Load()) - }) - } -} diff --git a/exporter/exporterqueue/sized_queue.go b/exporter/exporterqueue/memory_queue.go similarity index 69% rename from exporter/exporterqueue/sized_queue.go rename to exporter/exporterqueue/memory_queue.go index 20f0809abc2..3d027326d74 100644 --- a/exporter/exporterqueue/sized_queue.go +++ b/exporter/exporterqueue/memory_queue.go @@ -7,46 +7,22 @@ import ( "context" "errors" "sync" + + "go.opentelemetry.io/collector/component" ) var errInvalidSize = errors.New("invalid element size") -type node[T any] struct { - ctx context.Context - data T - size int64 - next *node[T] -} - -type linkedQueue[T any] struct { - head *node[T] - tail *node[T] -} - -func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) { - n := &node[T]{ctx: ctx, data: data, size: size} - if l.tail == nil { - l.head = n - l.tail = n - return - } - l.tail.next = n - l.tail = n -} - -func (l *linkedQueue[T]) pop() (context.Context, T, int64) { - n := l.head - l.head = n.next - if l.head == nil { - l.tail = nil - } - n.next = nil - return n.ctx, n.data, n.size +// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type memoryQueueSettings[T any] struct { + sizer sizer[T] + capacity int64 + blocking bool } -// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements. -// The channel will accept elements until the total size of the elements reaches the capacity. -type sizedQueue[T any] struct { +// memoryQueue is an in-memory implementation of a Queue. +type memoryQueue[T any] struct { + component.StartFunc sizer sizer[T] cap int64 @@ -59,14 +35,14 @@ type sizedQueue[T any] struct { blocking bool } -// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer. +// newMemoryQueue creates a sized elements channel. Each element is assigned a size by the provided sizer. // capacity is the capacity of the queue. -func newSizedQueue[T any](capacity int64, sizer sizer[T], blocking bool) *sizedQueue[T] { - sq := &sizedQueue[T]{ - sizer: sizer, - cap: capacity, +func newMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] { + sq := &memoryQueue[T]{ + sizer: set.sizer, + cap: set.capacity, items: &linkedQueue[T]{}, - blocking: blocking, + blocking: set.blocking, } sq.hasMoreElements = sync.NewCond(&sq.mu) sq.hasMoreSpace = newCond(&sq.mu) @@ -75,7 +51,7 @@ func newSizedQueue[T any](capacity int64, sizer sizer[T], blocking bool) *sizedQ // Offer puts the element into the queue with the given sized if there is enough capacity. // Returns an error if the queue is full. -func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error { +func (sq *memoryQueue[T]) Offer(ctx context.Context, el T) error { elSize := sq.sizer.Sizeof(el) if elSize == 0 { return nil @@ -105,10 +81,10 @@ func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error { return nil } -// pop removes the element from the queue and returns it. +// Read removes the element from the queue and returns it. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { +func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool) { sq.mu.Lock() defer sq.mu.Unlock() @@ -117,12 +93,12 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { elCtx, el, elSize := sq.items.pop() sq.size -= elSize sq.hasMoreSpace.Signal() - return elCtx, el, true + return elCtx, el, noopDoneInst, true } if sq.stopped { var el T - return context.Background(), el, false + return context.Background(), el, nil, false } // TODO: Need to change the Queue interface to return an error to allow distinguish between shutdown and context canceled. @@ -132,7 +108,7 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { } // Shutdown closes the queue channel to initiate draining of the queue. -func (sq *sizedQueue[T]) Shutdown(context.Context) error { +func (sq *memoryQueue[T]) Shutdown(context.Context) error { sq.mu.Lock() defer sq.mu.Unlock() sq.stopped = true @@ -140,12 +116,51 @@ func (sq *sizedQueue[T]) Shutdown(context.Context) error { return nil } -func (sq *sizedQueue[T]) Size() int64 { +func (sq *memoryQueue[T]) Size() int64 { sq.mu.Lock() defer sq.mu.Unlock() return sq.size } -func (sq *sizedQueue[T]) Capacity() int64 { +func (sq *memoryQueue[T]) Capacity() int64 { return sq.cap } + +type node[T any] struct { + ctx context.Context + data T + size int64 + next *node[T] +} + +type linkedQueue[T any] struct { + head *node[T] + tail *node[T] +} + +func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) { + n := &node[T]{ctx: ctx, data: data, size: size} + if l.tail == nil { + l.head = n + l.tail = n + return + } + l.tail.next = n + l.tail = n +} + +func (l *linkedQueue[T]) pop() (context.Context, T, int64) { + n := l.head + l.head = n.next + if l.head == nil { + l.tail = nil + } + n.next = nil + return n.ctx, n.data, n.size +} + +type noopDone struct{} + +func (*noopDone) OnDone(error) {} + +var noopDoneInst = &noopDone{} diff --git a/exporter/exporterqueue/memory_queue_test.go b/exporter/exporterqueue/memory_queue_test.go new file mode 100644 index 00000000000..9985729fcd7 --- /dev/null +++ b/exporter/exporterqueue/memory_queue_test.go @@ -0,0 +1,175 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +type sizerInt64 struct{} + +func (s sizerInt64) Sizeof(el int64) int64 { + return el +} + +func TestMemoryQueue(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7}) + require.NoError(t, q.Offer(context.Background(), 1)) + assert.EqualValues(t, 1, q.Size()) + assert.EqualValues(t, 7, q.Capacity()) + + require.NoError(t, q.Offer(context.Background(), 3)) + assert.EqualValues(t, 4, q.Size()) + + // should not be able to send to the full queue + require.ErrorIs(t, q.Offer(context.Background(), 4), ErrQueueIsFull) + assert.EqualValues(t, 4, q.Size()) + + assert.True(t, consume(q, func(_ context.Context, el int64) error { + assert.EqualValues(t, 1, el) + return nil + })) + assert.EqualValues(t, 3, q.Size()) + + assert.True(t, consume(q, func(_ context.Context, el int64) error { + assert.EqualValues(t, 3, el) + return nil + })) + assert.EqualValues(t, 0, q.Size()) + + require.NoError(t, q.Shutdown(context.Background())) + assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil })) +} + +func TestMemoryQueueBlockingCancelled(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 5, blocking: true}) + require.NoError(t, q.Offer(context.Background(), 3)) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.ErrorIs(t, q.Offer(ctx, 3), context.Canceled) + }() + cancel() + wg.Wait() + assert.EqualValues(t, 3, q.Size()) + assert.True(t, consume(q, func(_ context.Context, el int64) error { + assert.EqualValues(t, 3, el) + return nil + })) + require.NoError(t, q.Shutdown(context.Background())) +} + +func TestMemoryQueueDrainWhenShutdown(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7}) + require.NoError(t, q.Offer(context.Background(), 1)) + require.NoError(t, q.Offer(context.Background(), 3)) + + assert.True(t, consume(q, func(_ context.Context, el int64) error { + assert.EqualValues(t, 1, el) + return nil + })) + assert.EqualValues(t, 3, q.Size()) + require.NoError(t, q.Shutdown(context.Background())) + assert.EqualValues(t, 3, q.Size()) + assert.True(t, consume(q, func(_ context.Context, el int64) error { + assert.EqualValues(t, 3, el) + return nil + })) + assert.EqualValues(t, 0, q.Size()) + assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil })) +} + +func TestMemoryQueueOfferInvalidSize(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1}) + require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) +} + +func TestMemoryQueueOfferZeroSize(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1}) + require.NoError(t, q.Offer(context.Background(), 0)) + require.NoError(t, q.Shutdown(context.Background())) + // Because the size 0 is ignored, nothing to drain. + assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil })) +} + +func TestMemoryQueueZeroCapacity(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 0}) + require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull) + require.NoError(t, q.Shutdown(context.Background())) +} + +func TestAsyncMemoryQueue(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100}) + consumed := &atomic.Int64{} + ac := newConsumerQueue(q, 1, func(_ context.Context, _ int64, done Done) { + consumed.Add(1) + done.OnDone(nil) + }) + require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) + for j := 0; j < 10; j++ { + require.NoError(t, q.Offer(context.Background(), 10)) + } + require.NoError(t, ac.Shutdown(context.Background())) + assert.EqualValues(t, 10, consumed.Load()) +} + +func TestAsyncMemoryQueueBlocking(t *testing.T) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blocking: true}) + consumed := &atomic.Int64{} + ac := newConsumerQueue(q, 4, func(_ context.Context, _ int64, done Done) { + consumed.Add(1) + done.OnDone(nil) + }) + require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100_000; j++ { + assert.NoError(t, q.Offer(context.Background(), 10)) + } + }() + } + wg.Wait() + require.NoError(t, ac.Shutdown(context.Background())) + assert.EqualValues(t, 1_000_000, consumed.Load()) +} + +func BenchmarkAsyncMemoryQueue(b *testing.B) { + q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: int64(10 * b.N)}) + consumed := &atomic.Int64{} + require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newConsumerQueue(q, 1, func(_ context.Context, _ int64, done Done) { + consumed.Add(1) + done.OnDone(nil) + }) + require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost())) + b.ResetTimer() + b.ReportAllocs() + for j := 0; j < b.N; j++ { + require.NoError(b, q.Offer(context.Background(), 10)) + } + require.NoError(b, ac.Shutdown(context.Background())) + assert.EqualValues(b, b.N, consumed.Load()) +} + +func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool { + ctx, req, done, ok := q.Read(context.Background()) + if !ok { + return false + } + done.OnDone(consumeFunc(ctx, req)) + return true +} diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index 0044b53ec7e..7791915d4be 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -81,7 +81,7 @@ type Factory[T any] func(context.Context, Settings, Config, ConsumeFunc[T]) Queu // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewMemoryQueueFactory[T any]() Factory[T] { return func(_ context.Context, _ Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] { - q := newBoundedMemoryQueue[T](memoryQueueSettings[T]{ + q := newMemoryQueue[T](memoryQueueSettings[T]{ sizer: &requestSizer[T]{}, capacity: int64(cfg.QueueSize), blocking: cfg.Blocking, diff --git a/exporter/exporterqueue/sized_queue_test.go b/exporter/exporterqueue/sized_queue_test.go deleted file mode 100644 index aef119802e5..00000000000 --- a/exporter/exporterqueue/sized_queue_test.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterqueue - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type sizerInt struct{} - -func (s sizerInt) Sizeof(el int) int64 { - return int64(el) -} - -func TestSizedQueue(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}, false) - require.NoError(t, q.Offer(context.Background(), 1)) - assert.Equal(t, int64(1), q.Size()) - assert.Equal(t, int64(7), q.Capacity()) - - require.NoError(t, q.Offer(context.Background(), 3)) - assert.Equal(t, int64(4), q.Size()) - - // should not be able to send to the full queue - require.Error(t, q.Offer(context.Background(), 4)) - assert.Equal(t, int64(4), q.Size()) - - _, el, ok := q.pop() - assert.Equal(t, 1, el) - assert.True(t, ok) - assert.Equal(t, int64(3), q.Size()) - - _, el, ok = q.pop() - assert.Equal(t, 3, el) - assert.True(t, ok) - assert.Equal(t, int64(0), q.Size()) - - require.NoError(t, q.Shutdown(context.Background())) - _, el, ok = q.pop() - assert.False(t, ok) - assert.Equal(t, 0, el) -} - -func TestSizedQueue_DrainAllElements(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}, false) - require.NoError(t, q.Offer(context.Background(), 1)) - require.NoError(t, q.Offer(context.Background(), 3)) - - _, el, ok := q.pop() - assert.Equal(t, 1, el) - assert.True(t, ok) - assert.Equal(t, int64(3), q.Size()) - - require.NoError(t, q.Shutdown(context.Background())) - _, el, ok = q.pop() - assert.Equal(t, 3, el) - assert.True(t, ok) - assert.Equal(t, int64(0), q.Size()) - - _, el, ok = q.pop() - assert.False(t, ok) - assert.Equal(t, 0, el) -} - -func TestSizedChannel_OfferInvalidSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}, false) - require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) -} - -func TestSizedChannel_OfferZeroSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}, false) - require.NoError(t, q.Offer(context.Background(), 0)) - require.NoError(t, q.Shutdown(context.Background())) - // Because the size 0 is ignored, nothing to drain. - _, el, ok := q.pop() - assert.False(t, ok) - assert.Equal(t, 0, el) -}