diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index e8941b3bdfc7..f10c9b349042 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -7,6 +7,7 @@ import ( "bufio" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type customFactory struct { @@ -17,10 +18,13 @@ type customFactory struct { var _ Factory = (*customFactory)(nil) func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory { - return &customFactory{flusherCfg: flusherCfg, splitFunc: splitFunc} + return &customFactory{ + flusherCfg: flusherCfg, + splitFunc: splitFunc, + } } // Build builds Multiline Splitter struct func (f *customFactory) Build() (bufio.SplitFunc, error) { - return f.flusherCfg.Wrap(f.splitFunc), nil + return f.flusherCfg.Wrap(f.splitFunc, trim.Whitespace(true, true)), nil } diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 191d15a9d50a..cf60af119a37 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -25,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) const ( @@ -83,7 +84,8 @@ type BaseConfig struct { type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error) func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, int(c.MaxLogSize)) + trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces) + splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 4072fa7564c2..cdb2ac539f93 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) const ( @@ -91,7 +92,8 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } // Build multiline - splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, MaxUDPSize) + trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces) + splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc) if err != nil { return nil, err } diff --git a/pkg/stanza/tokenize/flusher.go b/pkg/stanza/tokenize/flusher.go index dea09214d85c..60a44d4f62a8 100644 --- a/pkg/stanza/tokenize/flusher.go +++ b/pkg/stanza/tokenize/flusher.go @@ -6,6 +6,8 @@ package tokenize // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) const DefaultFlushPeriod = 500 * time.Millisecond @@ -24,13 +26,13 @@ func NewFlusherConfig() FlusherConfig { } // Wrap a bufio.SplitFunc with a flusher -func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc) bufio.SplitFunc { +func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc { f := &flusher{ lastDataChange: time.Now(), forcePeriod: c.Period, previousDataLength: 0, } - return f.splitFunc(splitFunc) + return f.splitFunc(splitFunc, trimFunc) } // flusher keeps information about flush state @@ -71,7 +73,7 @@ func (f *flusher) shouldFlush() bool { return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } -func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { +func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) @@ -91,7 +93,7 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { if f.shouldFlush() { // Inform flusher that we just flushed f.flushed() - token = trimWhitespacesFunc(data) + token = trimFunc(data) advance = len(data) return } diff --git a/pkg/stanza/tokenize/multiline.go b/pkg/stanza/tokenize/multiline.go index 8b9a7a20ac07..8496d56090c9 100644 --- a/pkg/stanza/tokenize/multiline.go +++ b/pkg/stanza/tokenize/multiline.go @@ -10,6 +10,8 @@ import ( "regexp" "golang.org/x/text/encoding" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // Multiline consists of splitFunc and variables needed to perform force flush @@ -32,12 +34,12 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, maxLogSize int) (bufio.SplitFunc, error) { - return c.getSplitFunc(enc, flushAtEOF, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces) +func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { + return c.getSplitFunc(enc, flushAtEOF, maxLogSize, trimFunc) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -54,7 +56,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma case enc == encoding.Nop: return NoSplitFunc(maxLogSize), nil case endPattern == "" && startPattern == "": - splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, trimFunc) if err != nil { return nil, err } @@ -63,13 +65,13 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma if err != nil { return nil, fmt.Errorf("compile line end regex: %w", err) } - splitFunc = LineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc = LineEndSplitFunc(re, flushAtEOF, trimFunc) case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %w", err) } - splitFunc = LineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc) default: return nil, fmt.Errorf("unreachable") } @@ -78,7 +80,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma // LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { +func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { @@ -132,7 +134,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) b // LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { +func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { @@ -160,7 +162,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) buf // NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) { +func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func) (bufio.SplitFunc, error) { newline, err := encodedNewline(enc) if err != nil { return nil, err @@ -224,45 +226,3 @@ func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) { nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\r'}, true) return out[:nDst], err } - -type trimFunc func([]byte) []byte - -func noTrim(token []byte) []byte { - return token -} - -func trimLeadingWhitespacesFunc(data []byte) []byte { - // TrimLeft to strip EOF whitespaces in case of using $ in regex - // For some reason newline and carriage return are being moved to beginning of next log - token := bytes.TrimLeft(data, "\r\n\t ") - if token == nil { - return []byte{} - } - return token -} - -func trimTrailingWhitespacesFunc(data []byte) []byte { - // TrimRight to strip all whitespaces from the end of log - token := bytes.TrimRight(data, "\r\n\t ") - if token == nil { - return []byte{} - } - return token -} - -func trimWhitespacesFunc(data []byte) []byte { - return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data)) -} - -func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc { - if preserveLeadingWhitespaces && preserveTrailingWhitespaces { - return noTrim - } - if preserveLeadingWhitespaces { - return trimTrailingWhitespacesFunc - } - if preserveTrailingWhitespaces { - return trimLeadingWhitespacesFunc - } - return trimWhitespacesFunc -} diff --git a/pkg/stanza/tokenize/multiline_test.go b/pkg/stanza/tokenize/multiline_test.go index 04ffe7b7ddb9..555009bf836a 100644 --- a/pkg/stanza/tokenize/multiline_test.go +++ b/pkg/stanza/tokenize/multiline_test.go @@ -17,8 +17,11 @@ import ( "golang.org/x/text/encoding/unicode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize/tokenizetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) +var noTrim = trim.Whitespace(true, true) + const ( // Those values has been experimentally figured out for windows sleepDuration time.Duration = time.Millisecond * 80 @@ -215,10 +218,11 @@ func TestLineStartSplitFunc(t *testing.T) { LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc) + splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) } t.Run(tc.Name, tc.Run(splitFunc)) } @@ -426,10 +430,11 @@ func TestLineEndSplitFunc(t *testing.T) { LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc) + splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) } t.Run(tc.Name, tc.Run(splitFunc)) } @@ -591,10 +596,11 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, getTrimFunc(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)) + trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) require.NoError(t, err) if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc) + splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) } t.Run(tc.Name, tc.Run(splitFunc)) } @@ -666,14 +672,14 @@ func TestNoopEncodingError(t *testing.T) { LineEndPattern: "\n", } - _, err := cfg.getSplitFunc(encoding.Nop, false, 0, false, false) + _, err := cfg.getSplitFunc(encoding.Nop, false, 0, noTrim) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) cfg = &MultilineConfig{ LineStartPattern: "\n", } - _, err = cfg.getSplitFunc(encoding.Nop, false, 0, false, false) + _, err = cfg.getSplitFunc(encoding.Nop, false, 0, noTrim) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } diff --git a/pkg/stanza/tokenize/splitter.go b/pkg/stanza/tokenize/splitter.go index dafa39379206..96c01a7c7146 100644 --- a/pkg/stanza/tokenize/splitter.go +++ b/pkg/stanza/tokenize/splitter.go @@ -7,15 +7,16 @@ import ( "bufio" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { - Encoding string `mapstructure:"encoding,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` + PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"` + PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` } // NewSplitterConfig returns default SplitterConfig @@ -33,11 +34,11 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (bufio.SplitFunc if err != nil { return nil, err } - - splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, maxLogSize) + trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing) + splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc) if err != nil { return nil, err } - return c.Flusher.Wrap(splitFunc), nil + return c.Flusher.Wrap(splitFunc, trimFunc), nil } diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go new file mode 100644 index 000000000000..b49e6f7af143 --- /dev/null +++ b/pkg/stanza/trim/trim.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package trim // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + +import ( + "bytes" +) + +type Func func([]byte) []byte + +func Whitespace(preserveLeading, preserveTrailing bool) Func { + if preserveLeading && preserveTrailing { + return noTrim + } + if preserveLeading { + return trimTrailingWhitespacesFunc + } + if preserveTrailing { + return trimLeadingWhitespacesFunc + } + return trimWhitespacesFunc +} + +func noTrim(token []byte) []byte { + return token +} + +func trimLeadingWhitespacesFunc(data []byte) []byte { + // TrimLeft to strip EOF whitespaces in case of using $ in regex + // For some reason newline and carriage return are being moved to beginning of next log + token := bytes.TrimLeft(data, "\r\n\t ") + + // TrimLeft will return nil if data is an empty slice + if token == nil { + return []byte{} + } + return token +} + +func trimTrailingWhitespacesFunc(data []byte) []byte { + // TrimRight to strip all whitespaces from the end of log + return bytes.TrimRight(data, "\r\n\t ") +} + +func trimWhitespacesFunc(data []byte) []byte { + return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data)) +} diff --git a/pkg/stanza/trim/trim_test.go b/pkg/stanza/trim/trim_test.go new file mode 100644 index 000000000000..c603d59f1c29 --- /dev/null +++ b/pkg/stanza/trim/trim_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package trim + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWhitespace(t *testing.T) { + // Test all permutations of trimming whitespace + testCases := []struct { + name string + preserveLeading bool + preserveTrailing bool + input string + expect string + }{ + { + name: "preserve both", + preserveLeading: true, + preserveTrailing: true, + input: " hello world ", + expect: " hello world ", + }, + { + name: "preserve leading", + preserveLeading: true, + preserveTrailing: false, + input: " hello world ", + expect: " hello world", + }, + { + name: "preserve trailing", + preserveLeading: false, + preserveTrailing: true, + input: " hello world ", + expect: "hello world ", + }, + { + name: "preserve neither", + preserveLeading: false, + preserveTrailing: false, + input: " hello world ", + expect: "hello world", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + trimFunc := Whitespace(tc.preserveLeading, tc.preserveTrailing) + assert.Equal(t, []byte(tc.expect), trimFunc([]byte(tc.input))) + + // Also test that regardless of configuration, an empty []byte in gives an empty []byte out + assert.Equal(t, []byte{}, trimFunc([]byte{})) + }) + } +}