Skip to content

Commit 4c5a1fd

Browse files
committed
feedback 3/n
1 parent 19ee6ed commit 4c5a1fd

File tree

5 files changed

+100
-62
lines changed

5 files changed

+100
-62
lines changed

connector/throughputanomalyconnector/anomaly.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
// Package loganomalyconnector provides an OpenTelemetry collector connector that detects
15+
// Package throughputanomalyconnector provides an OpenTelemetry collector connector that detects
1616
// anomalies in log throughput using statistical analysis. It monitors the rate of incoming
1717
// logs and generates alerts when significant deviations from the baseline are detected,
1818
// using both Z-score and Median Absolute Deviation (MAD) methods for anomaly detection.
@@ -63,7 +63,6 @@ func calculateStatistics(counts []int64) Statistics {
6363
for _, count := range counts {
6464
sum += float64(count)
6565
}
66-
6766
mean := sum / float64(len(counts))
6867

6968
// Calculate standard deviation
@@ -77,18 +76,34 @@ func calculateStatistics(counts []int64) Statistics {
7776
// Calculate median
7877
sortedCounts := make([]int64, len(counts))
7978
copy(sortedCounts, counts)
80-
sort.Slice(sortedCounts, func(i int, j int) bool {
79+
sort.Slice(sortedCounts, func(i, j int) bool {
8180
return sortedCounts[i] < sortedCounts[j]
8281
})
83-
median := float64(sortedCounts[len(sortedCounts)/2])
8482

85-
// Calculate MAD
83+
var median float64
84+
if len(sortedCounts)%2 == 0 {
85+
// If even number of samples, average the two middle values
86+
mid := len(sortedCounts) / 2
87+
median = (float64(sortedCounts[mid-1]) + float64(sortedCounts[mid])) / 2
88+
} else {
89+
// If odd number of samples, take the middle value
90+
median = float64(sortedCounts[len(sortedCounts)/2])
91+
}
92+
93+
// Calculate MAD (Median Absolute Deviation)
8694
deviations := make([]float64, len(counts))
8795
for i, count := range counts {
8896
deviations[i] = math.Abs(float64(count) - median)
8997
}
9098
sort.Float64s(deviations)
91-
mad := deviations[len(deviations)/2] * 1.4826
99+
100+
var mad float64
101+
if len(deviations)%2 == 0 {
102+
mid := len(deviations) / 2
103+
mad = (deviations[mid-1] + deviations[mid]) / 2 * 1.4826
104+
} else {
105+
mad = deviations[len(deviations)/2] * 1.4826
106+
}
92107

93108
return Statistics{
94109
mean: mean,
@@ -111,9 +126,20 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {
111126

112127
stats := calculateStatistics(historicalCounts)
113128

114-
if stats.stdDev == 0 {
129+
var percentageDiff float64
130+
if stats.mean == 0 {
131+
if float64(currentCount) == 0 {
132+
percentageDiff = 0
133+
} else {
134+
percentageDiff = 100 // handle division by zero by allowing percentage diff to be 100%
135+
}
136+
} else {
137+
percentageDiff = ((float64(currentCount) - stats.mean) / stats.mean) * 100
138+
}
139+
percentageDiff = math.Abs(percentageDiff)
140+
141+
if stats.stdDev == 0 || stats.mad == 0 {
115142
if float64(currentCount) != stats.mean {
116-
percentageDiff := ((float64(currentCount) - stats.mean) / stats.mean) * 100
117143
anomalyType := "Drop"
118144
if float64(currentCount) > stats.mean {
119145
anomalyType = "Spike"
@@ -125,15 +151,15 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {
125151
currentCount: currentCount,
126152
zScore: 0, // Not meaningful when stdDev is 0
127153
madScore: 0, // Not meaningful when MAD is 0
128-
percentageDiff: math.Abs(percentageDiff),
154+
percentageDiff: percentageDiff,
129155
timestamp: d.lastWindowEndTime,
130156
}
131157
}
132158
return nil
133159
}
160+
134161
zScore := (float64(currentCount) - stats.mean) / stats.stdDev
135162
madScore := (float64(currentCount) - stats.median) / stats.mad
136-
percentageDiff := ((float64(currentCount) - stats.mean) / stats.mean) * 100
137163

138164
// Check for anomaly using both Z-score and MAD
139165
if math.Abs(zScore) > d.config.ZScoreThreshold || math.Abs(madScore) > d.config.MADThreshold {
@@ -148,7 +174,7 @@ func (d *Detector) checkForAnomaly() *AnomalyStat {
148174
currentCount: currentCount,
149175
zScore: zScore,
150176
madScore: madScore,
151-
percentageDiff: math.Abs(percentageDiff),
177+
percentageDiff: percentageDiff,
152178
timestamp: d.lastWindowEndTime,
153179
}
154180
}

connector/throughputanomalyconnector/anomaly_test.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,40 +35,40 @@ func TestConfig_Validate(t *testing.T) {
3535
{
3636
name: "valid config",
3737
config: Config{
38-
SampleInterval: time.Minute,
39-
MaxWindowAge: time.Hour,
40-
ZScoreThreshold: 3.0,
41-
MADThreshold: 3.0,
38+
AnalysisInterval: time.Minute,
39+
MaxWindowAge: time.Hour,
40+
ZScoreThreshold: 3.0,
41+
MADThreshold: 3.0,
4242
},
4343
expectError: false,
4444
},
4545
{
4646
name: "invalid sample interval - too small",
4747
config: Config{
48-
SampleInterval: time.Second * 30,
49-
MaxWindowAge: time.Hour,
50-
ZScoreThreshold: 3.0,
51-
MADThreshold: 3.0,
48+
AnalysisInterval: time.Second * 30,
49+
MaxWindowAge: time.Hour,
50+
ZScoreThreshold: 3.0,
51+
MADThreshold: 3.0,
5252
},
5353
expectError: true,
5454
},
5555
{
5656
name: "invalid max window age - too small relative to sample interval",
5757
config: Config{
58-
SampleInterval: time.Minute,
59-
MaxWindowAge: time.Minute * 5,
60-
ZScoreThreshold: 3.0,
61-
MADThreshold: 3.0,
58+
AnalysisInterval: time.Minute,
59+
MaxWindowAge: time.Minute * 5,
60+
ZScoreThreshold: 3.0,
61+
MADThreshold: 3.0,
6262
},
6363
expectError: true,
6464
},
6565
{
6666
name: "invalid threshold - negative z-score",
6767
config: Config{
68-
SampleInterval: time.Minute,
69-
MaxWindowAge: time.Hour,
70-
ZScoreThreshold: -1.0,
71-
MADThreshold: 3.0,
68+
AnalysisInterval: time.Minute,
69+
MaxWindowAge: time.Hour,
70+
ZScoreThreshold: -1.0,
71+
MADThreshold: 3.0,
7272
},
7373
expectError: true,
7474
},
@@ -160,10 +160,10 @@ func (m *mockConsumer) Capabilities() consumer.Capabilities {
160160

161161
func TestDetector_AnomalyDetection(t *testing.T) {
162162
config := &Config{
163-
SampleInterval: time.Minute,
164-
MaxWindowAge: time.Hour,
165-
ZScoreThreshold: 2.0,
166-
MADThreshold: 2.0,
163+
AnalysisInterval: time.Minute,
164+
MaxWindowAge: time.Hour,
165+
ZScoreThreshold: 2.0,
166+
MADThreshold: 2.0,
167167
}
168168

169169
logger := zap.NewNop()
@@ -254,20 +254,26 @@ func TestDetector_AnomalyDetection(t *testing.T) {
254254

255255
func TestDetector_Shutdown(t *testing.T) {
256256
config := &Config{
257-
SampleInterval: time.Minute,
258-
MaxWindowAge: time.Hour,
259-
ZScoreThreshold: 3.0,
260-
MADThreshold: 3.0,
257+
AnalysisInterval: time.Minute,
258+
MaxWindowAge: time.Hour,
259+
ZScoreThreshold: 3.0,
260+
MADThreshold: 3.0,
261261
}
262262

263263
logger := zap.NewNop()
264264
mockConsumer := &mockConsumer{}
265265
detector := newDetector(config, logger, mockConsumer)
266266

267+
// Initialize the log channel with proper capacity
268+
detector.logChan = make(chan logBatch, 1)
269+
267270
// Start the detector
268271
err := detector.Start(context.Background(), nil)
269272
require.NoError(t, err)
270273

274+
// Allow some time for goroutine to start
275+
time.Sleep(100 * time.Millisecond)
276+
271277
// Test clean shutdown
272278
ctx := context.Background()
273279
err = detector.Shutdown(ctx)
@@ -283,9 +289,13 @@ func TestDetector_Shutdown(t *testing.T) {
283289

284290
// Test shutdown with canceled context
285291
detector = newDetector(config, logger, mockConsumer)
292+
detector.logChan = make(chan logBatch, 1) // Initialize channel for new detector
286293
err = detector.Start(context.Background(), nil)
287294
require.NoError(t, err)
288295

296+
// Allow some time for goroutine to start
297+
time.Sleep(100 * time.Millisecond)
298+
289299
ctx, cancel := context.WithCancel(context.Background())
290300
cancel()
291301
err = detector.Shutdown(ctx)

connector/throughputanomalyconnector/config.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var _ component.Config = (*Config)(nil)
2626
// Config defines the configuration parameters for the log anomaly detector connector.
2727
type Config struct {
2828
// How often to take measurements
29-
SampleInterval time.Duration `mapstructure:"sample_interval"`
29+
AnalysisInterval time.Duration `mapstructure:"analysis_interval"`
3030
// MaxWindowAge defines the maximum age of samples to retain in the detection window.
3131
// Samples older than this duration are pruned from the analysis window.
3232
// This duration determines how far back the detector looks when establishing baseline behavior.
@@ -46,14 +46,14 @@ type Config struct {
4646
// Validate checks whether the input configuration has all of the required fields for the processor.
4747
// An error is returned if there are any invalid inputs.
4848
func (config *Config) Validate() error {
49-
if config.SampleInterval <= 0 {
50-
return fmt.Errorf("sample_interval must be positive, got %v", config.SampleInterval)
49+
if config.AnalysisInterval <= 0 {
50+
return fmt.Errorf("analysis_interval must be positive, got %v", config.AnalysisInterval)
5151
}
52-
if config.SampleInterval < time.Minute {
53-
return fmt.Errorf("sample_interval must be at least 1 minute, got %v", config.SampleInterval)
52+
if config.AnalysisInterval < time.Minute {
53+
return fmt.Errorf("analysis_interval must be at least 1 minute, got %v", config.AnalysisInterval)
5454
}
55-
if config.SampleInterval > time.Hour {
56-
return fmt.Errorf("sample_interval must not exceed 1 hour, got %v", config.SampleInterval)
55+
if config.AnalysisInterval > time.Hour {
56+
return fmt.Errorf("analysis_interval must not exceed 1 hour, got %v", config.AnalysisInterval)
5757
}
5858
if config.MaxWindowAge <= 0 {
5959
return fmt.Errorf("max_window_age must be positive, got %v", config.MaxWindowAge)
@@ -62,9 +62,9 @@ func (config *Config) Validate() error {
6262
return fmt.Errorf("max_window_age must be at least 1 hour, got %v", config.MaxWindowAge)
6363
}
6464

65-
if config.MaxWindowAge < config.SampleInterval*10 {
66-
return fmt.Errorf("max_window_age (%v) must be at least 10 times larger than sample_interval (%v)",
67-
config.MaxWindowAge, config.SampleInterval)
65+
if config.MaxWindowAge < config.AnalysisInterval*10 {
66+
return fmt.Errorf("max_window_age (%v) must be at least 10 times larger than analysis_interval (%v)",
67+
config.MaxWindowAge, config.AnalysisInterval)
6868
}
6969

7070
if config.ZScoreThreshold <= 0 {

connector/throughputanomalyconnector/connector.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ func newDetector(config *Config, logger *zap.Logger, nextConsumer consumer.Logs)
6363
stateLock: sync.Mutex{},
6464
nextConsumer: nextConsumer,
6565
logChan: make(chan logBatch, 1000),
66-
counts: make([]int64, numWindows),
67-
lastWindowEndTime: time.Now().Truncate(time.Minute),
66+
counts: make([]int64, 0, numWindows),
67+
lastWindowEndTime: time.Now().Truncate(config.AnalysisInterval),
6868
}
6969
}
7070

@@ -73,7 +73,7 @@ func newDetector(config *Config, logger *zap.Logger, nextConsumer consumer.Logs)
7373
func (d *Detector) Start(ctx context.Context, _ component.Host) error {
7474
d.ctx, d.cancel = context.WithCancel(ctx)
7575

76-
ticker := time.NewTicker(d.config.SampleInterval)
76+
ticker := time.NewTicker(d.config.AnalysisInterval)
7777

7878
d.wg.Add(1)
7979

@@ -123,9 +123,6 @@ func (d *Detector) Shutdown(ctx context.Context) error {
123123
func (d *Detector) processLogBatch(batch logBatch) {
124124
logCount := batch.logs.LogRecordCount()
125125
d.currentCount.Add(int64(logCount))
126-
127-
err := d.nextConsumer.ConsumeLogs(batch.ctx, batch.logs)
128-
batch.errChan <- err
129126
}
130127

131128
func (d *Detector) analyzeTimeWindow() {
@@ -135,18 +132,15 @@ func (d *Detector) analyzeTimeWindow() {
135132
now := time.Now()
136133
currentCount := d.currentCount.Swap(0)
137134

138-
if len(d.counts) > 0 {
139-
d.counts[len(d.counts)-1] = currentCount
140-
}
141-
142135
// drop any windows that are too old
143136
maxAge := d.config.MaxWindowAge
144137
cutoffTime := now.Add(-maxAge)
138+
windowDuration := d.config.AnalysisInterval
145139

146140
// find the first window that is not too old
147141
var keepIndex int
148142
for i := range d.counts {
149-
windowTime := d.lastWindowEndTime.Add(-time.Duration(len(d.counts)-1-i) * time.Minute)
143+
windowTime := d.lastWindowEndTime.Add(-time.Duration(len(d.counts)-1-i) * windowDuration)
150144
if windowTime.After(cutoffTime) {
151145
keepIndex = i
152146
break
@@ -158,9 +152,17 @@ func (d *Detector) analyzeTimeWindow() {
158152
}
159153

160154
// add windows until we reach current time
161-
for d.lastWindowEndTime.Add(time.Minute).Before(now) {
155+
for d.lastWindowEndTime.Add(windowDuration).Before(now) {
162156
d.counts = append(d.counts, 0)
163-
d.lastWindowEndTime = d.lastWindowEndTime.Add(time.Minute)
157+
d.lastWindowEndTime = d.lastWindowEndTime.Add(windowDuration)
158+
}
159+
160+
if len(d.counts) > 0 {
161+
d.counts[len(d.counts)-1] = currentCount
162+
} else {
163+
// Initialize first window
164+
d.counts = append(d.counts, currentCount)
165+
d.lastWindowEndTime = now.Truncate(windowDuration)
164166
}
165167

166168
// Check for anomalies using the fixed window counts

connector/throughputanomalyconnector/factory.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ func NewFactory() connector.Factory {
3535

3636
func createDefaultConfig() component.Config {
3737
return &Config{
38-
SampleInterval: 1 * time.Minute,
39-
MaxWindowAge: 1 * time.Hour,
40-
ZScoreThreshold: 3.0,
41-
MADThreshold: 3.5,
38+
AnalysisInterval: 1 * time.Minute,
39+
MaxWindowAge: 1 * time.Hour,
40+
ZScoreThreshold: 3.0,
41+
MADThreshold: 3.5,
4242
}
4343
}
4444

0 commit comments

Comments
 (0)