-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
130 lines (109 loc) · 3.1 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package conductor
import (
"io"
"log"
"net"
"net/http"
"net/url"
"github.com/gorilla/websocket"
)
const (
bufferSize = 1024
)
//Client is basic websocket connection.
type Client struct {
// we hold on to the url for when/if we need to reconnect.
url *url.URL
// we hold on to the headers for reconnecting as well.
headers http.Header
// the underlining websocket connection we need to hold on it.
ws *websocket.Conn
Read <-chan *Message
}
// NewClient allocates and returns a new channel
// ServerUrl is the server url to connect to.
func NewClient(serverURL string) (*Client, error) {
u, err := url.Parse(serverURL)
if err != nil {
return nil, err
}
header := make(http.Header)
header.Add("Sec-WebSocket-Protocol", "chat, superchat")
header.Add("Origin", u.String())
conn, err := net.Dial("tcp", u.Host)
if err != nil {
return nil, err
}
ws, _, err := websocket.NewClient(conn, u, header, bufferSize, bufferSize)
if err != nil {
return nil, err
}
channel := make(chan *Message)
c := &Client{ws: ws, url: u, headers: header, Read: channel}
go func() {
for {
message := c.decodeMessage()
if message == nil {
c.ws.Close()
break
} else {
channel <- message
}
}
}()
return c, nil
}
//Bind is used to send a bind request to a channel
func (c *Client) Bind(channelName string) {
c.write(&Message{Opcode: BindOpcode, ChannelName: channelName, Uuid: newUUID()})
}
//Unbind is used to send an unbind request to a channel
func (c *Client) Unbind(channelName string) {
c.write(&Message{Opcode: UnbindOpcode, ChannelName: channelName, Uuid: newUUID()})
}
//Write to send a message to a channel
func (c *Client) Write(channelName string, messageBody []byte) {
c.write(&Message{Opcode: WriteOpcode, ChannelName: channelName, Uuid: newUUID(), Body: messageBody})
}
//ServerMessage sends a message to the server for server operations (like getting message history or something)
func (c *Client) ServerMessage(messageBody []byte) {
c.write(&Message{Opcode: ServerOpcode, ChannelName: "", Uuid: newUUID(), Body: messageBody})
}
//WriteStream is to write an whole file to the stream. It chucks the data using the special stream op codes.
func (c *Client) WriteStream(channelName string, reader io.Reader) error {
buf := make([]byte, 32*1024)
c.write(&Message{Opcode: StreamStartOpcode, ChannelName: channelName, Uuid: newUUID()})
defer c.write(&Message{Opcode: StreamEndOpcode, ChannelName: channelName, Uuid: newUUID()})
for {
nr, err := reader.Read(buf)
if nr > 0 {
c.write(&Message{Opcode: StreamWriteOpcode, ChannelName: channelName, Uuid: newUUID(), Body: buf[0:nr]})
}
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}
func (c *Client) write(message *Message) {
buf, err := message.Marshal()
if err != nil {
log.Fatal(err)
}
if err := c.ws.WriteMessage(websocket.BinaryMessage, buf); err != nil {
log.Fatal(err) // do something else here.
}
}
func (c *Client) decodeMessage() *Message {
_, buf, err := c.ws.ReadMessage()
if err != nil {
// handle error
}
message, err := Unmarshal(buf)
if err != nil {
// handle error
}
return message
}