Skip to content

Commit

Permalink
[receiver/filelog] Fix issue where flushed tokens could be truncated (#…
Browse files Browse the repository at this point in the history
…37596)

Fixes #35042 (and #32100 again)

The issue affected unterminated logs of particular lengths.
Specifically, longer than our internal `scanner.DefaultBufferSize`
(16kB) and shorter than `max_log_size`.

The failure mode was described in #32100 but was apparently only fixed
in some circumstances. I believe this is a more robust fix. I'll
articulate the exact failure mode again here:
1. During a poll cycle, `reader.ReadToEnd` is called. Within this, a
scanner is created which starts with a default buffer size. The buffer
is filled, but no terminator is found. Therefore the scanner resizes the
buffer to accommodate more data, hoping to find a terminator.
Eventually, the buffer is large enough to contain all content until EOF,
but still no terminator was found. At this time, the flush timer has not
expired, so `reader.ReadToEnd` returns without emitting anything.
2. During the _next_ poll cycle, `reader.ReadToEnd` creates a new
scanner, also with default buffer size. The first time is looks for a
terminator, it of course doesn't find one, but at this time the flush
timer has expired. Therefore, instead of resizing the buffer and
continuing to look for a terminator, it just emits what it has.

What should happen instead is the scanner continues to resize the buffer
to find as much of the unterminated token as possible before emitting
it. Therefore, this fix introduces a simple layer into the split func
stack which allows us to reason about unterminated tokens more
carefully. It captures the length of unterminated tokens and ensures
that when we recreate a scanner, we will start with a buffer size that
is appropriate to read the same content as last time, plus one
additional byte. The extra byte allows us to check if new content has
been added, in which case we will resume resizing. If no new content is
found, the flusher will emit the entire unterminated token as one.
  • Loading branch information
djaglowski authored Feb 3, 2025
1 parent 1438738 commit 68b24ea
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-flush-short.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where flushed tokens could be truncated.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35042]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 15 additions & 3 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

Expand Down Expand Up @@ -56,14 +57,23 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
if err != nil {
return nil, err
}
m := &Metadata{Fingerprint: fp, FileAttributes: attributes}
m := &Metadata{
Fingerprint: fp,
FileAttributes: attributes,
TokenLenState: &tokenlen.State{},
}
if f.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
// Ensure TokenLenState is initialized
if m.TokenLenState == nil {
m.TokenLenState = &tokenlen.State{}
}

r = &Reader{
Metadata: m,
set: f.TelemetrySettings,
Expand All @@ -77,6 +87,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
includeFileRecordNum: f.IncludeFileRecordNumber,
compression: f.Compression,
acquireFSLock: f.AcquireFSLock,
emitFunc: f.EmitFunc,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand All @@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
r.Offset = info.Size()
}

flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout)
tokenLenFunc := m.TokenLenState.Func(f.SplitFunc)
flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout)
r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc)
r.emitFunc = f.EmitFunc

if f.HeaderConfig != nil && !m.HeaderFinalized {
r.headerSplitFunc = f.HeaderConfig.SplitFunc
r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig)
Expand Down
11 changes: 10 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
)

type Metadata struct {
Expand All @@ -30,6 +31,7 @@ type Metadata struct {
FileAttributes map[string]any
HeaderFinalized bool
FlushState *flush.State
TokenLenState *tokenlen.State
}

// Reader manages a single file
Expand Down Expand Up @@ -177,7 +179,14 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {

func (r *Reader) readContents(ctx context.Context) {
// Create the scanner to read the contents of the file.
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc)
bufferSize := r.initialBufferSize
if r.TokenLenState.MinimumLength > bufferSize {
// If we previously saw a potential token larger than the default buffer,
// size the buffer to be at least one byte larger so we can see if there's more data
bufferSize = r.TokenLenState.MinimumLength + 1
}

s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)

// Iterate over the contents of the file.
for {
Expand Down
99 changes: 99 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestFingerprintChangeSize(t *testing.T) {
func TestFlushPeriodEOF(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a long enough initial token, so the scanner can't read the whole file at once
aContentLength := 2 * 16 * 1024
content := []byte(strings.Repeat("a", aContentLength))
Expand Down Expand Up @@ -223,3 +224,101 @@ func TestFlushPeriodEOF(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectToken(t, []byte{'b'})
}

func TestUntermintedLongLogEntry(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
// a few times during a call to ReadToEnd.
clock := internaltime.NewAlwaysIncreasingClock()
internaltime.Now = clock.Now
internaltime.Since = clock.Since
defer func() {
internaltime.Now = time.Now
internaltime.Since = time.Since
}()

// Use a long flush period to ensure it does not expire DURING a ReadToEnd
flushPeriod := time.Second

f, sink := testFactory(t, withFlushPeriod(flushPeriod))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)
r, err := f.NewReader(temp, fp)
require.NoError(t, err)
assert.Equal(t, int64(0), r.Offset)

// First ReadToEnd should not emit anything as flush period hasn't expired
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Second ReadToEnd should emit the full untruncated token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, content)

sink.ExpectNoCalls(t)
}

func TestUntermintedLogEntryGrows(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
// a few times during a call to ReadToEnd.
clock := internaltime.NewAlwaysIncreasingClock()
internaltime.Now = clock.Now
internaltime.Since = clock.Since
defer func() {
internaltime.Now = time.Now
internaltime.Since = time.Since
}()

// Use a long flush period to ensure it does not expire DURING a ReadToEnd
flushPeriod := time.Second

f, sink := testFactory(t, withFlushPeriod(flushPeriod))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)
r, err := f.NewReader(temp, fp)
require.NoError(t, err)
assert.Equal(t, int64(0), r.Offset)

// First ReadToEnd should not emit anything as flush period hasn't expired
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Write additional unterminated content to ensure all is picked up in the same token
// The flusher should notice new data and not return anything on the next call
additionalContext := filetest.TokenWithLength(1024)
_, err = temp.WriteString(string(additionalContext)) // no newline
require.NoError(t, err)

r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Finally, since we haven't seen new data, flusher should emit the token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, append(content, additionalContext...))

sink.ExpectNoCalls(t)
}
4 changes: 4 additions & 0 deletions pkg/stanza/internal/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ func (c AlwaysIncreasingClock) Since(t time.Time) time.Duration {
c.FakeClock.Advance(time.Nanosecond)
return c.FakeClock.Since(t)
}

func (c AlwaysIncreasingClock) Advance(d time.Duration) {
c.FakeClock.Advance(d)
}
36 changes: 36 additions & 0 deletions pkg/stanza/tokenlen/tokenlen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"

import "bufio"

// State tracks the potential length of a token before any terminator checking
type State struct {
MinimumLength int
}

// Func wraps a bufio.SplitFunc to track potential token lengths
// Records the length of the data before delegating to the wrapped function
func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
if s == nil {
return splitFunc
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// Note the potential token length but don't update state until we know
// whether or not a token is actually returned
potentialLen := len(data)

advance, token, err = splitFunc(data, atEOF)
if advance == 0 && token == nil && err == nil {
// The splitFunc is asking for more data. Remember how much
// we saw previously so the buffer can be sized appropriately.
s.MinimumLength = potentialLen
} else {
// A token was returned. This state represented that token, so clear it.
s.MinimumLength = 0
}
return advance, token, err
}
}
91 changes: 91 additions & 0 deletions pkg/stanza/tokenlen/tokenlen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen

import (
"bufio"
"testing"

"github.com/stretchr/testify/require"
)

func TestTokenLenState_Func(t *testing.T) {
cases := []struct {
name string
input []byte
atEOF bool
expectedLen int
expectedToken []byte
expectedAdv int
expectedErr error
}{
{
name: "no token yet",
input: []byte("partial"),
atEOF: false,
expectedLen: len("partial"),
},
{
name: "complete token",
input: []byte("complete\ntoken"),
atEOF: false,
expectedLen: 0, // should clear state after finding token
expectedToken: []byte("complete"),
expectedAdv: len("complete\n"),
},
{
name: "growing token",
input: []byte("growing"),
atEOF: false,
expectedLen: len("growing"),
},
{
name: "flush at EOF",
input: []byte("flush"),
atEOF: true,
expectedLen: 0, // should clear state after flushing
expectedToken: []byte("flush"),
expectedAdv: len("flush"),
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

adv, token, err := splitFunc(tc.input, tc.atEOF)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedToken, token)
require.Equal(t, tc.expectedAdv, adv)
require.Equal(t, tc.expectedLen, state.MinimumLength)
})
}
}

func TestTokenLenState_GrowingToken(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

// First call with partial token
adv, token, err := splitFunc([]byte("part"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("part"), state.MinimumLength)

// Second call with longer partial token
adv, token, err = splitFunc([]byte("partial"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("partial"), state.MinimumLength)

// Final call with complete token
adv, token, err = splitFunc([]byte("partial\ntoken"), false)
require.NoError(t, err)
require.Equal(t, []byte("partial"), token)
require.Equal(t, len("partial\n"), adv)
require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token
}

0 comments on commit 68b24ea

Please sign in to comment.