Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory logging #821

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.8.1-0.20241129023712-3372e6e28532
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a
github.com/livekit/protocol v1.29.4-0.20241205204141-8cdf1765bcc7
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/livekit/server-sdk-go/v2 v2.4.0
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo=
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a h1:dI26u+l1pBBwojkTCuY8VVbNbwoWBaXM9nRY3uDg3p8=
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a/go.mod h1:mqXSWNHbENjxM0/HG25wZ7wgja/K9fA0PeQxi+MPmWw=
github.com/livekit/protocol v1.29.4-0.20241205204141-8cdf1765bcc7 h1:d/1pOqRaaaf/l301f2dWVAcKyCeCMxKRuBhSzd8SDz0=
github.com/livekit/protocol v1.29.4-0.20241205204141-8cdf1765bcc7/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.4.0 h1:ide41hppBf7btHLz/nj6rLIQSkaIOxP5tVSki74ZDhg=
Expand Down
6 changes: 0 additions & 6 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,6 @@ func (c *ioClient) sendUpdate(u *update) {
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId)
return err
}

return nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
logger.Errorw("process failed", err)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
avgCPU, maxCPU, maxMemory := s.monitor.EgressEnded(req)
if maxCPU > 0 {
_ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
Info: info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
})
logger.Debugw("egress metrics",
"egressID", info.EgressId,
"avgCPU", avgCPU,
"maxCPU", maxCPU,
"maxMemory", maxMemory,
)
}

s.ProcessFinished(info.EgressId)
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) {
// Get the ms from the handler via IPC
metricsResponse, err := p.ipcHandlerClient.GetMetrics(context.Background(), &ipc.MetricsRequest{})
if err != nil {
logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId)
if !p.closed.IsBroken() {
logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId)
}
return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler
}

Expand Down
60 changes: 36 additions & 24 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ type Monitor struct {
type processStats struct {
egressID string

pendingUsage float64
lastUsage float64
allowedUsage float64
pendingCPU float64
lastCPU float64
allowedCPU float64

totalCPU float64
cpuCounter int
maxCPU float64
maxMemory int
}

func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) {
Expand All @@ -85,7 +86,7 @@ func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) {
procStats: make(map[int]*processStats),
}

procStats, err := hwstats.NewProcCPUStats(m.updateEgressStats)
procStats, err := hwstats.NewProcMonitor(m.updateEgressStats)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,11 +249,11 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
}

ps := &processStats{
egressID: req.EgressId,
pendingUsage: cpuHold,
allowedUsage: cpuHold,
egressID: req.EgressId,
pendingCPU: cpuHold,
allowedCPU: cpuHold,
}
time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 })
time.AfterFunc(cpuHoldDuration, func() { ps.pendingCPU = 0 })
m.pending[req.EgressId] = ps

return nil
Expand All @@ -268,8 +269,8 @@ func (m *Monitor) UpdatePID(egressID string, pid int) {
if ps == nil {
logger.Warnw("missing pending procStats", nil, "egressID", egressID)
ps = &processStats{
egressID: egressID,
allowedUsage: m.cpuCostConfig.WebCpuCost,
egressID: egressID,
allowedCPU: m.cpuCostConfig.WebCpuCost,
}
}

Expand Down Expand Up @@ -308,7 +309,7 @@ func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) {
}
}

func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64, int) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -333,11 +334,11 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
for pid, ps := range m.procStats {
if ps.egressID == req.EgressId {
delete(m.procStats, pid)
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU, ps.maxMemory
}
}

return 0, 0
return 0, 0, 0
}

func (m *Monitor) GetAvailableCPU() float64 {
Expand All @@ -357,17 +358,17 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64)
}

for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
pending += ps.pendingUsage
if ps.pendingCPU > ps.lastCPU {
pending += ps.pendingCPU
} else {
pending += ps.lastUsage
pending += ps.lastCPU
}
}
for _, ps := range m.procStats {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
if ps.pendingCPU > ps.lastCPU {
used += ps.pendingCPU
} else {
used += ps.lastUsage
used += ps.lastCPU
}
}

Expand All @@ -376,34 +377,45 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64)
return
}

func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
load := 1 - idle/m.cpuStats.NumCPU()
func (m *Monitor) updateEgressStats(stats *hwstats.ProcStats) {
load := 1 - stats.CpuIdle/m.cpuStats.NumCPU()
m.promCPULoad.Set(load)

m.mu.Lock()
defer m.mu.Unlock()

maxUsage := 0.0
var maxEgress string
for pid, cpuUsage := range usage {
for pid, cpuUsage := range stats.Cpu {
procStats := m.procStats[pid]
if procStats == nil {
continue
}

procStats.lastUsage = cpuUsage
procStats.lastCPU = cpuUsage
procStats.totalCPU += cpuUsage
procStats.cpuCounter++
if cpuUsage > procStats.maxCPU {
procStats.maxCPU = cpuUsage
}

if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage {
if cpuUsage > procStats.allowedCPU && cpuUsage > maxUsage {
maxUsage = cpuUsage
maxEgress = procStats.egressID
}
}

for pid, memUsage := range stats.Memory {
procStats := m.procStats[pid]
if procStats == nil {
continue
}

if memUsage > procStats.maxMemory {
procStats.maxMemory = memUsage
}
}

killThreshold := defaultKillThreshold
if killThreshold <= m.cpuCostConfig.MaxCpuUtilization {
killThreshold = (1 + m.cpuCostConfig.MaxCpuUtilization) / 2
Expand Down
3 changes: 1 addition & 2 deletions test/ioserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (s *ioTestServer) UpdateEgress(_ context.Context, info *livekit.EgressInfo)
return &emptypb.Empty{}, nil
}

func (s *ioTestServer) UpdateMetrics(_ context.Context, req *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) {
logger.Infow("egress metrics", "egressID", req.Info.EgressId, "avgCPU", req.AvgCpuUsage, "maxCPU", req.MaxCpuUsage)
func (s *ioTestServer) UpdateMetrics(_ context.Context, _ *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}
1 change: 0 additions & 1 deletion test/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (r *Runner) StartServer(t *testing.T, svc Server, bus psrpc.MessageBus, tem
r.room.Disconnect()
}
r.svc.Shutdown(false, true)
r.svc.Drain()
})

// connect to room
Expand Down
Loading