-
Notifications
You must be signed in to change notification settings - Fork 8
/
websocket_conn.go
103 lines (86 loc) · 2.58 KB
/
websocket_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package bitstamp
import (
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
)
const (
PingFrequency = time.Second * 10
PongWait = time.Second * 20
)
var (
logger = logrus.WithField("service", "bitstamp").WithField("module", "wsconn")
)
// WSConn драйвер для получения OrderBook с бирж, работащих с WebSocket
type WSConn struct {
conn *websocket.Conn
readerCh chan []byte
}
// NewWSConn создает новый экземпляр *WebSocket
func NewWSConn(conn *websocket.Conn) *WSConn {
return &WSConn{
conn: conn,
}
}
// keepalive отправляет ping сообщения, чтобы поддерживать websocket connection
func (ws *WSConn) keepalive(close chan struct{}) {
// создать тикер, чтобы каждые PingFrequency секунд отправлять ping-сообщения
tk := time.NewTicker(PingFrequency)
defer tk.Stop()
// если в ответ прислали pong, то обновляется deadline connection'a
ws.conn.SetPongHandler(func(appData string) error {
logger.Debug("got pong message")
if err := ws.conn.SetReadDeadline(time.Now().Add(PongWait)); err != nil {
logger.WithError(err).Error("could not update read-readline")
return err
}
return nil
})
for {
select {
case <-close:
return
case <-tk.C:
logger.Debug("sending ping")
err := ws.conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
logger.WithError(err).Error("could not send ping-message")
return
}
}
}
}
// reader читает данные из WebSocket'a
func (ws *WSConn) reader(timeout time.Duration) {
keepAliveStop := make(chan struct{})
defer func() {
close(keepAliveStop)
close(ws.readerCh)
}()
go ws.keepalive(keepAliveStop)
for {
if err := ws.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
logger.WithError(err).Error("could not set deadline for websocket")
return
}
_, msg, err := ws.conn.ReadMessage()
if err != nil {
logger.WithError(err).Error("could not read from websocket")
return
}
ws.readerCh <- msg
}
}
// RunReader запускает процесс чтения данных из WebSocket'a
func (ws *WSConn) RunReader(wsTimeout time.Duration) <-chan []byte {
ws.readerCh = make(chan []byte, 256)
go ws.reader(wsTimeout)
return ws.readerCh
}
// SendMessage отправляет сообщение по WebSocket протоколу
func (ws *WSConn) SendMessage(msg string) error {
return ws.conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
func (ws *WSConn) Stop() {
_ = ws.conn.Close()
}