Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More refactoring for PC #457

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/egress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM livekit/gstreamer:1.22.4-dev
FROM livekit/gstreamer:1.22.5-dev

ARG TARGETPLATFORM

Expand Down Expand Up @@ -40,7 +40,7 @@ RUN find cmd/server/templates/ -name *.map | xargs rm
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \
CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go build -a -o egress ./cmd/server

FROM livekit/gstreamer:1.22.4-prod
FROM livekit/gstreamer:1.22.5-prod

ARG TARGETPLATFORM

Expand Down
4 changes: 2 additions & 2 deletions build/test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM livekit/gstreamer:1.22.4-dev
FROM livekit/gstreamer:1.22.5-dev

WORKDIR /workspace

Expand Down Expand Up @@ -44,7 +44,7 @@ RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd
CGO_ENABLED=1 GOOS=linux GOARCH=${GOARCH} GO111MODULE=on go test -c -v --tags=integration ./test


FROM livekit/gstreamer:1.22.4-prod
FROM livekit/gstreamer:1.22.5-prod

ARG TARGETPLATFORM

Expand Down
2 changes: 1 addition & 1 deletion magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
gstVersion = "1.22.4"
gstVersion = "1.22.5"
chromiumVersion = "117.0.5874.0"
dockerBuild = "docker build"
dockerBuildX = "docker buildx build --push --platform linux/amd64,linux/arm64"
Expand Down
38 changes: 23 additions & 15 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
lksdk "github.com/livekit/server-sdk-go"
)

const (
Expand Down Expand Up @@ -76,18 +77,23 @@ type WebSourceParams struct {
}

type SDKSourceParams struct {
TrackID string
TrackSource string
TrackKind string
AudioTrackID string
VideoTrackID string
Identity string
AudioSrc *app.Source
VideoSrc *app.Source
AudioInCodec types.MimeType
VideoInCodec types.MimeType
AudioCodecParams webrtc.RTPCodecParameters
VideoCodecParams webrtc.RTPCodecParameters
TrackID string
AudioTrackID string
VideoTrackID string
Identity string
TrackSource string
TrackKind string
AudioInCodec types.MimeType
VideoInCodec types.MimeType
AudioTrack *TrackSource
VideoTrack *TrackSource
}

type TrackSource struct {
TrackID string
Kind lksdk.TrackKind
AppSrc *app.Source
Codec webrtc.RTPCodecParameters
}

type AudioConfig struct {
Expand All @@ -112,9 +118,11 @@ type VideoConfig struct {
}

type Callbacks struct {
GstReady chan struct{} `yaml:"-"`
OnTrackMuted []func(bool) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
GstReady chan struct{} `yaml:"-"`
OnTrackMuted []func(bool) `yaml:"-"`
OnTrackAdded func(*TrackSource) `yaml:"-"`
OnTrackRemoved func(trackID string) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
}

func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*PipelineConfig, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrVideoWebsocket = psrpc.NewErrorf(psrpc.InvalidArgument, "cannot send video over websocket")
ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type")
)

func New(err string) error {
Expand Down
106 changes: 76 additions & 30 deletions pkg/pipeline/input/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type audioInput struct {
encoder *gst.Element
}

func (a *audioInput) buildWebSource(p *config.PipelineConfig) error {
func (a *audioInput) buildWebInput(p *config.PipelineConfig) error {
pulseSrc, err := gst.NewElement("pulsesrc")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -48,20 +48,36 @@ func (a *audioInput) buildWebSource(p *config.PipelineConfig) error {
return a.buildConverter(p)
}

func (a *audioInput) buildAppSource(p *config.PipelineConfig) error {
src := p.AudioSrc
src.Element.SetArg("format", "time")
if err := src.Element.SetProperty("is-live", true); err != nil {
func (a *audioInput) buildSDKInput(p *config.PipelineConfig) error {
if p.AudioTrack != nil {
if err := a.buildAppSource(p.AudioTrack); err != nil {
return err
}
if err := a.buildConverter(p); err != nil {
return err
}
}

if err := a.buildTestSrc(p); err != nil {
return err
}

return a.buildMixer(p)
}

func (a *audioInput) buildAppSource(track *config.TrackSource) error {
track.AppSrc.Element.SetArg("format", "time")
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return err
}
a.src = []*gst.Element{src.Element}
a.src = []*gst.Element{track.AppSrc.Element}

switch {
case strings.EqualFold(p.AudioCodecParams.MimeType, string(types.MimeTypeOpus)):
if err := src.Element.SetProperty("caps", gst.NewCapsFromString(
case strings.EqualFold(track.Codec.MimeType, string(types.MimeTypeOpus)):
if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(
fmt.Sprintf(
"application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d",
p.AudioCodecParams.PayloadType, p.AudioCodecParams.ClockRate,
track.Codec.PayloadType, track.Codec.ClockRate,
),
)); err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -80,14 +96,10 @@ func (a *audioInput) buildAppSource(p *config.PipelineConfig) error {
a.src = append(a.src, rtpOpusDepay, opusDec)

default:
return errors.ErrNotSupported(p.AudioCodecParams.MimeType)
return errors.ErrNotSupported(track.Codec.MimeType)
}

if err := a.buildConverter(p); err != nil {
return err
}

return a.buildMixer(p)
return nil
}

func (a *audioInput) buildConverter(p *config.PipelineConfig) error {
Expand Down Expand Up @@ -115,7 +127,7 @@ func (a *audioInput) buildConverter(p *config.PipelineConfig) error {
return nil
}

func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
func (a *audioInput) buildTestSrc(p *config.PipelineConfig) error {
audioTestSrc, err := gst.NewElement("audiotestsrc")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -129,25 +141,31 @@ func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
if err = audioTestSrc.SetProperty("is-live", true); err != nil {
return errors.ErrGstPipelineError(err)
}

audioCaps, err := newAudioCapsFilter(p)
if err != nil {
return err
}

a.testSrc = []*gst.Element{audioTestSrc, audioCaps}
return nil
}

func (a *audioInput) buildMixer(p *config.PipelineConfig) error {
audioMixer, err := gst.NewElement("audiomixer")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = audioMixer.SetProperty("latency", audioMixerLatency); err != nil {
return errors.ErrGstPipelineError(err)
}

mixedCaps, err := newAudioCapsFilter(p)
if err != nil {
return err
}
a.mixer = []*gst.Element{audioMixer, mixedCaps}

a.mixer = []*gst.Element{audioMixer, mixedCaps}
return nil
}

Expand Down Expand Up @@ -211,42 +229,70 @@ func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) {

func (a *audioInput) link() (*gst.GhostPad, error) {
if a.src != nil {
// link src elements
if err := gst.ElementLinkMany(a.src...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}
if a.mixer != nil {
if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil {
return nil, err
}

if a.testSrc != nil {
// link test src elements
if err := gst.ElementLinkMany(a.testSrc...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
if err := builder.LinkPads("audio test src", builder.GetSrcPad(a.testSrc), "audio mixer", a.mixer[0].GetRequestPad("sink_%u")); err != nil {
return nil, err
}

if a.mixer != nil {
if a.src != nil {
// link src to mixer
if err := builder.LinkPads(
"audio src", builder.GetSrcPad(a.src),
"audio mixer", a.mixer[0].GetRequestPad("sink_%u"),
); err != nil {
return nil, err
}
}

if a.testSrc != nil {
// link test src to mixer
if err := builder.LinkPads(
"audio test src", builder.GetSrcPad(a.testSrc),
"audio mixer", a.mixer[0].GetRequestPad("sink_%u"),
); err != nil {
return nil, err
}
}

if err := gst.ElementLinkMany(a.mixer...); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
}

var srcPad *gst.Pad
var ghostPad *gst.Pad
if a.encoder != nil {
if a.mixer != nil {
if err := builder.LinkPads("audio mixer", builder.GetSrcPad(a.mixer), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil {
// link mixer to encoder
if err := builder.LinkPads(
"audio mixer", builder.GetSrcPad(a.mixer),
"audio encoder", a.encoder.GetStaticPad("sink"),
); err != nil {
return nil, err
}
} else {
if err := builder.LinkPads("audio decoder", builder.GetSrcPad(a.src), "audio encoder", a.encoder.GetStaticPad("sink")); err != nil {
// link src to encoder
if err := builder.LinkPads(
"audio src", builder.GetSrcPad(a.src),
"audio encoder", a.encoder.GetStaticPad("sink"),
); err != nil {
return nil, err
}
}
srcPad = a.encoder.GetStaticPad("src")
ghostPad = a.encoder.GetStaticPad("src")
} else if a.mixer != nil {
srcPad = builder.GetSrcPad(a.mixer)
ghostPad = builder.GetSrcPad(a.mixer)
} else {
srcPad = builder.GetSrcPad(a.src)
ghostPad = builder.GetSrcPad(a.src)
}

return gst.NewGhostPad("audio_src", srcPad), nil
return gst.NewGhostPad("audio_src", ghostPad), nil
}
8 changes: 4 additions & 4 deletions pkg/pipeline/input/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error {

switch p.SourceType {
case types.SourceTypeSDK:
if err := a.buildAppSource(p); err != nil {
if err := a.buildSDKInput(p); err != nil {
return err
}

case types.SourceTypeWeb:
if err := a.buildWebSource(p); err != nil {
if err := a.buildWebInput(p); err != nil {
return err
}
}
Expand Down Expand Up @@ -102,12 +102,12 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error {

switch p.SourceType {
case types.SourceTypeSDK:
if err := v.buildAppSource(p); err != nil {
if err := v.buildSDKInput(p); err != nil {
return err
}

case types.SourceTypeWeb:
if err := v.buildWebSource(p); err != nil {
if err := v.buildWebInput(p); err != nil {
return err
}
}
Expand Down
Loading