Skip to content

Commit 13a798c

Browse files
authored
CBG-3834-prereq Add cancellable context to blip.Context (#77)
* CBG-3834-prereq Add cancellable context to blip.Context Allows callers to specify an optional cancellable context on a blipContext disconnect any connected blip clients when that context is used to create a WebSocketServer. * Test cleanup, accessors for optional cancelCtx * lint fix * Move cancellation context to ContextOptions * Tidy example go.sum
1 parent 73db215 commit 13a798c

File tree

3 files changed

+149
-2
lines changed

3 files changed

+149
-2
lines changed

context.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ type Context struct {
6767

6868
bytesSent atomic.Uint64 // Number of bytes sent
6969
bytesReceived atomic.Uint64 // Number of bytes received
70+
71+
cancelCtx context.Context // When cancelled, closes all connections. Terminates receiveLoop(s), which triggers sender and parseLoop stop
7072
}
7173

7274
// Defines a logging interface for use within the blip codebase. Implemented by Context.
@@ -84,6 +86,9 @@ type ContextOptions struct {
8486
ProtocolIds []string
8587
// Patterns that the Origin header must match (if non-empty). This matches only on hostname: ["example.com", "*"]
8688
Origin []string
89+
// Cancellation context. If specified, when context is cancelled the websocket connect will be closed,
90+
// by terminating receiveLoop (which triggers sender and parseLoop stop). This will not send a close message.
91+
CancelCtx context.Context
8792
}
8893

8994
// Creates a new Context with an empty dispatch table.
@@ -106,6 +111,7 @@ func NewContextCustomID(id string, opts ContextOptions) (*Context, error) {
106111
ID: id,
107112
SupportedSubProtocols: formatWebSocketSubProtocols(opts.ProtocolIds...),
108113
origin: opts.Origin,
114+
cancelCtx: opts.CancelCtx,
109115
}, nil
110116
}
111117

@@ -133,6 +139,14 @@ func (blipCtx *Context) GetBytesReceived() uint64 {
133139
return blipCtx.bytesReceived.Load()
134140
}
135141

142+
// GetCancelCtx returns a cancellation context if it has been set in the ContextOptions. Otherwise returns non-cancellable context.
143+
func (blipCtx *Context) GetCancelCtx() context.Context {
144+
if blipCtx.cancelCtx != nil {
145+
return blipCtx.cancelCtx
146+
}
147+
return context.TODO()
148+
}
149+
136150
// DialOptions is used by DialConfig to oepn a BLIP connection.
137151
type DialOptions struct {
138152
URL string
@@ -208,6 +222,7 @@ func (blipCtx *Context) ActiveSubprotocol() string {
208222

209223
type BlipWebsocketServer struct {
210224
blipCtx *Context
225+
ctx context.Context // Cancellable context to trigger server stop
211226
PostHandshakeCallback func(err error)
212227
}
213228

context_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ licenses/APL2.txt.
1111
package blip
1212

1313
import (
14+
"context"
1415
"fmt"
1516
"log"
1617
"net"
@@ -577,6 +578,138 @@ func TestOrigin(t *testing.T) {
577578
}
578579
}
579580

581+
// TestServerContextClose tests closing server using cancellable context, ensure that clients are disconnected
582+
//
583+
// Test:
584+
// - Start two blip contexts: an echo server and an echo client
585+
// - The echo server is configured to respond to incoming echo requests and return responses
586+
// - The echo client sends echo requests on a loop
587+
// - Expected: the echo client should receive some sort of error when the server closes the connection, and should not block
588+
func TestServerContextClose(t *testing.T) {
589+
590+
serverCancelCtx, cancelFunc := context.WithCancel(context.Background())
591+
contextOptionsWithCancel := ContextOptions{
592+
ProtocolIds: []string{BlipTestAppProtocolId},
593+
CancelCtx: serverCancelCtx,
594+
}
595+
blipContextEchoServer, err := NewContext(contextOptionsWithCancel)
596+
if err != nil {
597+
t.Fatal(err)
598+
}
599+
600+
receivedRequests := sync.WaitGroup{}
601+
602+
// ----------------- Setup Echo Server that will be closed via cancellation context -------------------------
603+
604+
// Create a blip profile handler to respond to echo requests
605+
dispatchEcho := func(request *Message) {
606+
defer receivedRequests.Done()
607+
body, err := request.Body()
608+
if err != nil {
609+
log.Printf("ERROR reading body of %s: %s", request, err)
610+
return
611+
}
612+
if request.Properties["Content-Type"] != "application/octet-stream" {
613+
t.Fatalf("Incorrect properties: %#x", request.Properties)
614+
}
615+
if response := request.Response(); response != nil {
616+
response.SetBody(body)
617+
response.Properties["Content-Type"] = request.Properties["Content-Type"]
618+
}
619+
}
620+
621+
// Blip setup
622+
blipContextEchoServer.HandlerForProfile["BLIPTest/EchoData"] = dispatchEcho
623+
blipContextEchoServer.LogMessages = true
624+
blipContextEchoServer.LogFrames = true
625+
626+
// Websocket Server
627+
server := blipContextEchoServer.WebSocketServer()
628+
629+
// HTTP Handler wrapping websocket server
630+
http.Handle("/TestServerContextClose", server)
631+
listener, err := net.Listen("tcp", ":0")
632+
if err != nil {
633+
t.Fatal(err)
634+
}
635+
defer listener.Close()
636+
go func() {
637+
err := http.Serve(listener, nil)
638+
log.Printf("server goroutine closed with error: %v", err)
639+
}()
640+
641+
// ----------------- Setup Echo Client ----------------------------------------
642+
blipContextEchoClient, err := NewContext(defaultContextOptions)
643+
if err != nil {
644+
t.Fatal(err)
645+
}
646+
port := listener.Addr().(*net.TCPAddr).Port
647+
destUrl := fmt.Sprintf("ws://localhost:%d/TestServerContextClose", port)
648+
sender, err := blipContextEchoClient.Dial(destUrl)
649+
if err != nil {
650+
t.Fatalf("Error opening WebSocket: %v", err)
651+
}
652+
653+
var closeWg, delayWg sync.WaitGroup
654+
655+
// Start a goroutine to send echo request every 100 ms, time out after 30s (if test fails)
656+
delayWg.Add(1) // wait for connection and messages to be sent before cancelling server context
657+
closeWg.Add(1) // wait for client to disconnect before exiting test
658+
go func() {
659+
defer closeWg.Done()
660+
timeout := time.After(time.Second * 30)
661+
ticker := time.NewTicker(time.Millisecond * 50)
662+
echoCount := 0
663+
for {
664+
select {
665+
case <-timeout:
666+
t.Error("Echo client connection wasn't closed before timeout expired")
667+
return
668+
case <-ticker.C:
669+
{
670+
echoCount++
671+
// After sending 10 echoes, close delayWg to trigger server-side cancellation
672+
log.Printf("Sending echo %v", echoCount)
673+
if echoCount == 10 {
674+
delayWg.Done()
675+
}
676+
// Create echo request
677+
echoResponseBody := []byte("hello")
678+
echoRequest := NewRequest()
679+
echoRequest.SetProfile("BLIPTest/EchoData")
680+
echoRequest.Properties["Content-Type"] = "application/octet-stream"
681+
echoRequest.SetBody(echoResponseBody)
682+
receivedRequests.Add(1)
683+
sent := sender.Send(echoRequest)
684+
assert.True(t, sent)
685+
686+
// Read the echo response. Closed connection will result in empty response, as EOF message
687+
// isn't currently returned by blip client
688+
response := echoRequest.Response()
689+
responseBody, err := response.Body()
690+
assert.True(t, err == nil)
691+
if len(responseBody) == 0 {
692+
log.Printf("empty response, connection closed")
693+
return
694+
}
695+
696+
assert.Equal(t, echoResponseBody, responseBody)
697+
}
698+
}
699+
}
700+
}()
701+
702+
// Wait for client to start sending echo messages before stopping server
703+
delayWg.Wait()
704+
705+
// Cancel context on server
706+
cancelFunc()
707+
708+
// Wait for client echo loop to exit due to closed connection before exiting test
709+
closeWg.Wait()
710+
711+
}
712+
580713
// assert that the server handshake callback is called with an error.
581714
func assertHandlerError(t *testing.T, server *BlipWebsocketServer, wg *sync.WaitGroup) {
582715
wg.Add(1)

receiver.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ package blip
1212

1313
import (
1414
"bytes"
15-
"context"
1615
"encoding/binary"
1716
"fmt"
1817
"io"
@@ -74,7 +73,7 @@ func (r *receiver) receiveLoop() error {
7473

7574
for {
7675
// Receive the next raw WebSocket frame:
77-
_, frame, err := r.conn.Read(context.TODO())
76+
_, frame, err := r.conn.Read(r.context.GetCancelCtx())
7877
if err != nil {
7978
if isCloseError(err) {
8079
// lower log level for close

0 commit comments

Comments
 (0)