diff --git a/.github/workflows/publish-egress.yaml b/.github/workflows/publish-egress.yaml index 13eaaf78..91afef44 100644 --- a/.github/workflows/publish-egress.yaml +++ b/.github/workflows/publish-egress.yaml @@ -48,7 +48,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.18 + go-version: 1.20.x - name: Download Go modules run: go mod download diff --git a/build/egress/Dockerfile b/build/egress/Dockerfile index d29b3bd4..70aeb09d 100644 --- a/build/egress/Dockerfile +++ b/build/egress/Dockerfile @@ -19,7 +19,12 @@ ARG TARGETPLATFORM WORKDIR /workspace # install go -RUN apt-get update && apt-get install -y golang +RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ + wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \ + rm -rf /usr/local/go && \ + tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz +ENV PATH="/usr/local/go/bin:${PATH}" + # download go modules COPY go.mod . diff --git a/build/test/Dockerfile b/build/test/Dockerfile index 6ee4d6f7..1cdcd460 100644 --- a/build/test/Dockerfile +++ b/build/test/Dockerfile @@ -19,7 +19,11 @@ WORKDIR /workspace ARG TARGETPLATFORM # install go -RUN apt-get update && apt-get install -y golang +RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ + wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \ + rm -rf /usr/local/go && \ + tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz +ENV PATH="/usr/local/go/bin:${PATH}" # download go modules COPY go.mod . @@ -55,13 +59,19 @@ RUN apt-get update && \ ffmpeg \ fonts-noto \ gnupg \ - golang \ pulseaudio \ unzip \ wget \ xvfb \ gstreamer1.0-plugins-base- +# install go +RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ + wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \ + rm -rf /usr/local/go && \ + tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz +ENV PATH="/usr/local/go/bin:${PATH}" + # install chrome COPY --from=livekit/chrome-installer:117.0.5874.0 /chrome-installer /chrome-installer RUN /chrome-installer/install-chrome "$TARGETPLATFORM" diff --git a/build/test/entrypoint.sh b/build/test/entrypoint.sh index 0314e373..2cfc9b1d 100755 --- a/build/test/entrypoint.sh +++ b/build/test/entrypoint.sh @@ -24,8 +24,8 @@ pulseaudio -D --verbose --exit-idle-time=-1 --disallow-exit # Run tests if [[ -z ${GITHUB_WORKFLOW+x} ]]; then - exec ./test.test -test.v -test.timeout 20m + exec ./test.test -test.v -test.timeout 30m else go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest - exec go tool test2json -p egress ./test.test -test.v -test.timeout 20m 2>&1 | "$HOME"/go/bin/gotestfmt + exec go tool test2json -p egress ./test.test -test.v -test.timeout 30m 2>&1 | "$HOME"/go/bin/gotestfmt fi diff --git a/go.mod b/go.mod index e74d05bc..28535e8c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/livekit/egress -go 1.18 +go 1.20 replace github.com/tinyzimmer/go-glib v0.0.25 => github.com/livekit/go-glib v0.0.0-20230223001336-834490045522 diff --git a/pkg/config/service.go b/pkg/config/service.go index 757a55e1..04a68829 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) @@ -57,10 +58,12 @@ type CPUCostConfig struct { func NewServiceConfig(confString string) (*ServiceConfig, error) { conf := &ServiceConfig{ BaseConfig: BaseConfig{ + Logging: logger.Config{ + Level: "info", + }, ApiKey: os.Getenv("LIVEKIT_API_KEY"), ApiSecret: os.Getenv("LIVEKIT_API_SECRET"), WsUrl: os.Getenv("LIVEKIT_WS_URL"), - LogLevel: "info", }, TemplatePort: defaultTemplatePort, } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 6cc46756..7b4af40d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -31,7 +31,7 @@ var ( ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") - ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type") + ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") ) func New(err string) error { @@ -96,6 +96,10 @@ func ErrTrackNotFound(trackID string) error { return psrpc.NewErrorf(psrpc.NotFound, "track %s not found", trackID) } +func ErrParticipantNotFound(identity string) error { + return psrpc.NewErrorf(psrpc.NotFound, "participant %s not found", identity) +} + func ErrPadLinkFailed(src, sink, status string) error { return psrpc.NewErrorf(psrpc.Internal, "failed to link %s to %s: %s", src, sink, status) } diff --git a/pkg/pipeline/input/bin.go b/pkg/pipeline/input/bin.go index ae2fff0a..2b53b661 100644 --- a/pkg/pipeline/input/bin.go +++ b/pkg/pipeline/input/bin.go @@ -39,6 +39,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) b := &Bin{ bin: gst.NewBin("input"), } + + // build input if p.AudioEnabled { if err := b.buildAudioInput(p); err != nil { return nil, err @@ -49,6 +51,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) return nil, err } } + + // add bin to pipeline if err := pipeline.Add(b.bin.Element); err != nil { return nil, errors.ErrGstPipelineError(err) } @@ -59,6 +63,7 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { a := &audioInput{} + // build input switch p.SourceType { case types.SourceTypeSDK: if err := a.buildSDKInput(p); err != nil { @@ -71,13 +76,14 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { } } + // build encoder if p.AudioTranscoding { if err := a.buildEncoder(p); err != nil { return err } } - // Add elements to bin + // add elements to bin if err := b.bin.AddMany(a.src...); err != nil { return errors.ErrGstPipelineError(err) } @@ -100,6 +106,7 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { func (b *Bin) buildVideoInput(p *config.PipelineConfig) error { v := &videoInput{} + // build input switch p.SourceType { case types.SourceTypeSDK: if err := v.buildSDKInput(p); err != nil { @@ -112,13 +119,14 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error { } } + // build encoder if p.VideoTranscoding { if err := v.buildEncoder(p); err != nil { return err } } - // Add elements to bin + // add elements to bin if err := b.bin.AddMany(v.src...); err != nil { return errors.ErrGstPipelineError(err) } diff --git a/pkg/pipeline/input/video.go b/pkg/pipeline/input/video.go index d979af87..aca65aae 100644 --- a/pkg/pipeline/input/video.go +++ b/pkg/pipeline/input/video.go @@ -309,7 +309,7 @@ func (v *videoInput) buildEncoder(p *config.PipelineConfig) error { } v.encoder = append(v.encoder, vp9Enc) - return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec)) + fallthrough default: return errors.ErrNotSupported(fmt.Sprintf("%s encoding", p.VideoOutCodec)) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index c922fd74..a9fccade 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -14,7 +14,6 @@ package pipeline -import "C" import ( "context" "sync" diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index ca4a0024..2f2ba2dc 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -234,7 +234,7 @@ func (s *SDKSource) subscribe(track lksdk.TrackPublication) error { return pub.SetSubscribed(true) } - return errors.ErrInvalidTrack + return errors.ErrSubscriptionFailed } // ----- Callbacks ----- diff --git a/test/ffprobe.go b/test/ffprobe.go index ed5abab7..d10a4eab 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -247,7 +247,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre d, err := strconv.ParseFloat(frac[1], 64) require.NoError(t, err) require.NotZero(t, d) - require.Less(t, n/d, float64(p.Framerate)*1.05) + require.Less(t, n/d, float64(p.Framerate)*1.5) require.Greater(t, n/d, float64(sourceFramerate)*0.8) } diff --git a/test/track_composite.go b/test/track_composite.go index 9eb4fd92..90c9ea92 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -37,7 +37,8 @@ func (r *Runner) testTrackComposite(t *testing.T) { r.testTrackCompositeMulti(t) } -func (r *Runner) runTrackTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, +func (r *Runner) runTrackTest( + t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T, audioTrackID, videoTrackID string), ) { t.Run(name, func(t *testing.T) {