-
Notifications
You must be signed in to change notification settings - Fork 2
/
net.go
178 lines (151 loc) · 3.87 KB
/
net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package net
import (
"context"
"errors"
"fmt"
"net"
"sync"
"github.com/deixis/spine/log"
)
const (
// StateDown mode is the default state. The handler is not ready to accept
// new connections
StateDown uint32 = iota
// StateUp mode is when a handler accepts connections
StateUp
// StateDrain mode is when a handler stops accepting new connection, but wait
// for all existing in-flight requests to complete
StateDrain
)
var (
// ErrEmptyReg is the error returned when there are no servers registered
ErrEmptyReg = errors.New("there must be at least one registered server")
// ErrDraining occurs when there is an attempt to access a draining `Server`
ErrDraining = errors.New("server is draining")
)
// Server is the interface to implement to be a valid server
type Server interface {
Serve(ctx context.Context, addr string) error
Drain()
}
// Reg (registry) holds a list of H
type Reg struct {
mu sync.Mutex
log log.Logger
l map[string]Server
drain bool
}
// NewReg builds a new registry
func NewReg(log log.Logger) *Reg {
return &Reg{
log: log,
l: map[string]Server{},
}
}
// Add adds the given server to the list of servers
func (r *Reg) Add(addr string, h Server) {
r.mu.Lock()
defer r.mu.Unlock()
err := r.register(addr, h)
if err != nil {
// If we attempt to register on the same address, we can assume it is a
// config error, therefore we should fail loudly and as fast as possible,
// hence the panic.
panic(err)
}
}
// Serve starts all registered servers
func (r *Reg) Serve(ctx context.Context) error {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.l) == 0 {
return ErrEmptyReg
}
r.log.Trace("server.serve.init", "Starting servers...")
wg := sync.WaitGroup{}
wg.Add(len(r.l))
for addr, h := range r.l {
go func(addr string, s Server) {
// Deregister itself upon completion
defer func() {
r.log.Trace("spine.serve.s.stop", "Server has stopped running",
log.String("addr", addr),
log.Type("server", s),
)
r.mu.Lock()
r.deregister(addr)
r.mu.Unlock()
}()
r.log.Trace("spine.serve.s", "Server starts serving",
log.String("addr", addr),
log.Type("server", s),
)
wg.Done()
// TODO: Send pre-flight requests to make sure the server is ready
err := s.Serve(ctx, addr)
if err != nil {
r.log.Error("spine.serve.s", "Server error",
log.String("addr", addr),
log.Error(err),
)
}
}(addr, h)
}
wg.Wait() // Wait to boot all servers
r.log.Trace("server.serve.ready", "All servers are running")
return nil
}
// Drain notify all servers to enter in draining mode. It means they are no
// longer accepting new requests, but they can finish all in-flight requests
func (r *Reg) Drain() {
r.mu.Lock()
defer r.mu.Unlock()
// Check if we are already draining
if r.drain {
return
}
// Flag registry as draining
r.drain = true
// Build WG
l := len(r.l)
wg := sync.WaitGroup{}
wg.Add(l)
// Drain servers
r.log.Trace("server.drain.init", "Start draining",
log.Int("servers", l),
)
for _, s := range r.l {
r.log.Trace("server.drain.s", "Drain server",
log.Type("server", s),
)
go func(s Server) {
s.Drain()
wg.Done()
}(s)
}
wg.Wait()
r.drain = false
r.log.Trace("server.drain.done", "All servers have been drained")
}
func (r *Reg) register(addr string, s Server) error {
if _, ok := r.l[addr]; ok {
return fmt.Errorf(
"server listening on <%s> has already been registered (%T)",
addr,
r.l[addr],
)
}
r.l[addr] = s
return nil
}
func (r *Reg) deregister(addr string) {
delete(r.l, addr)
}
// JoinHostPort combines host and port into a network address of the
// form "host:port". If host contains a colon, as found in literal
// IPv6 addresses, then JoinHostPort returns "[host]:port".
//
// See func Dial for a description of the host and port parameters.
func JoinHostPort(host, port string) string {
return net.JoinHostPort(host, port)
}