@@ -33,12 +33,6 @@ import (
33
33
"vitess.io/vitess/go/vt/vtorc/config"
34
34
)
35
35
36
- // QueueMetric contains the queue's active and queued sizes
37
- type QueueMetric struct {
38
- Active int
39
- Queued int
40
- }
41
-
42
36
// Queue contains information for managing discovery requests
43
37
type Queue struct {
44
38
sync.Mutex
@@ -48,67 +42,16 @@ type Queue struct {
48
42
queue chan string
49
43
queuedKeys map [string ]time.Time
50
44
consumedKeys map [string ]time.Time
51
- metrics []QueueMetric
52
- }
53
-
54
- // DiscoveryQueue contains the discovery queue which can then be accessed via an API call for monitoring.
55
- // Currently this is accessed by ContinuousDiscovery() but also from http api calls.
56
- // I may need to protect this better?
57
- var discoveryQueue map [string ](* Queue )
58
- var dcLock sync.Mutex
59
-
60
- func init () {
61
- discoveryQueue = make (map [string ](* Queue ))
62
45
}
63
46
64
- // CreateOrReturnQueue allows for creation of a new discovery queue or
65
- // returning a pointer to an existing one given the name.
66
- func CreateOrReturnQueue (name string ) * Queue {
67
- dcLock .Lock ()
68
- defer dcLock .Unlock ()
69
- if q , found := discoveryQueue [name ]; found {
70
- return q
71
- }
72
-
73
- q := & Queue {
47
+ // CreateQueue allows for creation of a new discovery queue
48
+ func CreateQueue (name string ) * Queue {
49
+ return & Queue {
74
50
name : name ,
75
51
queuedKeys : make (map [string ]time.Time ),
76
52
consumedKeys : make (map [string ]time.Time ),
77
53
queue : make (chan string , config .DiscoveryQueueCapacity ),
78
54
}
79
- go q .startMonitoring ()
80
-
81
- discoveryQueue [name ] = q
82
-
83
- return q
84
- }
85
-
86
- // monitoring queue sizes until we are told to stop
87
- func (q * Queue ) startMonitoring () {
88
- log .Infof ("Queue.startMonitoring(%s)" , q .name )
89
- ticker := time .NewTicker (time .Second ) // hard-coded at every second
90
-
91
- for {
92
- select {
93
- case <- ticker .C : // do the periodic expiry
94
- q .collectStatistics ()
95
- case <- q .done :
96
- return
97
- }
98
- }
99
- }
100
-
101
- // do a check of the entries in the queue, both those active and queued
102
- func (q * Queue ) collectStatistics () {
103
- q .Lock ()
104
- defer q .Unlock ()
105
-
106
- q .metrics = append (q .metrics , QueueMetric {Queued : len (q .queuedKeys ), Active : len (q .consumedKeys )})
107
-
108
- // remove old entries if we get too big
109
- if len (q .metrics ) > config .DiscoveryQueueMaxStatisticsSize {
110
- q .metrics = q .metrics [len (q .metrics )- config .DiscoveryQueueMaxStatisticsSize :]
111
- }
112
55
}
113
56
114
57
// QueueLen returns the length of the queue (channel size + queued size)
0 commit comments