6
6
"net/http"
7
7
"strings"
8
8
9
- "github.com/gorilla/websocket"
10
9
ws "github.com/gorilla/websocket"
11
10
"github.com/ybizeul/ybfeed/internal/utils"
12
11
"github.com/ybizeul/ybfeed/pkg/yblog"
@@ -15,16 +14,17 @@ import (
15
14
16
15
var wsL = yblog .NewYBLogger ("push" , []string {"DEBUG" , "DEBUG_WEBSOCKET" })
17
16
17
+ // upgrader is used to upgrade a connection to a websocket
18
+ var upgrader = ws.Upgrader {} // use default options
19
+
20
+ // FeedSockets maintains a list of active websockets for a specific feed
21
+ // designated by feedName
18
22
type FeedSockets struct {
19
23
feedName string
20
24
websockets []* ws.Conn
21
25
}
22
26
23
- type FeedNotification struct {
24
- Action string `json:"action"`
25
- Item PublicFeedItem `json:"item"`
26
- }
27
-
27
+ // RemoveConn removes the websocket c from the list of active websockets
28
28
func (fs * FeedSockets ) RemoveConn (c * ws.Conn ) {
29
29
wsL .Logger .Debug ("Removing connection" ,
30
30
slog .Int ("count" , len (fs .websockets )),
@@ -40,22 +40,32 @@ func (fs *FeedSockets) RemoveConn(c *ws.Conn) {
40
40
}
41
41
}
42
42
43
+ // FeedNotification is used to marshall notification information message
44
+ // to the push service
45
+ type FeedNotification struct {
46
+ Action string `json:"action"`
47
+ Item PublicFeedItem `json:"item"`
48
+ }
49
+
50
+ // WebSocketManager bridges a FeedManager with a FeedSockets struct
43
51
type WebSocketManager struct {
44
52
FeedSockets []* FeedSockets
45
53
FeedManager * FeedManager
46
54
}
47
55
56
+ // NewWebSocketManager creates a new WebSocketManager. There is typically one
57
+ // WebSocketManager per ybFeed deployment.
48
58
func NewWebSocketManager (fm * FeedManager ) * WebSocketManager {
49
59
return & WebSocketManager {
50
60
FeedManager : fm ,
51
61
}
52
62
}
53
63
54
- var upgrader = websocket.Upgrader {} // use default options
55
-
64
+ // FeedSocketsForFeed returns the FeedSockets for feed feedName
56
65
func (m * WebSocketManager ) FeedSocketsForFeed (feedName string ) * FeedSockets {
57
66
wsL .Logger .Debug ("Searching FeedSockets" , slog .Int ("count" , len (m .FeedSockets )), slog .String ("feedName" , feedName ))
58
67
68
+ // Loop through all FeedSockets to find the one for this feed
59
69
for _ , fs := range m .FeedSockets {
60
70
if fs .feedName == feedName {
61
71
return fs
@@ -64,29 +74,44 @@ func (m *WebSocketManager) FeedSocketsForFeed(feedName string) *FeedSockets {
64
74
return nil
65
75
}
66
76
77
+ // RunSocketForFeed promotes an HTTP connection to a websocket and starts
78
+ // waiting for data. This function is blocking and typically runs from
79
+ // a http handler.
67
80
func (m * WebSocketManager ) RunSocketForFeed (feedName string , w http.ResponseWriter , r * http.Request ) {
68
- c , err := upgrader .Upgrade (w , r , nil )
69
-
81
+ // Check if we already have websockets for this feed
70
82
feedSockets := m .FeedSocketsForFeed (feedName )
71
83
72
- if feedSockets == nil {
84
+ if feedSockets == nil { // No, then we create a new FeedSockets
73
85
wsL .Logger .Debug ("Adding FeedSockets" , slog .Int ("count_before" , len (m .FeedSockets )), slog .String ("feedName" , feedName ))
74
86
feedSockets = & FeedSockets {
75
87
feedName : feedName ,
76
88
}
77
89
m .FeedSockets = append (m .FeedSockets , feedSockets )
78
90
}
79
91
80
- wsL .Logger .Debug ("Adding connection" , slog .Int ("count" , len (feedSockets .websockets )))
92
+ c , err := upgrader .Upgrade (w , r , nil )
93
+ if err != nil {
94
+ utils .CloseWithCodeAndMessage (w , 500 , "Unable to upgrade WebSocket" )
95
+ }
96
+
81
97
feedSockets .websockets = append (feedSockets .websockets , c )
82
- wsL .Logger .Debug ("Added connection" , slog .Int ("count" , len (feedSockets .websockets )))
83
98
84
- wsL .Logger .Debug ("WebSocket added" , slog .Int ("array size" , len (feedSockets .websockets )))
99
+ secret , _ := utils .GetSecret (r )
100
+
101
+ f , err := m .FeedManager .GetFeedWithAuth (feedName , secret )
85
102
86
103
if err != nil {
87
- utils .CloseWithCodeAndMessage (w , 500 , "Unable to upgrade WebSocket" )
104
+ switch {
105
+ case errors .Is (err , FeedErrorNotFound ):
106
+ utils .CloseWithCodeAndMessage (w , 404 , "feed not found" )
107
+ case errors .Is (err , FeedErrorInvalidSecret ):
108
+ utils .CloseWithCodeAndMessage (w , 401 , "invalid secret" )
109
+ case errors .Is (err , FeedErrorIncorrectSecret ):
110
+ utils .CloseWithCodeAndMessage (w , 401 , "incorrect secret" )
111
+ default :
112
+ utils .CloseWithCodeAndMessage (w , 500 , err .Error ())
113
+ }
88
114
}
89
- secret , _ := utils .GetSecret (r )
90
115
91
116
defer func () {
92
117
feedSockets .RemoveConn (c )
@@ -102,15 +127,6 @@ func (m *WebSocketManager) RunSocketForFeed(feedName string, w http.ResponseWrit
102
127
}
103
128
switch strings .TrimSpace (string (message )) {
104
129
case "feed" :
105
- f , err := m .FeedManager .GetFeed (feedName )
106
- if ferr := f .IsSecretValid (secret ); err != nil {
107
- if errors .Is (ferr , FeedErrorInvalidSecret ) {
108
- utils .CloseWithCodeAndMessage (w , 401 , "invalid secret" )
109
- }
110
- }
111
- if err != nil {
112
- utils .CloseWithCodeAndMessage (w , 500 , err .Error ())
113
- }
114
130
pf , err := f .Public ()
115
131
if err != nil {
116
132
utils .CloseWithCodeAndMessage (w , 500 , err .Error ())
0 commit comments