diff --git a/pkg/config/manifest.go b/pkg/config/manifest.go index 04c099ba..c3735547 100644 --- a/pkg/config/manifest.go +++ b/pkg/config/manifest.go @@ -15,6 +15,7 @@ package config import ( + "bytes" "encoding/json" "sync" "time" @@ -153,5 +154,12 @@ func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUr func (m *Manifest) Close(endedAt int64) ([]byte, error) { m.EndedAt = endedAt - return json.Marshal(m) + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + enc.SetEscapeHTML(false) + if err := enc.Encode(m); err != nil { + return nil, err + } + + return buf.Bytes(), nil } diff --git a/pkg/pipeline/builder/websocket.go b/pkg/pipeline/builder/websocket.go index 55ed0f44..de808826 100644 --- a/pkg/pipeline/builder/websocket.go +++ b/pkg/pipeline/builder/websocket.go @@ -18,6 +18,7 @@ import ( "github.com/go-gst/go-gst/gst" "github.com/go-gst/go-gst/gst/app" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" ) @@ -26,7 +27,7 @@ func BuildWebsocketBin(pipeline *gstreamer.Pipeline, appSinkCallbacks *app.SinkC appSink, err := app.NewAppSink() if err != nil { - return nil, err + return nil, errors.ErrGstPipelineError(err) } appSink.SetCallbacks(appSinkCallbacks) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 788fb81b..bc820411 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -39,6 +39,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" + "github.com/livekit/psrpc" ) const ( @@ -349,7 +350,7 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st // fail egress if no outputs remaining if c.OutputCount.Load() == 0 { - return streamErr + return psrpc.NewError(psrpc.Unavailable, streamErr) } logger.Infow("stream failed", diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 4d0114fb..aa2d6b33 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -59,7 +59,7 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) { if conf.CredentialsJSON != "" { jwtConfig, err := google.JWTConfigFromJSON([]byte(conf.CredentialsJSON), storageScope) if err != nil { - return nil, err + return nil, errors.ErrUploadFailed("GCP", err) } opts = append(opts, option.WithTokenSource(jwtConfig.TokenSource(context.Background()))) } @@ -80,12 +80,12 @@ func newGCPUploader(c *config.StorageConfig) (uploader, error) { defaultTransport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth) } } - client, err := storage.NewClient(context.Background(), opts...) + client, err := storage.NewClient(context.Background(), opts...) // restore default transport http.DefaultTransport = transportClone if err != nil { - return nil, err + return nil, errors.ErrUploadFailed("GCP", err) } u.client = client diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 67cc7f05..26febb35 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -32,6 +32,7 @@ import ( "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" + "github.com/livekit/psrpc" ) const pingPeriod = time.Second * 30 @@ -56,7 +57,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks conn, _, err := websocket.DefaultDialer.Dial(wsUrl, header) if err != nil { - return nil, err + return nil, psrpc.NewError(psrpc.InvalidArgument, err) } s := &WebsocketSink{ @@ -88,7 +89,7 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks if err == io.EOF { return gst.FlowEOS } - callbacks.OnError(err) + callbacks.OnError(psrpc.NewError(psrpc.Unavailable, err)) } return gst.FlowOK diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 2d3b1213..0807bddd 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -54,7 +54,7 @@ type SDKSource struct { mu sync.RWMutex initialized core.Fuse filenameReplacements map[string]string - errors chan error + errors chan *subscriptionInfo writers map[string]*sdk.AppWriter subLock sync.RWMutex @@ -65,6 +65,11 @@ type SDKSource struct { endRecording chan struct{} } +type subscriptionInfo struct { + trackID string + err error +} + func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (*SDKSource, error) { ctx, span := tracer.Start(ctx, "SDKInput.New") defer span.End() @@ -77,7 +82,7 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig, callbacks *gstr close(startRecording) }), filenameReplacements: make(map[string]string), - errors: make(chan error, 2), + errors: make(chan *subscriptionInfo, 2), writers: make(map[string]*sdk.AppWriter), startRecording: startRecording, endRecording: make(chan struct{}), @@ -224,9 +229,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err done := false for !done { select { - case err = <-s.errors: - if err != nil { - return 0, 0, err + case sub := <-s.errors: + if sub.err != nil { + return 0, 0, sub.err } subscribed++ if subscribed == expected { @@ -244,9 +249,9 @@ func (s *SDKSource) awaitParticipantTracks(identity string) (uint32, uint32, err for { select { // check errors from any tracks published in the meantime - case err = <-s.errors: - if err != nil { - return 0, 0, err + case sub := <-s.errors: + if sub.err != nil { + return 0, 0, sub.err } default: // get dimensions after subscribing so that track info exists @@ -289,12 +294,15 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32, for i := 0; i < trackCount; i++ { select { - case err = <-s.errors: - if err != nil { - return 0, 0, err + case sub := <-s.errors: + if sub.err != nil { + return 0, 0, sub.err } + delete(expecting, sub.trackID) case <-deadline: - return 0, 0, errors.ErrSubscriptionFailed + for trackID := range expecting { + return 0, 0, errors.ErrTrackNotFound(trackID) + } } } @@ -380,7 +388,10 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo s.callbacks.OnError(onSubscribeErr) } } else { - s.errors <- onSubscribeErr + s.errors <- &subscriptionInfo{ + trackID: pub.SID(), + err: onSubscribeErr, + } } s.subLock.RUnlock() }()