Skip to content

Commit 1c01374

Browse files
committed
feat(server): initial WebSocket hub and client
1 parent 999eed1 commit 1c01374

File tree

8 files changed

+259
-17
lines changed

8 files changed

+259
-17
lines changed

server/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
PORT=8080
2+
CORS_ORIGIN=http://localhost:5173
23
LOG_LEVEL=debug

server/config/config.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,29 @@ import (
99
)
1010

1111
var logger = log.With().Str("pkg", "config").Logger()
12+
var conf *Config
1213

1314
type Config struct {
14-
Port uint16
15-
LogLevel zerolog.Level
15+
Port uint16
16+
CorsOrigin string
17+
LogLevel zerolog.Level
1618
}
1719

18-
func Init() *Config {
20+
func new() *Config {
1921
return &Config{
20-
Port: parseEnv("PORT", 8080, parseUint16),
21-
LogLevel: parseEnv("LOG_LEVEL", zerolog.InfoLevel, parseLogLevel),
22+
Port: parseEnv("PORT", 8080, parseUint16),
23+
CorsOrigin: parseEnv("CORS_ORIGIN", "*", nop),
24+
LogLevel: parseEnv("LOG_LEVEL", zerolog.InfoLevel, parseLogLevel),
2225
}
2326
}
2427

28+
func Get() *Config {
29+
if conf == nil {
30+
conf = new()
31+
}
32+
return conf
33+
}
34+
2535
// UTILS
2636

2737
func parseEnv[V any, P func(string) (V, error)](key string, defaultVal V, parser P) V {
@@ -38,6 +48,10 @@ func parseEnv[V any, P func(string) (V, error)](key string, defaultVal V, parser
3848

3949
// PARSERS
4050

51+
func nop(s string) (string, error) {
52+
return s, nil
53+
}
54+
4155
func parseUint(s string, bits int) (uint64, error) {
4256
return strconv.ParseUint(s, 10, bits)
4357
}

server/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ go 1.21.7
44

55
require github.com/rs/zerolog v1.31.0
66

7+
require golang.org/x/net v0.17.0 // indirect
8+
79
require (
10+
github.com/gorilla/websocket v1.5.1
811
github.com/joho/godotenv v1.5.1
912
github.com/mattn/go-colorable v0.1.13 // indirect
1013
github.com/mattn/go-isatty v0.0.19 // indirect

server/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
22
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
3+
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
4+
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
35
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
46
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
57
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@@ -11,6 +13,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
1113
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
1214
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
1315
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
16+
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
17+
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
1418
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1519
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1620
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

server/main.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,37 @@ package main
22

33
import (
44
"fmt"
5-
"html"
65
"net/http"
76
"time"
87

98
"github.com/aboqasem/portfolio/server/config"
9+
"github.com/aboqasem/portfolio/server/ws"
1010
_ "github.com/joho/godotenv/autoload"
1111
"github.com/rs/zerolog"
1212
"github.com/rs/zerolog/log"
1313
)
1414

1515
var logger = log.With().Str("pkg", "main").Logger()
16-
var conf *config.Config
16+
var conf = config.Get()
1717

1818
func init() {
19-
conf = config.Init()
20-
2119
zerolog.SetGlobalLevel(conf.LogLevel)
2220
}
2321

2422
func main() {
2523
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
26-
logger.Debug().Any("method", r.Method).Str("path", r.URL.Path).Send()
27-
28-
w.Write([]byte("<h1>Hello, world!</h1><pre>"))
29-
w.Write([]byte(r.Method))
30-
w.Write([]byte{' '})
31-
w.Write([]byte(html.EscapeString(r.URL.Path)))
32-
w.Write([]byte("</pre>"))
24+
w.Header().Add("Access-Control-Allow-Origin", conf.CorsOrigin)
25+
w.Write([]byte("OK"))
3326
})
3427

28+
http.HandleFunc("/ws/hub", ws.ServeWsHub)
29+
3530
server := &http.Server{
3631
Addr: fmt.Sprintf(":%d", conf.Port),
3732
ReadHeaderTimeout: 3 * time.Second,
3833
}
3934

40-
logger.Info().Uint16("port", conf.Port).Msg("Running server...")
35+
logger.Info().Msgf("Running server on http://localhost:%d", conf.Port)
4136
err := server.ListenAndServe()
4237
if err != nil {
4338
logger.Fatal().Err(err).Msg("ListenAndServe")

server/ws/client.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package ws
2+
3+
import (
4+
"time"
5+
6+
"github.com/gorilla/websocket"
7+
)
8+
9+
const (
10+
// Time allowed to write a message to the peer.
11+
writeWait = 10 * time.Second
12+
13+
// Time allowed to read the next pong message from the peer.
14+
pongWait = 60 * time.Second
15+
16+
// Send pings to peer with this period. Must be less than pongWait.
17+
pingPeriod = (pongWait * 9) / 10
18+
19+
// Maximum message size allowed from peer.
20+
maxMessageSize = 1_024
21+
)
22+
23+
var (
24+
arrayStart = []byte{'['}
25+
arrayEnd = []byte{']'}
26+
comma = []byte{','}
27+
)
28+
29+
// Client is a middleman between the websocket connection and the hub.
30+
type Client struct {
31+
hub *Hub
32+
33+
// The websocket connection.
34+
conn *websocket.Conn
35+
36+
// Buffered channel of outbound messages.
37+
send chan []byte
38+
}
39+
40+
// readPump pumps messages from the websocket connection to the hub.
41+
//
42+
// The application runs readPump in a per-connection goroutine. The application
43+
// ensures that there is at most one reader on a connection by executing all
44+
// reads from this goroutine.
45+
func (c *Client) readPump() {
46+
defer func() {
47+
c.hub.unregister <- c
48+
c.conn.Close()
49+
}()
50+
51+
c.conn.SetReadLimit(maxMessageSize)
52+
c.conn.SetReadDeadline(time.Now().Add(pongWait))
53+
c.conn.SetPongHandler(func(string) error {
54+
c.conn.SetReadDeadline(time.Now().Add(pongWait))
55+
return nil
56+
})
57+
58+
for {
59+
_, message, err := c.conn.ReadMessage()
60+
if err != nil {
61+
if websocket.IsUnexpectedCloseError(
62+
err,
63+
websocket.CloseGoingAway,
64+
websocket.CloseAbnormalClosure,
65+
websocket.CloseNoStatusReceived,
66+
) {
67+
logger.Error().Err(err).Msg("unexpected close error")
68+
}
69+
break
70+
}
71+
72+
c.hub.broadcast <- message
73+
}
74+
}
75+
76+
// writePump pumps messages from the hub to the websocket connection.
77+
//
78+
// A goroutine running writePump is started for each connection. The
79+
// application ensures that there is at most one writer to a connection by
80+
// executing all writes from this goroutine.
81+
func (c *Client) writePump() {
82+
ticker := time.NewTicker(pingPeriod)
83+
defer func() {
84+
ticker.Stop()
85+
c.conn.Close()
86+
}()
87+
88+
for {
89+
select {
90+
case message, ok := <-c.send:
91+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
92+
if !ok {
93+
// The hub closed the channel.
94+
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
95+
return
96+
}
97+
98+
w, err := c.conn.NextWriter(websocket.TextMessage)
99+
if err != nil {
100+
return
101+
}
102+
103+
n := len(c.send)
104+
if n == 0 {
105+
w.Write(message)
106+
} else {
107+
w.Write(arrayStart)
108+
w.Write(message)
109+
w.Write(comma)
110+
for i := 1; i < n; i++ {
111+
w.Write(<-c.send)
112+
w.Write(comma)
113+
}
114+
w.Write(<-c.send)
115+
w.Write(arrayEnd)
116+
}
117+
118+
if err := w.Close(); err != nil {
119+
return
120+
}
121+
122+
case <-ticker.C:
123+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
124+
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
125+
return
126+
}
127+
}
128+
}
129+
}

server/ws/hub.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package ws
2+
3+
// Hub maintains the set of active clients and broadcasts messages to the
4+
// clients.
5+
type Hub struct {
6+
// Registered clients.
7+
clients map[*Client]bool
8+
9+
// Inbound messages from the clients.
10+
broadcast chan []byte
11+
12+
// Register requests from the clients.
13+
register chan *Client
14+
15+
// Unregister requests from clients.
16+
unregister chan *Client
17+
}
18+
19+
func newHub() *Hub {
20+
return &Hub{
21+
broadcast: make(chan []byte),
22+
register: make(chan *Client),
23+
unregister: make(chan *Client),
24+
clients: make(map[*Client]bool),
25+
}
26+
}
27+
28+
func (h *Hub) run() {
29+
for {
30+
select {
31+
case client := <-h.register:
32+
h.clients[client] = true
33+
34+
case client := <-h.unregister:
35+
if _, ok := h.clients[client]; ok {
36+
close(client.send)
37+
delete(h.clients, client)
38+
}
39+
40+
case message := <-h.broadcast:
41+
for client := range h.clients {
42+
select {
43+
case client.send <- message:
44+
default:
45+
close(client.send)
46+
delete(h.clients, client)
47+
}
48+
}
49+
}
50+
}
51+
}

server/ws/serve.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package ws
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/aboqasem/portfolio/server/config"
7+
"github.com/gorilla/websocket"
8+
"github.com/rs/zerolog/log"
9+
)
10+
11+
var (
12+
logger = log.With().Str("pkg", "ws").Logger()
13+
14+
conf = config.Get()
15+
16+
hub = newHub()
17+
18+
upgrader = websocket.Upgrader{
19+
ReadBufferSize: 1024,
20+
WriteBufferSize: 1024,
21+
CheckOrigin: func(r *http.Request) bool {
22+
return conf.CorsOrigin == "*" || conf.CorsOrigin == r.Header.Get("Origin")
23+
},
24+
}
25+
)
26+
27+
func init() {
28+
go hub.run()
29+
}
30+
31+
func ServeWsHub(w http.ResponseWriter, r *http.Request) {
32+
conn, err := upgrader.Upgrade(w, r, nil)
33+
if err != nil {
34+
logger.Error().Err(err).Msg("failed to upgrade connection")
35+
return
36+
}
37+
38+
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
39+
client.hub.register <- client
40+
41+
// Allow collection of memory referenced by the caller by doing all work in new goroutines.
42+
go client.writePump()
43+
go client.readPump()
44+
45+
}

0 commit comments

Comments
 (0)