Skip to content

Commit

Permalink
[chore][pkg/stanza] Extract a trim package for managing trim functions (
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Sep 6, 2023
1 parent fe7ee75 commit a09b4d7
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 75 deletions.
8 changes: 6 additions & 2 deletions pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type customFactory struct {
Expand All @@ -17,10 +18,13 @@ type customFactory struct {
var _ Factory = (*customFactory)(nil)

func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory {
return &customFactory{flusherCfg: flusherCfg, splitFunc: splitFunc}
return &customFactory{
flusherCfg: flusherCfg,
splitFunc: splitFunc,
}
}

// Build builds Multiline Splitter struct
func (f *customFactory) Build() (bufio.SplitFunc, error) {
return f.flusherCfg.Wrap(f.splitFunc), nil
return f.flusherCfg.Wrap(f.splitFunc, trim.Whitespace(true, true)), nil
}
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
Expand Down Expand Up @@ -83,7 +84,8 @@ type BaseConfig struct {
type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, int(c.MaxLogSize))
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
Expand Down Expand Up @@ -91,7 +92,8 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

// Build multiline
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, MaxUDPSize)
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/stanza/tokenize/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package tokenize // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"bufio"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const DefaultFlushPeriod = 500 * time.Millisecond
Expand All @@ -24,13 +26,13 @@ func NewFlusherConfig() FlusherConfig {
}

// Wrap a bufio.SplitFunc with a flusher
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc) bufio.SplitFunc {
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc {
f := &flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
previousDataLength: 0,
}
return f.splitFunc(splitFunc)
return f.splitFunc(splitFunc, trimFunc)
}

// flusher keeps information about flush state
Expand Down Expand Up @@ -71,7 +73,7 @@ func (f *flusher) shouldFlush() bool {
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

Expand All @@ -91,7 +93,7 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
if f.shouldFlush() {
// Inform flusher that we just flushed
f.flushed()
token = trimWhitespacesFunc(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand Down
62 changes: 11 additions & 51 deletions pkg/stanza/tokenize/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"regexp"

"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

// Multiline consists of splitFunc and variables needed to perform force flush
Expand All @@ -32,12 +34,12 @@ type MultilineConfig struct {
}

// Build will build a Multiline operator.
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, maxLogSize int) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces)
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, maxLogSize, trimFunc)
}

// getSplitFunc returns split function for bufio.Scanner basing on configured pattern
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) {
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) {
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

Expand All @@ -54,7 +56,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma
case enc == encoding.Nop:
return NoSplitFunc(maxLogSize), nil
case endPattern == "" && startPattern == "":
splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, trimFunc)
if err != nil {
return nil, err
}
Expand All @@ -63,13 +65,13 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma
if err != nil {
return nil, fmt.Errorf("compile line end regex: %w", err)
}
splitFunc = LineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc = LineEndSplitFunc(re, flushAtEOF, trimFunc)
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %w", err)
}
splitFunc = LineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc)
default:
return nil, fmt.Errorf("unreachable")
}
Expand All @@ -78,7 +80,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma

// LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
firstLoc := re.FindIndex(data)
if firstLoc == nil {
Expand Down Expand Up @@ -132,7 +134,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) b

// LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
loc := re.FindIndex(data)
if loc == nil {
Expand Down Expand Up @@ -160,7 +162,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) buf

// NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) {
func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func) (bufio.SplitFunc, error) {
newline, err := encodedNewline(enc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -224,45 +226,3 @@ func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) {
nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}

type trimFunc func([]byte) []byte

func noTrim(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
token := bytes.TrimRight(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
}

func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc {
if preserveLeadingWhitespaces && preserveTrailingWhitespaces {
return noTrim
}
if preserveLeadingWhitespaces {
return trimTrailingWhitespacesFunc
}
if preserveTrailingWhitespaces {
return trimLeadingWhitespacesFunc
}
return trimWhitespacesFunc
}
22 changes: 14 additions & 8 deletions pkg/stanza/tokenize/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import (
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize/tokenizetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

var noTrim = trim.Whitespace(true, true)

const (
// Those values has been experimentally figured out for windows
sleepDuration time.Duration = time.Millisecond * 80
Expand Down Expand Up @@ -215,10 +218,11 @@ func TestLineStartSplitFunc(t *testing.T) {
LineStartPattern: tc.Pattern,
}

splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -426,10 +430,11 @@ func TestLineEndSplitFunc(t *testing.T) {
LineEndPattern: tc.Pattern,
}

splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -591,10 +596,11 @@ func TestNewlineSplitFunc(t *testing.T) {
}

for _, tc := range testCases {
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, getTrimFunc(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces))
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -666,14 +672,14 @@ func TestNoopEncodingError(t *testing.T) {
LineEndPattern: "\n",
}

_, err := cfg.getSplitFunc(encoding.Nop, false, 0, false, false)
_, err := cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))

cfg = &MultilineConfig{
LineStartPattern: "\n",
}

_, err = cfg.getSplitFunc(encoding.Nop, false, 0, false, false)
_, err = cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/stanza/tokenize/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
}

// NewSplitterConfig returns default SplitterConfig
Expand All @@ -33,11 +34,11 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (bufio.SplitFunc
if err != nil {
return nil, err
}

splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, maxLogSize)
trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc)
if err != nil {
return nil, err
}

return c.Flusher.Wrap(splitFunc), nil
return c.Flusher.Wrap(splitFunc, trimFunc), nil
}
48 changes: 48 additions & 0 deletions pkg/stanza/trim/trim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package trim // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"

import (
"bytes"
)

type Func func([]byte) []byte

func Whitespace(preserveLeading, preserveTrailing bool) Func {
if preserveLeading && preserveTrailing {
return noTrim
}
if preserveLeading {
return trimTrailingWhitespacesFunc
}
if preserveTrailing {
return trimLeadingWhitespacesFunc
}
return trimWhitespacesFunc
}

func noTrim(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")

// TrimLeft will return nil if data is an empty slice
if token == nil {
return []byte{}
}
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
return bytes.TrimRight(data, "\r\n\t ")
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
}
Loading

0 comments on commit a09b4d7

Please sign in to comment.