Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline/Bin subclassing #468

Merged
merged 10 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging logger.Config `yaml:"logging"` // logging config
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
return err
}

zl, err := logger.NewZapLogger(&c.Logging)
zl, err := logger.NewZapLogger(c.Logging)
if err != nil {
return err
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
Expand All @@ -51,12 +52,13 @@ type PipelineConfig struct {
SourceConfig `yaml:"-"`
AudioConfig `yaml:"-"`
VideoConfig `yaml:"-"`
*Callbacks `yaml:"-"`

Outputs map[types.EgressType]OutputConfig `yaml:"-"`
OutputCount int `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"`

Info *livekit.EgressInfo `yaml:"-"`
}

Expand Down Expand Up @@ -96,6 +98,7 @@ type TrackSource struct {
MimeType types.MimeType
PayloadType webrtc.PayloadType
ClockRate uint32
EOSFunc func()
}

type AudioConfig struct {
Expand All @@ -119,23 +122,14 @@ type VideoConfig struct {
KeyFrameInterval float64
}

type Callbacks struct {
GstReady chan struct{} `yaml:"-"`
OnTrackMuted func(string) `yaml:"-"`
OnTrackUnmuted func(string) `yaml:"-"`
OnTrackAdded func(*TrackSource) `yaml:"-"`
OnTrackRemoved func(trackID string) `yaml:"-"`
OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"`
OnFailure func(error) `yaml:"-"`
}

func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*PipelineConfig, error) {
p := &PipelineConfig{
BaseConfig: BaseConfig{},
Outputs: make(map[types.EgressType]OutputConfig),
Callbacks: &Callbacks{
GstReady: make(chan struct{}),
BaseConfig: BaseConfig{
Logging: &logger.Config{
Level: "info",
},
},
Outputs: make(map[types.EgressType]OutputConfig),
}

if err := yaml.Unmarshal([]byte(confString), p); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type CPUCostConfig struct {
func NewServiceConfig(confString string) (*ServiceConfig, error) {
conf := &ServiceConfig{
BaseConfig: BaseConfig{
Logging: logger.Config{
Logging: &logger.Config{
Level: "info",
},
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
Expand Down
13 changes: 13 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"strings"

"github.com/tinyzimmer/go-gst/gst"

"github.com/livekit/psrpc"
)

Expand All @@ -32,6 +34,7 @@ var (
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
)

func New(err string) error {
Expand Down Expand Up @@ -122,6 +125,10 @@ func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrStateChangeFailed(bin string, state gst.State) error {
return psrpc.NewErrorf(psrpc.Internal, "%s failed to change state to %s", bin, state.String())
}

type ErrArray struct {
errs []error
}
Expand All @@ -130,6 +137,12 @@ func (e *ErrArray) AppendErr(err error) {
e.errs = append(e.errs, err)
}

func (e *ErrArray) Check(err error) {
if err != nil {
e.errs = append(e.errs, err)
}
}

func (e *ErrArray) ToError() psrpc.Error {
if len(e.errs) == 0 {
return nil
Expand Down
Loading