-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
189 lines (167 loc) · 4.42 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package xlistener
import (
"net"
"os"
"sync"
"syscall"
"time"
"github.com/sandwich-go/xlistener/internal/ip"
"github.com/sandwich-go/xlistener/internal/sync2"
)
var _ net.Listener = &Listener{}
type Listener struct {
base net.Listener
closed sync2.AtomicBool
acceptChan chan net.Conn
closeOnce sync.Once
closeChan chan struct{}
epoll *epoll
conf *Conf
addr []byte
connId sync2.AtomicUint64
}
func (l *Listener) Accept() (net.Conn, error) {
select {
case conn := <-l.acceptChan:
return conn, nil
case <-l.closeChan:
}
return nil, os.ErrClosed
}
func (l *Listener) Addr() net.Addr {
return l.base.Addr()
}
func (l *Listener) Close() error {
l.closeOnce.Do(func() {
l.closed.Set(true)
close(l.closeChan)
})
return l.base.Close()
}
func Listen(baseListen func() (net.Listener, error), opts ...ConfOption) (net.Listener, error) {
base, err := baseListen()
if err != nil {
return nil, err
}
l := &Listener{base: base, closeChan: make(chan struct{}), conf: NewConf(opts...)}
l.acceptChan = make(chan net.Conn, l.conf.BacklogAccept)
if l.epoll, err = epollCreate(l.conf.TimeoutCanRead, l.onWaitError); err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(base.Addr().String())
if err != nil {
return nil, err
}
l.addr = []byte(net.JoinHostPort(ip.GetLocalIP(), port))
go l.acceptLoop()
return l, nil
}
func (l *Listener) onWaitError(err error) {
l.conf.Warningf("epoll: wait loop error: %s", err)
}
func (l *Listener) acceptLoop() {
for {
conn, err := l.base.Accept()
if err != nil {
if isTemporaryErr(err) {
l.conf.Warningf("accept failed: %v", err)
continue
}
if !l.closed.Get() {
l.conf.Warningf("accept failed: %v", err)
}
break
}
l.conf.Debugf("accept conn remote:%s", conn.RemoteAddr().String())
if err := l.handlerConn(conn); err != nil {
l.conf.Warningf("epoll handler conn with error:%s,fallback to go classic", err.Error())
l.toBacklogAccept(conn)
}
}
}
func isTemporaryErr(err error) bool {
netErr, ok := err.(net.Error)
if ok && netErr.Timeout() && netErr.Temporary() {
return true
}
return false
}
func (l *Listener) toBacklogAccept(conn net.Conn) {
if l.conf.EnableHandshake {
xc, err := newXConn(conn, l.connId.Add(1))
if err != nil {
l.conf.Warningf("new xconn error:%s addr:%s", err.Error(), conn.RemoteAddr())
_ = conn.Close()
return
}
go func(xcIn *xConn) {
if err := xcIn.handshake(l.addr, l.conf.HandshakeTimeout); err != nil {
l.conf.Warningf("handshake error:%s addr:%s", err.Error(), xcIn.RemoteAddr())
_ = xcIn.Close()
return
} else {
select {
case l.acceptChan <- xcIn:
case <-l.closeChan:
default:
_ = xcIn.Close()
l.conf.Warningf("the accept queue blocked, close the connection:%s", xcIn.RemoteAddr())
}
}
}(xc)
} else {
select {
case l.acceptChan <- conn:
case <-l.closeChan:
default:
_ = conn.Close()
l.conf.Warningf("the accept queue blocked, close the connection:%s", conn.RemoteAddr())
}
}
}
func (l *Listener) onEpollEvent(fi *fdInfo, evt epollEvent) {
_ = l.epoll.Del(fi.fd)
_ = fi.file.Close()
// we must check EpollRdHup first
if evt&EpollRdHup != 0 || evt&EpollClosed != 0 {
_ = fi.conn.Close()
} else if evt&EpollIn != 0 {
l.toBacklogAccept(fi.conn)
} else {
// fixme should just pass it to acceptChan?
_ = fi.conn.Close()
l.conf.Warningf("onEpollEvent got unexpected event:%v", evt)
}
}
// filer describes an object that has ability to return os.File.
type filer interface {
// File returns a copy of object's file descriptor.
File() (*os.File, error)
}
func (l *Listener) handlerConn(conn net.Conn) error {
f, err := conn.(filer).File()
if err != nil {
return err
}
fd := int(f.Fd())
// Set the file back to non blocking mode since calling Fd() sets underlying
// os.File to blocking mode. This is useful to get conn.Set{Read}Deadline
// methods still working on source Conn.
//
// See https://golang.org/pkg/net/#TCPConn.File
// See /usr/local/go/src/net/net.go: conn.File()
if err = syscall.SetNonblock(fd, true); err != nil {
_ = f.Close()
return os.NewSyscallError("setnonblock", err)
}
// we must hold this os.File and close it when finished.
return l.epoll.Add(fd,
EpollRdHup|EpollIn|EpollOneShot, // use one shot
&fdInfo{
file: f,
fd: fd,
conn: conn,
tsTimeout: time.Now().Unix() + int64(l.conf.TimeoutCanRead.Seconds()),
action: l.onEpollEvent,
})
}