Skip to content
This repository has been archived by the owner on Nov 24, 2024. It is now read-only.

Commit

Permalink
Use functional options to New to configure channelqueue (#6)
Browse files Browse the repository at this point in the history
- Calling New without options defaults to unbounded
- Add WithCapacity option to specify capacity
- Add WithInput option to use existing channel as input channel.
- Add WithOutput option to use existing channel as output channel.
- Add Shutdown function to close and drain ChannelQueue
- Test for goroutine leaks after Close and Shutdown
- Allow multiple calls to Close and Shutdown
  • Loading branch information
gammazero authored Nov 24, 2024
1 parent e92c93e commit ece24c7
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 74 deletions.
127 changes: 100 additions & 27 deletions channelqueue.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,86 @@
package channelqueue

import "github.com/gammazero/deque"
import (
"sync"

"github.com/gammazero/deque"
)

// ChannelQueue uses a queue to buffer data between input and output channels.
type ChannelQueue[T any] struct {
input, output chan T
length chan int
capacity int
closeOnce sync.Once
}

// New creates a new ChannelQueue with the specified buffer capacity.
type Option[T any] func(*ChannelQueue[T])

// WithCapacity sets the limit on the number of unread items that channelqueue
// will hold. Unbuffered behavior is not supported (use a normal channel for
// that), and a value of zero or less configures the default of no limit.
//
// A capacity < 0 specifies unlimited capacity. Unbuffered behavior is not
// supported; use a normal channel for that. Use caution if specifying an
// unlimited capacity since storage is still limited by system resources.
func New[T any](capacity int) *ChannelQueue[T] {
if capacity == 0 {
panic("unbuffered behavior not supported")
// Example:
//
// cq := channelqueue.New(channelqueue.WithCapacity[int](64))
func WithCapacity[T any](n int) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if n < 1 {
n = -1
}
c.capacity = n
}
if capacity < 0 {
capacity = -1
}

// WithInput uses an existing channel as the input channel, which is the
// channel used to write to the queue. This is used when buffering items that
// must be read from an existing channel. Be aware that calling Close or
// Shutdown will close this channel.
//
// Example:
//
// in := make(chan int)
// cq := channelqueue.New(channelqueue.WithInput[int](in))
func WithInput[T any](in chan T) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if in != nil {
c.input = in
}
}
}

// WithOutput uses an existing channel as the output channel, which is the
// channel used to read from the queue. This is used when buffering items that
// must be written to an existing channel. Be aware that ChannelQueue will
// close this channel when no more items are available.
//
// Example:
//
// out := make(chan int)
// cq := channelqueue.New(channelqueue.WithOutput[int](out))
func WithOutput[T any](out chan T) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if out != nil {
c.output = out
}
}
}

// New creates a new ChannelQueue that, by default, holds an unbounded number
// of items of the specified type.
func New[T any](options ...Option[T]) *ChannelQueue[T] {
cq := &ChannelQueue[T]{
input: make(chan T),
output: make(chan T),
length: make(chan int),
capacity: capacity,
capacity: -1,
}
for _, opt := range options {
opt(cq)
}
if cq.input == nil {
cq.input = make(chan T)
}
if cq.output == nil {
cq.output = make(chan T)
}
go cq.bufferData()
return cq
Expand All @@ -34,18 +89,25 @@ func New[T any](capacity int) *ChannelQueue[T] {
// NewRing creates a new ChannelQueue with the specified buffer capacity, and
// circular buffer behavior. When the buffer is full, writing an additional
// item discards the oldest buffered item.
func NewRing[T any](capacity int) *ChannelQueue[T] {
if capacity < 1 {
return New[T](capacity)
}

func NewRing[T any](options ...Option[T]) *ChannelQueue[T] {
cq := &ChannelQueue[T]{
input: make(chan T),
output: make(chan T),
length: make(chan int),
capacity: capacity,
capacity: -1,
}
if capacity == 1 {
for _, opt := range options {
opt(cq)
}
if cq.capacity < 1 {
// Unbounded ring is the same as an unbounded queue.
return New(WithInput[T](cq.input))
}
if cq.input == nil {
cq.input = make(chan T)
}
if cq.output == nil {
cq.output = make(chan T)
}
if cq.capacity == 1 {
go cq.oneBufferData()
} else {
go cq.ringBufferData()
Expand All @@ -68,16 +130,27 @@ func (cq *ChannelQueue[T]) Len() int {
return <-cq.length
}

// Cap returns the capacity of the channel.
// Cap returns the capacity of the channelqueue. Returns -1 if unbounded.
func (cq *ChannelQueue[T]) Cap() int {
return cq.capacity
}

// Close closes the input channel. Additional input will panic, output will
// continue to be readable until there is no more data, and then the output
// channel is closed.
// Close closes the input channel. This is the same as calling the builtin
// close on the input channel, except Close can be called multiple times..
// Additional input will panic, output will continue to be readable until there
// is no more data, and then the output channel is closed.
func (cq *ChannelQueue[T]) Close() {
close(cq.input)
cq.closeOnce.Do(func() {
close(cq.input)
})
}

// Shutdown calls Close then drains the channel to ensure that the internal
// goroutine finishes.
func (cq *ChannelQueue[T]) Shutdown() {
cq.Close()
for range cq.output {
}
}

// bufferData is the goroutine that transfers data from the In() chan to the
Expand Down
Loading

0 comments on commit ece24c7

Please sign in to comment.