-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
145 lines (132 loc) · 3.85 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package goproc
import (
"context"
"sync"
"time"
)
// Goroutine defines the function type for Controller.
type Goroutine func(ctx context.Context)
// Recover defines the recover handler function type for Controller.
type Recover func(r interface{})
// Controller implements a simple controller of goroutines, which can cancel
// or wait for all under control goroutines to return.
type Controller struct {
name string
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
}
// NewController creates a new goproc Controller.
func NewController(ctx context.Context, name string) *Controller {
child, cancel := context.WithCancel(ctx)
return &Controller{
name: name,
ctx: child,
cancel: cancel,
wg: &sync.WaitGroup{},
}
}
// Go initiates a new goroutine for g and gains control on the goroutine through
// a context.Context argument.
func (c *Controller) Go(g Goroutine) *Controller {
if err := c.ctx.Err(); err != nil {
panic(err)
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
g(c.ctx)
}()
return c
}
// GoWithRecover initiates a new goroutine for g and gains control on the goroutine
// through a context.Context argument.
// Any panic from g will be captured and handled by rf.
func (c *Controller) GoWithRecover(g Goroutine, rf Recover) *Controller {
if err := c.ctx.Err(); err != nil {
panic(err)
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer func() {
if r := recover(); r != nil {
rf(r)
}
}()
g(c.ctx)
}()
return c
}
// WithValue returns a copy of c with key->value added to internal context object, which will be
// passed to the Goroutine functions in subsequent c.Go* calls.
// For good practice of context key-value usage, reference context package docs.
//
// Note that unlike a child context, the returned object still holds the control of c, which means
// cancelling the returned Controller would actually cancel all goroutines started by c.
func (c *Controller) WithValue(key interface{}, value interface{}) *Controller {
if err := c.ctx.Err(); err != nil {
panic(err)
}
return &Controller{
name: c.name,
ctx: context.WithValue(c.ctx, key, value),
cancel: c.cancel,
wg: c.wg,
}
}
// WithDeadline returns a copy of c with deadline set to internal context object, which will be
// passed to the Goroutine functions in subsequent c.Go* calls.
//
// Note that unlike a child context, the returned object still holds the control of c, which means
// cancelling the returned Controller would actually cancel all goroutines started by c.
func (c *Controller) WithDeadline(deadline time.Time) *Controller {
if err := c.ctx.Err(); err != nil {
panic(err)
}
var child, cancel = context.WithDeadline(c.ctx, deadline)
return &Controller{
name: c.name,
ctx: child,
cancel: func() {
cancel()
c.cancel()
},
wg: c.wg,
}
}
// WithTimeout returns a copy of c with timeout set to internal context object, which will be
// passed to the Goroutine functions in subsequent c.Go* calls.
//
// Note that unlike a child context, the returned object still holds the control of c, which means
// cancelling the returned Controller would actually cancel all goroutines started by c.
func (c *Controller) WithTimeout(timeout time.Duration) *Controller {
if err := c.ctx.Err(); err != nil {
panic(err)
}
var child, cancel = context.WithTimeout(c.ctx, timeout)
return &Controller{
name: c.name,
ctx: child,
cancel: func() {
cancel()
c.cancel()
},
wg: c.wg,
}
}
// Shutdown cancels and waits for any goroutine under control.
func (c *Controller) Shutdown() {
c.cancel()
c.wg.Wait()
}
// Wait waits for any goroutine under control to exit.
func (c *Controller) Wait() {
defer c.cancel()
c.wg.Wait()
}
// Die tells whether c is already cancelled - it always returns true after the first time
// c.Shutdown() or c.Wait() is called.
func (c *Controller) Die() bool {
return c.ctx.Err() != nil
}