Package goroutines is an efficient, flexible, and lightweight goroutine pool written in Go. It provides a easy way to deal with several kinds of concurrent tasks with limited resource.
Inspired by fastsocket, the implementation is based on channel. It adopts pubsub model for dispatching tasks, and holding surplus tasks in queue if submitted more than the capacity of pool.
- Spawning and managing arbitrary number of asynchronous goroutines as a worker pool.
- Dispatch tasks to workers through pubsub model with specified queue size.
- Adjust the worker numbers based on the usage periodically.
- Easy to use when dealing with concurrent one-time batch jobs.
- Monitor current status by metrics
go get github.com/viney-shih/goroutines
By calling Schedule()
, it schedules the task executed by worker (goroutines) in the Pool.
It will be blocked until the workers accepting the request.
taskN := 7
rets := make(chan int, taskN)
// allocate a pool with 5 goroutines to deal with those tasks
p := goroutines.NewPool(5)
// don't forget to release the pool in the end
defer p.Release()
// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
idx := i
p.Schedule(func() {
// sleep and return the index
time.Sleep(20 * time.Millisecond)
rets <- idx
})
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("index:", <-rets)
}
// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 0
By calling ScheduleWithTimeout()
, it schedules the task executed by worker (goroutines) in the Pool within the specified period.
If it exceeds the time and doesn't be accepted, it will return error ErrScheduleTimeout
.
totalN, taskN := 5, 5
pause := make(chan struct{})
rets := make(chan int, taskN)
// allocate a pool with 5 goroutines to deal with those 5 tasks
p := goroutines.NewPool(totalN)
// don't forget to release the pool in the end
defer p.Release()
// full the workers which are stopped with the `pause`
for i := 0; i < taskN; i++ {
idx := i
p.ScheduleWithTimeout(50*time.Millisecond, func() {
<-pause
rets <- idx
})
}
// no more chance to add any task in Pool, and return `ErrScheduleTimeout`
if err := p.ScheduleWithTimeout(50*time.Millisecond, func() {
<-pause
rets <- taskN
}); err != nil {
fmt.Println(err.Error())
}
close(pause)
for i := 0; i < taskN; i++ {
fmt.Println("index:", <-rets)
}
// Unordered output:
// schedule timeout
// index: 0
// index: 3
// index: 2
// index: 4
// index: 1
To deal with batch jobs and consider the performance, we need to run tasks concurrently. However, the use case usually happen once and need not maintain a Pool for reusing it. I wrap this patten and call it Batch
. Here comes an example.
taskN := 11
// allocate a one-time batch job with 3 goroutines to deal with those tasks.
// no need to spawn extra goroutine by specifing the batch size consisting with the number of tasks.
b := goroutines.NewBatch(3, goroutines.WithBatchSize(taskN))
// don't forget to close batch job in the end
defer b.Close()
// pull all tasks to this batch queue
for i := 0; i < taskN; i++ {
idx := i
b.Queue(func() (interface{}, error) {
// sleep and return the index
time.Sleep(10 * time.Millisecond)
return idx, nil
})
}
// tell the batch that's all need to do
// DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
b.QueueComplete()
for ret := range b.Results() {
if ret.Error() != nil {
panic("not expected")
}
fmt.Println("index:", ret.Value().(int))
}
// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 10
// index: 7
// index: 9
// index: 8
// index: 0
See the examples, documentation and article for more details.
The PoolOption
interface is passed to NewPool
when creating Pool.
• WithTaskQueueLength( length int
)
It sets up the length of task queue for buffering tasks before sending to goroutines. The default queue length is 0
.
• WithPreAllocWorkers( size int
)
It sets up the number of workers to spawn when initializing Pool. Without specifying this, It initialize all numbers of goroutines consisting with Pool size at the beginning.
• WithWorkerAdjustPeriod( period time.Duration
)
It sets up the duration to adjust the worker size, and needs to be used with WithPreAllocWorkers
at the same time. By specifying both, it enables the mechanism to adjust the number of goroutines according to the usage dynamically.
The BatchOption
interface is passed to NewBatch
when creating Batch.
• WithBatchSize( size int
)
It specifies the batch size used to forward tasks.
By default, it needs to spawn an extra goroutine to prevent deadlocks.
It's helpful by specifing the batch size consisting with the number of tasks without an extra goroutine (see the example). The default batch size is 10
.