-
Notifications
You must be signed in to change notification settings - Fork 15
/
redhub.go
149 lines (128 loc) · 3.74 KB
/
redhub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package redhub
import (
"bytes"
"sync"
"time"
"github.com/IceFireDB/redhub/pkg/resp"
"github.com/panjf2000/gnet"
)
// Action represents the type of action to be taken after an event
type Action int
const (
// None indicates that no action should occur following an event
None Action = iota
// Close indicates that the connection should be closed
Close
// Shutdown indicates that the server should be shut down
Shutdown
)
// Conn wraps a gnet.Conn
type Conn struct {
gnet.Conn
}
// Options defines the configuration options for the RedHub server
type Options struct {
Multicore bool
LockOSThread bool
ReadBufferCap int
LB gnet.LoadBalancing
NumEventLoop int
ReusePort bool
Ticker bool
TCPKeepAlive time.Duration
TCPNoDelay gnet.TCPSocketOpt
SocketRecvBuffer int
SocketSendBuffer int
Codec gnet.ICodec
}
// RedHub represents the main server structure
type RedHub struct {
*gnet.EventServer
onOpened func(c *Conn) (out []byte, action Action)
onClosed func(c *Conn, err error) (action Action)
handler func(cmd resp.Command, out []byte) ([]byte, Action)
redHubBufMap map[gnet.Conn]*connBuffer
connSync *sync.RWMutex
}
// connBuffer holds the buffer and commands for each connection
type connBuffer struct {
buf bytes.Buffer
command []resp.Command
}
// NewRedHub creates a new RedHub instance
func NewRedHub(
onOpened func(c *Conn) (out []byte, action Action),
onClosed func(c *Conn, err error) (action Action),
handler func(cmd resp.Command, out []byte) ([]byte, Action),
) *RedHub {
return &RedHub{
redHubBufMap: make(map[gnet.Conn]*connBuffer),
connSync: &sync.RWMutex{},
onOpened: onOpened,
onClosed: onClosed,
handler: handler,
}
}
// OnOpened is called when a new connection is opened
func (rs *RedHub) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
rs.connSync.Lock()
rs.redHubBufMap[c] = new(connBuffer)
rs.connSync.Unlock()
out, act := rs.onOpened(&Conn{Conn: c})
return out, gnet.Action(act)
}
// OnClosed is called when a connection is closed
func (rs *RedHub) OnClosed(c gnet.Conn, err error) (action gnet.Action) {
rs.connSync.Lock()
delete(rs.redHubBufMap, c)
rs.connSync.Unlock()
return gnet.Action(rs.onClosed(&Conn{Conn: c}, err))
}
// React handles incoming data from connections
func (rs *RedHub) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
rs.connSync.RLock()
cb, ok := rs.redHubBufMap[c]
rs.connSync.RUnlock()
if !ok {
return resp.AppendError(out, "ERR Client is closed"), gnet.None
}
cb.buf.Write(frame)
cmds, lastbyte, err := resp.ReadCommands(cb.buf.Bytes())
if err != nil {
return resp.AppendError(out, "ERR "+err.Error()), gnet.None
}
cb.command = append(cb.command, cmds...)
cb.buf.Reset()
if len(lastbyte) == 0 {
for len(cb.command) > 0 {
cmd := cb.command[0]
cb.command = cb.command[1:]
var status Action
out, status = rs.handler(cmd, out)
if status == Close {
return out, gnet.Close
}
}
} else {
cb.buf.Write(lastbyte)
}
return out, gnet.None
}
// ListenAndServe starts the RedHub server
func ListenAndServe(addr string, options Options, rh *RedHub) error {
serveOptions := gnet.Options{
Multicore: options.Multicore,
LockOSThread: options.LockOSThread,
ReadBufferCap: options.ReadBufferCap,
LB: options.LB,
NumEventLoop: options.NumEventLoop,
ReusePort: options.ReusePort,
Ticker: options.Ticker,
TCPKeepAlive: options.TCPKeepAlive,
TCPNoDelay: options.TCPNoDelay,
SocketRecvBuffer: options.SocketRecvBuffer,
SocketSendBuffer: options.SocketSendBuffer,
Codec: options.Codec,
}
return gnet.Serve(rh, addr, gnet.WithOptions(serveOptions))
}