diff --git a/queue/ring.go b/queue/ring.go index 13d532f..be530bb 100644 --- a/queue/ring.go +++ b/queue/ring.go @@ -19,6 +19,7 @@ package queue import ( "runtime" "sync/atomic" + "time" ) // roundUp takes a uint64 greater than 0 and rounds it up to the next @@ -128,9 +129,24 @@ L: // to the queue or Dispose is called on the queue. An error will be returned // if the queue is disposed. func (rb *RingBuffer) Get() (interface{}, error) { - var n *node - pos := atomic.LoadUint64(&rb.dequeue) - i := 0 + return rb.Poll(0) +} + +// Poll will return the next item in the queue. This call will block +// if the queue is empty. This call will unblock when an item is added +// to the queue, Dispose is called on the queue, or the timeout is reached. An +// error will be returned if the queue is disposed or a timeout occurs. A +// non-positive timeout will block indefinitely. +func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error) { + var ( + n *node + pos = atomic.LoadUint64(&rb.dequeue) + i = 0 + start time.Time + ) + if timeout > 0 { + start = time.Now() + } L: for { if atomic.LoadUint64(&rb.disposed) == 1 { @@ -150,6 +166,10 @@ L: pos = atomic.LoadUint64(&rb.dequeue) } + if timeout > 0 && time.Since(start) >= timeout { + return nil, ErrTimeout + } + if i == 10000 { runtime.Gosched() // free up the cpu before the next iteration i = 0 diff --git a/queue/ring_test.go b/queue/ring_test.go index e558135..8fcc6da 100644 --- a/queue/ring_test.go +++ b/queue/ring_test.go @@ -20,6 +20,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -176,6 +177,54 @@ func TestRingGetEmpty(t *testing.T) { wg.Wait() } +func TestRingPollEmpty(t *testing.T) { + rb := NewRingBuffer(3) + + _, err := rb.Poll(1) + assert.Equal(t, ErrTimeout, err) +} + +func TestRingPoll(t *testing.T) { + rb := NewRingBuffer(10) + + // should be able to Poll() before anything is present, without breaking future Puts + rb.Poll(time.Millisecond) + + rb.Put(`test`) + result, err := rb.Poll(0) + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, `test`, result) + assert.Equal(t, uint64(0), rb.Len()) + + rb.Put(`1`) + rb.Put(`2`) + + result, err = rb.Poll(time.Millisecond) + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, `1`, result) + assert.Equal(t, uint64(1), rb.Len()) + + result, err = rb.Poll(time.Millisecond) + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, `2`, result) + + before := time.Now() + _, err = rb.Poll(5 * time.Millisecond) + // This delta is normally 1-3 ms but running tests in CI with -race causes + // this to run much slower. For now, just bump up the threshold. + assert.InDelta(t, 5, time.Since(before).Seconds()*1000, 10) + assert.Equal(t, ErrTimeout, err) +} + func TestRingLen(t *testing.T) { rb := NewRingBuffer(4) assert.Equal(t, uint64(0), rb.Len())