Skip to content

Commit

Permalink
Example chanegs for closing websocket with an error code
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin committed Nov 19, 2024
1 parent 13a798c commit 00eab7b
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 7 deletions.
35 changes: 30 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (blipCtx *Context) GetCancelCtx() context.Context {
return context.TODO()
}

// DialOptions is used by DialConfig to oepn a BLIP connection.
// DialOptions is used by DialConfig to open a BLIP connection.
type DialOptions struct {
URL string
HTTPClient *http.Client
Expand Down Expand Up @@ -197,7 +197,8 @@ func (blipCtx *Context) DialConfig(opts *DialOptions) (*Sender, error) {
incrReceiverGoroutines()
defer decrReceiverGoroutines()

err := sender.receiver.receiveLoop()
var handlersStopped atomic.Bool
err := sender.receiver.receiveLoop(&handlersStopped)
if err != nil {
if isCloseError(err) {
// lower log level for close
Expand All @@ -224,22 +225,26 @@ type BlipWebsocketServer struct {
blipCtx *Context
ctx context.Context // Cancellable context to trigger server stop
PostHandshakeCallback func(err error)
websockets map[*websocket.Conn]struct{}
handlersStopped atomic.Bool
}

var _ http.Handler = &BlipWebsocketServer{}

// Creates an HTTP handler that accepts WebSocket connections and dispatches BLIP messages
// to the Context.
func (blipCtx *Context) WebSocketServer() *BlipWebsocketServer {
return &BlipWebsocketServer{blipCtx: blipCtx}
return &BlipWebsocketServer{blipCtx: blipCtx, websockets: make(map[*websocket.Conn]struct{})}
}

func (bwss *BlipWebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ws, err := bwss.handshake(w, r)
if err != nil {
return
}
bwss.websockets[ws] = struct{}{}
bwss.handle(ws)
delete(bwss.websockets, ws)
}

func (bwss *BlipWebsocketServer) handshake(w http.ResponseWriter, r *http.Request) (conn *websocket.Conn, err error) {
Expand Down Expand Up @@ -275,9 +280,12 @@ func (bwss *BlipWebsocketServer) handshake(w http.ResponseWriter, r *http.Reques
func (bwss *BlipWebsocketServer) handle(ws *websocket.Conn) {
bwss.blipCtx.log("Start BLIP/Websocket handler")
sender := bwss.blipCtx.start(ws)
err := sender.receiver.receiveLoop()
err := sender.receiver.receiveLoop(&bwss.handlersStopped)
sender.Stop()
if err != nil && !isCloseError(err) {
// if handlerStopped is true, it means the handler was stopped by StopHandler
if bwss.handlersStopped.Load() {
return
} else if err != nil && !isCloseError(err) {
bwss.blipCtx.log("BLIP/Websocket Handler exited with error: %v", err)
if bwss.blipCtx.FatalErrorHandler != nil {
bwss.blipCtx.FatalErrorHandler(err)
Expand All @@ -286,6 +294,23 @@ func (bwss *BlipWebsocketServer) handle(ws *websocket.Conn) {
ws.Close(websocket.StatusNormalClosure, "")
}

func (bwss *BlipWebsocketServer) StopHandlers(status websocket.StatusCode) error {
bwss.handlersStopped.Store(true)
fmt.Printf("Closing websocket connection with status: %v\n", status)
var errs []error
for ws := range bwss.websockets {
fmt.Printf("Closing websocket connection\n")
err := ws.Close(status, "")
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("errors closing websockets: %w", errors.Join(errs...))
}
return nil
}

//////// DISPATCHING MESSAGES:

func (blipCtx *Context) dispatchRequest(request *Message, sender *Sender) {
Expand Down
72 changes: 72 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,78 @@ func assertHandlerNoError(t *testing.T, server *BlipWebsocketServer, wg *sync.Wa
}
}

// TestWebSocketServerStopHandler tests stopping the handler with a specific error code.
func TestWebSocketServerStopHandler(t *testing.T) {

opts := ContextOptions{
ProtocolIds: []string{BlipTestAppProtocolId},
}
blipContextEchoServer, err := NewContext(opts)
require.NoError(t, err)

receivedRequests := sync.WaitGroup{}

// ----------------- Setup Echo Server that will be closed via cancellation context -------------------------

// Create a blip profile handler to respond to echo requests
dispatchEcho := func(request *Message) {
defer receivedRequests.Done()
body, err := request.Body()
require.NoError(t, err)
require.Equal(t, "application/octet-stream", request.Properties["Content-Type"])
if response := request.Response(); response != nil {
response.SetBody(body)
response.Properties["Content-Type"] = request.Properties["Content-Type"]
}
}

// Blip setup
blipContextEchoServer.HandlerForProfile["BLIPTest/EchoData"] = dispatchEcho
blipContextEchoServer.LogMessages = true
blipContextEchoServer.LogFrames = true

// Websocket Server
server := blipContextEchoServer.WebSocketServer()

// HTTP Handler wrapping websocket server
http.Handle("/TestServerContextClose", server)
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer listener.Close()
go func() {
_ = http.Serve(listener, nil)
}()

// ----------------- Setup Echo Client ----------------------------------------
blipContextEchoClient, err := NewContext(defaultContextOptions)
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
destUrl := fmt.Sprintf("ws://localhost:%d/TestServerContextClose", port)
sender, err := blipContextEchoClient.Dial(destUrl)
require.NoError(t, err)

// Create echo request
echoResponseBody := []byte("hello")
echoRequest := NewRequest()
echoRequest.SetProfile("BLIPTest/EchoData")
echoRequest.Properties["Content-Type"] = "application/octet-stream"
echoRequest.SetBody(echoResponseBody)
receivedRequests.Add(1)
require.True(t, sender.Send(echoRequest))

// Read the echo response. Closed connection will result in empty response, as EOF message
// isn't currently returned by blip client
response := echoRequest.Response()
responseBody, err := response.Body()
require.NoError(t, err)
require.Equal(t, echoResponseBody, responseBody)

fmt.Printf("Closing connection\n")
server.StopHandler(websocket.StatusAbnormalClosure)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test-race

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test-race

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)

Check failure on line 798 in context_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

server.StopHandler undefined (type *BlipWebsocketServer has no field or method StopHandler)
//fmt.Printf("sender=%+v\n", sender.conn)
require.True(t, false)
}

// Wait for the WaitGroup, or return an error if the wg.Wait() doesn't return within timeout
// TODO: this code is duplicated with code in Sync Gateway utilities_testing.go. Should be refactored to common repo.
func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error {
Expand Down
7 changes: 5 additions & 2 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type receiver struct {
pendingRequests msgStreamerMap // Unfinished REQ messages being assembled
pendingResponses msgStreamerMap // Unfinished RES messages being assembled
maxPendingResponseNumber MessageNumber // Largest RES # I've seen
stopped atomic.Bool // True if I've been stopped by the caller
}

func newReceiver(context *Context, conn *websocket.Conn) *receiver {
Expand All @@ -64,7 +65,7 @@ func newReceiver(context *Context, conn *websocket.Conn) *receiver {
}
}

func (r *receiver) receiveLoop() error {
func (r *receiver) receiveLoop(handlerStopped *atomic.Bool) error {
defer atomic.AddInt32(&r.activeGoroutines, -1)
atomic.AddInt32(&r.activeGoroutines, 1)
go r.parseLoop()
Expand All @@ -75,7 +76,9 @@ func (r *receiver) receiveLoop() error {
// Receive the next raw WebSocket frame:
_, frame, err := r.conn.Read(r.context.GetCancelCtx())
if err != nil {
if isCloseError(err) {
if handlerStopped.Load() {
return nil
} else if isCloseError(err) {
// lower log level for close
r.context.logFrame("receiveLoop stopped: %v", err)
} else if parseErr := errorFromChannel(r.parseError); parseErr != nil {
Expand Down

0 comments on commit 00eab7b

Please sign in to comment.