forked from shaovie/goev
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connector.go
194 lines (174 loc) · 5.31 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package goev
import (
"errors"
"net"
"strconv"
"strings"
"syscall"
)
var (
// ErrConnectFail means connection failure without a specific reason
ErrConnectFail = errors.New("connect fail")
// ErrConnectTimeout means connection timeout
ErrConnectTimeout = errors.New("connect timeout")
// ErrConnectInprogress means the process is ongoing and not immediately successful.
ErrConnectInprogress = errors.New("connect EINPROGRESS")
)
// Connector provides a fast asynchronous connector and can set a timeout.
// It internally uses Reactor to achieve asynchronicity.
// Connect success or failure will trigger specified methods for notification
type Connector struct {
IOHandle
sockRcvBufSize int // ignore equal 0
}
// NewConnector return an instance
func NewConnector(r *Reactor, opts ...Option) (*Connector, error) {
evOptions := setOptions(opts...)
c := &Connector{
sockRcvBufSize: evOptions.sockRcvBufSize,
}
c.setReactor(r)
return c, nil
}
// Connect asynchronously to the specified address and there may also be an immediate result.
// Please check the return value
//
// The addr format 192.168.0.1:8080 or unix:/tmp/xxxx.sock
// The domain name format, such as qq.com:8080, is not supported.
// You need to manually extract the IP address using gethostbyname.
//
// Timeout is relative time measurements with millisecond accuracy, for example, delay=5msec.
func (c *Connector) Connect(addr string, eh EvHandler, timeout int64) error {
if timeout < 0 {
return errors.New("Connector:Connect param:timeout < 0")
}
p := strings.Index(addr, ":")
if p < 0 || p >= (len(addr)-1) {
return errors.New("Connector:Connect param:addr invalid")
}
if len(addr) > 5 {
s := addr[0:5]
if s == "unix:" {
return c.udsConnect(addr[5:], eh, timeout)
}
}
return c.tcpConnect(addr, eh, timeout)
}
// The addr format 192.168.0.1:8080
func (c *Connector) tcpConnect(addr string, eh EvHandler, timeout int64) error {
fd, err := syscall.Socket(syscall.AF_INET,
syscall.SOCK_STREAM|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return errors.New("Socket in connector.open: " + err.Error())
}
if c.sockRcvBufSize > 0 {
// `sysctl -a | grep net.ipv4.tcp_rmem` 返回 min default max
// 默认 内核会在 min max 之间动态调整, default是初始值, 如果设置了SO_RCVBUF, 缓冲区大小不变成固定值,
// 内核也不会进行动态调整了
// 必须在listen/connect之前调用
// must < `sysctl -a | grep net.core.rmem_max`
err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, c.sockRcvBufSize)
if err != nil {
syscall.Close(fd)
return errors.New("Set SO_RCVBUF: " + err.Error())
}
}
ip := "0.0.0.0"
var port int64
ipp := strings.Split(addr, ":")
if len(ipp) != 2 {
syscall.Close(fd)
return errors.New("address is invalid! 192.168.1.1:80")
}
if len(ipp[0]) > 0 {
ip = ipp[0]
}
ip4 := net.ParseIP(ip)
if ip4 == nil {
syscall.Close(fd)
return errors.New("address is invalid! 192.168.1.1:80")
}
port, _ = strconv.ParseInt(ipp[1], 10, 64)
if port < 1 || port > 65535 {
syscall.Close(fd)
return errors.New("port must in (0, 65536)")
}
sa := syscall.SockaddrInet4{Port: int(port)}
copy(sa.Addr[:], ip4.To4())
return c.connect(fd, &sa, eh, timeout)
}
func (c *Connector) udsConnect(addr string, eh EvHandler, timeout int64) error {
fd, err := syscall.Socket(syscall.AF_UNIX,
syscall.SOCK_STREAM|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return errors.New("Socket in connector.open: " + err.Error())
}
// SO_RCVBUF is invalid for unix sock
rsu := syscall.SockaddrUnix{Name: addr}
return c.connect(fd, &rsu, eh, timeout)
}
func (c *Connector) connect(fd int, sa syscall.Sockaddr, eh EvHandler, timeout int64) (err error) {
reactor := c.GetReactor()
for {
err = syscall.Connect(fd, sa)
if err == syscall.EINTR {
continue
}
break
}
if err == syscall.EINPROGRESS {
if timeout < 1 {
return ErrConnectInprogress
}
inh := &inProgressConnect{eh: eh}
if err = reactor.AddEvHandler(inh, fd, EvConnect); err != nil {
syscall.Close(fd)
return errors.New("InPorgress AddEvHandler in connector.Connect: " + err.Error())
}
inh.ScheduleTimer(inh, timeout, 0) // don't need to cancel it when conn error
return nil
} else if err == nil { // success
eh.setReactor(reactor)
if eh.OnOpen(fd) == false {
eh.OnClose()
}
return nil
}
syscall.Close(fd)
return errors.New("syscall connect: " + err.Error())
}
// nonblocking inprogress connection
type inProgressConnect struct {
IOHandle
eh EvHandler
}
// Called by reactor when asynchronous connections fail.
func (p *inProgressConnect) OnRead() bool {
p.eh.OnConnectFail(ErrConnectFail)
return false // goto p.OnClose()
}
// Called by reactor when asynchronous connections succeed.
func (p *inProgressConnect) OnWrite() bool {
// From here on, the `fd` resources will be managed by h.
p.GetReactor().RemoveEvHandler(p, p.Fd()) // p will auto release
fd := p.Fd()
p.setFd(-1)
p.eh.setReactor(p.GetReactor())
if p.eh.OnOpen(fd) == false {
p.eh.OnClose()
}
return true
}
// Called if a connection times out before completing.
func (p *inProgressConnect) OnTimeout(now int64) bool {
// i/o event not catched
p.eh.OnConnectFail(ErrConnectTimeout)
p.OnClose()
return false
}
func (p *inProgressConnect) OnClose() {
if p.Fd() != -1 {
syscall.Close(p.Fd())
p.setFd(-1)
}
}