Skip to content

Commit

Permalink
Implements graceful shutdown methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ksysoev committed Apr 27, 2024
1 parent feebf01 commit 98802fc
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 13 deletions.
8 changes: 8 additions & 0 deletions channel/channel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package channel

import (
"context"
"net/http"

"github.com/ksysoev/wasabi"
Expand Down Expand Up @@ -84,6 +85,13 @@ func (c *Channel) Use(middlewere Middlewere) {
c.middlewares = append(c.middlewares, middlewere)
}

// Shutdown gracefully shuts down the Channel by shutting down the underlying connection registry.
// It waits for all active connections to be closed or until the context is canceled.
// Returns an error if the shutdown process encounters any issues.
func (srv *Channel) Shutdown(ctx context.Context) error {

Check warning on line 91 in channel/channel.go

View workflow job for this annotation

GitHub Actions / tests

receiver-naming: receiver name srv should be consistent with previous receiver name c for Channel (revive)
return srv.connRegistry.Shutdown(ctx)
}

// useMiddleware applies middlewares to handler
func (c *Channel) wrapMiddleware(handler http.Handler) http.Handler {
for i := len(c.middlewares) - 1; i >= 0; i-- {
Expand Down
29 changes: 29 additions & 0 deletions channel/connection_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ConnectionRegistry struct {
concurrencyLimit uint
mu sync.RWMutex
frameSizeLimit int64
isClosed bool
}

type ConnectionRegistryOption func(*ConnectionRegistry)
Expand All @@ -33,6 +34,7 @@ func NewConnectionRegistry(opts ...ConnectionRegistryOption) *ConnectionRegistry
concurrencyLimit: concurencyLimitPerConnection,
bufferPool: newBufferPool(),
frameSizeLimit: frameSizeLimitInBytes,
isClosed: false,
}

for _, opt := range opts {
Expand All @@ -53,6 +55,10 @@ func (r *ConnectionRegistry) AddConnection(
r.mu.Lock()
defer r.mu.Unlock()

if r.isClosed {
return nil
}

conn := NewConnection(ctx, ws, cb, r.onClose, r.bufferPool, r.concurrencyLimit)
r.connections[conn.ID()] = conn

Expand All @@ -78,6 +84,29 @@ func (r *ConnectionRegistry) handleClose() {
}
}

func (r *ConnectionRegistry) Shutdown(ctx context.Context) error {
r.mu.Lock()
r.isClosed = true
connections := make([]wasabi.Connection, 0, len(r.connections))
for _, conn := range r.connections {
connections = append(connections, conn)
}
r.mu.Unlock()

wg := sync.WaitGroup{}
for _, conn := range connections {
c := conn
wg.Add(1)
go func() {
c.Close(ctx, websocket.StatusServiceRestart, "")
wg.Done()
}()
}

wg.Wait()
return nil
}

// WithMaxFrameLimit sets the maximum frame size limit for incomming messages to the ConnectionRegistry.
// The limit parameter specifies the maximum frame size limit in bytes.
// This option can be used when creating a new ConnectionRegistry instance.
Expand Down
2 changes: 2 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type RequestHandler interface {
type Channel interface {
Path() string
Handler() http.Handler
Shutdown(ctx context.Context) error
}

// ConnectionRegistry is interface for connection registries
Expand All @@ -61,4 +62,5 @@ type ConnectionRegistry interface {
cb OnMessage,
) Connection
GetConnection(id string) Connection
Shutdown(ctx context.Context) error
}
2 changes: 1 addition & 1 deletion loadtesting/k6.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default function () {
const url = 'ws://localhost:8080/';
const params = { tags: { my_tag: 'hello' } };

let counter = 100;
let counter = 2000;

const res = ws.connect(url, params, function (socket) {
socket.on('open', () => {
Expand Down
37 changes: 25 additions & 12 deletions mocks/mock_Channel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions mocks/mock_ConnectionRegistry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ func (s *Server) Shutdown(ctx context.Context) error {
}
}()

wg := sync.WaitGroup{}
for _, channel := range s.channels {
c := channel
wg.Add(1)
go func() {
defer wg.Done()
err := c.Shutdown(ctx)
if err != nil {
slog.Error("Error shutting down channel:" + err.Error())
}
}()
}

wg.Wait()

select {
case <-ctx.Done():
return ctx.Err()
Expand Down

0 comments on commit 98802fc

Please sign in to comment.