-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathprocessor.go
executable file
·307 lines (279 loc) · 9.32 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Package go4data is a package that is used to create procescors that runs any kind of handler on a payload flow
// The payloads will be transferred between processors that has a relationship assigned
package go4data
import (
"context"
"errors"
"fmt"
"sync"
"github.com/percybolmer/go4data/handlers"
"github.com/percybolmer/go4data/metric"
"github.com/percybolmer/go4data/property"
"github.com/percybolmer/go4data/pubsub"
// Add shadow import to all known Handler categories?
_ "github.com/percybolmer/go4data/handlers/databases"
_ "github.com/percybolmer/go4data/handlers/files"
_ "github.com/percybolmer/go4data/handlers/filters"
// Removed network from shadow since we dont wanna force users to download libpcap
//_ "github.com/percybolmer/go4data/handlers/network"
_ "github.com/percybolmer/go4data/handlers/parsers"
_ "github.com/percybolmer/go4data/handlers/terminal"
)
// Processor is used to perform an Handler on each Item that is ingressed
type Processor struct {
// ID is a unique identifier for each processor,
ID uint `json:"id" yaml:"id"`
// Name is a user configured Name for a processor that can be used relatd to an Processor easier than an ID
Name string `json:"name" yaml:"name"`
// Running is a boolean indicator if the processor is currently Running
Running bool `json:"running" yaml:"running"`
// Workers is a int that determines how many Concurrent workers the processor should run
Workers int `json:"workers yaml:"workers"`
// FailureHandler is the failurehandler to use with the Processor
FailureHandler func(f Failure) `json:"-" yaml:"-"`
// Handler is the handler to Perform on the Payload received
Handler handlers.Handler `json:"handler" yaml:"handler"`
// Subscriptions is a slice of all the current Subscriptions
// A Subscription will input data into the Processor
subscriptions []*pubsub.Pipe
// Topics is the Topics to publish payload onto
Topics []string `json:"topics" yaml:"topics"`
// QueueSize is a integer of how many payloads are accepted on the Output channels to Subscribers
QueueSize int `json:"queuesize" yaml:"queuesize"`
// Metric is used to store metrics
Metric metric.Provider `json:"-" yaml:"-"`
//cancel is used by the processor the handle cancellation
cancel context.CancelFunc
sync.Mutex `json:"-" yaml:"-"`
}
var (
//IDCounter is used to make sure no processors are generated with a ID that already exists
IDCounter uint = 1
// DefaultQueueSize is a limit set to define how many payloads can be sent in queue
DefaultQueueSize = 1000
//ErrProcessorHasNoHandlerApplied is when starting a processor that has a nil Handler
ErrProcessorHasNoHandlerApplied = errors.New("the processor has no Handler set. Please assign a Handler to it before running")
//ErrNilContext not allowed
ErrNilContext = errors.New("nil context is not allowed when starting a processor")
//ErrProcessorAlreadyStopped is when trying to stop a processor that is alrady stopped
ErrProcessorAlreadyStopped = errors.New("the processor is already stopped")
//ErrRequiredPropertiesNotFulfilled is when trying to start a Handler but it needs additional properties
ErrRequiredPropertiesNotFulfilled = errors.New("the Handler needs additional properties to work, see the Handlers documentation")
//ErrHandlerDoesNotAcceptPublishers is when trying to register an publisher to a processor that has a selfpublishing Handler
ErrHandlerDoesNotAcceptPublishers = errors.New("the used Handler does not allow publishers")
//ErrDuplicateTopic is when trying to register an duplicate TOPIC to publish to
ErrDuplicateTopic = errors.New("the topic is already registered")
// ErrFailedToUnmarshal is thrown when trying to unmarshal go4datas but it fails
ErrFailedToUnmarshal = errors.New("failed to unmarshal since data provided is not correct")
)
// NewID is used to generate a new ID
func NewID() uint {
IDCounter++
return IDCounter - 1
}
// NewProcessor is used to spawn a new processor
// You need to set a registered Handler or it will return an error
// Topics is a vararg that allows you to insert any topic you want the processor
// to publish its payloads to
func NewProcessor(name string, topics ...string) *Processor {
proc := &Processor{
ID: NewID(),
Name: name,
FailureHandler: PrintFailure,
Handler: nil,
Workers: 1,
subscriptions: make([]*pubsub.Pipe, 0),
Topics: make([]string, 0),
QueueSize: DefaultQueueSize,
Metric: metric.NewPrometheusProvider(),
}
if len(topics) != 0 {
proc.Topics = append(proc.Topics, topics...)
}
return proc
}
// Start will run a Processor and execute the given Handler on any incomming payloads
func (p *Processor) Start(ctx context.Context) error {
// IsRunning? Skip
if p.Running {
return nil
}
// Validate Settings of Handler
if p.Handler == nil {
return ErrProcessorHasNoHandlerApplied
}
if ctx == nil {
return ErrNilContext
}
if ok, _ := p.Handler.ValidateConfiguration(); !ok {
return ErrRequiredPropertiesNotFulfilled
}
c, cancel := context.WithCancel(ctx)
p.cancel = cancel
err := p.Handler.SetMetricProvider(p.Metric, fmt.Sprintf("%s_%d", p.Name, p.ID))
if err != nil {
return err
}
if p.Handler.Subscriptionless() {
go p.HandleSubscriptionless(c)
} else {
for _, sub := range p.subscriptions {
go p.handleSubscription(c, sub)
}
}
// Start listening on Handler errorChannel and transform errors into Failures and apply Failurehandler on em
go p.MonitorErrChannel(c)
p.Running = true
return nil
}
// MonitorErrChannel is used to monitor errorchannel of a handler if its not nil
func (p *Processor) MonitorErrChannel(ctx context.Context) {
p.Lock()
errChan := p.Handler.GetErrorChannel()
p.Unlock()
if errChan == nil {
return
}
for {
select {
case <-ctx.Done():
return
case err := <-errChan:
p.FailureHandler(Failure{
Err: err,
Payload: nil,
Processor: p.ID,
})
}
}
}
// Stop will cancel the goroutines running
func (p *Processor) Stop() error {
if !p.Running {
return ErrProcessorAlreadyStopped
} else if p.cancel == nil {
return ErrProcessorAlreadyStopped
}
p.cancel()
p.Running = false
return nil
}
// GetConfiguration is just an reacher for Handlers getcfg
func (p *Processor) GetConfiguration() *property.Configuration {
return p.Handler.GetConfiguration()
}
// SetID is a way to overwrite the generated ID, this is mostly used when Loading Processors from a Daisy file
func (p *Processor) SetID(i uint) {
p.ID = i
}
// SetName will change the name of the processor
func (p *Processor) SetName(n string) {
p.Name = n
}
// SetHandler will change the Handler the Processor performs on incomming payloads
// Should hot reloading like this be ok? Do we need to Stop / Start the proccessor after?
func (p *Processor) SetHandler(a handlers.Handler) {
p.Lock()
p.Handler = a
p.Unlock()
}
// HandleSubscriptionless is used to handle Handlers that has no requirement of subscriptions
func (p *Processor) HandleSubscriptionless(ctx context.Context) {
err := p.Handler.Handle(ctx, nil, p.Topics...)
if err != nil {
p.FailureHandler(Failure{
Err: err,
Payload: nil,
Processor: p.ID,
})
}
}
// handleSubscription is used to run the
// assigned Handler on incomming payloads
func (p *Processor) handleSubscription(ctx context.Context, sub *pubsub.Pipe) {
for w := 1; w <= p.Workers; w++ {
go p.runHandle(ctx, sub)
}
// Lock so we can close on ctx
for {
select {
case <-ctx.Done():
return
}
}
}
// runHandle is used to execute the processors set handler on a payload, will be started concurrently by handleSubscription
func (p *Processor) runHandle(ctx context.Context, jobs *pubsub.Pipe) {
for {
select {
case payload := <-jobs.Flow:
err := p.Handler.Handle(ctx, payload, p.Topics...)
if err != nil {
p.Metric.IncrementMetric(fmt.Sprintf("%s_%d_failures", p.Name, p.ID), 1)
p.FailureHandler(Failure{
Err: err,
Payload: payload,
Processor: p.ID,
})
}
case <-ctx.Done():
return
}
}
}
// Subscribe will subscribe to a certain topic and make the Processor
// Ingest its payloads into it
func (p *Processor) Subscribe(topics ...string) error {
for _, sub := range p.subscriptions {
for _, topic := range topics {
if sub.Topic == topic {
return ErrDuplicateTopic
}
}
}
for _, topic := range topics {
pipe, err := pubsub.Subscribe(topic, p.ID, p.QueueSize)
if err != nil {
return err
}
p.Lock()
p.subscriptions = append(p.subscriptions, pipe)
p.Unlock()
}
return nil
}
// AddTopics will add Topics to publish onto
func (p *Processor) AddTopics(topics ...string) error {
for _, topic := range topics {
for _, currenttop := range p.Topics {
if topic == currenttop {
return ErrDuplicateTopic
}
}
}
p.Lock()
p.Topics = append(p.Topics, topics...)
p.Unlock()
return nil
}
// ConvertToLoader is actually just a way too convert into a savable format
func (p *Processor) ConvertToLoader() *LoaderProccessor {
// Convert Subscription pipelines into []string
var subnames []string
for _, sub := range p.subscriptions {
subnames = append(subnames, sub.Topic)
}
return &LoaderProccessor{
ID: p.ID,
Name: p.Name,
QueueSize: p.QueueSize,
Workers: p.Workers,
Running: p.Running,
Topics: p.Topics,
Subscriptions: subnames,
Handler: LoaderHandler{
Cfg: p.Handler.GetConfiguration(),
Name: p.Handler.GetHandlerName(),
},
}
}