Skip to content

Commit e5e64b8

Browse files
Merge remote-tracking branch 'origin/main' into benjamin/storage_metric_push
2 parents 784cc27 + 71a7187 commit e5e64b8

File tree

6 files changed

+104
-68
lines changed

6 files changed

+104
-68
lines changed

.github/workflows/publish-egress.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ jobs:
4646
type=semver,pattern=v{{major}}.{{minor}}
4747
4848
- name: Set up Go
49-
uses: actions/setup-go@v4
49+
uses: actions/setup-go@v5
5050
with:
5151
go-version: 1.21.4
5252

pkg/service/process.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ type Process struct {
3535
cmd *exec.Cmd
3636
ipcHandlerClient ipc.EgressHandlerClient
3737
ready chan struct{}
38-
totalCPU float64
39-
cpuCounter int
40-
maxCPU float64
4138
closed core.Fuse
4239
}
4340

@@ -68,21 +65,6 @@ func NewProcess(
6865
return p, nil
6966
}
7067

71-
func (p *Process) updateCPU(cpu float64) {
72-
p.totalCPU += cpu
73-
p.cpuCounter++
74-
if cpu > p.maxCPU {
75-
p.maxCPU = cpu
76-
}
77-
}
78-
79-
func (p *Process) getUsageStats() (float64, float64) {
80-
if p.cpuCounter == 0 {
81-
return 0, 0
82-
}
83-
return p.totalCPU / float64(p.cpuCounter), p.maxCPU
84-
}
85-
8668
// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics
8769
func (p *Process) Gather() ([]*dto.MetricFamily, error) {
8870
// Get the metrics from the handler via IPC

pkg/service/service.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service
8888
if err := s.Start(s.conf,
8989
s.promIsIdle,
9090
s.promCanAcceptRequest,
91-
s.promProcUpdate,
9291
); err != nil {
9392
return nil, err
9493
}

pkg/service/service_prom.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,25 +89,6 @@ func (s *Service) promCanAcceptRequest() float64 {
8989
return 0
9090
}
9191

92-
func (s *Service) promProcUpdate(pUsage map[int]float64) map[string]float64 {
93-
s.mu.RLock()
94-
defer s.mu.RUnlock()
95-
96-
eUsage := make(map[string]float64)
97-
for _, h := range s.activeHandlers {
98-
if cmd := h.cmd; cmd != nil {
99-
if process := cmd.Process; process != nil {
100-
if usage, ok := pUsage[process.Pid]; ok {
101-
eUsage[h.req.EgressId] = usage
102-
h.updateCPU(usage)
103-
}
104-
}
105-
}
106-
}
107-
108-
return eUsage
109-
}
110-
11192
func (s *Service) storeProcessEndedMetrics(egressID string, metrics string) error {
11293
m, err := deserializeMetrics(egressID, metrics)
11394
if err != nil {

pkg/service/service_rpc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (s *Service) AddHandler(egressID string, p *Process) error {
154154

155155
select {
156156
case <-p.ready:
157+
s.UpdatePID(egressID, p.cmd.Process.Pid)
157158
go func() {
158159
err := p.cmd.Wait()
159160
s.processEnded(p, err)
@@ -178,7 +179,7 @@ func (s *Service) processEnded(p *Process, err error) {
178179
s.Stop(false)
179180
}
180181

181-
avgCPU, maxCPU := p.getUsageStats()
182+
avgCPU, maxCPU := s.CloseEgressStats(p.info.EgressId)
182183
logger.Infow("egress stats", "egressID", p.req.EgressId, "avgCPU", avgCPU, "maxCPU", maxCPU)
183184

184185
s.EgressEnded(p.req)

pkg/stats/monitor.go

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,40 +39,62 @@ type Monitor struct {
3939
requestGauge *prometheus.GaugeVec
4040

4141
cpuStats *utils.CPUStats
42+
requests atomic.Int32
4243

43-
pendingCPUs atomic.Float64
44+
mu sync.Mutex
45+
pending map[string]*processStats
46+
procStats map[int]*processStats
47+
}
48+
49+
type processStats struct {
50+
egressID string
51+
52+
pendingUsage float64
53+
lastUsage float64
4454

45-
mu sync.Mutex
46-
requests atomic.Int32
47-
prevEgressUsage map[string]float64
55+
totalCPU float64
56+
cpuCounter int
57+
maxCPU float64
4858
}
4959

50-
const cpuHoldDuration = time.Second * 5
60+
const cpuHoldDuration = time.Second * 30
5161

5262
func NewMonitor(conf *config.ServiceConfig) *Monitor {
5363
return &Monitor{
5464
cpuCostConfig: conf.CPUCostConfig,
65+
pending: make(map[string]*processStats),
66+
procStats: make(map[int]*processStats),
5567
}
5668
}
5769

5870
func (m *Monitor) Start(
5971
conf *config.ServiceConfig,
6072
isIdle func() float64,
6173
canAcceptRequest func() float64,
62-
procUpdate func(map[int]float64) map[string]float64,
6374
) error {
6475
procStats, err := utils.NewProcCPUStats(func(idle float64, usage map[int]float64) {
6576
m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU())
66-
egressUsage := procUpdate(usage)
67-
for egressID, cpuUsage := range egressUsage {
68-
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(cpuUsage)
69-
}
70-
for egressID := range m.prevEgressUsage {
71-
if _, ok := egressUsage[egressID]; !ok {
72-
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0)
77+
78+
m.mu.Lock()
79+
defer m.mu.Unlock()
80+
81+
for pid, cpuUsage := range usage {
82+
if m.procStats[pid] == nil {
83+
m.procStats[pid] = &processStats{}
84+
}
85+
procStats := m.procStats[pid]
86+
87+
procStats.lastUsage = cpuUsage
88+
procStats.totalCPU += cpuUsage
89+
procStats.cpuCounter++
90+
if cpuUsage > procStats.maxCPU {
91+
procStats.maxCPU = cpuUsage
92+
}
93+
94+
if procStats.egressID != "" {
95+
m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage)
7396
}
7497
}
75-
m.prevEgressUsage = egressUsage
7698
})
7799
if err != nil {
78100
return err
@@ -168,33 +190,78 @@ func (m *Monitor) GetRequestCount() int {
168190
return int(m.requests.Load())
169191
}
170192

193+
func (m *Monitor) UpdatePID(egressID string, pid int) {
194+
m.mu.Lock()
195+
defer m.mu.Unlock()
196+
197+
ps := m.pending[egressID]
198+
delete(m.pending, egressID)
199+
200+
if existing := m.procStats[pid]; existing != nil {
201+
ps.maxCPU = existing.maxCPU
202+
ps.totalCPU = existing.totalCPU
203+
ps.cpuCounter = existing.cpuCounter
204+
}
205+
m.procStats[pid] = ps
206+
}
207+
208+
func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {
209+
m.mu.Lock()
210+
defer m.mu.Unlock()
211+
212+
for pid, ps := range m.procStats {
213+
if ps.egressID == egressID {
214+
delete(m.procStats, pid)
215+
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
216+
}
217+
}
218+
219+
return 0, 0
220+
}
221+
171222
func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
172223
m.mu.Lock()
173224
defer m.mu.Unlock()
174225

175-
return m.canAcceptRequest(req)
226+
return m.canAcceptRequestLocked(req)
176227
}
177228

178-
func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
229+
func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
179230
accept := false
180231

181232
total := m.cpuStats.NumCPU()
182-
available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load()
183-
184-
logger.Debugw("cpu check",
185-
"total", total,
186-
"available", available,
187-
"active_requests", m.requests,
188-
)
189233

234+
var available float64
190235
if m.requests.Load() == 0 {
191236
// if no requests, use total
192237
available = total
193238
} else {
239+
var used float64
240+
for _, ps := range m.pending {
241+
if ps.pendingUsage > ps.lastUsage {
242+
used += ps.pendingUsage
243+
} else {
244+
used += ps.lastUsage
245+
}
246+
}
247+
for _, ps := range m.procStats {
248+
if ps.pendingUsage > ps.lastUsage {
249+
used += ps.pendingUsage
250+
} else {
251+
used += ps.lastUsage
252+
}
253+
}
254+
194255
// if already running requests, cap usage at MaxCpuUtilization
195-
available -= (1 - m.cpuCostConfig.MaxCpuUtilization) * total
256+
available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization))
196257
}
197258

259+
logger.Debugw("cpu check",
260+
"total", total,
261+
"available", available,
262+
"active_requests", m.requests,
263+
)
264+
198265
switch req.Request.(type) {
199266
case *rpc.StartEgressRequest_RoomComposite:
200267
accept = available >= m.cpuCostConfig.RoomCompositeCpuCost
@@ -215,10 +282,12 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
215282
m.mu.Lock()
216283
defer m.mu.Unlock()
217284

218-
if !m.canAcceptRequest(req) {
285+
if !m.canAcceptRequestLocked(req) {
219286
return errors.ErrResourceExhausted
220287
}
221288

289+
m.requests.Inc()
290+
222291
var cpuHold float64
223292
switch req.Request.(type) {
224293
case *rpc.StartEgressRequest_RoomComposite:
@@ -233,9 +302,13 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
233302
cpuHold = m.cpuCostConfig.TrackCpuCost
234303
}
235304

236-
m.requests.Inc()
237-
m.pendingCPUs.Add(cpuHold)
238-
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
305+
ps := &processStats{
306+
egressID: req.EgressId,
307+
pendingUsage: cpuHold,
308+
}
309+
time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 })
310+
m.pending[req.EgressId] = ps
311+
239312
return nil
240313
}
241314

0 commit comments

Comments
 (0)