Skip to content

Commit

Permalink
clean up websocket logging and flow (#456)
Browse files Browse the repository at this point in the history
* clean up websocket logging and flow

* remove errChan
  • Loading branch information
frostbyte73 authored Aug 9, 2023
1 parent a42558c commit fd026c7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 40 deletions.
23 changes: 0 additions & 23 deletions pkg/pipeline/output/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
package output

import (
"io"

"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/app"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/pipeline/sink"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
)

type WebsocketOutput struct {
Expand Down Expand Up @@ -63,20 +60,12 @@ func (o *WebsocketOutput) SetSink(writer *sink.WebsocketSink, eosFunc func(*app.
// Pull the sample that triggered this callback
sample := appSink.PullSample()
if sample == nil {
logger.Debugw("unexpected flow return",
"flow", "EOS",
"reason", "nil sample",
)
return gst.FlowEOS
}

// Retrieve the buffer from the sample
buffer := sample.GetBuffer()
if buffer == nil {
logger.Debugw("unexpected flow return",
"flow", "Error",
"reason", "nil buffer",
)
return gst.FlowError
}

Expand All @@ -86,19 +75,7 @@ func (o *WebsocketOutput) SetSink(writer *sink.WebsocketSink, eosFunc func(*app.
// From the extracted bytes, send to writer
_, err := writer.Write(samples)
if err != nil {
if err == io.EOF {
logger.Debugw("unexpected flow return",
"flow", "EOS",
"reason", "Write returned EOF",
)
return gst.FlowEOS
}
o.conf.OnFailure(err)
logger.Debugw("unexpected flow return",
"flow", "Error",
"reason", err.Error(),
)
return gst.FlowError
}

return gst.FlowOK
Expand Down
24 changes: 7 additions & 17 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ import (
const pingPeriod = time.Second * 30

type WebsocketSink struct {
mu sync.Mutex
conn *websocket.Conn
closed atomic.Bool
errChan chan error
mu sync.Mutex
conn *websocket.Conn
closed atomic.Bool
}

func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType) (*WebsocketSink, error) {
Expand All @@ -50,8 +49,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType) (*Websock
return nil, err
}
return &WebsocketSink{
conn: conn,
errChan: make(chan error, 1),
conn: conn,
}, nil
}

Expand All @@ -76,8 +74,6 @@ func (s *WebsocketSink) Start() error {
if err != nil {
_, isCloseError := err.(*websocket.CloseError)
if isCloseError || errors.Is(err, io.EOF) || strings.HasSuffix(err.Error(), "use of closed network connection") {
s.errChan <- err
s.closed.Store(true)
return
}
errCount++
Expand Down Expand Up @@ -114,20 +110,14 @@ func (s *WebsocketSink) Write(p []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed.Load() {
select {
case err := <-s.errChan:
return 0, err
default:
return 0, errors.ErrWebsocketClosed(s.conn.RemoteAddr().String())
}
return 0, nil
}

return len(p), s.conn.WriteMessage(websocket.BinaryMessage, p)
}

func (s *WebsocketSink) OnTrackMuted(muted bool) {
err := s.writeMutedMessage(muted)
if err != nil {
if err := s.writeMutedMessage(muted); err != nil {
logger.Errorw("failed to write muted message", err)
}
}
Expand All @@ -147,7 +137,7 @@ func (s *WebsocketSink) writeMutedMessage(muted bool) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed.Load() {
return errors.ErrWebsocketClosed(s.conn.RemoteAddr().String())
return nil
}

return s.conn.WriteMessage(websocket.TextMessage, data)
Expand Down

0 comments on commit fd026c7

Please sign in to comment.