From c2a95668605c2a5869e0794ee8f958a8705cec5b Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 17:38:59 -0700 Subject: [PATCH 1/6] update io client --- pkg/info/io.go | 173 ++++++++++++++++++++++++------------------- pkg/server/server.go | 10 +-- 2 files changed, 99 insertions(+), 84 deletions(-) diff --git a/pkg/info/io.go b/pkg/info/io.go index 62998c97..561a7de5 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -21,6 +21,7 @@ import ( "time" "github.com/frostbyte73/core" + "go.uber.org/atomic" "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/egress" @@ -31,13 +32,15 @@ import ( ) const ( - ioTimeout = time.Second * 30 + ioTimeout = time.Second * 30 + maxBackoff = time.Minute * 10 ) type IOClient interface { CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error + IsHealthy() bool Drain() } @@ -45,15 +48,21 @@ type ioClient struct { rpc.IOInfoClient mu sync.Mutex - egresses map[string]*egressIOClient + egresses map[string]*egressCreation + updates chan *update + + healthy atomic.Bool + draining core.Fuse + done core.Fuse } -type egressIOClient struct { - created core.Fuse - aborted core.Fuse +type egressCreation struct { + pending *update +} - mu sync.Mutex - pending chan *livekit.EgressInfo +type update struct { + ctx context.Context + info *livekit.EgressInfo } func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { @@ -62,16 +71,20 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { return nil, err } - return &ioClient{ + c := &ioClient{ IOInfoClient: client, - egresses: make(map[string]*egressIOClient), - }, nil + egresses: make(map[string]*egressCreation), + updates: make(chan *update, 1000), + } + c.healthy.Store(true) + go c.updateWorker() + + return c, nil } func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error { - e := &egressIOClient{ - pending: make(chan *livekit.EgressInfo, 10), - } + e := &egressCreation{} + c.mu.Lock() c.egresses[info.EgressId] = e c.mu.Unlock() @@ -79,84 +92,92 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c errChan := make(chan error, 1) go func() { _, err := c.IOInfoClient.CreateEgress(ctx, info) + + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.egresses, info.EgressId) + if err != nil { logger.Errorw("failed to create egress", err) - e.aborted.Break() + if errors.Is(err, psrpc.ErrRequestTimedOut) && c.healthy.Swap(false) { + logger.Infow("io connection unhealthy") + } errChan <- err + return + } else if !c.healthy.Swap(true) { + logger.Infow("io connection restored") + } - c.mu.Lock() - delete(c.egresses, info.EgressId) - c.mu.Unlock() - } else { - e.created.Break() - errChan <- nil + if e.pending != nil { + c.updates <- e.pending } + errChan <- nil }() return errChan } func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error { - c.mu.Lock() - e, ok := c.egresses[info.EgressId] - c.mu.Unlock() - if !ok { - return errors.ErrEgressNotFound + u := &update{ + ctx: ctx, + info: info, } - // ensure updates are sent sequentially - e.pending <- info - - select { - case <-e.created.Watch(): - // egress was created, continue - case <-e.aborted.Watch(): - // egress was aborted, ignore + c.mu.Lock() + if e, ok := c.egresses[info.EgressId]; ok { + e.pending = u + c.mu.Unlock() return nil } + c.mu.Unlock() - // ensure only one thread is sending updates sequentially - e.mu.Lock() - defer e.mu.Unlock() + c.updates <- u + return nil +} + +func (c *ioClient) updateWorker() { + draining := c.draining.Watch() for { select { - case update := <-e.pending: - var err error - for i := 0; i < 10; i++ { - _, err = c.IOInfoClient.UpdateEgress(ctx, update) - if err == nil { - break + case u := <-c.updates: + c.sendUpdate(u) + case <-draining: + c.done.Break() + return + } + } +} + +func (c *ioClient) sendUpdate(u *update) { + d := time.Millisecond * 250 + for { + if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil { + if errors.Is(err, psrpc.ErrRequestTimedOut) { + if c.healthy.Swap(false) { + logger.Infow("io connection unhealthy") } - time.Sleep(time.Millisecond * 100 * time.Duration(i)) - } - if err != nil { - logger.Warnw("failed to update egress", err, "egressID", update.EgressId) - return err + d = min(d*2, maxBackoff) + time.Sleep(d) + continue } - requestType, outputType := egress.GetTypes(update.Request) - logger.Infow(strings.ToLower(update.Status.String()), - "requestType", requestType, - "outputType", outputType, - "error", update.Error, - "code", update.ErrorCode, - "details", update.Details, - ) - - switch update.Status { - case livekit.EgressStatus_EGRESS_COMPLETE, - livekit.EgressStatus_EGRESS_FAILED, - livekit.EgressStatus_EGRESS_ABORTED, - livekit.EgressStatus_EGRESS_LIMIT_REACHED: - // egress is done, delete ioEgressClient - c.mu.Lock() - delete(c.egresses, info.EgressId) - c.mu.Unlock() - } + logger.Errorw("failed to update egress", err) + return + } - default: - return nil + if !c.healthy.Swap(true) { + logger.Infow("io connection restored") } + requestType, outputType := egress.GetTypes(u.info.Request) + logger.Infow(strings.ToLower(u.info.Status.String()), + "requestType", requestType, + "outputType", outputType, + "error", u.info.Error, + "code", u.info.ErrorCode, + "details", u.info.Details, + ) + return } } @@ -170,15 +191,11 @@ func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequ return nil } +func (c *ioClient) IsHealthy() bool { + return c.healthy.Load() +} + func (c *ioClient) Drain() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for range ticker.C { - c.mu.Lock() - if len(c.egresses) == 0 { - c.mu.Unlock() - return - } - c.mu.Unlock() - } + c.draining.Break() + <-c.done.Watch() } diff --git a/pkg/server/server.go b/pkg/server/server.go index 6ba43e6f..9f9035a1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -154,13 +154,11 @@ func (s *Server) Run() error { } func (s *Server) Status() ([]byte, error) { - info := map[string]interface{}{ + status := map[string]interface{}{ "CpuLoad": s.monitor.GetAvailableCPU(), } - - s.GetStatus(info) - - return json.Marshal(info) + s.GetStatus(status) + return json.Marshal(status) } func (s *Server) IsIdle() bool { @@ -168,7 +166,7 @@ func (s *Server) IsIdle() bool { } func (s *Server) IsDisabled() bool { - return s.shutdown.IsBroken() + return s.shutdown.IsBroken() || !s.ioClient.IsHealthy() } func (s *Server) IsTerminating() bool { From 2eec45dea8df25e1365e966034946a008c44ea0b Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 17:45:10 -0700 Subject: [PATCH 2/6] avoid scenario where node will never become healthy again --- pkg/info/io.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/info/io.go b/pkg/info/io.go index 561a7de5..3e0141c4 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -100,18 +100,14 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c if err != nil { logger.Errorw("failed to create egress", err) - if errors.Is(err, psrpc.ErrRequestTimedOut) && c.healthy.Swap(false) { - logger.Infow("io connection unhealthy") - } errChan <- err return - } else if !c.healthy.Swap(true) { - logger.Infow("io connection restored") } if e.pending != nil { c.updates <- e.pending } + errChan <- nil }() From 9ff7a999daf139ebb63e5ed6a5c4e7f6ba1a022d Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 18:17:05 -0700 Subject: [PATCH 3/6] fix context cancel by grpc --- pkg/server/server_ipc.go | 8 ++++---- test/ioserver.go | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/server/server_ipc.go b/pkg/server/server_ipc.go index 8fd0c623..393c83a4 100644 --- a/pkg/server/server_ipc.go +++ b/pkg/server/server_ipc.go @@ -34,8 +34,8 @@ func (s *Server) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) ( return &emptypb.Empty{}, nil } -func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { - if err := s.ioClient.UpdateEgress(ctx, info); err != nil { +func (s *Server) HandlerUpdate(_ context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + if err := s.ioClient.UpdateEgress(context.Background(), info); err != nil { logger.Errorw("failed to update egress", err) } @@ -47,8 +47,8 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (* return &emptypb.Empty{}, nil } -func (s *Server) HandlerFinished(ctx context.Context, req *ipc.HandlerFinishedRequest) (*emptypb.Empty, error) { - if err := s.ioClient.UpdateEgress(ctx, req.Info); err != nil { +func (s *Server) HandlerFinished(_ context.Context, req *ipc.HandlerFinishedRequest) (*emptypb.Empty, error) { + if err := s.ioClient.UpdateEgress(context.Background(), req.Info); err != nil { logger.Errorw("failed to update egress", err) } diff --git a/test/ioserver.go b/test/ioserver.go index 73247967..5c568943 100644 --- a/test/ioserver.go +++ b/test/ioserver.go @@ -45,11 +45,13 @@ func newIOTestServer(bus psrpc.MessageBus, updates chan *livekit.EgressInfo) (*i return s, nil } -func (s *ioTestServer) CreateEgress(_ context.Context, _ *livekit.EgressInfo) (*emptypb.Empty, error) { +func (s *ioTestServer) CreateEgress(_ context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + logger.Infow("egress created", "egressID", info.EgressId) return &emptypb.Empty{}, nil } func (s *ioTestServer) UpdateEgress(_ context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + logger.Infow("egress updated", "egressID", info.EgressId, "status", info.Status) s.updates <- info return &emptypb.Empty{}, nil } From 4573bd2e52ddc35de685aa62405af894ff778078 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 19:38:37 -0700 Subject: [PATCH 4/6] fix local/short testing --- test/edge.go | 3 +-- test/file.go | 3 +-- test/images.go | 3 +-- test/integration.go | 6 ++++-- test/multi.go | 3 +-- test/segments.go | 3 +-- test/stream.go | 3 +-- 7 files changed, 10 insertions(+), 14 deletions(-) diff --git a/test/edge.go b/test/edge.go index 70e6438c..6f14b8e0 100644 --- a/test/edge.go +++ b/test/edge.go @@ -120,8 +120,7 @@ func (r *Runner) testEdgeCases(t *testing.T) { custom: r.testEmptyStreamBin, }, } { - r.run(t, test, test.custom) - if r.Short { + if !r.run(t, test, test.custom) { return } } diff --git a/test/file.go b/test/file.go index 0b0ca610..04f1cb66 100644 --- a/test/file.go +++ b/test/file.go @@ -205,8 +205,7 @@ func (r *Runner) testFile(t *testing.T) { // filename: "t_{track_type}_{time}.webm", // }, } { - r.run(t, test, r.runFileTest) - if r.Short { + if !r.run(t, test, r.runFileTest) { return } } diff --git a/test/images.go b/test/images.go index a60c5f37..960cbaaa 100644 --- a/test/images.go +++ b/test/images.go @@ -70,8 +70,7 @@ func (r *Runner) testImages(t *testing.T) { }, }, } { - r.run(t, test, r.runImagesTest) - if r.Short { + if !r.run(t, test, r.runImagesTest) { return } } diff --git a/test/integration.go b/test/integration.go index fe1380b1..16f4635d 100644 --- a/test/integration.go +++ b/test/integration.go @@ -46,9 +46,9 @@ func (r *Runner) RunTests(t *testing.T) { var testNumber int -func (r *Runner) run(t *testing.T, test *testCase, f func(*testing.T, *testCase)) { +func (r *Runner) run(t *testing.T, test *testCase, f func(*testing.T, *testCase)) bool { if !r.should(runRequestType[test.requestType]) { - return + return true } switch test.requestType { @@ -76,6 +76,8 @@ func (r *Runner) run(t *testing.T, test *testCase, f func(*testing.T, *testCase) f(t, test) }) + + return !r.Short } func (r *Runner) awaitIdle(t *testing.T) { diff --git a/test/multi.go b/test/multi.go index 3dcb288a..5a3b16d6 100644 --- a/test/multi.go +++ b/test/multi.go @@ -105,8 +105,7 @@ func (r *Runner) testMulti(t *testing.T) { multi: true, }, } { - r.run(t, test, r.runMultiTest) - if r.Short { + if !r.run(t, test, r.runMultiTest) { return } } diff --git a/test/segments.go b/test/segments.go index cacb7b32..6a4dfb09 100644 --- a/test/segments.go +++ b/test/segments.go @@ -151,8 +151,7 @@ func (r *Runner) testSegments(t *testing.T) { }, }, } { - r.run(t, test, r.runSegmentsTest) - if r.Short { + if !r.run(t, test, r.runSegmentsTest) { return } } diff --git a/test/stream.go b/test/stream.go index 0875ddd8..e66772a6 100644 --- a/test/stream.go +++ b/test/stream.go @@ -155,8 +155,7 @@ func (r *Runner) testStream(t *testing.T) { }, }, } { - r.run(t, test, r.runStreamTest) - if r.Short { + if !r.run(t, test, r.runStreamTest) { return } } From dccb4b7f34d4392b3e26db29e2ce7c2419dc878a Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 20:24:23 -0700 Subject: [PATCH 5/6] add initDefaults for config --- pkg/config/service.go | 65 +++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/pkg/config/service.go b/pkg/config/service.go index 135ee2db..98fcff65 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -76,7 +76,6 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) { ApiSecret: os.Getenv("LIVEKIT_API_SECRET"), WsUrl: os.Getenv("LIVEKIT_WS_URL"), }, - TemplatePort: defaultTemplatePort, CPUCostConfig: &CPUCostConfig{}, } if confString != "" { @@ -87,46 +86,52 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) { // always create a new node ID conf.NodeID = utils.NewGuid("NE_") + conf.InitDefaults() - // Setting CPU costs from config. Ensure that CPU costs are positive - if conf.RoomCompositeCpuCost <= 0 { - conf.RoomCompositeCpuCost = roomCompositeCpuCost + if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil { + return nil, err + } + + return conf, nil +} + +func (c *ServiceConfig) InitDefaults() { + if c.TemplatePort == 0 { + c.TemplatePort = defaultTemplatePort } - if conf.AudioRoomCompositeCpuCost <= 0 { - conf.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost + if c.TemplateBase == "" { + c.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, c.TemplatePort) } - if conf.WebCpuCost <= 0 { - conf.WebCpuCost = webCpuCost + + // Setting CPU costs from config. Ensure that CPU costs are positive + if c.RoomCompositeCpuCost <= 0 { + c.RoomCompositeCpuCost = roomCompositeCpuCost } - if conf.AudioWebCpuCost <= 0 { - conf.AudioWebCpuCost = audioWebCpuCost + if c.AudioRoomCompositeCpuCost <= 0 { + c.AudioRoomCompositeCpuCost = audioRoomCompositeCpuCost } - if conf.ParticipantCpuCost <= 0 { - conf.ParticipantCpuCost = participantCpuCost + if c.WebCpuCost <= 0 { + c.WebCpuCost = webCpuCost } - if conf.TrackCompositeCpuCost <= 0 { - conf.TrackCompositeCpuCost = trackCompositeCpuCost + if c.AudioWebCpuCost <= 0 { + c.AudioWebCpuCost = audioWebCpuCost } - if conf.TrackCpuCost <= 0 { - conf.TrackCpuCost = trackCpuCost + if c.ParticipantCpuCost <= 0 { + c.ParticipantCpuCost = participantCpuCost } - if conf.MaxCpuUtilization <= 0 || conf.MaxCpuUtilization > 1 { - conf.MaxCpuUtilization = maxCpuUtilization + if c.TrackCompositeCpuCost <= 0 { + c.TrackCompositeCpuCost = trackCompositeCpuCost } - if conf.MaxConcurrentWeb <= 0 { - conf.MaxConcurrentWeb = maxConcurrentWeb + if c.TrackCpuCost <= 0 { + c.TrackCpuCost = trackCpuCost } - if conf.MaxUploadQueue <= 0 { - conf.MaxUploadQueue = maxUploadQueue + if c.MaxCpuUtilization <= 0 || c.MaxCpuUtilization > 1 { + c.MaxCpuUtilization = maxCpuUtilization } - - if conf.TemplateBase == "" { - conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort) + if c.MaxConcurrentWeb <= 0 { + c.MaxConcurrentWeb = maxConcurrentWeb } - - if err := conf.initLogger("nodeID", conf.NodeID, "clusterID", conf.ClusterID); err != nil { - return nil, err + if c.MaxUploadQueue <= 0 { + c.MaxUploadQueue = maxUploadQueue } - - return conf, nil } From 975bad046e01dbbbb38b591660d613761f5d6180 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 23 Sep 2024 20:55:18 -0700 Subject: [PATCH 6/6] avoid panic on full update channel, update logging --- pkg/info/io.go | 14 +++++++++----- pkg/server/server_ipc.go | 6 +++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/info/io.go b/pkg/info/io.go index 3e0141c4..a30d45fe 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -99,7 +99,7 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c delete(c.egresses, info.EgressId) if err != nil { - logger.Errorw("failed to create egress", err) + logger.Errorw("failed to create egress", err, "egressID", info.EgressId) errChan <- err return } @@ -128,8 +128,12 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) e } c.mu.Unlock() - c.updates <- u - return nil + select { + case c.updates <- u: + return nil + default: + return errors.New("channel full or closed") + } } func (c *ioClient) updateWorker() { @@ -158,7 +162,7 @@ func (c *ioClient) sendUpdate(u *update) { continue } - logger.Errorw("failed to update egress", err) + logger.Errorw("failed to update egress", err, "egressID", u.info.EgressId) return } @@ -180,7 +184,7 @@ func (c *ioClient) sendUpdate(u *update) { func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error { _, err := c.IOInfoClient.UpdateMetrics(ctx, req) if err != nil { - logger.Errorw("failed to update ms", err) + logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId) return err } diff --git a/pkg/server/server_ipc.go b/pkg/server/server_ipc.go index 393c83a4..bb1a247e 100644 --- a/pkg/server/server_ipc.go +++ b/pkg/server/server_ipc.go @@ -36,7 +36,7 @@ func (s *Server) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) ( func (s *Server) HandlerUpdate(_ context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { if err := s.ioClient.UpdateEgress(context.Background(), info); err != nil { - logger.Errorw("failed to update egress", err) + logger.Errorw("failed to update egress", err, "egressID", info.EgressId) } if info.ErrorCode == int32(http.StatusInternalServerError) { @@ -49,11 +49,11 @@ func (s *Server) HandlerUpdate(_ context.Context, info *livekit.EgressInfo) (*em func (s *Server) HandlerFinished(_ context.Context, req *ipc.HandlerFinishedRequest) (*emptypb.Empty, error) { if err := s.ioClient.UpdateEgress(context.Background(), req.Info); err != nil { - logger.Errorw("failed to update egress", err) + logger.Errorw("failed to update egress", err, "egressID", req.EgressId) } if err := s.StoreProcessEndedMetrics(req.EgressId, req.Metrics); err != nil { - logger.Errorw("failed to store ms", err) + logger.Errorw("failed to store metrics", err, "egressID", req.EgressId) } return &emptypb.Empty{}, nil