-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_parallel.go
108 lines (90 loc) · 1.66 KB
/
async_parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package async
import (
"sync"
"time"
)
// Parallel return parallel handler
func Parallel() *parallel {
p := new(parallel)
p.init()
return p
}
type parallel struct {
wg *sync.WaitGroup
f []Func
e errMap
eC chan err
timeout time.Duration
poolNum int
poolChan chan Func
}
type Func struct {
Tag string
F func() error
}
type errMap struct {
mu sync.Mutex
errors map[string]error
}
type err struct {
tag string
err error
}
func (p *parallel) init() {
p.wg = &sync.WaitGroup{}
p.f = []Func{}
p.e = errMap{mu: sync.Mutex{}, errors: map[string]error{}}
p.timeout = defaultTimeout
p.poolNum = defaultPoolNum
p.poolChan = make(chan Func)
}
// AddFunc add one func to handler
func (p *parallel) AddFunc(f Func) {
p.f = append(p.f, f)
}
// AddFunc add more functions to handler
func (p *parallel) AddFuncs(f ...Func) {
p.f = append(p.f, f...)
}
// SetTimeout ...
func (p *parallel) SetTimeout(timeout time.Duration) *parallel {
p.timeout = timeout
return p
}
// SetPoolNum ...
func (p *parallel) SetPoolNum(n int) {
p.poolNum = n
}
// Run return all errors if error exists
func (p *parallel) Run() map[string]error {
p.eC = make(chan err, len(p.f))
if p.poolNum > len(p.f) {
p.poolNum = len(p.f)
}
go func() {
for i := range p.f {
p.poolChan <- p.f[i]
}
close(p.poolChan)
}()
p.wg.Add(p.poolNum)
for i := 0; i < p.poolNum; i++ {
go func() {
defer p.wg.Done()
for f := range p.poolChan {
p.eC <- err{
tag: f.Tag,
err: handle(p.timeout, f.F),
}
}
}()
}
p.wg.Wait()
close(p.eC)
for e := range p.eC {
p.e.errors[e.tag] = e.err
}
result := p.e.errors
p.init()
return result
}