Skip to content

Commit

Permalink
Misc updates (#436)
Browse files Browse the repository at this point in the history
* Misc updates

* use service for track websocket test
  • Loading branch information
frostbyte73 authored Jul 21, 2023
1 parent 333989a commit 60e37ed
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 29 deletions.
1 change: 1 addition & 0 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BaseConfig struct {

type DebugConfig struct {
EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error
PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *Pipeline) uploadDotFile(u uploader.Uploader) {
}

func (p *Pipeline) uploadPProf(u uploader.Uploader) {
b, err := pprof.GetProfileData(context.Background(), "heap", 0, 0)
b, err := pprof.GetProfileData(context.Background(), "goroutine", 0, 0)
if err != nil {
logger.Errorw("failed to get profile data", err)
return
Expand All @@ -70,9 +70,10 @@ func (p *Pipeline) uploadPProf(u uploader.Uploader) {

func (p *Pipeline) uploadDebugFile(u uploader.Uploader, data []byte, fileExtension string) {
filename := fmt.Sprintf("%s%s", p.Info.EgressId, fileExtension)
filepath := path.Join(config.TmpDir, filename)
local := path.Join(config.TmpDir, filename)
storage := path.Join(p.Debug.PathPrefix, filename)

f, err := os.Create(filepath)
f, err := os.Create(local)
if err != nil {
logger.Errorw("failed to create dotfile", err)
return
Expand All @@ -85,7 +86,7 @@ func (p *Pipeline) uploadDebugFile(u uploader.Uploader, data []byte, fileExtensi
return
}

_, _, err = u.Upload(filepath, filename, types.OutputTypeBlob, false)
_, _, err = u.Upload(local, storage, types.OutputTypeBlob, false)
if err != nil {
logger.Errorw("failed to upload dotfile", err)
return
Expand Down
5 changes: 3 additions & 2 deletions pkg/pipeline/output/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/app"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
Expand Down Expand Up @@ -195,12 +196,12 @@ func (b *Bin) RemoveStream(url string) error {
return o.(*StreamOutput).RemoveSink(b.bin, url)
}

func (b *Bin) SetWebsocketSink(writer *sink.WebsocketSink) error {
func (b *Bin) SetWebsocketSink(writer *sink.WebsocketSink, eosFunc func(*app.Sink)) error {
o := b.outputs[types.EgressTypeWebsocket]
if o == nil {
return psrpc.NewErrorf(psrpc.Internal, "missing websocket output")
}

o.(*WebsocketOutput).SetSink(writer)
o.(*WebsocketOutput).SetSink(writer, eosFunc)
return nil
}
26 changes: 19 additions & 7 deletions pkg/pipeline/output/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,27 @@ func (b *Bin) buildWebsocketOutput(p *config.PipelineConfig) (*WebsocketOutput,
}, nil
}

func (o *WebsocketOutput) SetSink(writer *sink.WebsocketSink) {
func (o *WebsocketOutput) SetSink(writer *sink.WebsocketSink, eosFunc func(*app.Sink)) {
o.sink.SetCallbacks(&app.SinkCallbacks{
EOSFunc: func(appSink *app.Sink) {
// Close writer on EOS
if err := writer.Close(); err != nil && !errors.Is(err, io.EOF) {
logger.Errorw("cannot close WS sink", err)
}
},
EOSFunc: eosFunc,
NewSampleFunc: func(appSink *app.Sink) gst.FlowReturn {
// Pull the sample that triggered this callback
sample := appSink.PullSample()
if sample == nil {
logger.Debugw("unexpected flow return",
"flow", "EOS",
"reason", "nil sample",
)
return gst.FlowEOS
}

// Retrieve the buffer from the sample
buffer := sample.GetBuffer()
if buffer == nil {
logger.Debugw("unexpected flow return",
"flow", "Error",
"reason", "nil buffer",
)
return gst.FlowError
}

Expand All @@ -70,11 +73,20 @@ func (o *WebsocketOutput) SetSink(writer *sink.WebsocketSink) {
_, err := writer.Write(samples)
if err != nil {
if err == io.EOF {
logger.Debugw("unexpected flow return",
"flow", "EOS",
"reason", "Write returned EOF",
)
return gst.FlowEOS
}
o.conf.Failure <- err
logger.Debugw("unexpected flow return",
"flow", "Error",
"reason", err.Error(),
)
return gst.FlowError
}

return gst.FlowOK
},
})
Expand Down
6 changes: 5 additions & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/frostbyte73/core"
"github.com/tinyzimmer/go-glib/glib"
"github.com/tinyzimmer/go-gst/gst"
"github.com/tinyzimmer/go-gst/gst/app"
"go.uber.org/zap"

"github.com/livekit/egress/pkg/config"
Expand Down Expand Up @@ -120,7 +121,10 @@ func New(ctx context.Context, conf *config.PipelineConfig, onStatusUpdate Update
if s, ok := p.sinks[types.EgressTypeWebsocket]; ok {
websocketSink := s.(*sink.WebsocketSink)
p.src.(*source.SDKSource).OnTrackMuted(websocketSink.OnTrackMuted)
if err = p.out.SetWebsocketSink(websocketSink); err != nil {
if err = p.out.SetWebsocketSink(websocketSink, func(appSink *app.Sink) {
_ = websocketSink.Close()
p.pipeline.GetPipelineBus().Post(gst.NewEOSMessage(appSink))
}); err != nil {
return nil, err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (s *WebsocketSink) Finalize() error {
}

func (s *WebsocketSink) Close() error {
logger.Debugw("closing websocket sink")

s.mu.Lock()
defer s.mu.Unlock()
if !s.closed.Swap(true) {
Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error {
var wg sync.WaitGroup
cb.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
defer wg.Done()
logger.Debugw("track subscribed", "trackID", track.ID(), "mime", track.Codec().MimeType)

s.active.Inc()
t := s.sync.AddTrack(track, rp.Identity())
Expand Down
3 changes: 2 additions & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -272,5 +273,5 @@ func (r *Runner) getFilePath(filename string) string {
return filename
}

return fmt.Sprintf("%s/%s", r.FilePrefix, filename)
return path.Join(r.FilePrefix, filename)
}
18 changes: 5 additions & 13 deletions test/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
package test

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path"
"strings"
"testing"
"time"
Expand All @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -116,7 +115,7 @@ func (r *Runner) testTrackStream(t *testing.T) {
trackID = videoTrackID
}

filepath := r.getFilePath(test.filename)
filepath := path.Join(r.FilePrefix, test.filename)
wss := newTestWebsocketServer(filepath)
s := httptest.NewServer(http.HandlerFunc(wss.handleWebsocket))
defer func() {
Expand All @@ -137,21 +136,14 @@ func (r *Runner) testTrackStream(t *testing.T) {
},
}

ctx := context.Background()
egressID := r.startEgress(t, req)

p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req)
require.NoError(t, err)
p.GstReady = make(chan struct{})

rec, err := pipeline.New(ctx, p, func(_ context.Context, _ *livekit.EgressInfo) {})
require.NoError(t, err)

go func() {
time.Sleep(time.Second * 35)
rec.SendEOS(ctx)
}()
time.Sleep(time.Second * 30)

res := rec.Run(ctx)
res := r.stopEgress(t, egressID)
verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate)
})
if r.Short {
Expand Down

0 comments on commit 60e37ed

Please sign in to comment.