From 6a79200aef021de3d8c137193e1f88c73bc6eafe Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 8 Aug 2023 22:19:16 -0700 Subject: [PATCH] more refactoring for PC (#457) --- build/egress/Dockerfile | 4 +- build/test/Dockerfile | 4 +- magefile.go | 2 +- pkg/config/pipeline.go | 38 +++--- pkg/errors/errors.go | 1 + pkg/pipeline/input/audio.go | 106 ++++++++++++----- pkg/pipeline/input/bin.go | 8 +- pkg/pipeline/input/video.go | 35 ++++-- pkg/pipeline/output/stream.go | 5 +- pkg/pipeline/pipeline.go | 4 +- pkg/pipeline/source/sdk.go | 171 ++++++++++++++------------- pkg/pipeline/source/sdk/appwriter.go | 4 + pkg/pipeline/watch.go | 11 +- test/integration.go | 87 ++++++++++---- test/track.go | 4 +- test/track_composite.go | 10 +- 16 files changed, 299 insertions(+), 195 deletions(-) diff --git a/build/egress/Dockerfile b/build/egress/Dockerfile index d0bf33ec..d29b3bd4 100644 --- a/build/egress/Dockerfile +++ b/build/egress/Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM livekit/gstreamer:1.22.4-dev +FROM livekit/gstreamer:1.22.5-dev ARG TARGETPLATFORM @@ -40,7 +40,7 @@ RUN find cmd/server/templates/ -name *.map | xargs rm RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go build -a -o egress ./cmd/server -FROM livekit/gstreamer:1.22.4-prod +FROM livekit/gstreamer:1.22.5-prod ARG TARGETPLATFORM diff --git a/build/test/Dockerfile b/build/test/Dockerfile index 52e30a9f..6ee4d6f7 100644 --- a/build/test/Dockerfile +++ b/build/test/Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM livekit/gstreamer:1.22.4-dev +FROM livekit/gstreamer:1.22.5-dev WORKDIR /workspace @@ -44,7 +44,7 @@ RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go test -c -v --tags=integration ./test -FROM livekit/gstreamer:1.22.4-prod +FROM livekit/gstreamer:1.22.5-prod ARG TARGETPLATFORM diff --git a/magefile.go b/magefile.go index 075de5d8..67ef6779 100644 --- a/magefile.go +++ b/magefile.go @@ -28,7 +28,7 @@ import ( ) const ( - gstVersion = "1.22.4" + gstVersion = "1.22.5" chromiumVersion = "117.0.5874.0" dockerBuild = "docker build" dockerBuildX = "docker buildx build --push --platform linux/amd64,linux/arm64" diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 9b3a5212..f086743c 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -33,6 +33,7 @@ import ( "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils" + lksdk "github.com/livekit/server-sdk-go" ) const ( @@ -76,18 +77,23 @@ type WebSourceParams struct { } type SDKSourceParams struct { - TrackID string - TrackSource string - TrackKind string - AudioTrackID string - VideoTrackID string - Identity string - AudioSrc *app.Source - VideoSrc *app.Source - AudioInCodec types.MimeType - VideoInCodec types.MimeType - AudioCodecParams webrtc.RTPCodecParameters - VideoCodecParams webrtc.RTPCodecParameters + TrackID string + AudioTrackID string + VideoTrackID string + Identity string + TrackSource string + TrackKind string + AudioInCodec types.MimeType + VideoInCodec types.MimeType + AudioTrack *TrackSource + VideoTrack *TrackSource +} + +type TrackSource struct { + TrackID string + Kind lksdk.TrackKind + AppSrc *app.Source + Codec webrtc.RTPCodecParameters } type AudioConfig struct { @@ -112,9 +118,11 @@ type VideoConfig struct { } type Callbacks struct { - GstReady chan struct{} `yaml:"-"` - OnTrackMuted []func(bool) `yaml:"-"` - OnFailure func(error) `yaml:"-"` + GstReady chan struct{} `yaml:"-"` + OnTrackMuted []func(bool) `yaml:"-"` + OnTrackAdded func(*TrackSource) `yaml:"-"` + OnTrackRemoved func(trackID string) `yaml:"-"` + OnFailure func(error) `yaml:"-"` } func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*PipelineConfig, error) { diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 1e92e5ea..9341ec69 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -32,6 +32,7 @@ var ( ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") ErrVideoWebsocket = psrpc.NewErrorf(psrpc.InvalidArgument, "cannot send video over websocket") + ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type") ) func New(err string) error { diff --git a/pkg/pipeline/input/audio.go b/pkg/pipeline/input/audio.go index 75235b29..3446d8d2 100644 --- a/pkg/pipeline/input/audio.go +++ b/pkg/pipeline/input/audio.go @@ -35,7 +35,7 @@ type audioInput struct { encoder *gst.Element } -func (a *audioInput) buildWebSource(p *config.PipelineConfig) error { +func (a *audioInput) buildWebInput(p *config.PipelineConfig) error { pulseSrc, err := gst.NewElement("pulsesrc") if err != nil { return errors.ErrGstPipelineError(err) @@ -48,20 +48,36 @@ func (a *audioInput) buildWebSource(p *config.PipelineConfig) error { return a.buildConverter(p) } -func (a *audioInput) buildAppSource(p *config.PipelineConfig) error { - src := p.AudioSrc - src.Element.SetArg("format", "time") - if err := src.Element.SetProperty("is-live", true); err != nil { +func (a *audioInput) buildSDKInput(p *config.PipelineConfig) error { + if p.AudioTrack != nil { + if err := a.buildAppSource(p.AudioTrack); err != nil { + return err + } + if err := a.buildConverter(p); err != nil { + return err + } + } + + if err := a.buildTestSrc(p); err != nil { + return err + } + + return a.buildMixer(p) +} + +func (a *audioInput) buildAppSource(track *config.TrackSource) error { + track.AppSrc.Element.SetArg("format", "time") + if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil { return err } - a.src = []*gst.Element{src.Element} + a.src = []*gst.Element{track.AppSrc.Element} switch { - case strings.EqualFold(p.AudioCodecParams.MimeType, string(types.MimeTypeOpus)): - if err := src.Element.SetProperty("caps", gst.NewCapsFromString( + case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeOpus)): + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString( fmt.Sprintf( "application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d", - p.AudioCodecParams.PayloadType, p.AudioCodecParams.ClockRate, + track.Codec.PayloadType, track.Codec.ClockRate, ), )); err != nil { return errors.ErrGstPipelineError(err) @@ -80,14 +96,10 @@ func (a *audioInput) buildAppSource(p *config.PipelineConfig) error { a.src = append(a.src, rtpOpusDepay, opusDec) default: - return errors.ErrNotSupported(p.AudioCodecParams.MimeType) + return errors.ErrNotSupported(track.Codec.MimeType) } - if err := a.buildConverter(p); err != nil { - return err - } - - return a.buildMixer(p) + return nil } func (a *audioInput) buildConverter(p *config.PipelineConfig) error { @@ -115,7 +127,7 @@ func (a *audioInput) buildConverter(p *config.PipelineConfig) error { return nil } -func (a *audioInput) buildMixer(p *config.PipelineConfig) error { +func (a *audioInput) buildTestSrc(p *config.PipelineConfig) error { audioTestSrc, err := gst.NewElement("audiotestsrc") if err != nil { return errors.ErrGstPipelineError(err) @@ -129,12 +141,17 @@ func (a *audioInput) buildMixer(p *config.PipelineConfig) error { if err = audioTestSrc.SetProperty("is-live", true); err != nil { return errors.ErrGstPipelineError(err) } + audioCaps, err := newAudioCapsFilter(p) if err != nil { return err } + a.testSrc = []*gst.Element{audioTestSrc, audioCaps} + return nil +} +func (a *audioInput) buildMixer(p *config.PipelineConfig) error { audioMixer, err := gst.NewElement("audiomixer") if err != nil { return errors.ErrGstPipelineError(err) @@ -142,12 +159,13 @@ func (a *audioInput) buildMixer(p *config.PipelineConfig) error { if err = audioMixer.SetProperty("latency", audioMixerLatency); err != nil { return errors.ErrGstPipelineError(err) } + mixedCaps, err := newAudioCapsFilter(p) if err != nil { return err } - a.mixer = []*gst.Element{audioMixer, mixedCaps} + a.mixer = []*gst.Element{audioMixer, mixedCaps} return nil } @@ -211,42 +229,70 @@ func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) { func (a *audioInput) link() (*gst.GhostPad, error) { if a.src != nil { + // link src elements if err := gst.ElementLinkMany(a.src...); err != nil { return nil, errors.ErrGstPipelineError(err) } } - if a.mixer != nil { - if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil { - return nil, err - } + + if a.testSrc != nil { + // link test src elements if err := gst.ElementLinkMany(a.testSrc...); err != nil { return nil, errors.ErrGstPipelineError(err) } - if err := builder.LinkPads("audio test src", builder.GetSrcPad(a.testSrc), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil { - return nil, err + } + + if a.mixer != nil { + if a.src != nil { + // link src to mixer + if err := builder.LinkPads( + "audio src", builder.GetSrcPad(a.src), + "audio mixer", a.mixer[0].GetRequestPad("sink_%u"), + ); err != nil { + return nil, err + } + } + + if a.testSrc != nil { + // link test src to mixer + if err := builder.LinkPads( + "audio test src", builder.GetSrcPad(a.testSrc), + "audio mixer", a.mixer[0].GetRequestPad("sink_%u"), + ); err != nil { + return nil, err + } } + if err := gst.ElementLinkMany(a.mixer...); err != nil { return nil, errors.ErrGstPipelineError(err) } } - var srcPad *gst.Pad + var ghostPad *gst.Pad if a.encoder != nil { if a.mixer != nil { - if err := builder.LinkPads("audio mixer", builder.GetSrcPad(a.mixer), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil { + // link mixer to encoder + if err := builder.LinkPads( + "audio mixer", builder.GetSrcPad(a.mixer), + "audio encoder", a.encoder.GetStaticPad("sink"), + ); err != nil { return nil, err } } else { - if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil { + // link src to encoder + if err := builder.LinkPads( + "audio src", builder.GetSrcPad(a.src), + "audio encoder", a.encoder.GetStaticPad("sink"), + ); err != nil { return nil, err } } - srcPad = a.encoder.GetStaticPad("src") + ghostPad = a.encoder.GetStaticPad("src") } else if a.mixer != nil { - srcPad = builder.GetSrcPad(a.mixer) + ghostPad = builder.GetSrcPad(a.mixer) } else { - srcPad = builder.GetSrcPad(a.src) + ghostPad = builder.GetSrcPad(a.src) } - return gst.NewGhostPad("audio_src", srcPad), nil + return gst.NewGhostPad("audio_src", ghostPad), nil } diff --git a/pkg/pipeline/input/bin.go b/pkg/pipeline/input/bin.go index 0c5c05af..ae2fff0a 100644 --- a/pkg/pipeline/input/bin.go +++ b/pkg/pipeline/input/bin.go @@ -61,12 +61,12 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { switch p.SourceType { case types.SourceTypeSDK: - if err := a.buildAppSource(p); err != nil { + if err := a.buildSDKInput(p); err != nil { return err } case types.SourceTypeWeb: - if err := a.buildWebSource(p); err != nil { + if err := a.buildWebInput(p); err != nil { return err } } @@ -102,12 +102,12 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error { switch p.SourceType { case types.SourceTypeSDK: - if err := v.buildAppSource(p); err != nil { + if err := v.buildSDKInput(p); err != nil { return err } case types.SourceTypeWeb: - if err := v.buildWebSource(p); err != nil { + if err := v.buildWebInput(p); err != nil { return err } } diff --git a/pkg/pipeline/input/video.go b/pkg/pipeline/input/video.go index e9cb3bbb..00803757 100644 --- a/pkg/pipeline/input/video.go +++ b/pkg/pipeline/input/video.go @@ -31,7 +31,7 @@ type videoInput struct { encoder []*gst.Element } -func (v *videoInput) buildWebSource(p *config.PipelineConfig) error { +func (v *videoInput) buildWebInput(p *config.PipelineConfig) error { xImageSrc, err := gst.NewElement("ximagesrc") if err != nil { return errors.ErrGstPipelineError(err) @@ -70,19 +70,28 @@ func (v *videoInput) buildWebSource(p *config.PipelineConfig) error { return nil } -func (v *videoInput) buildAppSource(p *config.PipelineConfig) error { - src := p.VideoSrc - src.Element.SetArg("format", "time") - if err := src.Element.SetProperty("is-live", true); err != nil { +func (v *videoInput) buildSDKInput(p *config.PipelineConfig) error { + if p.VideoTrack != nil { + if err := v.buildAppSource(p, p.VideoTrack); err != nil { + return err + } + } + + return nil +} + +func (v *videoInput) buildAppSource(p *config.PipelineConfig, track *config.TrackSource) error { + track.AppSrc.Element.SetArg("format", "time") + if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil { return errors.ErrGstPipelineError(err) } - v.src = append(v.src, src.Element) + v.src = append(v.src, track.AppSrc.Element) switch { - case strings.EqualFold(p.VideoCodecParams.MimeType, string(types.MimeTypeH264)): - if err := src.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeH264)): + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=H264,clock-rate=%d", - p.VideoCodecParams.PayloadType, p.VideoCodecParams.ClockRate, + track.Codec.PayloadType, track.Codec.ClockRate, ))); err != nil { return errors.ErrGstPipelineError(err) } @@ -122,11 +131,11 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig) error { return nil } - case strings.EqualFold(p.VideoCodecParams.MimeType, string(types.MimeTypeVP8)): - if err := src.Element.SetProperty("caps", gst.NewCapsFromString( + case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeVP8)): + if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString( fmt.Sprintf( "application/x-rtp,media=video,payload=%d,encoding-name=VP8,clock-rate=%d", - p.VideoCodecParams.PayloadType, p.VideoCodecParams.ClockRate, + track.Codec.PayloadType, track.Codec.ClockRate, ), )); err != nil { return errors.ErrGstPipelineError(err) @@ -150,7 +159,7 @@ func (v *videoInput) buildAppSource(p *config.PipelineConfig) error { v.src = append(v.src, vp8Dec) default: - return errors.ErrNotSupported(p.VideoCodecParams.MimeType) + return errors.ErrNotSupported(track.Codec.MimeType) } videoQueue, err := builder.BuildQueue("video_input_queue", p.Latency, true) diff --git a/pkg/pipeline/output/stream.go b/pkg/pipeline/output/stream.go index 63499892..95d19eb9 100644 --- a/pkg/pipeline/output/stream.go +++ b/pkg/pipeline/output/stream.go @@ -271,9 +271,6 @@ func (o *StreamOutput) RemoveSink(bin *gst.Bin, url string) error { srcPad := o.tee.GetStaticPad(sink.pad) srcPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - // remove probe - pad.RemoveProbe(uint64(info.ID())) - // unlink queue pad.Unlink(sink.queue.GetStaticPad("sink")) @@ -294,7 +291,7 @@ func (o *StreamOutput) RemoveSink(bin *gst.Bin, url string) error { // release tee src pad o.tee.ReleaseRequestPad(pad) - return gst.PadProbeOK + return gst.PadProbeRemove }) delete(o.sinks, url) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 44854545..e4f3b6de 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -91,7 +91,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, onStatusUpdate Update defer span.End() gst.Init(nil) gst.SetLogFunction(p.gstLog) - close(conf.GstReady) + close(p.GstReady) }() // create source @@ -107,7 +107,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, onStatusUpdate Update } // create pipeline - <-conf.GstReady + <-p.GstReady p.pipeline, err = gst.NewPipeline("pipeline") if err != nil { return nil, errors.ErrGstPipelineError(err) diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 4f15f149..b4660725 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -38,14 +38,11 @@ import ( ) const ( - AudioAppSource = "audioAppSrc" - VideoAppSource = "videoAppSrc" - subscriptionTimeout = time.Second * 30 ) type SDKSource struct { - *config.Callbacks + *config.PipelineConfig room *lksdk.Room sync *synchronizer.Synchronizer @@ -75,7 +72,7 @@ func NewSDKSource(ctx context.Context, p *config.PipelineConfig) (*SDKSource, er startRecording := make(chan struct{}) s := &SDKSource{ - Callbacks: p.Callbacks, + PipelineConfig: p, sync: synchronizer.NewSynchronizer(func() { close(startRecording) }), @@ -105,12 +102,9 @@ func (s *SDKSource) GetStartTime() int64 { return s.sync.GetStartedAt() } -func (s *SDKSource) Playing(name string) { - switch name { - case AudioAppSource: - s.audioWriter.Play() - case VideoAppSource: - s.videoWriter.Play() +func (s *SDKSource) Playing(trackID string) { + if w := s.getWriterForTrack(trackID); w != nil { + w.Play() } } @@ -143,19 +137,8 @@ func (s *SDKSource) CloseWriters() { wg.Wait() } -func (s *SDKSource) StreamStopped(name string) { - switch name { - case AudioAppSource: - s.audioWriter.Drain(true) - if s.active.Dec() == 0 { - s.onDisconnected() - } - case VideoAppSource: - s.videoWriter.Drain(true) - if s.active.Dec() == 0 { - s.onDisconnected() - } - } +func (s *SDKSource) StreamStopped(trackID string) { + s.onTrackFinished(trackID) } func (s *SDKSource) Close() { @@ -177,9 +160,9 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { } var mu sync.Mutex + var onSubscribeErr error filenameReplacements := make(map[string]string) - var onSubscribeErr error cb.OnTrackSubscribed = func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { defer s.subscriptions.Done() @@ -187,12 +170,13 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { t := s.sync.AddTrack(track, rp.Identity()) mu.Lock() - if p.Identity == "" || track.Kind() == webrtc.RTPCodecTypeVideo { - p.Identity = rp.Identity() - filenameReplacements["{publisher_identity}"] = p.Identity - } - - if p.TrackID != "" { + switch p.RequestType { + case types.RequestTypeTrackComposite: + if p.Identity == "" || track.Kind() == webrtc.RTPCodecTypeVideo { + p.Identity = rp.Identity() + filenameReplacements["{publisher_identity}"] = p.Identity + } + case types.RequestTypeTrack: if track.Kind() == webrtc.RTPCodecTypeAudio { p.TrackKind = "audio" } else { @@ -208,17 +192,16 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { filenameReplacements["{track_id}"] = p.TrackID filenameReplacements["{track_type}"] = p.TrackKind filenameReplacements["{track_source}"] = p.TrackSource + filenameReplacements["{publisher_identity}"] = p.Identity } mu.Unlock() var codec types.MimeType - var appSrcName string + var writeBlanks bool var err error - writeBlanks := false switch { case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeOpus)): - appSrcName = AudioAppSource codec = types.MimeTypeOpus p.AudioEnabled = true @@ -229,14 +212,13 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { } p.AudioTranscoding = true - if p.TrackID != "" { + if p.RequestType == types.RequestTypeTrack { if o := p.GetFileConfig(); o != nil { o.OutputType = types.OutputTypeOGG } } case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeVP8)): - appSrcName = VideoAppSource codec = types.MimeTypeVP8 p.VideoEnabled = true @@ -250,14 +232,13 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { writeBlanks = true } - if p.TrackID != "" { + if p.RequestType == types.RequestTypeTrack { if o := p.GetFileConfig(); o != nil { o.OutputType = types.OutputTypeWebM } } case strings.EqualFold(track.Codec().MimeType, string(types.MimeTypeH264)): - appSrcName = VideoAppSource codec = types.MimeTypeH264 p.VideoEnabled = true @@ -267,7 +248,7 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { p.VideoOutCodec = types.MimeTypeH264 } - if p.TrackID != "" { + if p.RequestType == types.RequestTypeTrack { if o := p.GetFileConfig(); o != nil { o.OutputType = types.OutputTypeMP4 } @@ -278,14 +259,6 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { return } - <-p.GstReady - src, err := gst.NewElementWithName("appsrc", appSrcName) - if err != nil { - onSubscribeErr = errors.ErrGstPipelineError(err) - return - } - appSrc := app.SrcFromElement(src) - var logFilename string if p.Debug.EnableProfiling { if p.Debug.ToUploadConfig() == nil { @@ -295,6 +268,14 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { } } + <-p.GstReady + src, err := gst.NewElementWithName("appsrc", track.ID()) + if err != nil { + onSubscribeErr = errors.ErrGstPipelineError(err) + return + } + + appSrc := app.SrcFromElement(src) writer, err := sdk.NewAppWriter(track, rp, codec, appSrc, s.sync, t, writeBlanks, logFilename) if err != nil { logger.Errorw("could not create app writer", err) @@ -302,16 +283,20 @@ func (s *SDKSource) joinRoom(p *config.PipelineConfig) error { return } - // write blank frames only when writing to mp4 + ts := &config.TrackSource{ + TrackID: pub.SID(), + Kind: pub.Kind(), + AppSrc: appSrc, + Codec: track.Codec(), + } + switch track.Kind() { case webrtc.RTPCodecTypeAudio: s.audioWriter = writer - p.AudioSrc = appSrc - p.AudioCodecParams = track.Codec() + p.AudioTrack = ts case webrtc.RTPCodecTypeVideo: s.videoWriter = writer - p.VideoSrc = appSrc - p.VideoCodecParams = track.Codec() + p.VideoTrack = ts } } @@ -366,18 +351,13 @@ func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}) error { for _, track := range p.Tracks() { trackID := track.SID() if _, ok := expecting[trackID]; ok { - if pub, ok := track.(*lksdk.RemoteTrackPublication); ok { - pub.OnRTCP(s.sync.OnRTCP) - - err := pub.SetSubscribed(true) - if err != nil { - return err - } - - delete(expecting, track.SID()) - if len(expecting) == 0 { - return nil - } + if err := s.subscribe(track); err != nil { + return err + } + + delete(expecting, track.SID()) + if len(expecting) == 0 { + return nil } } } @@ -392,18 +372,29 @@ func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}) error { return nil } -func (s *SDKSource) onTrackMuteChanged(pub lksdk.TrackPublication, muted bool) { - track := pub.Track() - if track == nil { - return - } +func (s *SDKSource) subscribe(track lksdk.TrackPublication) error { + if pub, ok := track.(*lksdk.RemoteTrackPublication); ok { + if pub.IsSubscribed() { + s.subscriptions.Done() + return nil + } - if w := s.getWriterForTrack(track.ID()); w != nil { - w.SetTrackMuted(muted) + s.active.Inc() + logger.Infow("subscribing to track", "trackID", track.SID()) + + pub.OnRTCP(s.sync.OnRTCP) + return pub.SetSubscribed(true) } - for _, onMute := range s.OnTrackMuted { - onMute(muted) + return errors.ErrInvalidTrack +} + +func (s *SDKSource) onTrackMuteChanged(pub lksdk.TrackPublication, muted bool) { + if w := s.getWriterForTrack(pub.SID()); w != nil { + w.SetTrackMuted(muted) + for _, onMute := range s.OnTrackMuted { + onMute(muted) + } } } @@ -416,6 +407,27 @@ func (s *SDKSource) onTrackUnpublished(pub *lksdk.RemoteTrackPublication, _ *lks } } +func (s *SDKSource) onTrackFinished(trackID string) { + var w *sdk.AppWriter + + if s.audioWriter != nil && s.audioWriter.TrackID() == trackID { + logger.Infow("removing audio writer") + w = s.audioWriter + s.audioWriter = nil + } else if s.videoWriter != nil && s.videoWriter.TrackID() == trackID { + logger.Infow("removing video writer") + w = s.videoWriter + s.videoWriter = nil + } else { + return + } + + w.Drain(true) + if s.active.Dec() == 0 { + s.onDisconnected() + } +} + func (s *SDKSource) onDisconnected() { select { case <-s.endRecording: @@ -426,18 +438,11 @@ func (s *SDKSource) onDisconnected() { } func (s *SDKSource) getWriterForTrack(trackID string) *sdk.AppWriter { - switch trackID { - case s.trackID: - if s.audioWriter != nil { - return s.audioWriter - } else if s.videoWriter != nil { - return s.videoWriter - } - case s.audioTrackID: + if s.audioWriter != nil && s.audioWriter.TrackID() == trackID { return s.audioWriter - case s.videoTrackID: + } + if s.videoWriter != nil && s.videoWriter.TrackID() == trackID { return s.videoWriter } - return nil } diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index d213ddbb..82c9c77b 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -146,6 +146,10 @@ func NewAppWriter( return w, nil } +func (w *AppWriter) TrackID() string { + return w.track.ID() +} + func (w *AppWriter) Play() { w.playing.Break() } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 5dad0f27..a0b667fa 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -243,12 +243,8 @@ func (p *Pipeline) handleMessageStateChanged(msg *gst.Message) { return } - switch s := msg.Source(); s { - case source.AudioAppSource, source.VideoAppSource: - logger.Infow(fmt.Sprintf("%s playing", s)) - p.src.(*source.SDKSource).Playing(s) - - case pipelineSource: + s := msg.Source() + if s == pipelineSource { logger.Infow("pipeline playing") p.playing.Break() @@ -258,6 +254,9 @@ func (p *Pipeline) handleMessageStateChanged(msg *gst.Message) { case types.SourceTypeWeb: p.updateStartTime(time.Now().UnixNano()) } + } else if strings.HasPrefix(s, "TR_") { + logger.Infow(fmt.Sprintf("%s playing", s)) + p.src.(*source.SDKSource).Playing(s) } return diff --git a/test/integration.go b/test/integration.go index 0dbc6cec..5d708194 100644 --- a/test/integration.go +++ b/test/integration.go @@ -76,9 +76,16 @@ type testCase struct { playlist string filenameSuffix livekit.SegmentedFileSuffix - // used by track and track composite tests - audioCodec types.MimeType - videoCodec types.MimeType + // used by sdk tests + audioCodec types.MimeType + audioDelay time.Duration + audioUnpublish time.Duration + audioRepublish time.Duration + + videoCodec types.MimeType + videoDelay time.Duration + videoUnpublish time.Duration + videoRepublish time.Duration // used by track tests outputType types.OutputType @@ -114,32 +121,33 @@ func (r *Runner) publishSamplesToRoom(t *testing.T, audioCodec, videoCodec types return } -func (r *Runner) publishSampleToRoom(t *testing.T, codec types.MimeType, withMuting bool) string { - filename := samples[codec] - frameDuration := frameDurations[codec] - - var pub *lksdk.LocalTrackPublication - done := make(chan struct{}) - opts := []lksdk.ReaderSampleProviderOption{ - lksdk.ReaderTrackWithOnWriteComplete(func() { - close(done) - if pub != nil { +func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) { + go func() { + time.Sleep(publishAfter) + done := make(chan struct{}) + pub := r.publish(t, codec, done) + if unpublishAfter != 0 { + time.AfterFunc(unpublishAfter, func() { + select { + case <-done: + return + default: + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + }) + } else { + t.Cleanup(func() { _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - } - }), - } - - if frameDuration != 0 { - opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) - } - - track, err := lksdk.NewLocalFileTrack(filename, opts...) - require.NoError(t, err) - - pub, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: filename}) - require.NoError(t, err) + }) + } + }() +} +func (r *Runner) publishSampleToRoom(t *testing.T, codec types.MimeType, withMuting bool) string { + done := make(chan struct{}) + pub := r.publish(t, codec, done) trackID := pub.SID() + t.Cleanup(func() { _ = r.room.LocalParticipant.UnpublishTrack(trackID) }) @@ -164,6 +172,33 @@ func (r *Runner) publishSampleToRoom(t *testing.T, codec types.MimeType, withMut return trackID } +func (r *Runner) publish(t *testing.T, codec types.MimeType, done chan struct{}) *lksdk.LocalTrackPublication { + filename := samples[codec] + frameDuration := frameDurations[codec] + + var pub *lksdk.LocalTrackPublication + opts := []lksdk.ReaderSampleProviderOption{ + lksdk.ReaderTrackWithOnWriteComplete(func() { + close(done) + if pub != nil { + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + }), + } + + if frameDuration != 0 { + opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) + } + + track, err := lksdk.NewLocalFileTrack(filename, opts...) + require.NoError(t, err) + + pub, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: filename}) + require.NoError(t, err) + + return pub +} + func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string { // send start request info, err := r.client.StartEgress(context.Background(), "", req) diff --git a/test/track.go b/test/track.go index 8f61f977..d1d814c2 100644 --- a/test/track.go +++ b/test/track.go @@ -76,7 +76,7 @@ func (r *Runner) testTrackFile(t *testing.T) { filename: "t_{track_id}_{time}.mp4", }, } { - r.runSDKTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { trackID := audioTrackID if trackID == "" { trackID = videoTrackID @@ -123,7 +123,7 @@ func (r *Runner) testTrackStream(t *testing.T) { filename: fmt.Sprintf("track-ws-%v.raw", now), }, } { - r.runSDKTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { trackID := audioTrackID if trackID == "" { trackID = videoTrackID diff --git a/test/track_composite.go b/test/track_composite.go index 6b4e0cab..9eb4fd92 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -37,7 +37,7 @@ func (r *Runner) testTrackComposite(t *testing.T) { r.testTrackCompositeMulti(t) } -func (r *Runner) runSDKTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, +func (r *Runner) runTrackTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T, audioTrackID, videoTrackID string), ) { t.Run(name, func(t *testing.T) { @@ -73,7 +73,7 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { filename: "tc_{room_name}_h264_{time}.mp4", }, } { - r.runSDKTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { var aID, vID string if !test.audioOnly { vID = videoTrackID @@ -127,7 +127,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { return } - r.runSDKTest(t, "TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -170,7 +170,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { playlist: "tcs_{room_name}_h264_{time}.m3u8", }, } { - r.runSDKTest(t, test.name, test.audioCodec, test.videoCodec, + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { var aID, vID string if !test.audioOnly { @@ -227,7 +227,7 @@ func (r *Runner) testTrackCompositeMulti(t *testing.T) { return } - r.runSDKTest(t, "TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix),