Skip to content

Commit

Permalink
read loop required for websocket sink
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Jul 6, 2023
1 parent 1a39a3d commit 8d61bbd
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/livekit/protocol/logger"
)

const (
pingPeriod = time.Second * 30
readTimeout = time.Minute
)

type WebsocketSink struct {
mu sync.Mutex
conn *websocket.Conn
Expand All @@ -30,33 +35,50 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType) (*Websock
if err != nil {
return nil, err
}

s := &WebsocketSink{
return &WebsocketSink{
conn: conn,
}
go s.keepAlive()

return s, nil
}, nil
}

func (s *WebsocketSink) Start() error {
return nil
}
// override default ping handler to include locking
s.conn.SetPingHandler(func(_ string) error {
s.mu.Lock()
defer s.mu.Unlock()

func (s *WebsocketSink) keepAlive() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
_ = s.conn.WriteMessage(websocket.PongMessage, []byte("pong"))
return nil
})

for {
<-ticker.C
s.mu.Lock()
if s.closed.Load() {
// read loop is required for the ping handler to receive pings
go func() {
for {
_ = s.conn.SetReadDeadline(time.Now().Add(readTimeout))
_, _, _ = s.conn.ReadMessage()
if s.closed.Load() {
return
}
}
}()

// write loop for sending pings
go func() {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

for {
<-ticker.C
s.mu.Lock()
if s.closed.Load() {
s.mu.Unlock()
return
}
_ = s.conn.WriteMessage(websocket.PingMessage, []byte("ping"))
s.mu.Unlock()
return
}
_ = s.conn.WriteMessage(websocket.PingMessage, []byte("ping"))
s.mu.Unlock()
}
}()

return nil
}

func (s *WebsocketSink) Write(p []byte) (int, error) {
Expand Down

0 comments on commit 8d61bbd

Please sign in to comment.