Skip to content

Commit

Permalink
Continue splitting up participant composite (#467)
Browse files Browse the repository at this point in the history
* continue splitting up participant composite

* go 1.20
  • Loading branch information
frostbyte73 authored Aug 16, 2023
1 parent cf060cc commit 918f62c
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-egress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.18
go-version: 1.20.x

- name: Download Go modules
run: go mod download
Expand Down
7 changes: 6 additions & 1 deletion build/egress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ ARG TARGETPLATFORM
WORKDIR /workspace

# install go
RUN apt-get update && apt-get install -y golang
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \
wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \
rm -rf /usr/local/go && \
tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz
ENV PATH="/usr/local/go/bin:${PATH}"


# download go modules
COPY go.mod .
Expand Down
14 changes: 12 additions & 2 deletions build/test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ WORKDIR /workspace
ARG TARGETPLATFORM

# install go
RUN apt-get update && apt-get install -y golang
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \
wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \
rm -rf /usr/local/go && \
tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz
ENV PATH="/usr/local/go/bin:${PATH}"

# download go modules
COPY go.mod .
Expand Down Expand Up @@ -55,13 +59,19 @@ RUN apt-get update && \
ffmpeg \
fonts-noto \
gnupg \
golang \
pulseaudio \
unzip \
wget \
xvfb \
gstreamer1.0-plugins-base-

# install go
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \
wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \
rm -rf /usr/local/go && \
tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz
ENV PATH="/usr/local/go/bin:${PATH}"

# install chrome
COPY --from=livekit/chrome-installer:117.0.5874.0 /chrome-installer /chrome-installer
RUN /chrome-installer/install-chrome "$TARGETPLATFORM"
Expand Down
4 changes: 2 additions & 2 deletions build/test/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pulseaudio -D --verbose --exit-idle-time=-1 --disallow-exit

# Run tests
if [[ -z ${GITHUB_WORKFLOW+x} ]]; then
exec ./test.test -test.v -test.timeout 20m
exec ./test.test -test.v -test.timeout 30m
else
go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
exec go tool test2json -p egress ./test.test -test.v -test.timeout 20m 2>&1 | "$HOME"/go/bin/gotestfmt
exec go tool test2json -p egress ./test.test -test.v -test.timeout 30m 2>&1 | "$HOME"/go/bin/gotestfmt
fi
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/livekit/egress

go 1.18
go 1.20

replace github.com/tinyzimmer/go-glib v0.0.25 => github.com/livekit/go-glib v0.0.0-20230223001336-834490045522

Expand Down
5 changes: 4 additions & 1 deletion pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)

Expand Down Expand Up @@ -57,10 +58,12 @@ type CPUCostConfig struct {
func NewServiceConfig(confString string) (*ServiceConfig, error) {
conf := &ServiceConfig{
BaseConfig: BaseConfig{
Logging: logger.Config{
Level: "info",
},
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
LogLevel: "info",
},
TemplatePort: defaultTemplatePort,
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
)

func New(err string) error {
Expand Down Expand Up @@ -96,6 +96,10 @@ func ErrTrackNotFound(trackID string) error {
return psrpc.NewErrorf(psrpc.NotFound, "track %s not found", trackID)
}

func ErrParticipantNotFound(identity string) error {
return psrpc.NewErrorf(psrpc.NotFound, "participant %s not found", identity)
}

func ErrPadLinkFailed(src, sink, status string) error {
return psrpc.NewErrorf(psrpc.Internal, "failed to link %s to %s: %s", src, sink, status)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/pipeline/input/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig)
b := &Bin{
bin: gst.NewBin("input"),
}

// build input
if p.AudioEnabled {
if err := b.buildAudioInput(p); err != nil {
return nil, err
Expand All @@ -49,6 +51,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig)
return nil, err
}
}

// add bin to pipeline
if err := pipeline.Add(b.bin.Element); err != nil {
return nil, errors.ErrGstPipelineError(err)
}
Expand All @@ -59,6 +63,7 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig)
func (b *Bin) buildAudioInput(p *config.PipelineConfig) error {
a := &audioInput{}

// build input
switch p.SourceType {
case types.SourceTypeSDK:
if err := a.buildSDKInput(p); err != nil {
Expand All @@ -71,13 +76,14 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error {
}
}

// build encoder
if p.AudioTranscoding {
if err := a.buildEncoder(p); err != nil {
return err
}
}

// Add elements to bin
// add elements to bin
if err := b.bin.AddMany(a.src...); err != nil {
return errors.ErrGstPipelineError(err)
}
Expand All @@ -100,6 +106,7 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error {
func (b *Bin) buildVideoInput(p *config.PipelineConfig) error {
v := &videoInput{}

// build input
switch p.SourceType {
case types.SourceTypeSDK:
if err := v.buildSDKInput(p); err != nil {
Expand All @@ -112,13 +119,14 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error {
}
}

// build encoder
if p.VideoTranscoding {
if err := v.buildEncoder(p); err != nil {
return err
}
}

// Add elements to bin
// add elements to bin
if err := b.bin.AddMany(v.src...); err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/input/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (v *videoInput) buildEncoder(p *config.PipelineConfig) error {
}

v.encoder = append(v.encoder, vp9Enc)
return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec))
fallthrough

default:
return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec))
Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package pipeline

import "C"
import (
"context"
"sync"
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *SDKSource) subscribe(track lksdk.TrackPublication) error {
return pub.SetSubscribed(true)
}

return errors.ErrInvalidTrack
return errors.ErrSubscriptionFailed
}

// ----- Callbacks -----
Expand Down
2 changes: 1 addition & 1 deletion test/ffprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre
d, err := strconv.ParseFloat(frac[1], 64)
require.NoError(t, err)
require.NotZero(t, d)
require.Less(t, n/d, float64(p.Framerate)*1.05)
require.Less(t, n/d, float64(p.Framerate)*1.5)
require.Greater(t, n/d, float64(sourceFramerate)*0.8)

}
Expand Down
3 changes: 2 additions & 1 deletion test/track_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (r *Runner) testTrackComposite(t *testing.T) {
r.testTrackCompositeMulti(t)
}

func (r *Runner) runTrackTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType,
func (r *Runner) runTrackTest(
t *testing.T, name string, audioCodec, videoCodec types.MimeType,
f func(t *testing.T, audioTrackID, videoTrackID string),
) {
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit 918f62c

Please sign in to comment.