Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

per-process cpu reservations #566

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Process struct {
cmd *exec.Cmd
ipcHandlerClient ipc.EgressHandlerClient
ready chan struct{}
totalCPU float64
cpuCounter int
maxCPU float64
closed core.Fuse
}

Expand Down Expand Up @@ -71,21 +68,6 @@ func NewProcess(
return p, nil
}

func (p *Process) updateCPU(cpu float64) {
p.totalCPU += cpu
p.cpuCounter++
if cpu > p.maxCPU {
p.maxCPU = cpu
}
}

func (p *Process) getUsageStats() (float64, float64) {
if p.cpuCounter == 0 {
return 0, 0
}
return p.totalCPU / float64(p.cpuCounter), p.maxCPU
}

// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics
func (p *Process) Gather() ([]*dto.MetricFamily, error) {
// Get the metrics from the handler via IPC
Expand Down
1 change: 0 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service
if err := s.Start(s.conf,
s.promIsIdle,
s.promCanAcceptRequest,
s.promProcUpdate,
); err != nil {
return nil, err
}
Expand Down
19 changes: 0 additions & 19 deletions pkg/service/service_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,3 @@ func (s *Service) promCanAcceptRequest() float64 {
}
return 0
}

func (s *Service) promProcUpdate(pUsage map[int]float64) map[string]float64 {
s.mu.RLock()
defer s.mu.RUnlock()

eUsage := make(map[string]float64)
for _, h := range s.activeHandlers {
if cmd := h.cmd; cmd != nil {
if process := cmd.Process; process != nil {
if usage, ok := pUsage[process.Pid]; ok {
eUsage[h.req.EgressId] = usage
h.updateCPU(usage)
}
}
}
}

return eUsage
}
3 changes: 2 additions & 1 deletion pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *Service) AddHandler(egressID string, p *Process) error {

select {
case <-p.ready:
s.UpdatePID(egressID, p.cmd.Process.Pid)
go func() {
err := p.cmd.Wait()
s.processEnded(p, err)
Expand All @@ -178,7 +179,7 @@ func (s *Service) processEnded(p *Process, err error) {
s.Stop(false)
}

avgCPU, maxCPU := p.getUsageStats()
avgCPU, maxCPU := s.CloseEgressStats(p.info.EgressId)
logger.Infow("egress stats", "egressID", p.req.EgressId, "avgCPU", avgCPU, "maxCPU", maxCPU)

s.EgressEnded(p.req)
Expand Down
129 changes: 101 additions & 28 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,62 @@ type Monitor struct {
requestGauge *prometheus.GaugeVec

cpuStats *utils.CPUStats
requests atomic.Int32

pendingCPUs atomic.Float64
mu sync.Mutex
pending map[string]*processStats
procStats map[int]*processStats
}

type processStats struct {
egressID string

pendingUsage float64
lastUsage float64

mu sync.Mutex
requests atomic.Int32
prevEgressUsage map[string]float64
totalCPU float64
cpuCounter int
maxCPU float64
}

const cpuHoldDuration = time.Second * 5
const cpuHoldDuration = time.Second * 30

func NewMonitor(conf *config.ServiceConfig) *Monitor {
return &Monitor{
cpuCostConfig: conf.CPUCostConfig,
pending: make(map[string]*processStats),
procStats: make(map[int]*processStats),
}
}

func (m *Monitor) Start(
conf *config.ServiceConfig,
isIdle func() float64,
canAcceptRequest func() float64,
procUpdate func(map[int]float64) map[string]float64,
) error {
procStats, err := utils.NewProcCPUStats(func(idle float64, usage map[int]float64) {
m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU())
egressUsage := procUpdate(usage)
for egressID, cpuUsage := range egressUsage {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(cpuUsage)
}
for egressID := range m.prevEgressUsage {
if _, ok := egressUsage[egressID]; !ok {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0)

m.mu.Lock()
defer m.mu.Unlock()

for pid, cpuUsage := range usage {
if m.procStats[pid] == nil {
m.procStats[pid] = &processStats{}
}
procStats := m.procStats[pid]

procStats.lastUsage = cpuUsage
procStats.totalCPU += cpuUsage
procStats.cpuCounter++
if cpuUsage > procStats.maxCPU {
procStats.maxCPU = cpuUsage
}

if procStats.egressID != "" {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage)
}
}
m.prevEgressUsage = egressUsage
})
if err != nil {
return err
Expand Down Expand Up @@ -168,33 +190,78 @@ func (m *Monitor) GetRequestCount() int {
return int(m.requests.Load())
}

func (m *Monitor) UpdatePID(egressID string, pid int) {
m.mu.Lock()
defer m.mu.Unlock()

ps := m.pending[egressID]
delete(m.pending, egressID)

if existing := m.procStats[pid]; existing != nil {
ps.maxCPU = existing.maxCPU
ps.totalCPU = existing.totalCPU
ps.cpuCounter = existing.cpuCounter
}
m.procStats[pid] = ps
}

func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {
m.mu.Lock()
defer m.mu.Unlock()

for pid, ps := range m.procStats {
if ps.egressID == egressID {
delete(m.procStats, pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we delete it here? seems like a GetXX should not have a side effect like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename, it's only called after the egress ends

return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
}
}

return 0, 0
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.canAcceptRequest(req)
return m.canAcceptRequestLocked(req)
}

func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
accept := false

total := m.cpuStats.NumCPU()
available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load()

logger.Debugw("cpu check",
"total", total,
"available", available,
"active_requests", m.requests,
)

var available float64
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else {
var used float64
for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
} else {
used += ps.lastUsage
}
}
for _, ps := range m.procStats {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
} else {
used += ps.lastUsage
}
}

// if already running requests, cap usage at MaxCpuUtilization
available -= (1 - m.cpuCostConfig.MaxCpuUtilization) * total
available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization))
}

logger.Debugw("cpu check",
"total", total,
"available", available,
"active_requests", m.requests,
)

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
accept = available >= m.cpuCostConfig.RoomCompositeCpuCost
Expand All @@ -215,10 +282,12 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
m.mu.Lock()
defer m.mu.Unlock()

if !m.canAcceptRequest(req) {
if !m.canAcceptRequestLocked(req) {
return errors.ErrResourceExhausted
}

m.requests.Inc()

var cpuHold float64
switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
Expand All @@ -233,9 +302,13 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
cpuHold = m.cpuCostConfig.TrackCpuCost
}

m.requests.Inc()
m.pendingCPUs.Add(cpuHold)
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
ps := &processStats{
egressID: req.EgressId,
pendingUsage: cpuHold,
}
time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 })
m.pending[req.EgressId] = ps

return nil
}

Expand Down