Skip to content

Commit e645399

Browse files
committed
Add support of discard with treshold option, remake the solution using channels
The change is inspired by the comment go-pkgz#5 (comment). The solution relies on channels as the main synchronization primitive, instead of semaphore.
1 parent d557ed0 commit e645399

File tree

3 files changed

+123
-34
lines changed

3 files changed

+123
-34
lines changed

group_options.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package syncs
33
import "context"
44

55
type options struct {
6-
ctx context.Context
7-
preLock bool
8-
termOnError bool
9-
discardIfFull bool
6+
ctx context.Context
7+
preLock bool
8+
termOnError bool
9+
discardIfFull bool
10+
tresholdDiscard int
1011
}
1112

1213
// GroupOption functional option type
@@ -34,3 +35,19 @@ func Discard(o *options) {
3435
o.discardIfFull = true
3536
o.preLock = true // discard implies preemptive
3637
}
38+
39+
// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach
40+
// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks
41+
// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become
42+
// available.
43+
func DiscardAfterTreshold(bufferSize int) GroupOption {
44+
return func(o *options) {
45+
o.discardIfFull = true
46+
o.preLock = true
47+
48+
if (bufferSize < 1) {
49+
bufferSize = 0
50+
}
51+
o.tresholdDiscard = bufferSize
52+
}
53+
}

sizedgroup.go

Lines changed: 87 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,66 +10,123 @@ import (
1010
// SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup
1111
type SizedGroup struct {
1212
options
13-
wg sync.WaitGroup
14-
sema Locker
13+
wg sync.WaitGroup
14+
workers chan struct{}
15+
scheduledJobs chan struct{}
16+
jobQueue chan func(ctx context.Context)
17+
workersMutex sync.Mutex
1518
}
1619

1720
// NewSizedGroup makes wait group with limited size alive goroutines
1821
func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
19-
res := SizedGroup{sema: NewSemaphore(size)}
22+
if size < 0 {
23+
size = 1
24+
}
25+
res := SizedGroup{workers: make(chan struct{}, size)}
2026
res.options.ctx = context.Background()
2127
for _, opt := range opts {
2228
opt(&res.options)
2329
}
30+
31+
// queue size either equal to number of workers or larger, otherwise does not make sense
32+
queueSize := size
33+
if res.tresholdDiscard > 0 {
34+
queueSize += res.tresholdDiscard
35+
}
36+
37+
res.jobQueue = make(chan func(ctx context.Context), queueSize)
38+
res.scheduledJobs = make(chan struct{}, queueSize)
2439
return &res
2540
}
2641

2742
// Go calls the given function in a new goroutine.
2843
// Every call will be unblocked, but some goroutines may wait if semaphore locked.
2944
func (g *SizedGroup) Go(fn func(ctx context.Context)) {
30-
canceled := func() bool {
31-
select {
32-
case <-g.ctx.Done():
33-
return true
34-
default:
35-
return false
36-
}
45+
if g.canceled() {
46+
return
3747
}
3848

39-
if canceled() {
49+
g.wg.Add(1)
50+
if !g.preLock {
51+
go func() {
52+
defer g.wg.Done()
53+
if g.canceled() {
54+
return
55+
}
56+
g.scheduledJobs <- struct{}{}
57+
fn(g.ctx)
58+
<-g.scheduledJobs
59+
}()
4060
return
4161
}
42-
43-
if g.preLock {
44-
lockOk := g.sema.TryLock()
45-
if !lockOk && g.discardIfFull {
46-
// lock failed and discardIfFull is set, discard this goroutine
62+
63+
toRun := func(job func(ctx context.Context)) {
64+
defer g.wg.Done()
65+
if g.canceled() {
4766
return
4867
}
49-
if !lockOk && !g.discardIfFull {
50-
g.sema.Lock() // make sure we have block until lock is acquired
51-
}
68+
job(g.ctx)
69+
<- g.scheduledJobs
5270
}
5371

54-
g.wg.Add(1)
55-
go func() {
56-
defer g.wg.Done()
57-
58-
if canceled() {
59-
return
72+
startWorkerIfNeeded := func() {
73+
g.workersMutex.Lock()
74+
select {
75+
case g.workers <- struct{}{}:
76+
g.workersMutex.Unlock()
77+
go func() {
78+
for {
79+
select {
80+
case job := <-g.jobQueue:
81+
toRun(job)
82+
default:
83+
g.workersMutex.Lock()
84+
select {
85+
case job := <-g.jobQueue:
86+
g.workersMutex.Unlock()
87+
toRun(job)
88+
continue
89+
default:
90+
<-g.workers
91+
g.workersMutex.Unlock()
92+
}
93+
return
94+
}
95+
}
96+
}()
97+
default:
98+
g.workersMutex.Unlock()
6099
}
100+
}
61101

62-
if !g.preLock {
63-
g.sema.Lock()
102+
if g.discardIfFull {
103+
select {
104+
case g.scheduledJobs <- struct{}{}:
105+
g.jobQueue <- fn
106+
startWorkerIfNeeded()
107+
default:
108+
g.wg.Done()
64109
}
65110

66-
fn(g.ctx)
67-
g.sema.Unlock()
68-
}()
111+
return
112+
}
113+
114+
g.scheduledJobs <- struct{}{}
115+
g.jobQueue <- fn
116+
startWorkerIfNeeded()
69117
}
70118

71119
// Wait blocks until the SizedGroup counter is zero.
72120
// See sync.WaitGroup documentation for more information.
73121
func (g *SizedGroup) Wait() {
74122
g.wg.Wait()
75123
}
124+
125+
func (g *SizedGroup) canceled() bool {
126+
select {
127+
case <-g.ctx.Done():
128+
return true
129+
default:
130+
return false
131+
}
132+
}

sizedgroup_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ func TestSizedGroup_Discard(t *testing.T) {
4242
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c))
4343
}
4444

45+
func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
46+
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
47+
var c uint32
48+
49+
for i := 0; i < 100; i++ {
50+
swg.Go(func(ctx context.Context) {
51+
time.Sleep(5 * time.Millisecond)
52+
atomic.AddUint32(&c, 1)
53+
})
54+
}
55+
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
56+
swg.Wait()
57+
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
58+
}
59+
4560
func TestSizedGroup_Preemptive(t *testing.T) {
4661
swg := NewSizedGroup(10, Preemptive)
4762
var c uint32

0 commit comments

Comments
 (0)