@@ -27,15 +27,27 @@ import (
27
27
"time"
28
28
)
29
29
30
- // maxDequeFlushPeriod specifies the maximum period
30
+ // maxQueueTTLPeriod specifies the maximum period
31
31
// for the events to reside in the queue.
32
- var maxDequeFlushPeriod = time .Second * 30
32
+ var maxQueueTTLPeriod = time .Second * 10
33
33
34
34
// flusherInterval specifies the interval for the queue flushing.
35
35
var flusherInterval = time .Second * 5
36
36
37
- // callstackFlushes computes overall callstack dequeue flushes
38
- var callstackFlushes = expvar .NewInt ("callstack.flushes" )
37
+ // stackwalkFlushes computes overall flushes for unmatched stackwalk events
38
+ var stackwalkFlushes = expvar .NewInt ("stackwalk.flushes" )
39
+
40
+ // stackwalkFlushesProcs computes overall flushes for unmatched stackwalk events per process
41
+ var stackwalkFlushesProcs = expvar .NewMap ("stackwalk.flushes.procs" )
42
+
43
+ // stackwalkFlushesEvents computes overall flushes for unmatched stackwalks per event type
44
+ var stackwalkFlushesEvents = expvar .NewMap ("stackwalk.flushes.events" )
45
+
46
+ // stackwalkEnqueued counts the number of enqueued events in individual buckets
47
+ var stackwalkEnqueued = expvar .NewInt ("stackwalk.enqueued" )
48
+
49
+ // stackwalkBuckets counts the number of overall stackwalk buckets per stack id
50
+ var stackwalkBuckets = expvar .NewInt ("stackwalk.buckets" )
39
51
40
52
// StackwalkDecorator maintains a FIFO queue where events
41
53
// eligible for stack enrichment are queued. Upon arrival
@@ -80,6 +92,9 @@ func (s *StackwalkDecorator) Push(e *Kevent) {
80
92
} else {
81
93
s .buckets [id ] = append (q , e )
82
94
}
95
+
96
+ stackwalkBuckets .Set (int64 (len (s .buckets )))
97
+ stackwalkEnqueued .Add (int64 (len (s .buckets [id ])))
83
98
}
84
99
85
100
// Pop receives the stack walk event and pops the oldest
@@ -99,6 +114,7 @@ func (s *StackwalkDecorator) Pop(e *Kevent) *Kevent {
99
114
var evt * Kevent
100
115
if len (q ) > 0 {
101
116
evt , s .buckets [id ] = q [0 ], q [1 :]
117
+ stackwalkEnqueued .Add (- int64 (len (s .buckets [id ])))
102
118
}
103
119
104
120
if evt == nil {
@@ -121,6 +137,7 @@ func (s *StackwalkDecorator) RemoveBucket(id uint64) {
121
137
s .mux .Lock ()
122
138
defer s .mux .Unlock ()
123
139
delete (s .buckets , id )
140
+ stackwalkBuckets .Set (int64 (len (s .buckets )))
124
141
}
125
142
126
143
func (s * StackwalkDecorator ) doFlush () {
@@ -138,8 +155,8 @@ func (s *StackwalkDecorator) doFlush() {
138
155
}
139
156
140
157
// flush pushes events to the event queue if they have
141
- // been living in the deque more than the maximum allowed
142
- // flush period.
158
+ // been living in the queue more than the maximum allowed
159
+ // TTL period.
143
160
func (s * StackwalkDecorator ) flush () []error {
144
161
s .mux .Lock ()
145
162
defer s .mux .Unlock ()
@@ -152,15 +169,22 @@ func (s *StackwalkDecorator) flush() []error {
152
169
153
170
for id , q := range s .buckets {
154
171
for i , evt := range q {
155
- if time .Since (evt .Timestamp ) < maxDequeFlushPeriod {
172
+ if time .Since (evt .Timestamp ) < maxQueueTTLPeriod {
156
173
continue
157
174
}
158
- callstackFlushes .Add (1 )
175
+ stackwalkFlushes .Add (1 )
159
176
err := s .q .push (evt )
160
177
if err != nil {
161
178
errs = append (errs , err )
162
179
}
163
180
s .buckets [id ] = append (q [:i ], q [i + 1 :]... )
181
+ if stackwalkEnqueued .Value () > 0 {
182
+ stackwalkEnqueued .Add (- 1 )
183
+ }
184
+ if evt .PS != nil {
185
+ stackwalkFlushesProcs .Add (evt .PS .Name , 1 )
186
+ }
187
+ stackwalkFlushesEvents .Add (evt .Name , 1 )
164
188
}
165
189
}
166
190
0 commit comments