-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
149 lines (134 loc) · 3.77 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package grpcmix
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type Server interface {
grpc.ServiceRegistrar
reflection.GRPCServer
GetConnStateMap() map[net.Conn]http.ConnState
StartAndWait(ctx context.Context) error
}
type Config struct {
Port int
ShutdownDelay time.Duration
MaxHeaderBytes int
GrpcServerOptions []grpc.ServerOption
OnStarted func()
}
type server struct {
config Config
connStateMap map[net.Conn]http.ConnState
mutex sync.RWMutex // protects connStateMap
grpcServer *grpc.Server
httpHandler http.Handler
}
func (s *server) GetServiceInfo() map[string]grpc.ServiceInfo {
if s.grpcServer == nil {
return nil
}
return s.grpcServer.GetServiceInfo()
}
func (s *server) GetConnStateMap() map[net.Conn]http.ConnState {
s.mutex.RLock()
defer s.mutex.RUnlock()
connStateMap := make(map[net.Conn]http.ConnState, len(s.connStateMap))
for conn, state := range s.connStateMap {
connStateMap[conn] = state
}
return connStateMap
}
func (s *server) StartAndWait(ctx context.Context) error {
var connectionClose atomic.Bool
http2Server := &http2.Server{}
handler, wait := newHandler(s.grpcServer, http2Server, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if connectionClose.Load() {
w.Header().Set("Connection", "close")
}
s.httpHandler.ServeHTTP(w, r)
}))
httpServer := &http.Server{
Handler: handler,
MaxHeaderBytes: s.config.MaxHeaderBytes,
ConnState: s.updateConnState,
}
if err := http2.ConfigureServer(httpServer, http2Server); err != nil {
return fmt.Errorf("failed to configure HTTP/2 server: %v", err)
}
if ctx == nil {
ctx = context.Background()
}
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: s.config.Port})
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
done := make(chan error)
go func() {
defer close(done)
done <- httpServer.Serve(listener)
}()
if s.config.OnStarted != nil {
s.config.OnStarted()
}
select {
case <-ctx.Done():
// don't call server.SetKeepAlivesEnabled here because it will close idle connections immediately
connectionClose.Store(true)
// round up to integer second
shutdownDelay := (s.config.ShutdownDelay + time.Second - time.Nanosecond).Truncate(time.Second)
for shutdownDelay > 0 {
// check for connections every 100ms for total 1s. if all checks are negative, shutdown the server
hasConnections := len(s.GetConnStateMap()) > 0
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
hasConnections = hasConnections || len(s.GetConnStateMap()) > 0
}
shutdownDelay -= time.Second
if !hasConnections {
break
}
}
_ = httpServer.Shutdown(context.Background())
wait()
<-done
return nil
case err := <-done:
// server.Serve() returned an error before context was canceled
return fmt.Errorf("failed to start server: %v", err)
}
}
func (s *server) updateConnState(conn net.Conn, state http.ConnState) {
s.mutex.Lock()
defer s.mutex.Unlock()
switch state {
case http.StateNew:
s.connStateMap[conn] = state
case http.StateActive, http.StateIdle:
if _, ok := s.connStateMap[conn]; ok {
s.connStateMap[conn] = state
}
case http.StateHijacked, http.StateClosed:
delete(s.connStateMap, conn)
}
}
func (s *server) RegisterService(desc *grpc.ServiceDesc, impl interface{}) {
s.grpcServer.RegisterService(desc, impl)
}
// NewServer creates a new Server instance.
func NewServer(config Config, httpHandler http.Handler) Server {
grpcServer := grpc.NewServer(config.GrpcServerOptions...)
return &server{
config: config,
connStateMap: make(map[net.Conn]http.ConnState),
grpcServer: grpcServer,
httpHandler: httpHandler,
}
}