-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.go
155 lines (132 loc) · 3.73 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main
import (
"encoding/json"
"log"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/moethu/gocodegraph/core"
"github.com/moethu/gocodegraph/node"
)
const (
writeTimeout = 10 * time.Second
readTimeout = 60 * time.Second
pingPeriod = (readTimeout * 9) / 10
maxMessageSize = 5120
)
var upgrader = websocket.Upgrader{}
type Client struct {
// The websocket connection.
conn *websocket.Conn
// Buffered channels messages.
write chan node.Result
expected int
}
// streamReader reads messages from the websocket
func (c *Client) streamReader() {
defer func() {
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(readTimeout))
// SetPongHandler sets the handler for pong messages received from the peer.
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(readTimeout)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// deserialize and solve
var payload map[string]interface{}
err = json.Unmarshal(message, &payload)
if err != nil {
log.Println(err)
}
// generate nodes resporint to results channel
nodes := mapOperators(c.write, payload["operators"])
c.expected = len(nodes)
// generate links
mapLinks(payload["links"], nodes)
ns := []node.Node{}
for _, value := range nodes {
ns = append(ns, value)
}
// solve graph
core.Solve(ns, true)
}
}
// streamWriter writes messages from the write channel to the websocket connection
func (c *Client) streamWriter() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
ctr := 0
for {
// Go’s select lets you wait on multiple channel operations.
// We’ll use select to await both of these values simultaneously.
select {
case message, ok := <-c.write:
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// NextWriter returns a writer for the next message to send.
// The writer's Close method flushes the complete message to the network.
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
payload, _ := json.Marshal(message)
w.Write(payload)
ctr++
// Add queued messages to the current websocket message
n := len(c.write)
for i := 0; i < n; i++ {
x := <-c.write
p, _ := json.Marshal(x)
w.Write(p)
ctr++
}
if ctr >= c.expected {
w.Close()
return
}
if err := w.Close(); err != nil {
return
}
//a channel that will send the time with a period specified by the duration argument
case <-ticker.C:
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// serveWebsocket handles websocket requests from the peer.
func serveWebsocket(c *gin.Context) {
// upgrade connection to websocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println(err)
return
}
conn.EnableWriteCompression(true)
// create two channels for read write concurrency
resultChannel := make(chan node.Result)
client := &Client{conn: conn, write: resultChannel}
// run reader and writer in two different go routines
// so they can act concurrently
go client.streamReader()
go client.streamWriter()
}