forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnats_consumer.go
215 lines (178 loc) · 4.95 KB
/
nats_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package natsconsumer
import (
"fmt"
"log"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/nats-io/nats"
)
type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}
func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}
type natsConsumer struct {
QueueGroup string
Subjects []string
Servers []string
Secure bool
// Client pending limits:
PendingMessageLimit int
PendingBytesLimit int
// Legacy metric buffer support
MetricBuffer int
parser parsers.Parser
sync.Mutex
wg sync.WaitGroup
Conn *nats.Conn
Subs []*nats.Subscription
// channel for all incoming NATS messages
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
done chan struct{}
acc telegraf.Accumulator
}
var sampleConfig = `
## urls of NATS servers
# servers = ["nats://localhost:4222"]
## Use Transport Layer Security
# secure = false
## subject(s) to consume
# subjects = ["telegraf"]
## name a queue group
# queue_group = "telegraf_consumers"
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func (n *natsConsumer) SampleConfig() string {
return sampleConfig
}
func (n *natsConsumer) Description() string {
return "Read metrics from NATS subject(s)"
}
func (n *natsConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
}
func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
n.acc = acc
var connectErr error
// set default NATS connection options
opts := nats.DefaultOptions
// override max reconnection tries
opts.MaxReconnect = -1
// override servers if any were specified
opts.Servers = n.Servers
opts.Secure = n.Secure
if n.Conn == nil || n.Conn.IsClosed() {
n.Conn, connectErr = opts.Connect()
if connectErr != nil {
return connectErr
}
// Setup message and error channels
n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler)
n.in = make(chan *nats.Msg, 1000)
for _, subj := range n.Subjects {
sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}
// ensure that the subscription has been processed by the server
if err = n.Conn.Flush(); err != nil {
return err
}
// set the subscription pending limits
if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil {
return err
}
n.Subs = append(n.Subs, sub)
}
}
n.done = make(chan struct{})
// Start the message reader
n.wg.Add(1)
go n.receiver()
log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
return nil
}
// receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics.
func (n *natsConsumer) receiver() {
defer n.wg.Done()
for {
select {
case <-n.done:
return
case err := <-n.errs:
n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
}
for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
}
}
func (n *natsConsumer) clean() {
for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil {
n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error()))
}
}
if n.Conn != nil && !n.Conn.IsClosed() {
n.Conn.Close()
}
}
func (n *natsConsumer) Stop() {
n.Lock()
close(n.done)
n.wg.Wait()
n.clean()
n.Unlock()
}
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{
Servers: []string{"nats://localhost:4222"},
Secure: false,
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
}
})
}