Skip to content

Commit

Permalink
Clean up comments, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 31, 2025
1 parent 587d3b9 commit 1202b33
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
func (r *Reader) readContents(ctx context.Context) {
// Create the scanner to read the contents of the file.
bufferSize := r.initialBufferSize
if r.TokenLenState.PotentialLength > bufferSize {
if r.TokenLenState.MinimumLength > bufferSize {
// If we previously saw a potential token larger than the default buffer,
// size the buffer to be at least one byte larger so we can see if there's more data
bufferSize = r.TokenLenState.PotentialLength + 1
bufferSize = r.TokenLenState.MinimumLength + 1
}

s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)
Expand Down
22 changes: 9 additions & 13 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestUntermintedLongLogEntry(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Second ReadToEnd should emit the full untruncated token
Expand All @@ -273,9 +273,8 @@ func TestUntermintedLogEntryGrows(t *testing.T) {
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
additionalContext := filetest.TokenWithLength(1024) // 1KB
_, err := temp.WriteString(string(content)) // no newline
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
Expand All @@ -302,25 +301,22 @@ func TestUntermintedLogEntryGrows(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Write additional unterminated content to the file. We want to ensure this
// is all picked up in the same token.
// Importantly, this resets the flush timer so the next call still will not
// return anything
// Write additional unterminated content to ensure all is picked up in the same token
// The flusher should notice new data and not return anything on the next call
additionalContext := filetest.TokenWithLength(1024)
_, err = temp.WriteString(string(additionalContext)) // no newline
require.NoError(t, err)

// Next ReadToEnd should STILL not emit anything as flush period has been extended
// because we saw more data than last time
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Finally, since we haven't seen new data, we should emit the token
// Finally, since we haven't seen new data, flusher should emit the token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, append(content, additionalContext...))

Expand Down
26 changes: 8 additions & 18 deletions pkg/stanza/tokenlen/tokenlen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,7 @@ import "bufio"

// State tracks the potential length of a token before any terminator checking
type State struct {
PotentialLength int
}

func (s *State) Copy() *State {
if s == nil {
return nil
}
return &State{
PotentialLength: s.PotentialLength,
}
MinimumLength int
}

// Func wraps a bufio.SplitFunc to track potential token lengths
Expand All @@ -27,20 +18,19 @@ func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// Note the potential token length but don't update state yet
// Note the potential token length but don't update state until we know
// whether or not a token is actually returned
potentialLen := len(data)

// Delegate to the wrapped split function
advance, token, err = splitFunc(data, atEOF)

// Only update state if we didn't find a token (delegate returned 0, nil, nil)
if advance == 0 && token == nil && err == nil {
s.PotentialLength = potentialLen
// The splitFunc is asking for more data. Remember how much
// we saw previously so the buffer can be sized appropriately.
s.MinimumLength = potentialLen
} else {
// Clear the state if we got a token or error
s.PotentialLength = 0
// A token was returned. This state represented that token, so clear it.
s.MinimumLength = 0
}

return advance, token, err
}
}
91 changes: 91 additions & 0 deletions pkg/stanza/tokenlen/tokenlen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen

import (
"bufio"
"testing"

"github.com/stretchr/testify/require"
)

func TestTokenLenState_Func(t *testing.T) {
cases := []struct {
name string
input []byte
atEOF bool
expectedLen int
expectedToken []byte
expectedAdv int
expectedErr error
}{
{
name: "no token yet",
input: []byte("partial"),
atEOF: false,
expectedLen: len("partial"),
},
{
name: "complete token",
input: []byte("complete\ntoken"),
atEOF: false,
expectedLen: 0, // should clear state after finding token
expectedToken: []byte("complete"),
expectedAdv: len("complete\n"),
},
{
name: "growing token",
input: []byte("growing"),
atEOF: false,
expectedLen: len("growing"),
},
{
name: "flush at EOF",
input: []byte("flush"),
atEOF: true,
expectedLen: 0, // should clear state after flushing
expectedToken: []byte("flush"),
expectedAdv: len("flush"),
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

adv, token, err := splitFunc(tc.input, tc.atEOF)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedToken, token)
require.Equal(t, tc.expectedAdv, adv)
require.Equal(t, tc.expectedLen, state.MinimumLength)
})
}
}

func TestTokenLenState_GrowingToken(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

// First call with partial token
adv, token, err := splitFunc([]byte("part"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("part"), state.MinimumLength)

// Second call with longer partial token
adv, token, err = splitFunc([]byte("partial"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("partial"), state.MinimumLength)

// Final call with complete token
adv, token, err = splitFunc([]byte("partial\ntoken"), false)
require.NoError(t, err)
require.Equal(t, []byte("partial"), token)
require.Equal(t, len("partial\n"), adv)
require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token
}

0 comments on commit 1202b33

Please sign in to comment.