Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/filelog] Fix issue where flushed tokens could be truncated #37596

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/fix-flush-short.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where flushed tokens could be truncated.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35042]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 15 additions & 3 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

Expand Down Expand Up @@ -56,14 +57,23 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
if err != nil {
return nil, err
}
m := &Metadata{Fingerprint: fp, FileAttributes: attributes}
m := &Metadata{
Fingerprint: fp,
FileAttributes: attributes,
TokenLenState: &tokenlen.State{},
}
if f.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
// Ensure TokenLenState is initialized
if m.TokenLenState == nil {
m.TokenLenState = &tokenlen.State{}
}

r = &Reader{
Metadata: m,
set: f.TelemetrySettings,
Expand All @@ -77,6 +87,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
includeFileRecordNum: f.IncludeFileRecordNumber,
compression: f.Compression,
acquireFSLock: f.AcquireFSLock,
emitFunc: f.EmitFunc,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand All @@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
r.Offset = info.Size()
}

flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout)
tokenLenFunc := m.TokenLenState.Func(f.SplitFunc)
flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please correct me if I'm wrong, but I'm confused. Wouldn't the flush function timeout, regardless of buffer resizing?

Copy link
Member Author

@djaglowski djaglowski Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer would indeed be expired. What this change does though, is makes sure that the token we're passing into the flush function is at least as long as the one we passed in previously.

Example:

Poll 1: Buffer starts at 16kB, grows to 20kB, finds no terminator. Flush hasn't expired, so we return. (At this point, the tokenlen function has captured knowledge that there is 20kB of unterminated content at the EOF.

Poll 2: We see that there was 20kB unterminated content left after the previous poll, so we initialize the scanner with a 20kB buffer. This means it will read the entire token on the first try. The timer has expired, so we flush the token.

The fix here is that we are avoiding the situation where Poll 2 starts with a 16kB buffer and ends up flushing only a 16kB token.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay now I understand.
So, instead of resizing the buffer for x times, we configure it to be of "potential length" at the beginning, and we will atleast read "potential length" amount of data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the idea. I think the name of the field could be clearer. MinimumLength is better because it conveys that we know the next token will be at least that long. I'll push the update shortly.

r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc)
r.emitFunc = f.EmitFunc

if f.HeaderConfig != nil && !m.HeaderFinalized {
r.headerSplitFunc = f.HeaderConfig.SplitFunc
r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig)
Expand Down
11 changes: 10 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
)

type Metadata struct {
Expand All @@ -30,6 +31,7 @@ type Metadata struct {
FileAttributes map[string]any
HeaderFinalized bool
FlushState *flush.State
TokenLenState *tokenlen.State
}

// Reader manages a single file
Expand Down Expand Up @@ -177,7 +179,14 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {

func (r *Reader) readContents(ctx context.Context) {
// Create the scanner to read the contents of the file.
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc)
bufferSize := r.initialBufferSize
if r.TokenLenState.MinimumLength > bufferSize {
// If we previously saw a potential token larger than the default buffer,
// size the buffer to be at least one byte larger so we can see if there's more data
bufferSize = r.TokenLenState.MinimumLength + 1
}

s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)

// Iterate over the contents of the file.
for {
Expand Down
99 changes: 99 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func TestFingerprintChangeSize(t *testing.T) {
func TestFlushPeriodEOF(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a long enough initial token, so the scanner can't read the whole file at once
aContentLength := 2 * 16 * 1024
content := []byte(strings.Repeat("a", aContentLength))
Expand Down Expand Up @@ -223,3 +224,101 @@ func TestFlushPeriodEOF(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectToken(t, []byte{'b'})
}

func TestUntermintedLongLogEntry(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
// a few times during a call to ReadToEnd.
clock := internaltime.NewAlwaysIncreasingClock()
internaltime.Now = clock.Now
internaltime.Since = clock.Since
defer func() {
internaltime.Now = time.Now
internaltime.Since = time.Since
}()

// Use a long flush period to ensure it does not expire DURING a ReadToEnd
flushPeriod := time.Second

f, sink := testFactory(t, withFlushPeriod(flushPeriod))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)
r, err := f.NewReader(temp, fp)
require.NoError(t, err)
assert.Equal(t, int64(0), r.Offset)

// First ReadToEnd should not emit anything as flush period hasn't expired
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Second ReadToEnd should emit the full untruncated token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, content)

sink.ExpectNoCalls(t)
}

func TestUntermintedLogEntryGrows(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
// a few times during a call to ReadToEnd.
clock := internaltime.NewAlwaysIncreasingClock()
internaltime.Now = clock.Now
internaltime.Since = clock.Since
defer func() {
internaltime.Now = time.Now
internaltime.Since = time.Since
}()

// Use a long flush period to ensure it does not expire DURING a ReadToEnd
flushPeriod := time.Second

f, sink := testFactory(t, withFlushPeriod(flushPeriod))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)
r, err := f.NewReader(temp, fp)
require.NoError(t, err)
assert.Equal(t, int64(0), r.Offset)

// First ReadToEnd should not emit anything as flush period hasn't expired
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Write additional unterminated content to ensure all is picked up in the same token
// The flusher should notice new data and not return anything on the next call
additionalContext := filetest.TokenWithLength(1024)
_, err = temp.WriteString(string(additionalContext)) // no newline
require.NoError(t, err)

r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Finally, since we haven't seen new data, flusher should emit the token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, append(content, additionalContext...))

sink.ExpectNoCalls(t)
}
4 changes: 4 additions & 0 deletions pkg/stanza/internal/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ func (c AlwaysIncreasingClock) Since(t time.Time) time.Duration {
c.FakeClock.Advance(time.Nanosecond)
return c.FakeClock.Since(t)
}

func (c AlwaysIncreasingClock) Advance(d time.Duration) {
c.FakeClock.Advance(d)
}
36 changes: 36 additions & 0 deletions pkg/stanza/tokenlen/tokenlen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"

import "bufio"

// State tracks the potential length of a token before any terminator checking
type State struct {
MinimumLength int
}

// Func wraps a bufio.SplitFunc to track potential token lengths
// Records the length of the data before delegating to the wrapped function
func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
if s == nil {
return splitFunc
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// Note the potential token length but don't update state until we know
// whether or not a token is actually returned
potentialLen := len(data)

advance, token, err = splitFunc(data, atEOF)
if advance == 0 && token == nil && err == nil {
// The splitFunc is asking for more data. Remember how much
// we saw previously so the buffer can be sized appropriately.
s.MinimumLength = potentialLen
} else {
// A token was returned. This state represented that token, so clear it.
s.MinimumLength = 0
}
return advance, token, err
}
}
91 changes: 91 additions & 0 deletions pkg/stanza/tokenlen/tokenlen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen

import (
"bufio"
"testing"

"github.com/stretchr/testify/require"
)

func TestTokenLenState_Func(t *testing.T) {
cases := []struct {
name string
input []byte
atEOF bool
expectedLen int
expectedToken []byte
expectedAdv int
expectedErr error
}{
{
name: "no token yet",
input: []byte("partial"),
atEOF: false,
expectedLen: len("partial"),
},
{
name: "complete token",
input: []byte("complete\ntoken"),
atEOF: false,
expectedLen: 0, // should clear state after finding token
expectedToken: []byte("complete"),
expectedAdv: len("complete\n"),
},
{
name: "growing token",
input: []byte("growing"),
atEOF: false,
expectedLen: len("growing"),
},
{
name: "flush at EOF",
input: []byte("flush"),
atEOF: true,
expectedLen: 0, // should clear state after flushing
expectedToken: []byte("flush"),
expectedAdv: len("flush"),
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

adv, token, err := splitFunc(tc.input, tc.atEOF)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedToken, token)
require.Equal(t, tc.expectedAdv, adv)
require.Equal(t, tc.expectedLen, state.MinimumLength)
})
}
}

func TestTokenLenState_GrowingToken(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

// First call with partial token
adv, token, err := splitFunc([]byte("part"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("part"), state.MinimumLength)

// Second call with longer partial token
adv, token, err = splitFunc([]byte("partial"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("partial"), state.MinimumLength)

// Final call with complete token
adv, token, err = splitFunc([]byte("partial\ntoken"), false)
require.NoError(t, err)
require.Equal(t, []byte("partial"), token)
require.Equal(t, len("partial\n"), adv)
require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token
}