Skip to content

Commit

Permalink
more refactoring for PC (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Aug 9, 2023
1 parent fd026c7 commit 6a79200
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 195 deletions.
4 changes: 2 additions & 2 deletions build/egress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM livekit/gstreamer:1.22.4-dev
FROM livekit/gstreamer:1.22.5-dev

ARG TARGETPLATFORM

Expand Down Expand Up @@ -40,7 +40,7 @@ RUN find cmd/server/templates/ -name *.map | xargs rm
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \
CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go build -a -o egress ./cmd/server

FROM livekit/gstreamer:1.22.4-prod
FROM livekit/gstreamer:1.22.5-prod

ARG TARGETPLATFORM

Expand Down
4 changes: 2 additions & 2 deletions build/test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM livekit/gstreamer:1.22.4-dev
FROM livekit/gstreamer:1.22.5-dev

WORKDIR /workspace

Expand Down Expand Up @@ -44,7 +44,7 @@ RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd
CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go test -c -v --tags=integration ./test


FROM livekit/gstreamer:1.22.4-prod
FROM livekit/gstreamer:1.22.5-prod

ARG TARGETPLATFORM

Expand Down
2 changes: 1 addition & 1 deletion magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
gstVersion = "1.22.4"
gstVersion = "1.22.5"
chromiumVersion = "117.0.5874.0"
dockerBuild = "docker build"
dockerBuildX = "docker buildx build --push --platform linux/amd64,linux/arm64"
Expand Down
38 changes: 23 additions & 15 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
lksdk "github.com/livekit/server-sdk-go"
)

const (
Expand Down Expand Up @@ -76,18 +77,23 @@ type WebSourceParams struct {
}

type SDKSourceParams struct {
TrackID string
TrackSource string
TrackKind string
AudioTrackID string
VideoTrackID string
Identity string
AudioSrc *app.Source
VideoSrc *app.Source
AudioInCodec types.MimeType
VideoInCodec types.MimeType
AudioCodecParams webrtc.RTPCodecParameters
VideoCodecParams webrtc.RTPCodecParameters
TrackID string
AudioTrackID string
VideoTrackID string
Identity string
TrackSource string
TrackKind string
AudioInCodec types.MimeType
VideoInCodec types.MimeType
AudioTrack *TrackSource
VideoTrack *TrackSource
}

type TrackSource struct {
TrackID string
Kind lksdk.TrackKind
AppSrc *app.Source
Codec webrtc.RTPCodecParameters
}

type AudioConfig struct {
Expand All @@ -112,9 +118,11 @@ type VideoConfig struct {
}

type Callbacks struct {
GstReady chan struct{} `yaml:"-"`
OnTrackMuted []func(bool) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
GstReady chan struct{} `yaml:"-"`
OnTrackMuted []func(bool) `yaml:"-"`
OnTrackAdded func(*TrackSource) `yaml:"-"`
OnTrackRemoved func(trackID string) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
}

func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*PipelineConfig, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrVideoWebsocket = psrpc.NewErrorf(psrpc.InvalidArgument, "cannot send video over websocket")
ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type")
)

func New(err string) error {
Expand Down
106 changes: 76 additions & 30 deletions pkg/pipeline/input/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type audioInput struct {
encoder *gst.Element
}

func (a *audioInput) buildWebSource(p *config.PipelineConfig) error {
func (a *audioInput) buildWebInput(p *config.PipelineConfig) error {
pulseSrc, err := gst.NewElement("pulsesrc")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -48,20 +48,36 @@ func (a *audioInput) buildWebSource(p *config.PipelineConfig) error {
return a.buildConverter(p)
}

func (a *audioInput) buildAppSource(p *config.PipelineConfig) error {
src := p.AudioSrc
src.Element.SetArg("format", "time")
if err := src.Element.SetProperty("is-live", true); err != nil {
func (a *audioInput) buildSDKInput(p *config.PipelineConfig) error {
if p.AudioTrack != nil {
if err := a.buildAppSource(p.AudioTrack); err != nil {
return err
}
if err := a.buildConverter(p); err != nil {
return err
}
}

if err := a.buildTestSrc(p); err != nil {
return err
}

return a.buildMixer(p)
}

func (a *audioInput) buildAppSource(track *config.TrackSource) error {
track.AppSrc.Element.SetArg("format", "time")
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return err
}
a.src = []*gst.Element{src.Element}
a.src = []*gst.Element{track.AppSrc.Element}

switch {
case strings.EqualFold(p.AudioCodecParams.MimeType, string(types.MimeTypeOpus)):
if err := src.Element.SetProperty("caps", gst.NewCapsFromString(
case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeOpus)):
if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(
fmt.Sprintf(
"application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d",
p.AudioCodecParams.PayloadType, p.AudioCodecParams.ClockRate,
track.Codec.PayloadType, track.Codec.ClockRate,
),
)); err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -80,14 +96,10 @@ func (a *audioInput) buildAppSource(p *config.PipelineConfig) error {
a.src = append(a.src, rtpOpusDepay, opusDec)

default:
return errors.ErrNotSupported(p.AudioCodecParams.MimeType)
return errors.ErrNotSupported(track.Codec.MimeType)
}

if err := a.buildConverter(p); err != nil {
return err
}

return a.buildMixer(p)
return nil
}

func (a *audioInput) buildConverter(p *config.PipelineConfig) error {
Expand Down Expand Up @@ -115,7 +127,7 @@ func (a *audioInput) buildConverter(p *config.PipelineConfig) error {
return nil
}

func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
func (a *audioInput) buildTestSrc(p *config.PipelineConfig) error {
audioTestSrc, err := gst.NewElement("audiotestsrc")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -129,25 +141,31 @@ func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
if err = audioTestSrc.SetProperty("is-live", true); err != nil {
return errors.ErrGstPipelineError(err)
}

audioCaps, err := newAudioCapsFilter(p)
if err != nil {
return err
}

a.testSrc = []*gst.Element{audioTestSrc, audioCaps}
return nil
}

func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
audioMixer, err := gst.NewElement("audiomixer")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = audioMixer.SetProperty("latency", audioMixerLatency); err != nil {
return errors.ErrGstPipelineError(err)
}

mixedCaps, err := newAudioCapsFilter(p)
if err != nil {
return err
}
a.mixer = []*gst.Element{audioMixer, mixedCaps}

a.mixer = []*gst.Element{audioMixer, mixedCaps}
return nil
}

Expand Down Expand Up @@ -211,42 +229,70 @@ func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) {

func (a *audioInput) link() (*gst.GhostPad, error) {
if a.src != nil {
// link src elements
if err := gst.ElementLinkMany(a.src...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}
if a.mixer != nil {
if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil {
return nil, err
}

if a.testSrc != nil {
// link test src elements
if err := gst.ElementLinkMany(a.testSrc...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if err := builder.LinkPads("audio test src", builder.GetSrcPad(a.testSrc), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil {
return nil, err
}

if a.mixer != nil {
if a.src != nil {
// link src to mixer
if err := builder.LinkPads(
"audio src", builder.GetSrcPad(a.src),
"audio mixer", a.mixer[0].GetRequestPad("sink_%u"),
); err != nil {
return nil, err
}
}

if a.testSrc != nil {
// link test src to mixer
if err := builder.LinkPads(
"audio test src", builder.GetSrcPad(a.testSrc),
"audio mixer", a.mixer[0].GetRequestPad("sink_%u"),
); err != nil {
return nil, err
}
}

if err := gst.ElementLinkMany(a.mixer...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}

var srcPad *gst.Pad
var ghostPad *gst.Pad
if a.encoder != nil {
if a.mixer != nil {
if err := builder.LinkPads("audio mixer", builder.GetSrcPad(a.mixer), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil {
// link mixer to encoder
if err := builder.LinkPads(
"audio mixer", builder.GetSrcPad(a.mixer),
"audio encoder", a.encoder.GetStaticPad("sink"),
); err != nil {
return nil, err
}
} else {
if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil {
// link src to encoder
if err := builder.LinkPads(
"audio src", builder.GetSrcPad(a.src),
"audio encoder", a.encoder.GetStaticPad("sink"),
); err != nil {
return nil, err
}
}
srcPad = a.encoder.GetStaticPad("src")
ghostPad = a.encoder.GetStaticPad("src")
} else if a.mixer != nil {
srcPad = builder.GetSrcPad(a.mixer)
ghostPad = builder.GetSrcPad(a.mixer)
} else {
srcPad = builder.GetSrcPad(a.src)
ghostPad = builder.GetSrcPad(a.src)
}

return gst.NewGhostPad("audio_src", srcPad), nil
return gst.NewGhostPad("audio_src", ghostPad), nil
}
8 changes: 4 additions & 4 deletions pkg/pipeline/input/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error {

switch p.SourceType {
case types.SourceTypeSDK:
if err := a.buildAppSource(p); err != nil {
if err := a.buildSDKInput(p); err != nil {
return err
}

case types.SourceTypeWeb:
if err := a.buildWebSource(p); err != nil {
if err := a.buildWebInput(p); err != nil {
return err
}
}
Expand Down Expand Up @@ -102,12 +102,12 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error {

switch p.SourceType {
case types.SourceTypeSDK:
if err := v.buildAppSource(p); err != nil {
if err := v.buildSDKInput(p); err != nil {
return err
}

case types.SourceTypeWeb:
if err := v.buildWebSource(p); err != nil {
if err := v.buildWebInput(p); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 6a79200

Please sign in to comment.