Skip to content

Commit

Permalink
Add support of discard with treshold option, remake the solution usin…
Browse files Browse the repository at this point in the history
…g channels

The change is inspired by the comment go-pkgz#5 (comment).
The solution relies on channels as the main synchronization primitive, instead of semaphore.
  • Loading branch information
Mykyta committed Aug 25, 2024
1 parent d557ed0 commit e726d2d
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 34 deletions.
25 changes: 21 additions & 4 deletions group_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package syncs
import "context"

type options struct {
ctx context.Context
preLock bool
termOnError bool
discardIfFull bool
ctx context.Context
preLock bool
termOnError bool
discardIfFull bool
tresholdDiscard int
}

// GroupOption functional option type
Expand Down Expand Up @@ -34,3 +35,19 @@ func Discard(o *options) {
o.discardIfFull = true
o.preLock = true // discard implies preemptive
}

// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach
// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks
// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become
// available.
func DiscardAfterTreshold(bufferSize int) GroupOption {
return func(o *options) {
o.discardIfFull = true
o.preLock = true

if (bufferSize < 1) {
bufferSize = 0
}
o.tresholdDiscard = bufferSize
}
}
117 changes: 87 additions & 30 deletions sizedgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,123 @@ import (
// SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup
type SizedGroup struct {
options
wg sync.WaitGroup
sema Locker
wg sync.WaitGroup
workers chan struct{}
scheduledJobs chan struct{}
jobQueue chan func(ctx context.Context)
workersMutex sync.Mutex
}

// NewSizedGroup makes wait group with limited size alive goroutines
func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
res := SizedGroup{sema: NewSemaphore(size)}
if size < 0 {
size = 1
}
res := SizedGroup{workers: make(chan struct{}, size)}
res.options.ctx = context.Background()
for _, opt := range opts {
opt(&res.options)
}

// queue size either equal to number of workers or larger, otherwise does not make sense
queueSize := size
if res.tresholdDiscard > 0 {
queueSize += res.tresholdDiscard
}

res.jobQueue = make(chan func(ctx context.Context), queueSize)
res.scheduledJobs = make(chan struct{}, queueSize)
return &res
}

// Go calls the given function in a new goroutine.
// Every call will be unblocked, but some goroutines may wait if semaphore locked.
func (g *SizedGroup) Go(fn func(ctx context.Context)) {
canceled := func() bool {
select {
case <-g.ctx.Done():
return true
default:
return false
}
if g.canceled() {
return
}

if canceled() {
g.wg.Add(1)
if !g.preLock {
go func() {
defer g.wg.Done()
if g.canceled() {
return
}
g.scheduledJobs <- struct{}{}
fn(g.ctx)
<-g.scheduledJobs
}()
return
}

if g.preLock {
lockOk := g.sema.TryLock()
if !lockOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine

toRun := func(job func(ctx context.Context)) {
defer g.wg.Done()
if g.canceled() {
return
}
if !lockOk && !g.discardIfFull {
g.sema.Lock() // make sure we have block until lock is acquired
}
job(g.ctx)
<- g.scheduledJobs
}

g.wg.Add(1)
go func() {
defer g.wg.Done()

if canceled() {
return
startWorkerIfNeeded := func() {
g.workersMutex.Lock()
select {
case g.workers <- struct{}{}:
g.workersMutex.Unlock()
go func() {
for {
select {
case job := <-g.jobQueue:
toRun(job)
default:
g.workersMutex.Lock()
select {
case job := <-g.jobQueue:
g.workersMutex.Unlock()
toRun(job)
continue
default:
<-g.workers
g.workersMutex.Unlock()
}
return
}
}
}()
default:
g.workersMutex.Unlock()
}
}

if !g.preLock {
g.sema.Lock()
if g.discardIfFull {
select {
case g.scheduledJobs <- struct{}{}:
g.jobQueue <- fn
startWorkerIfNeeded()
default:
g.wg.Done()
}

fn(g.ctx)
g.sema.Unlock()
}()
return
}

g.scheduledJobs <- struct{}{}
g.jobQueue <- fn
startWorkerIfNeeded()
}

// Wait blocks until the SizedGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (g *SizedGroup) Wait() {
g.wg.Wait()
}

func (g *SizedGroup) canceled() bool {
select {
case <-g.ctx.Done():
return true
default:
return false
}
}
15 changes: 15 additions & 0 deletions sizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ func TestSizedGroup_Discard(t *testing.T) {
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
}

func TestSizedGroup_Preemptive(t *testing.T) {
swg := NewSizedGroup(10, Preemptive)
var c uint32
Expand Down

0 comments on commit e726d2d

Please sign in to comment.