Skip to content

Commit 4e05446

Browse files
committed
change *Action to Action, add a new func named Map
1 parent c060cce commit 4e05446

File tree

3 files changed

+93
-34
lines changed

3 files changed

+93
-34
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Features:
1212
- [x] `ipfilter`: a high performance IP filter based on AVL tree
1313
- [x] `concurrent`: some well-tested concurrent features, such as Goroutine pool
1414

15-
More common algorithms and data structures will be implemented here.
15+
More and more common algorithms and data structures will be implemented here.
1616

1717

1818
## Install

concurrent/pool.go

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,31 @@ type Action struct {
2929
// Runner is a func need to be executed. It may return a list of new
3030
// actions. These actions will be executed after the current action had
3131
// finished
32-
Runner func() []*Action
32+
Runner func() []Action
3333

3434
// Dependencies for the current action. The current action will only be
3535
// executed after all dependencies had been finished
36-
Dependencies []*Action
36+
Dependencies []Action
3737
}
3838

3939
// ActionPool is an action pool based on goroutines
4040
type ActionPool interface {
4141
// Do an action. It's panic-free if using it correctly.
42-
// It will panic if you do a nil action, or do an action in a closed pool.
43-
Do(*Action)
42+
// It will panic if you do an action in a closed pool.
43+
Do(Action)
44+
45+
// Map returns a slice of result that applies `f` to every item of `input`.
46+
// The `ret` is in-order.
47+
Map(input []interface{}, f func(interface{}) interface{}) (ret []interface{})
48+
49+
// Wait until all the actions had been finished.
50+
Wait()
4451

4552
// Wait until all the actions had been finished, and close the pool.
4653
WaitAndClose()
4754

4855
// Set a logger to output some err msg when panic. os.Stderr will be used
49-
// by default
56+
// by default.
5057
SetLogger(io.Writer)
5158
}
5259

@@ -59,11 +66,7 @@ type actionPool struct {
5966
}
6067

6168
// Do implements ActionPool's method.
62-
func (p *actionPool) Do(a *Action) {
63-
if a == nil {
64-
panic("ActionPool: the action is nil")
65-
}
66-
69+
func (p *actionPool) Do(a Action) {
6770
if atomic.LoadUint32(&p.closed) > 0 {
6871
panic("ActionPool: the current pool had already been closed.")
6972
}
@@ -76,7 +79,7 @@ func (p *actionPool) Do(a *Action) {
7679
}
7780

7881
// do an action and free a slot when finished executing.
79-
func (p *actionPool) do(a *Action, parentWg *sync.WaitGroup) {
82+
func (p *actionPool) do(a Action, parentWg *sync.WaitGroup) {
8083
// clean up
8184
defer func() {
8285
// parent waitgroup
@@ -122,6 +125,11 @@ func (p *actionPool) panicCatcher() {
122125
}
123126
}
124127

128+
// Wait implements ActionPool's method.
129+
func (p *actionPool) Wait() {
130+
p.wg.Wait()
131+
}
132+
125133
// WaitAndClose implements ActionPool's method.
126134
func (p *actionPool) WaitAndClose() {
127135
atomic.StoreUint32(&p.closed, 1)
@@ -132,3 +140,28 @@ func (p *actionPool) WaitAndClose() {
132140
func (p *actionPool) SetLogger(l io.Writer) {
133141
p.logger = l
134142
}
143+
144+
// Map implements ActionPool's method.
145+
func (p *actionPool) Map(input []interface{}, f func(interface{}) interface{}) (ret []interface{}) {
146+
if len(input) == 0 {
147+
return
148+
}
149+
150+
ret = make([]interface{}, len(input))
151+
mu := sync.Mutex{}
152+
for tmpIdx := range input {
153+
idx := tmpIdx // just copy the value
154+
p.Do(Action{
155+
Runner: func() []Action {
156+
r := f(input[idx])
157+
mu.Lock()
158+
ret[idx] = r
159+
mu.Unlock()
160+
return nil
161+
},
162+
})
163+
}
164+
165+
p.Wait()
166+
return
167+
}

concurrent/pool_test.go

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package concurrent_test
22

33
import (
44
"bytes"
5+
"fmt"
56
"strconv"
67
"strings"
78
"sync/atomic"
@@ -19,8 +20,8 @@ func TestPoolWithOneSlot(t *testing.T) {
1920
for i := 0; i < 100; i++ {
2021
c := strconv.Itoa(i) + "."
2122
expected.WriteString(c)
22-
pool.Do(&concurrent.Action{
23-
Runner: func() []*concurrent.Action {
23+
pool.Do(concurrent.Action{
24+
Runner: func() []concurrent.Action {
2425
buf.WriteString(c)
2526
return nil
2627
},
@@ -45,22 +46,22 @@ func TestPoolWithOneSlotAndDeps(t *testing.T) {
4546
expected.WriteString("2@")
4647
expected.WriteString(c)
4748

48-
pool.Do(&concurrent.Action{
49-
Runner: func() []*concurrent.Action {
49+
pool.Do(concurrent.Action{
50+
Runner: func() []concurrent.Action {
5051
buf.WriteString(strconv.Itoa(int(atomic.LoadInt32(p))))
5152
buf.WriteString("@")
5253
buf.WriteString(c)
5354
return nil
5455
},
55-
Dependencies: []*concurrent.Action{
56+
Dependencies: []concurrent.Action{
5657
{
57-
Runner: func() []*concurrent.Action {
58+
Runner: func() []concurrent.Action {
5859
atomic.AddInt32(p, 1)
5960
return nil
6061
},
6162
},
6263
{
63-
Runner: func() []*concurrent.Action {
64+
Runner: func() []concurrent.Action {
6465
atomic.AddInt32(p, 1)
6566
return nil
6667
},
@@ -81,27 +82,27 @@ func TestPoolConcurrency(t *testing.T) {
8182
tStart := time.Now()
8283

8384
for i := 0; i < 5; i++ {
84-
pool.Do(&concurrent.Action{
85-
Runner: func() []*concurrent.Action {
85+
pool.Do(concurrent.Action{
86+
Runner: func() []concurrent.Action {
8687
// 2s will elapse
8788
time.Sleep(time2sleep)
8889

8990
// 2s will elapse
90-
return []*concurrent.Action{
91+
return []concurrent.Action{
9192
{
92-
Runner: func() []*concurrent.Action {
93+
Runner: func() []concurrent.Action {
9394
time.Sleep(time2sleep)
9495
return nil
9596
},
9697
},
9798
{
98-
Runner: func() []*concurrent.Action {
99+
Runner: func() []concurrent.Action {
99100
time.Sleep(time2sleep)
100101
return nil
101102
},
102103
},
103104
{
104-
Runner: func() []*concurrent.Action {
105+
Runner: func() []concurrent.Action {
105106
time.Sleep(time2sleep)
106107
return nil
107108
},
@@ -119,25 +120,22 @@ func TestPoolConcurrency(t *testing.T) {
119120

120121
func TestPanic(t *testing.T) {
121122
pool := concurrent.NewActionPool(0)
122-
expectPanic(func() {
123-
pool.Do(nil)
124-
}, t)
125-
126-
pool.Do(&concurrent.Action{})
123+
pool.Map(nil, nil)
124+
pool.Do(concurrent.Action{})
127125
pool.WaitAndClose()
128126
expectPanic(func() {
129-
pool.Do(&concurrent.Action{})
127+
pool.Do(concurrent.Action{})
130128
}, t)
131129

132130
pool = concurrent.NewActionPool(0)
133131
buf := new(bytes.Buffer)
134132
pool.SetLogger(buf)
135-
pool.Do(&concurrent.Action{
136-
Runner: func() []*concurrent.Action {
133+
pool.Do(concurrent.Action{
134+
Runner: func() []concurrent.Action {
137135
panic("panic test")
138136
},
139137
})
140-
pool.WaitAndClose()
138+
pool.Wait()
141139
if !strings.HasPrefix(buf.String(), "panic test") {
142140
t.Fatalf("unexpected result, expect logger with panic msg")
143141
}
@@ -152,3 +150,31 @@ func expectPanic(f func(), t *testing.T) {
152150

153151
f()
154152
}
153+
154+
func ExampleActionPool_Map() {
155+
pool := concurrent.NewActionPool(10)
156+
input := []interface{}{
157+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
158+
}
159+
transFunc := func(i interface{}) interface{} {
160+
return 12 - i.(int)
161+
}
162+
163+
ret := pool.Map(input, transFunc)
164+
for _, item := range ret {
165+
fmt.Println(item)
166+
}
167+
168+
// Output:
169+
// 11
170+
// 10
171+
// 9
172+
// 8
173+
// 7
174+
// 6
175+
// 5
176+
// 4
177+
// 3
178+
// 2
179+
// 1
180+
}

0 commit comments

Comments
 (0)