Skip to content

Commit 16077b7

Browse files
committed
refactor(adaptive_throttler, resource_monitor): implement shared monitor registry with reference counting
- Replace singleton ResourceMonitor with shared registry pattern for efficient resource sharing - Add dynamic interval updates and improved CPU mode switching - Enhance memory sampling with better fallback handling - Add InDelta assertion helper for precise float comparisons - Comprehensive test refactoring with improved mocking and coverage - Update adaptive throttler tests to use new resource monitor architecture
1 parent 7dfff93 commit 16077b7

File tree

8 files changed

+2246
-2418
lines changed

8 files changed

+2246
-2418
lines changed

Dockerfile.test

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
FROM golang:1.21
2+
3+
WORKDIR /app
4+
5+
# Copy go mod files (go.sum may not exist)
6+
COPY go.mod ./
7+
COPY */go.mod ./
8+
9+
# Download dependencies
10+
RUN go mod download
11+
12+
# Copy source code
13+
COPY . .
14+
15+
# Run tests with coverage
16+
CMD ["sh", "-c", "echo '=== Running all sysmonitor tests with coverage ===' && go test -v -coverprofile=sysmonitor_coverage.out -covermode=atomic ./internal/sysmonitor && echo '' && echo '=== sysmonitor coverage ===' && go tool cover -func=sysmonitor_coverage.out && echo '' && echo '=== Running adaptive_throttler and resource_monitor tests with coverage ===' && go test -v -coverprofile=flow_coverage.out -covermode=atomic ./flow -run 'TestAdaptiveThrottler|TestResourceMonitor|TestInitSampler|TestClampPercent|TestValidatePercent|TestValidateResourceStats|TestMemoryUsagePercent' && echo '' && echo '=== flow coverage (adaptive_throttler + resource_monitor) ===' && go tool cover -func=flow_coverage.out | grep -E '(adaptive_throttler|resource_monitor|initSampler|clampPercent|validatePercent|validateResourceStats|memoryUsagePercent|total)' && echo '' && echo '=== Combined coverage summary ===' && go tool cover -func=sysmonitor_coverage.out | tail -1 && go tool cover -func=flow_coverage.out | tail -1"]
17+

examples/adaptive_throttler/demo/demo.go

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,18 @@ import (
1414
// The demo:
1515
// 1. Produces 250 elements in bursts
1616
// 2. Processes elements with CPU-intensive work (50ms each)
17-
// 3. Simulates increasing memory pressure as more elements are processed
17+
// 3. Simulates memory pressure that increases then decreases (creating throttling-recovery cycle)
1818
// 4. The adaptive throttler adjusts throughput based on CPU/memory usage
19-
// 5. Stats are logged every 500ms showing rate adaptation
19+
// 5. Shows throttling down to ~1/sec during high memory, then recovery back to 40/sec
20+
// 6. Stats are logged every 500ms showing rate adaptation
2021
func main() {
2122
var elementsProcessed atomic.Int64
2223

2324
// Set up demo configuration with memory simulation
2425
throttler := setupDemoThrottler(&elementsProcessed)
25-
defer func() {
26-
throttler.Close()
27-
}()
2826

2927
in := make(chan any)
30-
out := make(chan any, 32)
28+
out := make(chan any) // Unbuffered channel to prevent apparent bursts
3129

3230
source := ext.NewChanSource(in)
3331
sink := ext.NewChanSink(out)
@@ -45,62 +43,64 @@ func main() {
4543

4644
go produceBurst(in, 250)
4745

48-
// Use a variable to prevent compiler optimization of CPU work
4946
var cpuWorkChecksum uint64
5047

48+
// Process the output
49+
elementsReceived := 0
5150
for element := range sink.Out {
5251
fmt.Printf("consumer received %v\n", element)
53-
elementsProcessed.Add(1) // Track processed elements for memory pressure simulation
52+
elementsProcessed.Add(1)
53+
elementsReceived++
5454

55-
// Perform CPU-intensive work that can't be optimized away
56-
// This ensures Windows GetProcessTimes can detect CPU usage
57-
// (Windows timer resolution is ~15.625ms, so we need at least 50-100ms of work)
55+
// Perform CPU-intensive work
5856
burnCPU(50*time.Millisecond, &cpuWorkChecksum)
5957

6058
time.Sleep(25 * time.Millisecond)
6159
}
6260

63-
// Print checksum to ensure CPU work wasn't optimized away
6461
fmt.Printf("CPU work checksum: %d\n", cpuWorkChecksum)
65-
62+
fmt.Printf("Total elements produced: 250, Total elements received: %d\n", elementsReceived)
63+
if elementsReceived == 250 {
64+
fmt.Println("✅ SUCCESS: All elements processed without dropping!")
65+
} else {
66+
fmt.Printf("❌ FAILURE: %d elements were dropped!\n", 250-elementsReceived)
67+
}
6668
fmt.Println("adaptive throttling pipeline completed")
6769
}
6870

6971
// setupDemoThrottler creates and configures an adaptive throttler with demo settings
7072
func setupDemoThrottler(elementsProcessed *atomic.Int64) *flow.AdaptiveThrottler {
7173
config := flow.DefaultAdaptiveThrottlerConfig()
72-
config.MinThroughput = 5
73-
config.MaxThroughput = 40
74+
75+
config.MinRate = 1
76+
config.MaxRate = 20
77+
config.InitialRate = 20
7478
config.SampleInterval = 200 * time.Millisecond
75-
config.BufferSize = 32
76-
config.AdaptationFactor = 0.5
77-
config.SmoothTransitions = true
78-
config.MaxMemoryPercent = 40.0
79-
config.MaxCPUPercent = 80.0
8079

80+
config.BackoffFactor = 0.5
81+
config.RecoveryFactor = 1.5
82+
83+
config.MaxMemoryPercent = 35.0
84+
config.RecoveryMemoryThreshold = 30.0
85+
86+
// Memory Reader Simulation - Creates a cycle: low -> high -> low memory usage
8187
config.MemoryReader = func() (float64, error) {
8288
elementCount := elementsProcessed.Load()
8389

84-
// Memory pressure increases with processed elements:
85-
// - 0-50 elements: 5% memory
86-
// - 51-100 elements: 15% memory
87-
// - 101-150 elements: 30% memory
88-
// - 151+ elements: 50%+ memory (increases gradually)
8990
var memoryPercent float64
9091
switch {
91-
case elementCount <= 50:
92-
memoryPercent = 5.0 + float64(elementCount)*0.2 // 5% to 15%
93-
case elementCount <= 100:
94-
memoryPercent = 15.0 + float64(elementCount-50)*0.3 // 15% to 30%
95-
case elementCount <= 150:
96-
memoryPercent = 30.0 + float64(elementCount-100)*0.4 // 30% to 50%
97-
default:
98-
memoryPercent = 50.0 + float64(elementCount-150)*0.3 // 50%+ (increases more slowly)
99-
if memoryPercent > 95.0 {
100-
memoryPercent = 95.0
92+
case elementCount <= 80: // Phase 1: Low memory, allow high throughput
93+
memoryPercent = 5.0 + float64(elementCount)*0.1 // 5% to 13%
94+
case elementCount <= 120: // Phase 2: Increasing memory pressure, cause throttling
95+
memoryPercent = 15.0 + float64(elementCount-80)*0.6 // 15% to 43%
96+
case elementCount <= 160: // Phase 3: High memory, keep throttled down to ~1/sec
97+
memoryPercent = 30.0 + float64(elementCount-120)*0.3 // 30% to 42%
98+
default: // Phase 4: Memory decreases, allow recovery back to 40/sec
99+
memoryPercent = 25.0 - float64(elementCount-160)*1.5 // 25% down to ~5%
100+
if memoryPercent < 5.0 {
101+
memoryPercent = 5.0
101102
}
102103
}
103-
104104
return memoryPercent, nil
105105
}
106106

@@ -114,20 +114,17 @@ func setupDemoThrottler(elementsProcessed *atomic.Int64) *flow.AdaptiveThrottler
114114
func produceBurst(in chan<- any, total int) {
115115
defer close(in)
116116

117-
for i := 0; i < total; i++ {
117+
for i := range total {
118118
in <- fmt.Sprintf("job-%02d", i)
119119

120120
if (i+1)%10 == 0 {
121121
time.Sleep(180 * time.Millisecond)
122122
continue
123123
}
124-
125124
time.Sleep(time.Duration(2+rand.Intn(5)) * time.Millisecond)
126125
}
127126
}
128127

129-
// burnCPU performs CPU-intensive work for the specified duration
130-
// The checksum parameter prevents the compiler from optimizing away the work
131128
func burnCPU(duration time.Duration, checksum *uint64) {
132129
start := time.Now()
133130
for time.Since(start) < duration {
@@ -147,8 +144,8 @@ func logThrottlerStats(at *flow.AdaptiveThrottler, done <-chan struct{}) {
147144
return
148145
case <-ticker.C:
149146
stats := at.GetResourceStats()
150-
fmt.Printf("[stats] rate=%d eps memory=%.1f%% cpu=%.1f%% goroutines=%d\n",
151-
at.GetCurrentRate(), stats.MemoryUsedPercent, stats.CPUUsagePercent, stats.GoroutineCount)
147+
fmt.Printf("[stats] Rate: %.1f/sec, CPU: %.1f%%, Memory: %.1f%%\n",
148+
at.GetCurrentRate(), stats.CPUUsagePercent, stats.MemoryUsedPercent)
152149
}
153150
}
154151
}

examples/adaptive_throttler/main.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,78 @@ func editMessage(msg string) string {
1313
return strings.ToUpper(msg)
1414
}
1515

16+
func addTimestamp(msg string) string {
17+
return fmt.Sprintf("[%s] %s", time.Now().Format("15:04:05"), msg)
18+
}
19+
1620
func main() {
17-
// To setup a custom throttler, you can modify the throttlerConfig struct with your desired values.
18-
// For all available options, see the flow.AdaptiveThrottlerConfig struct.
19-
throttlerConfig := flow.DefaultAdaptiveThrottlerConfig()
20-
throttler, err := flow.NewAdaptiveThrottler(throttlerConfig)
21+
// Configure first throttler - focuses on CPU limits
22+
throttler1Config := flow.DefaultAdaptiveThrottlerConfig()
23+
throttler1Config.MaxCPUPercent = 70.0
24+
throttler1Config.MaxMemoryPercent = 60.0
25+
throttler1Config.InitialRate = 15
26+
throttler1Config.MaxRate = 30
27+
throttler1Config.SampleInterval = 200 * time.Millisecond
28+
throttler1Config.CPUUsageMode = flow.CPUUsageModeMeasured // Use heuristic for better CPU visibility
29+
30+
throttler1, err := flow.NewAdaptiveThrottler(throttler1Config)
31+
if err != nil {
32+
panic(fmt.Sprintf("failed to create throttler1: %v", err))
33+
}
34+
35+
// Configure second throttler - focuses on memory limits
36+
throttler2Config := flow.DefaultAdaptiveThrottlerConfig()
37+
throttler2Config.MaxCPUPercent = 75.0
38+
throttler2Config.MaxMemoryPercent = 50.0
39+
throttler2Config.InitialRate = 10
40+
throttler2Config.MaxRate = 25
41+
throttler2Config.SampleInterval = 200 * time.Millisecond
42+
throttler2Config.CPUUsageMode = flow.CPUUsageModeMeasured // Use heuristic for better CPU visibility
43+
44+
throttler2, err := flow.NewAdaptiveThrottler(throttler2Config)
2145
if err != nil {
22-
panic(fmt.Sprintf("failed to create adaptive throttler: %v", err))
46+
panic(fmt.Sprintf("failed to create throttler2: %v", err))
2347
}
24-
defer throttler.Close()
2548

2649
in := make(chan any)
2750

2851
source := ext.NewChanSource(in)
2952
editMapFlow := flow.NewMap(editMessage, 1)
53+
timestampFlow := flow.NewMap(addTimestamp, 1)
3054
sink := ext.NewStdoutSink()
3155

56+
// Pipeline: Source -> Throttler1 -> Edit -> Throttler2 -> Timestamp -> Sink
57+
go func() {
58+
source.
59+
Via(throttler1).
60+
Via(editMapFlow).
61+
Via(throttler2).
62+
Via(timestampFlow).
63+
To(sink)
64+
}()
65+
66+
// Stats logging goroutine
3267
go func() {
33-
source.Via(throttler).Via(editMapFlow).To(sink)
68+
ticker := time.NewTicker(1 * time.Second)
69+
defer ticker.Stop()
70+
71+
for range ticker.C {
72+
stats := throttler1.GetResourceStats()
73+
fmt.Printf("[stats] T1-Rate: %.1f/s, T2-Rate: %.1f/s, CPU: %.1f%%, Mem: %.1f%%\n",
74+
throttler1.GetCurrentRate(),
75+
throttler2.GetCurrentRate(),
76+
stats.CPUUsagePercent,
77+
stats.MemoryUsedPercent)
78+
}
3479
}()
3580

3681
go func() {
3782
defer close(in)
3883

39-
for i := 1; i <= 50; i++ {
84+
for i := 1; i <= 100; i++ {
4085
message := fmt.Sprintf("message-%d", i)
4186
in <- message
42-
time.Sleep(50 * time.Millisecond)
87+
time.Sleep(30 * time.Millisecond)
4388
}
4489
}()
4590

0 commit comments

Comments
 (0)