Skip to content

Commit

Permalink
video selector
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Aug 24, 2023
1 parent e864c61 commit 9cb8b0c
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 267 deletions.
2 changes: 2 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
Expand Down
17 changes: 17 additions & 0 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Bin struct {
getSrcPad func(string) *gst.Pad
getSinkPad func(string) *gst.Pad

added bool
srcs []*Bin // source bins
elements []*gst.Element // elements within this bin
queues map[string]*gst.Element // used with BinTypeMultiStream
Expand All @@ -59,6 +60,14 @@ func (b *Bin) AddSourceBin(src *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

src.mu.Lock()
alreadyAdded := src.added
src.added = true
src.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.srcs = append(b.srcs, src)
if err := b.pipeline.Add(src.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down Expand Up @@ -89,6 +98,14 @@ func (b *Bin) AddSinkBin(sink *Bin) error {
b.mu.Lock()
defer b.mu.Unlock()

sink.mu.Lock()
alreadyAdded := sink.added
sink.added = true
sink.mu.Unlock()
if alreadyAdded {
return errors.ErrBinAlreadyAdded
}

b.sinks = append(b.sinks, sink)
if err := b.pipeline.Add(sink.bin.Element); err != nil {
return errors.ErrGstPipelineError(err)
Expand Down
38 changes: 36 additions & 2 deletions pkg/gstreamer/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type Pipeline struct {

loop *glib.MainLoop

started core.Fuse
running chan struct{}
binsAdded bool
elementsAdded bool
started core.Fuse
running chan struct{}
}

// A pipeline can have either elements or src and sink bins. If you add both you will get a wrong hierarchy error
Expand All @@ -54,6 +56,38 @@ func NewPipeline(name string, latency uint64, callbacks *Callbacks) (*Pipeline,
}, nil
}

func (p *Pipeline) AddSourceBin(src *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSourceBin(src)
}

func (p *Pipeline) AddSinkBin(sink *Bin) error {
if p.elementsAdded {
return errors.ErrWrongHierarchy
}
p.binsAdded = true
return p.Bin.AddSinkBin(sink)
}

func (p *Pipeline) AddElement(e *gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElement(e)
}

func (p *Pipeline) AddElements(elements ...*gst.Element) error {
if p.binsAdded {
return errors.ErrWrongHierarchy
}
p.elementsAdded = true
return p.Bin.AddElements(elements...)
}

func (p *Pipeline) Link() error {
return p.link()
}
Expand Down
148 changes: 88 additions & 60 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,7 @@ import (
"github.com/livekit/protocol/logger"
)

const videoTestSrc = "video_test_src"

type videoBin struct {
lastPTS atomic.Duration
nextPTS atomic.Duration
selectedPad string
nextPad *gst.Pad

mu sync.Mutex
pads map[string]*gst.Pad
selector *gst.Element
}
const videoTestSrcName = "video_test_src"

func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) {
b := pipeline.NewBin("video")
Expand Down Expand Up @@ -122,8 +111,21 @@ func buildWebVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
return nil
}

type videoSDKBin struct {
lastPTS atomic.Duration
nextPTS atomic.Duration
selectedPad string
nextPad string

mu sync.Mutex
pads map[string]*gst.Pad
selector *gst.Element
}

func buildSDKVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
v := &videoBin{}
v := &videoSDKBin{
pads: make(map[string]*gst.Pad),
}

if p.VideoTrack != nil {
if err := v.buildVideoAppSrcBin(b, p); err != nil {
Expand All @@ -135,10 +137,22 @@ func buildSDKVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if err := v.buildVideoTestSrcBin(b, p); err != nil {
return err
}
if err := v.addVideoSelector(b); err != nil {
if err := v.addVideoSelector(b, p); err != nil {
return err
}

v.createTestSrcPad()
if p.VideoTrack != nil {
v.createSrcPad(p.VideoTrack.TrackID)
if err := v.setSelectorPad(p.VideoTrack.TrackID); err != nil {
return err
}
} else {
if err := v.setSelectorPad(videoTestSrcName); err != nil {
return err
}
}

b.SetGetSrcPad(v.getSrcPad)
b.Callbacks.AddOnTrackMuted(v.onTrackMuted)
b.Callbacks.AddOnTrackUnmuted(v.onTrackUnmuted)
Expand All @@ -151,7 +165,7 @@ func buildSDKVideoInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
return nil
}

func (v *videoBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error {
func (v *videoSDKBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error {
track := p.VideoTrack

b := videoBin.NewBin(track.TrackID)
Expand All @@ -164,7 +178,6 @@ func (v *videoBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.Pipeli
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return errors.ErrGstPipelineError(err)
}

if err := b.AddElement(track.AppSrc.Element); err != nil {
return err
}
Expand Down Expand Up @@ -316,17 +329,14 @@ func (v *videoBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.Pipeli
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = videoRate.SetProperty("max-duplication-time", uint64(time.Second)); err != nil {
return err
}

caps, err := gst.NewElement("capsfilter")
caps, err := newVideoCapsFilter(p, false)
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Framerate, p.Width, p.Height,
))); err != nil {
return errors.ErrGstPipelineError(err)
}

if err = b.AddElements(videoQueue, videoConvert, videoScale, videoRate, caps); err != nil {
return err
Expand All @@ -335,8 +345,8 @@ func (v *videoBin) buildVideoAppSrcBin(videoBin *gstreamer.Bin, p *config.Pipeli
return nil
}

func (v *videoBin) buildVideoTestSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error {
b := videoBin.NewBin(videoTestSrc)
func (v *videoSDKBin) buildVideoTestSrcBin(videoBin *gstreamer.Bin, p *config.PipelineConfig) error {
b := videoBin.NewBin(videoTestSrcName)
if err := videoBin.AddSourceBin(b); err != nil {
return err
}
Expand All @@ -350,16 +360,10 @@ func (v *videoBin) buildVideoTestSrcBin(videoBin *gstreamer.Bin, p *config.Pipel
}
videoTestSrc.SetArg("pattern", "black")

caps, err := gst.NewElement("capsfilter")
caps, err := newVideoCapsFilter(p, true)
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Framerate, p.Width, p.Height,
))); err != nil {
return errors.ErrGstPipelineError(err)
}

if err = b.AddElements(videoTestSrc, caps); err != nil {
return err
Expand All @@ -368,19 +372,21 @@ func (v *videoBin) buildVideoTestSrcBin(videoBin *gstreamer.Bin, p *config.Pipel
return nil
}

func (v *videoBin) addVideoSelector(b *gstreamer.Bin) error {
func (v *videoSDKBin) addVideoSelector(b *gstreamer.Bin, p *config.PipelineConfig) error {
inputSelector, err := gst.NewElement("input-selector")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = b.AddElement(inputSelector); err != nil {

if err = b.AddElements(inputSelector); err != nil {
return err
}

v.selector = inputSelector
return nil
}

func addVideoEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error {
// Put a queue in front of the encoder for pipelining with the stage before
videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", p.Latency, false)
if err != nil {
return err
Expand Down Expand Up @@ -466,86 +472,85 @@ func addVideoEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error {
}
}

func (v *videoBin) getSrcPad(name string) *gst.Pad {
func (v *videoSDKBin) getSrcPad(name string) *gst.Pad {
v.mu.Lock()
defer v.mu.Unlock()

return v.pads[name]
}

func (v *videoBin) createSrcPad(trackID string) {
func (v *videoSDKBin) createSrcPad(trackID string) {
v.mu.Lock()
defer v.mu.Unlock()

pad := v.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()

for v.nextPTS.Load() != 0 {
time.Sleep(time.Millisecond * 100)
}

if buffer.PresentationTimestamp() < v.lastPTS.Load() {
return gst.PadProbeDrop
}
v.lastPTS.Store(buffer.PresentationTimestamp())

logger.Debugw("pushing src buffer", "pts", buffer.PresentationTimestamp())
return gst.PadProbeOK
})
v.pads[trackID] = pad
}

func (v *videoBin) createTestSrcPad() {
func (v *videoSDKBin) createTestSrcPad() {
v.mu.Lock()
defer v.mu.Unlock()

pad := v.selector.GetRequestPad("sink_%u")
pad.AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()

if buffer.PresentationTimestamp() < v.lastPTS.Load() {
return gst.PadProbeDrop
}
if nextPTS := v.nextPTS.Load(); nextPTS != 0 && buffer.PresentationTimestamp() >= nextPTS {
if err := v.setSelectorPad(v.nextPad); err != nil {
logger.Errorw("failed to unmute", err)
return gst.PadProbeDrop
}
v.nextPad = ""
v.nextPTS.Store(0)
} else if buffer.PresentationTimestamp() < v.lastPTS.Load() {
return gst.PadProbeDrop
}

logger.Debugw("pushing test src buffer", "pts", buffer.PresentationTimestamp())
v.lastPTS.Store(buffer.PresentationTimestamp())
return gst.PadProbeOK
})
v.pads[videoTestSrc] = pad
v.pads[videoTestSrcName] = pad
}

func (v *videoBin) onTrackMuted(trackID string) {
v.mu.Lock()
defer v.mu.Unlock()

func (v *videoSDKBin) onTrackMuted(trackID string) {
if v.selectedPad != trackID {
return
}

pad := v.pads[videoTestSrc]
if err := v.setSelectorPad(pad); err != nil {
if err := v.setSelectorPad(videoTestSrcName); err != nil {
logger.Errorw("failed to set selector pad", err)
}
}

func (v *videoBin) onTrackUnmuted(trackID string, pts time.Duration) {
func (v *videoSDKBin) onTrackUnmuted(trackID string, pts time.Duration) {
v.mu.Lock()
defer v.mu.Unlock()

pad := v.pads[trackID]
if pad == nil {
if v.pads[trackID] == nil {
return
}

v.nextPTS.Store(pts)
v.nextPad = pad
v.nextPad = trackID
}

func (v *videoBin) setSelectorPad(pad *gst.Pad) error {
// TODO: go-gst should accept objects directly and handle conversion to C
// TODO: go-gst should accept objects directly and handle conversion to C
func (v *videoSDKBin) setSelectorPad(name string) error {
v.mu.Lock()
defer v.mu.Unlock()

pad := v.pads[name]

pt, err := v.selector.GetPropertyType("active-pad")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -561,5 +566,28 @@ func (v *videoBin) setSelectorPad(pad *gst.Pad) error {
return errors.ErrGstPipelineError(err)
}

v.selectedPad = name
return nil
}

func newVideoCapsFilter(p *config.PipelineConfig, includeFramerate bool) (*gst.Element, error) {
caps, err := gst.NewElement("capsfilter")
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if includeFramerate {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,framerate=%d/1,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Framerate, p.Width, p.Height,
)))
} else {
err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"video/x-raw,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1",
p.Width, p.Height,
)))
}
if err != nil {
return nil, errors.ErrGstPipelineError(err)
}
return caps, nil
}
Loading

0 comments on commit 9cb8b0c

Please sign in to comment.