-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathinput.go
95 lines (79 loc) · 2.64 KB
/
input.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package helper
import (
"context"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/errors"
"github.com/observiq/stanza/operator"
"go.uber.org/zap"
)
// NewInputConfig creates a new input config with default values.
func NewInputConfig(operatorID, operatorType string) InputConfig {
return InputConfig{
LabelerConfig: NewLabelerConfig(),
IdentifierConfig: NewIdentifierConfig(),
WriterConfig: NewWriterConfig(operatorID, operatorType),
WriteTo: entry.NewRecordField(),
}
}
// InputConfig provides a basic implementation of an input operator config.
type InputConfig struct {
LabelerConfig `yaml:",inline"`
IdentifierConfig `yaml:",inline"`
WriterConfig `yaml:",inline"`
WriteTo entry.Field `json:"write_to" yaml:"write_to"`
}
// Build will build a base producer.
func (c InputConfig) Build(context operator.BuildContext) (InputOperator, error) {
writerOperator, err := c.WriterConfig.Build(context)
if err != nil {
return InputOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
labeler, err := c.LabelerConfig.Build()
if err != nil {
return InputOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
identifier, err := c.IdentifierConfig.Build()
if err != nil {
return InputOperator{}, errors.WithDetails(err, "operator_id", c.ID())
}
inputOperator := InputOperator{
Labeler: labeler,
Identifier: identifier,
WriterOperator: writerOperator,
WriteTo: c.WriteTo,
}
return inputOperator, nil
}
// InputOperator provides a basic implementation of an input operator.
type InputOperator struct {
Labeler
Identifier
WriterOperator
WriteTo entry.Field
}
// NewEntry will create a new entry using the `write_to`, `labels`, and `resource` configuration.
func (i *InputOperator) NewEntry(value interface{}) (*entry.Entry, error) {
entry := entry.New()
if err := entry.Set(i.WriteTo, value); err != nil {
return nil, errors.Wrap(err, "add record to entry")
}
if err := i.Label(entry); err != nil {
return nil, errors.Wrap(err, "add labels to entry")
}
if err := i.Identify(entry); err != nil {
return nil, errors.Wrap(err, "add resource keys to entry")
}
return entry, nil
}
// CanProcess will always return false for an input operator.
func (i *InputOperator) CanProcess() bool {
return false
}
// Process will always return an error if called.
func (i *InputOperator) Process(_ context.Context, entry *entry.Entry) error {
i.Errorw("Operator received an entry, but can not process", zap.Any("entry", entry))
return errors.NewError(
"Operator can not process logs.",
"Ensure that operator is not configured to receive logs from other operators",
)
}