Skip to content

Commit

Permalink
update io client (#781)
Browse files Browse the repository at this point in the history
* update io client

* avoid scenario where node will never become healthy again

* fix context cancel by grpc

* fix local/short testing

* add initDefaults for config

* avoid panic on full update channel, update logging
  • Loading branch information
frostbyte73 authored Sep 25, 2024
1 parent ae762f1 commit 86eb9e9
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 136 deletions.
65 changes: 35 additions & 30 deletions pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) {
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
},
TemplatePort: defaultTemplatePort,
CPUCostConfig: &CPUCostConfig{},
}
if confString != "" {
Expand All @@ -87,46 +86,52 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) {

// always create a new node ID
conf.NodeID = utils.NewGuid("NE_")
conf.InitDefaults()

// Setting CPU costs from config. Ensure that CPU costs are positive
if conf.RoomCompositeCpuCost <= 0 {
conf.RoomCompositeCpuCost = roomCompositeCpuCost
if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil {
return nil, err
}

return conf, nil
}

func (c *ServiceConfig) InitDefaults() {
if c.TemplatePort == 0 {
c.TemplatePort = defaultTemplatePort
}
if conf.AudioRoomCompositeCpuCost <= 0 {
conf.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost
if c.TemplateBase == "" {
c.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, c.TemplatePort)
}
if conf.WebCpuCost <= 0 {
conf.WebCpuCost = webCpuCost

// Setting CPU costs from config. Ensure that CPU costs are positive
if c.RoomCompositeCpuCost <= 0 {
c.RoomCompositeCpuCost = roomCompositeCpuCost
}
if conf.AudioWebCpuCost <= 0 {
conf.AudioWebCpuCost = audioWebCpuCost
if c.AudioRoomCompositeCpuCost <= 0 {
c.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost
}
if conf.ParticipantCpuCost <= 0 {
conf.ParticipantCpuCost = participantCpuCost
if c.WebCpuCost <= 0 {
c.WebCpuCost = webCpuCost
}
if conf.TrackCompositeCpuCost <= 0 {
conf.TrackCompositeCpuCost = trackCompositeCpuCost
if c.AudioWebCpuCost <= 0 {
c.AudioWebCpuCost = audioWebCpuCost
}
if conf.TrackCpuCost <= 0 {
conf.TrackCpuCost = trackCpuCost
if c.ParticipantCpuCost <= 0 {
c.ParticipantCpuCost = participantCpuCost
}
if conf.MaxCpuUtilization <= 0 || conf.MaxCpuUtilization > 1 {
conf.MaxCpuUtilization = maxCpuUtilization
if c.TrackCompositeCpuCost <= 0 {
c.TrackCompositeCpuCost = trackCompositeCpuCost
}
if conf.MaxConcurrentWeb <= 0 {
conf.MaxConcurrentWeb = maxConcurrentWeb
if c.TrackCpuCost <= 0 {
c.TrackCpuCost = trackCpuCost
}
if conf.MaxUploadQueue <= 0 {
conf.MaxUploadQueue = maxUploadQueue
if c.MaxCpuUtilization <= 0 || c.MaxCpuUtilization > 1 {
c.MaxCpuUtilization = maxCpuUtilization
}

if conf.TemplateBase == "" {
conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort)
if c.MaxConcurrentWeb <= 0 {
c.MaxConcurrentWeb = maxConcurrentWeb
}

if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil {
return nil, err
if c.MaxUploadQueue <= 0 {
c.MaxUploadQueue = maxUploadQueue
}

return conf, nil
}
173 changes: 95 additions & 78 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/frostbyte73/core"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
Expand All @@ -31,29 +32,37 @@ import (
)

const (
ioTimeout = time.Second * 30
ioTimeout = time.Second * 30
maxBackoff = time.Minute * 10
)

type IOClient interface {
CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error
IsHealthy() bool
Drain()
}

type ioClient struct {
rpc.IOInfoClient

mu sync.Mutex
egresses map[string]*egressIOClient
egresses map[string]*egressCreation
updates chan *update

healthy atomic.Bool
draining core.Fuse
done core.Fuse
}

type egressIOClient struct {
created core.Fuse
aborted core.Fuse
type egressCreation struct {
pending *update
}

mu sync.Mutex
pending chan *livekit.EgressInfo
type update struct {
ctx context.Context
info *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
Expand All @@ -62,123 +71,131 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
return nil, err
}

return &ioClient{
c := &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressIOClient),
}, nil
egresses: make(map[string]*egressCreation),
updates: make(chan *update, 1000),
}
c.healthy.Store(true)
go c.updateWorker()

return c, nil
}

func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo, 10),
}
e := &egressCreation{}

c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info)

c.mu.Lock()
defer c.mu.Unlock()

delete(c.egresses, info.EgressId)

if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()
logger.Errorw("failed to create egress", err, "egressID", info.EgressId)
errChan <- err
return
}

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
} else {
e.created.Break()
errChan <- nil
if e.pending != nil {
c.updates <- e.pending
}

errChan <- nil
}()

return errChan
}

func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return errors.ErrEgressNotFound
u := &update{
ctx: ctx,
info: info,
}

// ensure updates are sent sequentially
e.pending <- info
c.mu.Lock()
if e, ok := c.egresses[info.EgressId]; ok {
e.pending = u
c.mu.Unlock()
return nil
}
c.mu.Unlock()

select {
case <-e.created.Watch():
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
case c.updates <- u:
return nil
default:
return errors.New("channel full or closed")
}
}

// ensure only one thread is sending updates sequentially
e.mu.Lock()
defer e.mu.Unlock()
func (c *ioClient) updateWorker() {
draining := c.draining.Watch()
for {
select {
case update := <-e.pending:
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update)
if err == nil {
break
case u := <-c.updates:
c.sendUpdate(u)
case <-draining:
c.done.Break()
return
}
}
}

func (c *ioClient) sendUpdate(u *update) {
d := time.Millisecond * 250
for {
if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil {
if errors.Is(err, psrpc.ErrRequestTimedOut) {
if c.healthy.Swap(false) {
logger.Infow("io connection unhealthy")
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return err
d = min(d*2, maxBackoff)
time.Sleep(d)
continue
}

requestType, outputType := egress.GetTypes(update.Request)
logger.Infow(strings.ToLower(update.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", update.Error,
"code", update.ErrorCode,
"details", update.Details,
)

switch update.Status {
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// egress is done, delete ioEgressClient
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
}
logger.Errorw("failed to update egress", err, "egressID", u.info.EgressId)
return
}

default:
return nil
if !c.healthy.Swap(true) {
logger.Infow("io connection restored")
}
requestType, outputType := egress.GetTypes(u.info.Request)
logger.Infow(strings.ToLower(u.info.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", u.info.Error,
"code", u.info.ErrorCode,
"details", u.info.Details,
)
return
}
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update ms", err)
logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId)
return err
}

return nil
}

func (c *ioClient) IsHealthy() bool {
return c.healthy.Load()
}

func (c *ioClient) Drain() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
if len(c.egresses) == 0 {
c.mu.Unlock()
return
}
c.mu.Unlock()
}
c.draining.Break()
<-c.done.Watch()
}
10 changes: 4 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,19 @@ func (s *Server) Run() error {
}

func (s *Server) Status() ([]byte, error) {
info := map[string]interface{}{
status := map[string]interface{}{
"CpuLoad": s.monitor.GetAvailableCPU(),
}

s.GetStatus(info)

return json.Marshal(info)
s.GetStatus(status)
return json.Marshal(status)
}

func (s *Server) IsIdle() bool {
return s.activeRequests.Load() == 0
}

func (s *Server) IsDisabled() bool {
return s.shutdown.IsBroken()
return s.shutdown.IsBroken() || !s.ioClient.IsHealthy()
}

func (s *Server) IsTerminating() bool {
Expand Down
Loading

0 comments on commit 86eb9e9

Please sign in to comment.