-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathbroker.go
604 lines (505 loc) · 16.7 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package eventlogger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-multierror"
)
// RegistrationPolicy is used to specify what kind of policy should apply when
// registering components (e.g. Pipeline, Node) with the Broker
type RegistrationPolicy string
const (
AllowOverwrite RegistrationPolicy = "AllowOverwrite"
DenyOverwrite RegistrationPolicy = "DenyOverwrite"
)
// Broker is the top-level entity used in the library for configuring the system
// and for sending events.
//
// Brokers have registered Nodes which may be composed into registered Pipelines
// for EventTypes.
//
// A Node may be a filter, formatter or sink (see NodeType).
//
// A Broker may have multiple Pipelines.
//
// EventTypes may have multiple Pipelines.
//
// A Pipeline for an EventType may contain multiple filters, one formatter and
// one sink.
//
// If a Pipeline does not have a formatter, then the event will not be written
// to the Sink.
//
// A Node can be shared across multiple pipelines.
type Broker struct {
nodes map[NodeID]*nodeUsage
graphs map[EventType]*graph
lock sync.RWMutex
*clock
}
// nodeUsage tracks how many times a Node is referenced by registered pipelines.
type nodeUsage struct {
node Node
referenceCount int
registrationPolicy RegistrationPolicy
}
// Option allows options to be passed as arguments.
type Option func(*options) error
// options are used to represent configuration for the broker.
type options struct {
withPipelineRegistrationPolicy RegistrationPolicy
withNodeRegistrationPolicy RegistrationPolicy
}
// getDefaultOptions returns a set of default options
func getDefaultOptions() options {
return options{
withPipelineRegistrationPolicy: AllowOverwrite,
withNodeRegistrationPolicy: AllowOverwrite,
}
}
// getOpts iterates the inbound Options and returns a struct.
// Each Option is applied in the order it appears in the argument list, so it is
// possible to supply the same Option numerous times and the 'last write wins'.
func getOpts(opt ...Option) (options, error) {
opts := getDefaultOptions()
for _, o := range opt {
if o == nil {
continue
}
if err := o(&opts); err != nil {
return options{}, err
}
}
return opts, nil
}
// WithPipelineRegistrationPolicy configures the option that determines the pipeline registration policy.
func WithPipelineRegistrationPolicy(policy RegistrationPolicy) Option {
return func(o *options) error {
var err error
switch policy {
case AllowOverwrite, DenyOverwrite:
o.withPipelineRegistrationPolicy = policy
default:
err = fmt.Errorf("'%s' is not a valid pipeline registration policy: %w", policy, ErrInvalidParameter)
}
return err
}
}
// WithNodeRegistrationPolicy configures the option that determines the node registration policy.
func WithNodeRegistrationPolicy(policy RegistrationPolicy) Option {
return func(o *options) error {
var err error
switch policy {
case AllowOverwrite, DenyOverwrite:
o.withNodeRegistrationPolicy = policy
default:
err = fmt.Errorf("'%s' is not a valid node registration policy: %w", policy, ErrInvalidParameter)
}
return err
}
}
// NewBroker creates a new Broker applying any relevant supplied options.
// Options are currently accepted, but none are applied.
func NewBroker(_ ...Option) (*Broker, error) {
b := &Broker{
nodes: make(map[NodeID]*nodeUsage),
graphs: make(map[EventType]*graph),
}
return b, nil
}
// clock only exists to make testing simpler.
type clock struct {
now time.Time
}
// Now returns the current time
func (c *clock) Now() time.Time {
if c == nil {
return time.Now()
}
return c.now
}
// StopTimeAt allows you to "stop" the Broker's timestamp clock at a predicable
// point in time, so timestamps are predictable for testing.
func (b *Broker) StopTimeAt(now time.Time) {
b.clock = &clock{now: now}
}
// Status describes the result of a Send.
type Status struct {
// complete lists the IDs of 'filter' and 'sink' type nodes that successfully
// processed the Event, resulting in immediate completion of a particular Pipeline.
complete []NodeID
// completeSinks lists the IDs of 'sink' type nodes that successfully processed
// the Event, resulting in immediate completion of a particular Pipeline.
completeSinks []NodeID
// Warnings lists any non-fatal errors that occurred while sending an Event.
Warnings []error
}
// Complete returns the IDs of 'filter' and 'sink' type nodes that successfully
// processed the Event, resulting in immediate completion of a particular Pipeline.
func (s Status) Complete() []NodeID {
return s.complete
}
// CompleteSinks returns the IDs of 'sink' type nodes that successfully processed
// the Event, resulting in immediate completion of a particular Pipeline.
func (s Status) CompleteSinks() []NodeID {
return s.completeSinks
}
func (s Status) getError(ctxErr error, threshold, thresholdSinks int) error {
var err error
switch {
case len(s.complete) < threshold:
err = fmt.Errorf("event not processed by enough 'filter' and 'sink' nodes")
case len(s.completeSinks) < thresholdSinks:
err = fmt.Errorf("event not processed by enough 'sink' nodes")
default:
return nil
}
return errors.Join(err, ctxErr)
}
// Send writes an event of type t to all registered pipelines concurrently and
// reports on the result. An error will only be returned if a pipeline's delivery
// policies could not be satisfied.
func (b *Broker) Send(ctx context.Context, t EventType, payload interface{}) (Status, error) {
b.lock.RLock()
g, ok := b.graphs[t]
b.lock.RUnlock()
if !ok {
return Status{}, fmt.Errorf("no graph for EventType %s", t)
}
e := &Event{
Type: t,
CreatedAt: b.clock.Now(),
Formatted: make(map[string][]byte),
Payload: payload,
}
return g.process(ctx, e)
}
// Reopen calls every registered Node's Reopen() function. The intention is to
// ask all nodes to reopen any files they have open. This is typically used as
// part of log rotation: after rotating, the rotator sends a signal to the
// application, which then would invoke this method. Another typically use-case
// is to have all Nodes reevaluated any external configuration they might have.
func (b *Broker) Reopen(ctx context.Context) error {
b.lock.RLock()
defer b.lock.RUnlock()
for _, g := range b.graphs {
if err := g.reopen(ctx); err != nil {
return err
}
}
return nil
}
// NodeID is a string that uniquely identifies a Node.
type NodeID string
// RegisterNode assigns a node ID to a node. Node IDs should be unique. A Node
// may be a filter, formatter or sink (see NodeType). Nodes can be shared across
// multiple pipelines.
// Accepted options: WithNodeRegistrationPolicy (default: AllowOverwrite).
func (b *Broker) RegisterNode(id NodeID, node Node, opt ...Option) error {
if id == "" {
return fmt.Errorf("unable to register node, node ID cannot be empty: %w", ErrInvalidParameter)
}
opts, err := getOpts(opt...)
if err != nil {
return fmt.Errorf("cannot register node: %w", err)
}
b.lock.Lock()
defer b.lock.Unlock()
nr := &nodeUsage{
node: node,
referenceCount: 0,
registrationPolicy: opts.withNodeRegistrationPolicy,
}
// Check if this node is already registered, if so maintain reference count
r, exists := b.nodes[id]
if exists {
switch r.registrationPolicy {
case AllowOverwrite:
nr.referenceCount = r.referenceCount
case DenyOverwrite:
return fmt.Errorf("node ID %q is already registered, configured policy prevents overwriting", id)
}
}
b.nodes[id] = nr
return nil
}
// RemoveNode will remove a node from the broker, if it is not currently in use
// This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call
// referencing those nodes
func (b *Broker) RemoveNode(ctx context.Context, id NodeID) error {
b.lock.Lock()
defer b.lock.Unlock()
return b.removeNode(ctx, id, false)
}
// removeNode will remove a node from the broker, if it is not currently in use.
// This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call
// referencing those nodes
// The force option can be used to decrement the count for the node if it's still in use by pipelines
// This function assumes that the caller holds a lock
func (b *Broker) removeNode(ctx context.Context, id NodeID, force bool) error {
if id == "" {
return fmt.Errorf("unable to remove node, node ID cannot be empty: %w", ErrInvalidParameter)
}
nodeUsage, ok := b.nodes[id]
if !ok {
return fmt.Errorf("%w: %q", ErrNodeNotFound, id)
}
// if force is passed, then decrement the count for this node instead of failing
if nodeUsage.referenceCount > 0 && !force {
return fmt.Errorf("cannot remove node, as it is still in use by 1 or more pipelines: %q", id)
}
var err error
switch nodeUsage.referenceCount {
case 0, 1:
nc := NewNodeController(nodeUsage.node)
if err = nc.Close(ctx); err != nil {
err = fmt.Errorf("unable to close node ID %q: %w", id, err)
}
delete(b.nodes, id)
default:
nodeUsage.referenceCount--
}
return err
}
// PipelineID is a string that uniquely identifies a Pipeline within a given EventType.
type PipelineID string
// Pipeline defines a pipe: its ID, the EventType it's for, and the nodes
// that it contains. Nodes can be shared across multiple pipelines.
type Pipeline struct {
// PipelineID uniquely identifies the Pipeline
PipelineID PipelineID
// EventType defines the type of event the Pipeline processes
EventType EventType
// NodeIDs defines Pipeline's the list of nodes
NodeIDs []NodeID
}
// RegisterPipeline adds a pipeline to the broker.
// Accepted options: WithPipelineRegistrationPolicy (default: AllowOverwrite).
func (b *Broker) RegisterPipeline(def Pipeline, opt ...Option) error {
err := def.validate()
if err != nil {
return err
}
opts, err := getOpts(opt...)
if err != nil {
return fmt.Errorf("cannot register pipeline: %w", err)
}
b.lock.Lock()
defer b.lock.Unlock()
g, exists := b.graphs[def.EventType]
if !exists {
g = &graph{}
b.graphs[def.EventType] = g
}
// Get the configured policy
pol := AllowOverwrite
g.roots.Range(func(key PipelineID, v *registeredPipeline) bool {
if key == def.PipelineID {
pol = v.registrationPolicy
return false
}
return true
})
if pol == DenyOverwrite {
return fmt.Errorf("pipeline ID %q is already registered, configured policy prevents overwriting", def.PipelineID)
}
// Gather the registered nodes, so they can be referenced for this pipeline.
nodes := make([]Node, len(def.NodeIDs))
for i, n := range def.NodeIDs {
nodeUsage, ok := b.nodes[n]
if !ok {
return fmt.Errorf("node ID %q not registered", n)
}
nodes[i] = nodeUsage.node
}
root, err := linkNodes(nodes, def.NodeIDs)
if err != nil {
return err
}
err = g.doValidate(nil, root)
if err != nil {
return err
}
// Create the pipeline registration using the optional policy (or default).
pipelineReg := ®isteredPipeline{
rootNode: root,
registrationPolicy: opts.withPipelineRegistrationPolicy,
}
// Store the pipeline and then update the reference count of the nodes in that pipeline.
g.roots.Store(def.PipelineID, pipelineReg)
for _, id := range def.NodeIDs {
nodeUsage, ok := b.nodes[id]
// We can be optimistic about this as we would have already errored above.
if ok {
nodeUsage.referenceCount++
}
}
return nil
}
// RemovePipeline removes a pipeline from the broker.
func (b *Broker) RemovePipeline(t EventType, id PipelineID) error {
switch {
case t == "":
return errors.New("event type cannot be empty")
case id == "":
return errors.New("pipeline ID cannot be empty")
}
b.lock.Lock()
defer b.lock.Unlock()
g, ok := b.graphs[t]
if !ok {
return fmt.Errorf("no graph for EventType %s", t)
}
g.roots.Delete(id)
return nil
}
// RemovePipelineAndNodes will attempt to remove all nodes referenced by the pipeline.
// Any nodes that are referenced by other pipelines will not be removed.
//
// Failed preconditions will result in a return of false with an error and
// neither the pipeline nor nodes will be deleted.
//
// Once we start deleting the pipeline and nodes, we will continue until completion,
// but we'll return true along with any errors encountered (as multierror.Error).
func (b *Broker) RemovePipelineAndNodes(ctx context.Context, t EventType, id PipelineID) (bool, error) {
switch {
case t == "":
return false, errors.New("event type cannot be empty")
case id == "":
return false, errors.New("pipeline ID cannot be empty")
}
b.lock.Lock()
defer b.lock.Unlock()
g, ok := b.graphs[t]
if !ok {
return false, fmt.Errorf("no graph for EventType %s", t)
}
nodes, err := g.roots.Nodes(id)
if err != nil {
return false, fmt.Errorf("unable to retrieve all nodes referenced by pipeline ID %q: %w", id, err)
}
g.roots.Delete(id)
var nodeErr error
for _, nodeID := range nodes {
err = b.removeNode(ctx, nodeID, true)
if err != nil {
nodeErr = multierror.Append(nodeErr, err)
}
}
return true, nodeErr
}
// SetSuccessThreshold sets the success threshold per EventType. For the
// overall processing of a given event to be considered a success, at least as
// many pipelines as the threshold value must successfully process the event.
// This means that a filter could of course filter an event before it reaches
// the pipeline's sink, but it would still count as success when it comes to
// meeting this threshold. Use this when you want to allow the filtering of
// events without causing an error because an event was filtered.
func (b *Broker) SetSuccessThreshold(t EventType, successThreshold int) error {
switch {
case t == "":
return errors.New("event type cannot be empty")
case successThreshold < 0:
return fmt.Errorf("successThreshold must be 0 or greater")
}
b.lock.Lock()
defer b.lock.Unlock()
g, ok := b.graphs[t]
if !ok {
g = &graph{}
b.graphs[t] = g
}
g.successThreshold = successThreshold
return nil
}
// SetSuccessThresholdSinks sets the success threshold per EventType. For the
// overall processing of a given event to be considered a success, at least as
// many sinks as the threshold value must successfully process the event.
func (b *Broker) SetSuccessThresholdSinks(t EventType, successThresholdSinks int) error {
switch {
case t == "":
return errors.New("event type cannot be empty")
case successThresholdSinks < 0:
return fmt.Errorf("successThresholdSinks must be 0 or greater")
}
b.lock.Lock()
defer b.lock.Unlock()
g, ok := b.graphs[t]
if !ok {
g = &graph{}
b.graphs[t] = g
}
g.successThresholdSinks = successThresholdSinks
return nil
}
// SuccessThreshold returns the configured success threshold per EventType.
// For the overall processing of a given event to be considered a success, at least
// as many filter or sink nodes as the threshold value must successfully process
// the event.
// The threshold is returned (default: 0), along with a boolean indicating whether
// the EventType was registered with the broker, if true, the threshold is accurate
// for the specified EventType.
func (b *Broker) SuccessThreshold(t EventType) (int, bool) {
b.lock.RLock()
defer b.lock.RUnlock()
g, ok := b.graphs[t]
if ok {
return g.successThreshold, true
}
return 0, false
}
// SuccessThresholdSinks returns the configured success threshold per EventType.
// For the overall processing of a given event to be considered a success, at least
// as many sink nodes as the threshold value must successfully process the event.
// The threshold is returned (default: 0), along with a boolean indicating whether
// the EventType was registered with the broker, if true, the threshold is accurate
// for the specified EventType.
func (b *Broker) SuccessThresholdSinks(t EventType) (int, bool) {
b.lock.RLock()
defer b.lock.RUnlock()
g, ok := b.graphs[t]
if ok {
return g.successThresholdSinks, true
}
return 0, false
}
// IsAnyPipelineRegistered returns whether a pipeline for a given event type is already registered or not.
func (b *Broker) IsAnyPipelineRegistered(e EventType) bool {
b.lock.RLock()
defer b.lock.RUnlock()
g, found := b.graphs[e]
if !found {
return false
}
found = false
g.roots.Range(func(_ PipelineID, pipeline *registeredPipeline) bool {
found = true
return false
})
return found
}
// validate ensures that the Pipeline has the required configuration to allow
// registration, removal or usage, without issue.
func (p Pipeline) validate() error {
var err error
if p.PipelineID == "" {
err = multierror.Append(err, errors.New("pipeline ID is required"))
}
if p.EventType == "" {
err = multierror.Append(err, errors.New("event type is required"))
}
if len(p.NodeIDs) == 0 {
err = multierror.Append(err, errors.New("node IDs are required"))
}
for _, n := range p.NodeIDs {
if n == "" {
err = multierror.Append(err, errors.New("node ID cannot be empty"))
break
}
}
return err
}