diff --git a/concurrency/worker.go b/concurrency/worker.go index 72acc7dd0..10a59e600 100644 --- a/concurrency/worker.go +++ b/concurrency/worker.go @@ -1,20 +1,22 @@ package concurrency -import ( - "sync" -) - // NewReusableGoroutinesPool creates a new worker pool with the given size. // These workers will run the workloads passed through Go() calls. // If all workers are busy, Go() will spawn a new goroutine to run the workload. func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { p := &ReusableGoroutinesPool{ - jobs: make(chan func()), + jobs: make(chan func()), + closed: make(chan struct{}), } for i := 0; i < size; i++ { go func() { - for f := range p.jobs { - f() + for { + select { + case f := <-p.jobs: + f() + case <-p.closed: + return + } } }() } @@ -22,23 +24,13 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { } type ReusableGoroutinesPool struct { - jobsMu sync.RWMutex - closed bool jobs chan func() + closed chan struct{} } // Go will run the given function in a worker of the pool. // If all workers are busy, Go() will spawn a new goroutine to run the workload. func (p *ReusableGoroutinesPool) Go(f func()) { - p.jobsMu.RLock() - defer p.jobsMu.RUnlock() - - // If the pool is closed, run the function in a new goroutine. - if p.closed { - go f() - return - } - select { case p.jobs <- f: default: @@ -51,8 +43,5 @@ func (p *ReusableGoroutinesPool) Go(f func()) { // Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. // Close is intended to be used in tests to ensure that no goroutines are leaked. func (p *ReusableGoroutinesPool) Close() { - p.jobsMu.Lock() - defer p.jobsMu.Unlock() - p.closed = true - close(p.jobs) + close(p.closed) }