-
Notifications
You must be signed in to change notification settings - Fork 182
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chanutils: add concurrent queue impl
- Loading branch information
1 parent
16ac6c0
commit 6551148
Showing
1 changed file
with
132 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package chanutils | ||
|
||
import ( | ||
"container/list" | ||
"sync" | ||
) | ||
|
||
const ( | ||
// DefaultQueueSize is the default size to use for concurrent queues. | ||
DefaultQueueSize = 10 | ||
) | ||
|
||
// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded | ||
// capacity. Clients interact with the queue by pushing items into the in | ||
// channel and popping items from the out channel. There is a goroutine that | ||
// manages moving items from the in channel to the out channel in the correct | ||
// order that must be started by calling Start(). | ||
type ConcurrentQueue[T any] struct { | ||
started sync.Once | ||
stopped sync.Once | ||
|
||
chanIn chan T | ||
chanOut chan T | ||
overflow *list.List | ||
|
||
wg sync.WaitGroup | ||
quit chan struct{} | ||
} | ||
|
||
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is | ||
// the capacity of the output channel. When the size of the queue is below this | ||
// threshold, pushes do not incur the overhead of the less efficient overflow | ||
// structure. | ||
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] { | ||
return &ConcurrentQueue[T]{ | ||
chanIn: make(chan T), | ||
chanOut: make(chan T, bufferSize), | ||
overflow: list.New(), | ||
quit: make(chan struct{}), | ||
} | ||
} | ||
|
||
// ChanIn returns a channel that can be used to push new items into the queue. | ||
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T { | ||
return cq.chanIn | ||
} | ||
|
||
// ChanOut returns a channel that can be used to pop items from the queue. | ||
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T { | ||
return cq.chanOut | ||
} | ||
|
||
// Start begins a goroutine that manages moving items from the in channel to the | ||
// out channel. The queue tries to move items directly to the out channel | ||
// minimize overhead, but if the out channel is full it pushes items to an | ||
// overflow queue. This must be called before using the queue. | ||
func (cq *ConcurrentQueue[T]) Start() { | ||
cq.started.Do(cq.start) | ||
} | ||
|
||
func (cq *ConcurrentQueue[T]) start() { | ||
cq.wg.Add(1) | ||
go func() { | ||
defer cq.wg.Done() | ||
|
||
readLoop: | ||
for { | ||
nextElement := cq.overflow.Front() | ||
if nextElement == nil { | ||
// Overflow queue is empty so incoming items can | ||
// be pushed directly to the output channel. If | ||
// output channel is full though, push to | ||
// overflow. | ||
select { | ||
case item, ok := <-cq.chanIn: | ||
if !ok { | ||
break readLoop | ||
} | ||
select { | ||
case cq.chanOut <- item: | ||
// Optimistically push directly | ||
// to chanOut. | ||
default: | ||
cq.overflow.PushBack(item) | ||
} | ||
case <-cq.quit: | ||
return | ||
} | ||
} else { | ||
// Overflow queue is not empty, so any new items | ||
// get pushed to the back to preserve order. | ||
select { | ||
case item, ok := <-cq.chanIn: | ||
if !ok { | ||
break readLoop | ||
} | ||
cq.overflow.PushBack(item) | ||
case cq.chanOut <- nextElement.Value.(T): | ||
cq.overflow.Remove(nextElement) | ||
case <-cq.quit: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Incoming channel has been closed. Empty overflow queue into | ||
// the outgoing channel. | ||
nextElement := cq.overflow.Front() | ||
for nextElement != nil { | ||
select { | ||
case cq.chanOut <- nextElement.Value.(T): | ||
cq.overflow.Remove(nextElement) | ||
case <-cq.quit: | ||
return | ||
} | ||
nextElement = cq.overflow.Front() | ||
} | ||
|
||
// Close outgoing channel. | ||
close(cq.chanOut) | ||
}() | ||
} | ||
|
||
// Stop ends the goroutine that moves items from the in channel to the out | ||
// channel. This does not clear the queue state, so the queue can be restarted | ||
// without dropping items. | ||
func (cq *ConcurrentQueue[T]) Stop() { | ||
cq.stopped.Do(func() { | ||
close(cq.quit) | ||
cq.wg.Wait() | ||
}) | ||
} |