Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update io client #781

Merged
merged 7 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) {
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
},
TemplatePort: defaultTemplatePort,
CPUCostConfig: &CPUCostConfig{},
}
if confString != "" {
Expand All @@ -87,46 +86,52 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) {

// always create a new node ID
conf.NodeID = utils.NewGuid("NE_")
conf.InitDefaults()

// Setting CPU costs from config. Ensure that CPU costs are positive
if conf.RoomCompositeCpuCost <= 0 {
conf.RoomCompositeCpuCost = roomCompositeCpuCost
if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil {
return nil, err
}

return conf, nil
}

func (c *ServiceConfig) InitDefaults() {
if c.TemplatePort == 0 {
c.TemplatePort = defaultTemplatePort
}
if conf.AudioRoomCompositeCpuCost <= 0 {
conf.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost
if c.TemplateBase == "" {
c.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, c.TemplatePort)
}
if conf.WebCpuCost <= 0 {
conf.WebCpuCost = webCpuCost

// Setting CPU costs from config. Ensure that CPU costs are positive
if c.RoomCompositeCpuCost <= 0 {
c.RoomCompositeCpuCost = roomCompositeCpuCost
}
if conf.AudioWebCpuCost <= 0 {
conf.AudioWebCpuCost = audioWebCpuCost
if c.AudioRoomCompositeCpuCost <= 0 {
c.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost
}
if conf.ParticipantCpuCost <= 0 {
conf.ParticipantCpuCost = participantCpuCost
if c.WebCpuCost <= 0 {
c.WebCpuCost = webCpuCost
}
if conf.TrackCompositeCpuCost <= 0 {
conf.TrackCompositeCpuCost = trackCompositeCpuCost
if c.AudioWebCpuCost <= 0 {
c.AudioWebCpuCost = audioWebCpuCost
}
if conf.TrackCpuCost <= 0 {
conf.TrackCpuCost = trackCpuCost
if c.ParticipantCpuCost <= 0 {
c.ParticipantCpuCost = participantCpuCost
}
if conf.MaxCpuUtilization <= 0 || conf.MaxCpuUtilization > 1 {
conf.MaxCpuUtilization = maxCpuUtilization
if c.TrackCompositeCpuCost <= 0 {
c.TrackCompositeCpuCost = trackCompositeCpuCost
}
if conf.MaxConcurrentWeb <= 0 {
conf.MaxConcurrentWeb = maxConcurrentWeb
if c.TrackCpuCost <= 0 {
c.TrackCpuCost = trackCpuCost
}
if conf.MaxUploadQueue <= 0 {
conf.MaxUploadQueue = maxUploadQueue
if c.MaxCpuUtilization <= 0 || c.MaxCpuUtilization > 1 {
c.MaxCpuUtilization = maxCpuUtilization
}

if conf.TemplateBase == "" {
conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort)
if c.MaxConcurrentWeb <= 0 {
c.MaxConcurrentWeb = maxConcurrentWeb
}

if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil {
return nil, err
if c.MaxUploadQueue <= 0 {
c.MaxUploadQueue = maxUploadQueue
}

return conf, nil
}
173 changes: 95 additions & 78 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/frostbyte73/core"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
Expand All @@ -31,29 +32,37 @@ import (
)

const (
ioTimeout = time.Second * 30
ioTimeout = time.Second * 30
maxBackoff = time.Minute * 10
)

type IOClient interface {
CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error
IsHealthy() bool
Drain()
}

type ioClient struct {
rpc.IOInfoClient

mu sync.Mutex
egresses map[string]*egressIOClient
egresses map[string]*egressCreation
updates chan *update

healthy atomic.Bool
draining core.Fuse
done core.Fuse
}

type egressIOClient struct {
created core.Fuse
aborted core.Fuse
type egressCreation struct {
pending *update
}

mu sync.Mutex
pending chan *livekit.EgressInfo
type update struct {
ctx context.Context
info *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
Expand All @@ -62,123 +71,131 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
return nil, err
}

return &ioClient{
c := &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressIOClient),
}, nil
egresses: make(map[string]*egressCreation),
updates: make(chan *update, 1000),
}
c.healthy.Store(true)
go c.updateWorker()

return c, nil
}

func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo, 10),
}
e := &egressCreation{}

c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info)

c.mu.Lock()
defer c.mu.Unlock()

delete(c.egresses, info.EgressId)

if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()
logger.Errorw("failed to create egress", err, "egressID", info.EgressId)
errChan <- err
return
}

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
} else {
e.created.Break()
errChan <- nil
if e.pending != nil {
c.updates <- e.pending
}

errChan <- nil
}()

return errChan
}

func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return errors.ErrEgressNotFound
u := &update{
ctx: ctx,
info: info,
}

// ensure updates are sent sequentially
e.pending <- info
c.mu.Lock()
if e, ok := c.egresses[info.EgressId]; ok {
e.pending = u
c.mu.Unlock()
return nil
}
c.mu.Unlock()

select {
case <-e.created.Watch():
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
case c.updates <- u:
return nil
default:
return errors.New("channel full or closed")
}
}

// ensure only one thread is sending updates sequentially
e.mu.Lock()
defer e.mu.Unlock()
func (c *ioClient) updateWorker() {
draining := c.draining.Watch()
for {
select {
case update := <-e.pending:
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update)
if err == nil {
break
case u := <-c.updates:
c.sendUpdate(u)
case <-draining:
c.done.Break()
return
}
}
}

func (c *ioClient) sendUpdate(u *update) {
d := time.Millisecond * 250
for {
if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil {
if errors.Is(err, psrpc.ErrRequestTimedOut) {
if c.healthy.Swap(false) {
logger.Infow("io connection unhealthy")
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return err
d = min(d*2, maxBackoff)
time.Sleep(d)
continue
}

requestType, outputType := egress.GetTypes(update.Request)
logger.Infow(strings.ToLower(update.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", update.Error,
"code", update.ErrorCode,
"details", update.Details,
)

switch update.Status {
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// egress is done, delete ioEgressClient
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
}
logger.Errorw("failed to update egress", err, "egressID", u.info.EgressId)
return
}

default:
return nil
if !c.healthy.Swap(true) {
logger.Infow("io connection restored")
}
requestType, outputType := egress.GetTypes(u.info.Request)
logger.Infow(strings.ToLower(u.info.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", u.info.Error,
"code", u.info.ErrorCode,
"details", u.info.Details,
)
return
}
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update ms", err)
logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId)
return err
}

return nil
}

func (c *ioClient) IsHealthy() bool {
return c.healthy.Load()
}

func (c *ioClient) Drain() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
if len(c.egresses) == 0 {
c.mu.Unlock()
return
}
c.mu.Unlock()
}
c.draining.Break()
<-c.done.Watch()
}
10 changes: 4 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,19 @@ func (s *Server) Run() error {
}

func (s *Server) Status() ([]byte, error) {
info := map[string]interface{}{
status := map[string]interface{}{
"CpuLoad": s.monitor.GetAvailableCPU(),
}

s.GetStatus(info)

return json.Marshal(info)
s.GetStatus(status)
return json.Marshal(status)
}

func (s *Server) IsIdle() bool {
return s.activeRequests.Load() == 0
}

func (s *Server) IsDisabled() bool {
return s.shutdown.IsBroken()
return s.shutdown.IsBroken() || !s.ioClient.IsHealthy()
}

func (s *Server) IsTerminating() bool {
Expand Down
Loading
Loading