Skip to content

Commit

Permalink
add disallow_local_storage config option (#795)
Browse files Browse the repository at this point in the history
* add disallow-local config option

* only upload manifest on success

* use estimated cpu from request if available
  • Loading branch information
frostbyte73 authored Oct 24, 2024
1 parent 1f35c4b commit ea5cdb3
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 64 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 h1:EgULMfdFSW/3ZZckhiF+CwDApYTD3SkqR3MYazKeE5w=
github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292 h1:wVzOLGSjJpCsdKHKKpPxYhXW/JL90l0XYFQbeINSdP4=
github.com/livekit/protocol v1.27.1-0.20241022061022-caa595ed3292/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ type fileRequest interface {
}

func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequest) (*FileConfig, error) {
sc, err := p.getStorageConfig(req)
if err != nil {
return nil, err
}

conf := &FileConfig{
outputConfig: outputConfig{OutputType: outputType},
FileInfo: &livekit.FileInfo{},
StorageFilepath: clean(req.GetFilepath()),
DisableManifest: req.GetDisableManifest(),
StorageConfig: p.getStorageConfig(req),
StorageConfig: sc,
}

// filename
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
return nil, err
}

sc, err := p.getStorageConfig(images)
if err != nil {
return nil, err
}

filenamePrefix := clean(images.FilenamePrefix)
conf := &ImageConfig{
outputConfig: outputConfig{
Expand All @@ -77,7 +82,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
ImagePrefix: filenamePrefix,
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
StorageConfig: p.getStorageConfig(images),
StorageConfig: sc,
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
Expand Down
10 changes: 7 additions & 3 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {

// segments should always be added last, so we can check keyframe interval from file/stream
func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) {
sc, err := p.getStorageConfig(segments)
if err != nil {
return nil, err
}

conf := &SegmentConfig{
SegmentsInfo: &livekit.SegmentsInfo{},
SegmentPrefix: clean(segments.FilenamePrefix),
Expand All @@ -60,7 +65,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
LivePlaylistFilename: clean(segments.LivePlaylistName),
SegmentDuration: int(segments.SegmentDuration),
DisableManifest: segments.DisableManifest,
StorageConfig: p.getStorageConfig(segments),
StorageConfig: sc,
}

if conf.SegmentDuration == 0 {
Expand All @@ -74,8 +79,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
}

// filename
err := conf.updatePrefixAndPlaylist(p)
if err != nil {
if err = conf.updatePrefixAndPlaylist(p); err != nil {
return nil, err
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"time"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ type GCPConfig struct {
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
}

func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig {
func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) {
sc := &StorageConfig{}
if p.StorageConfig != nil {
sc.PathPrefix = p.StorageConfig.PathPrefix
Expand Down Expand Up @@ -102,7 +103,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
if sc.S3.MinRetryDelay == 0 {
sc.S3.MinRetryDelay = time.Millisecond * 100
}
return sc
return sc, nil
}

if gcp := req.GetGcp(); gcp != nil {
Expand All @@ -117,7 +118,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Password: gcp.Proxy.Password,
}
}
return sc
return sc, nil
}

if azure := req.GetAzure(); azure != nil {
Expand All @@ -126,7 +127,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
AccountKey: azure.AccountKey,
ContainerName: azure.ContainerName,
}
return sc
return sc, nil
}

if ali := req.GetAliOSS(); ali != nil {
Expand All @@ -137,8 +138,17 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Endpoint: ali.Endpoint,
Bucket: ali.Bucket,
}
return sc
return sc, nil
}

return p.StorageConfig
sc = p.StorageConfig
if p.DisallowLocalStorage && (sc == nil || sc.IsLocal()) {
return nil, errors.ErrInvalidInput("output")
}

return sc, nil
}

func (c *StorageConfig) IsLocal() bool {
return c.S3 == nil && c.GCP == nil && c.Azure == nil && c.AliOSS == nil
}
9 changes: 6 additions & 3 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,13 @@ func (c *Controller) Close() {
case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
c.Info.SetComplete()
}
fallthrough

// upload manifest and add location to egress info
c.uploadManifest()
case livekit.EgressStatus_EGRESS_LIMIT_REACHED,
livekit.EgressStatus_EGRESS_COMPLETE:
// upload manifest and add location to egress info
c.uploadManifest()
}
}

func (c *Controller) startSessionLimitTimer(ctx context.Context) {
Expand Down
40 changes: 25 additions & 15 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,30 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
"request", p.Info.Request,
)

errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err = <-errChan; err != nil {
s.AbortProcess(req.EgressId, err)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
info := (*livekit.EgressInfo)(p.Info)

errChan := s.ioClient.CreateEgress(ctx, info)
launchErr := s.launchProcess(req, info)
createErr := <-errChan

if launchErr != nil {
if createErr == nil {
// send failed update if it was saved to db
s.processEnded(req, info, launchErr)
}
return nil, launchErr
} else if createErr != nil {
// launched but failed to save - abort and return error
info.Error = createErr.Error()
info.ErrorCode = int32(http.StatusInternalServerError)
s.AbortProcess(req.EgressId, createErr)
return nil, createErr
}

return (*livekit.EgressInfo)(p.Info), nil
}

func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) {
func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error {
_, span := tracer.Start(context.Background(), "Service.launchProcess")
defer span.End()

Expand All @@ -102,16 +113,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal config", err)
s.processEnded(req, info, err)
return
return err
}

reqString, err := protojson.Marshal(req)
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal request", err)
s.processEnded(req, info, err)
return
return err
}

cmd := exec.Command("egress",
Expand All @@ -125,13 +134,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}

if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil {
s.processEnded(req, info, err)
return err
} else {
s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid)
go func() {
err = cmd.Wait()
s.processEnded(req, info, err)
}()
return nil
}
}

Expand All @@ -145,8 +155,8 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_ = s.ioClient.UpdateEgress(context.Background(), info)
logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)

logger.Errorw("process failed", err)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
Expand Down
55 changes: 28 additions & 27 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,36 +163,37 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) ([]interfa
"activeWeb", m.webRequests.Load(),
}

var accept bool
var required float64
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
required = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
} else {
required = m.cpuCostConfig.WebCpuCost
required := req.EstimatedCpu
if required == 0 {
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
required = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return fields, false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
} else {
required = m.cpuCostConfig.WebCpuCost
}
case *rpc.StartEgressRequest_Participant:
required = m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
required = m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
required = m.cpuCostConfig.TrackCpuCost
}
case *rpc.StartEgressRequest_Participant:
required = m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
required = m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
required = m.cpuCostConfig.TrackCpuCost
}
accept = available >= required

accept := available >= required
fields = append(fields,
"required", required,
"canAccept", accept,
Expand Down

0 comments on commit ea5cdb3

Please sign in to comment.