From 587d3b971f8673e22a1eeca3c7ce27de9d4d619e Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 30 Jan 2025 11:36:25 -0600 Subject: [PATCH 1/2] [receiver/filelog] Fix issue where flushed tokens could be truncated --- .chloggen/fix-flush-short.yaml | 27 +++++ .../fileconsumer/internal/reader/factory.go | 18 ++- .../fileconsumer/internal/reader/reader.go | 11 +- .../internal/reader/reader_test.go | 103 ++++++++++++++++++ pkg/stanza/internal/time/time.go | 4 + pkg/stanza/tokenlen/tokenlen.go | 46 ++++++++ 6 files changed, 205 insertions(+), 4 deletions(-) create mode 100644 .chloggen/fix-flush-short.yaml create mode 100644 pkg/stanza/tokenlen/tokenlen.go diff --git a/.chloggen/fix-flush-short.yaml b/.chloggen/fix-flush-short.yaml new file mode 100644 index 000000000000..7e9df8662ee6 --- /dev/null +++ b/.chloggen/fix-flush-short.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where flushed tokens could be truncated. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35042] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index f314e4aacae8..ef35230bce36 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) @@ -56,7 +57,11 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader if err != nil { return nil, err } - m := &Metadata{Fingerprint: fp, FileAttributes: attributes} + m := &Metadata{ + Fingerprint: fp, + FileAttributes: attributes, + TokenLenState: &tokenlen.State{}, + } if f.FlushTimeout > 0 { m.FlushState = &flush.State{LastDataChange: time.Now()} } @@ -64,6 +69,11 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader } func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) { + // Ensure TokenLenState is initialized + if m.TokenLenState == nil { + m.TokenLenState = &tokenlen.State{} + } + r = &Reader{ Metadata: m, set: f.TelemetrySettings, @@ -77,6 +87,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, includeFileRecordNum: f.IncludeFileRecordNumber, compression: f.Compression, acquireFSLock: f.AcquireFSLock, + emitFunc: f.EmitFunc, } r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName)) @@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, r.Offset = info.Size() } - flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout) + tokenLenFunc := m.TokenLenState.Func(f.SplitFunc) + flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout) r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) - r.emitFunc = f.EmitFunc + if f.HeaderConfig != nil && !m.HeaderFinalized { r.headerSplitFunc = f.HeaderConfig.SplitFunc r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 3a591574fbc6..0ebd731880f7 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -21,6 +21,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" ) type Metadata struct { @@ -30,6 +31,7 @@ type Metadata struct { FileAttributes map[string]any HeaderFinalized bool FlushState *flush.State + TokenLenState *tokenlen.State } // Reader manages a single file @@ -177,7 +179,14 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { func (r *Reader) readContents(ctx context.Context) { // Create the scanner to read the contents of the file. - s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc) + bufferSize := r.initialBufferSize + if r.TokenLenState.PotentialLength > bufferSize { + // If we previously saw a potential token larger than the default buffer, + // size the buffer to be at least one byte larger so we can see if there's more data + bufferSize = r.TokenLenState.PotentialLength + 1 + } + + s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc) // Iterate over the contents of the file. for { diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_test.go b/pkg/stanza/fileconsumer/internal/reader/reader_test.go index d7d4871f43e7..065b6528c099 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader_test.go @@ -189,6 +189,7 @@ func TestFingerprintChangeSize(t *testing.T) { func TestFlushPeriodEOF(t *testing.T) { tempDir := t.TempDir() temp := filetest.OpenTemp(t, tempDir) + // Create a long enough initial token, so the scanner can't read the whole file at once aContentLength := 2 * 16 * 1024 content := []byte(strings.Repeat("a", aContentLength)) @@ -223,3 +224,105 @@ func TestFlushPeriodEOF(t *testing.T) { r.ReadToEnd(context.Background()) sink.ExpectToken(t, []byte{'b'}) } + +func TestUntermintedLongLogEntry(t *testing.T) { + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + + // Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize + content := filetest.TokenWithLength(20 * 1024) // 20KB + _, err := temp.WriteString(string(content)) // no newline + require.NoError(t, err) + + // Use a controlled clock. It advances by 1ns each time Now() is called, which may happen + // a few times during a call to ReadToEnd. + clock := internaltime.NewAlwaysIncreasingClock() + internaltime.Now = clock.Now + internaltime.Since = clock.Since + defer func() { + internaltime.Now = time.Now + internaltime.Since = time.Since + }() + + // Use a long flush period to ensure it does not expire DURING a ReadToEnd + flushPeriod := time.Second + + f, sink := testFactory(t, withFlushPeriod(flushPeriod)) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + assert.Equal(t, int64(0), r.Offset) + + // First ReadToEnd should not emit anything as flush period hasn't expired + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period + clock.Advance(2 * flushPeriod) + + // Second ReadToEnd should emit the full untruncated token + r.ReadToEnd(context.Background()) + sink.ExpectToken(t, content) + + sink.ExpectNoCalls(t) +} + +func TestUntermintedLogEntryGrows(t *testing.T) { + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + + // Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize + content := filetest.TokenWithLength(20 * 1024) // 20KB + additionalContext := filetest.TokenWithLength(1024) // 1KB + _, err := temp.WriteString(string(content)) // no newline + require.NoError(t, err) + + // Use a controlled clock. It advances by 1ns each time Now() is called, which may happen + // a few times during a call to ReadToEnd. + clock := internaltime.NewAlwaysIncreasingClock() + internaltime.Now = clock.Now + internaltime.Since = clock.Since + defer func() { + internaltime.Now = time.Now + internaltime.Since = time.Since + }() + + // Use a long flush period to ensure it does not expire DURING a ReadToEnd + flushPeriod := time.Second + + f, sink := testFactory(t, withFlushPeriod(flushPeriod)) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + assert.Equal(t, int64(0), r.Offset) + + // First ReadToEnd should not emit anything as flush period hasn't expired + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period + clock.Advance(2 * flushPeriod) + + // Write additional unterminated content to the file. We want to ensure this + // is all picked up in the same token. + // Importantly, this resets the flush timer so the next call still will not + // return anything + _, err = temp.WriteString(string(additionalContext)) // no newline + require.NoError(t, err) + + // Next ReadToEnd should STILL not emit anything as flush period has been extended + // because we saw more data than last time + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period + clock.Advance(2 * flushPeriod) + + // Finally, since we haven't seen new data, we should emit the token + r.ReadToEnd(context.Background()) + sink.ExpectToken(t, append(content, additionalContext...)) + + sink.ExpectNoCalls(t) +} diff --git a/pkg/stanza/internal/time/time.go b/pkg/stanza/internal/time/time.go index 34269bbf3fbf..6d57c781aaf3 100644 --- a/pkg/stanza/internal/time/time.go +++ b/pkg/stanza/internal/time/time.go @@ -35,3 +35,7 @@ func (c AlwaysIncreasingClock) Since(t time.Time) time.Duration { c.FakeClock.Advance(time.Nanosecond) return c.FakeClock.Since(t) } + +func (c AlwaysIncreasingClock) Advance(d time.Duration) { + c.FakeClock.Advance(d) +} diff --git a/pkg/stanza/tokenlen/tokenlen.go b/pkg/stanza/tokenlen/tokenlen.go new file mode 100644 index 000000000000..e09c9a1dd3d7 --- /dev/null +++ b/pkg/stanza/tokenlen/tokenlen.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenlen // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" + +import "bufio" + +// State tracks the potential length of a token before any terminator checking +type State struct { + PotentialLength int +} + +func (s *State) Copy() *State { + if s == nil { + return nil + } + return &State{ + PotentialLength: s.PotentialLength, + } +} + +// Func wraps a bufio.SplitFunc to track potential token lengths +// Records the length of the data before delegating to the wrapped function +func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc { + if s == nil { + return splitFunc + } + + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + // Note the potential token length but don't update state yet + potentialLen := len(data) + + // Delegate to the wrapped split function + advance, token, err = splitFunc(data, atEOF) + + // Only update state if we didn't find a token (delegate returned 0, nil, nil) + if advance == 0 && token == nil && err == nil { + s.PotentialLength = potentialLen + } else { + // Clear the state if we got a token or error + s.PotentialLength = 0 + } + + return advance, token, err + } +} From 1202b3356e3080f3dd95ee12d7137a7078d1f396 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 30 Jan 2025 13:20:43 -0600 Subject: [PATCH 2/2] Clean up comments, add tests --- .../fileconsumer/internal/reader/reader.go | 4 +- .../internal/reader/reader_test.go | 22 ++--- pkg/stanza/tokenlen/tokenlen.go | 26 ++---- pkg/stanza/tokenlen/tokenlen_test.go | 91 +++++++++++++++++++ 4 files changed, 110 insertions(+), 33 deletions(-) create mode 100644 pkg/stanza/tokenlen/tokenlen_test.go diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 0ebd731880f7..838add80cfb0 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -180,10 +180,10 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { func (r *Reader) readContents(ctx context.Context) { // Create the scanner to read the contents of the file. bufferSize := r.initialBufferSize - if r.TokenLenState.PotentialLength > bufferSize { + if r.TokenLenState.MinimumLength > bufferSize { // If we previously saw a potential token larger than the default buffer, // size the buffer to be at least one byte larger so we can see if there's more data - bufferSize = r.TokenLenState.PotentialLength + 1 + bufferSize = r.TokenLenState.MinimumLength + 1 } s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_test.go b/pkg/stanza/fileconsumer/internal/reader/reader_test.go index 065b6528c099..e7ff78f8a4ff 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader_test.go @@ -258,7 +258,7 @@ func TestUntermintedLongLogEntry(t *testing.T) { r.ReadToEnd(context.Background()) sink.ExpectNoCalls(t) - // Advance time past the flush period + // Advance time past the flush period to test behavior after timer is expired clock.Advance(2 * flushPeriod) // Second ReadToEnd should emit the full untruncated token @@ -273,9 +273,8 @@ func TestUntermintedLogEntryGrows(t *testing.T) { temp := filetest.OpenTemp(t, tempDir) // Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize - content := filetest.TokenWithLength(20 * 1024) // 20KB - additionalContext := filetest.TokenWithLength(1024) // 1KB - _, err := temp.WriteString(string(content)) // no newline + content := filetest.TokenWithLength(20 * 1024) // 20KB + _, err := temp.WriteString(string(content)) // no newline require.NoError(t, err) // Use a controlled clock. It advances by 1ns each time Now() is called, which may happen @@ -302,25 +301,22 @@ func TestUntermintedLogEntryGrows(t *testing.T) { r.ReadToEnd(context.Background()) sink.ExpectNoCalls(t) - // Advance time past the flush period + // Advance time past the flush period to test behavior after timer is expired clock.Advance(2 * flushPeriod) - // Write additional unterminated content to the file. We want to ensure this - // is all picked up in the same token. - // Importantly, this resets the flush timer so the next call still will not - // return anything + // Write additional unterminated content to ensure all is picked up in the same token + // The flusher should notice new data and not return anything on the next call + additionalContext := filetest.TokenWithLength(1024) _, err = temp.WriteString(string(additionalContext)) // no newline require.NoError(t, err) - // Next ReadToEnd should STILL not emit anything as flush period has been extended - // because we saw more data than last time r.ReadToEnd(context.Background()) sink.ExpectNoCalls(t) - // Advance time past the flush period + // Advance time past the flush period to test behavior after timer is expired clock.Advance(2 * flushPeriod) - // Finally, since we haven't seen new data, we should emit the token + // Finally, since we haven't seen new data, flusher should emit the token r.ReadToEnd(context.Background()) sink.ExpectToken(t, append(content, additionalContext...)) diff --git a/pkg/stanza/tokenlen/tokenlen.go b/pkg/stanza/tokenlen/tokenlen.go index e09c9a1dd3d7..326e64c3a950 100644 --- a/pkg/stanza/tokenlen/tokenlen.go +++ b/pkg/stanza/tokenlen/tokenlen.go @@ -7,16 +7,7 @@ import "bufio" // State tracks the potential length of a token before any terminator checking type State struct { - PotentialLength int -} - -func (s *State) Copy() *State { - if s == nil { - return nil - } - return &State{ - PotentialLength: s.PotentialLength, - } + MinimumLength int } // Func wraps a bufio.SplitFunc to track potential token lengths @@ -27,20 +18,19 @@ func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc { } return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - // Note the potential token length but don't update state yet + // Note the potential token length but don't update state until we know + // whether or not a token is actually returned potentialLen := len(data) - // Delegate to the wrapped split function advance, token, err = splitFunc(data, atEOF) - - // Only update state if we didn't find a token (delegate returned 0, nil, nil) if advance == 0 && token == nil && err == nil { - s.PotentialLength = potentialLen + // The splitFunc is asking for more data. Remember how much + // we saw previously so the buffer can be sized appropriately. + s.MinimumLength = potentialLen } else { - // Clear the state if we got a token or error - s.PotentialLength = 0 + // A token was returned. This state represented that token, so clear it. + s.MinimumLength = 0 } - return advance, token, err } } diff --git a/pkg/stanza/tokenlen/tokenlen_test.go b/pkg/stanza/tokenlen/tokenlen_test.go new file mode 100644 index 000000000000..c91cd628d305 --- /dev/null +++ b/pkg/stanza/tokenlen/tokenlen_test.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenlen + +import ( + "bufio" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTokenLenState_Func(t *testing.T) { + cases := []struct { + name string + input []byte + atEOF bool + expectedLen int + expectedToken []byte + expectedAdv int + expectedErr error + }{ + { + name: "no token yet", + input: []byte("partial"), + atEOF: false, + expectedLen: len("partial"), + }, + { + name: "complete token", + input: []byte("complete\ntoken"), + atEOF: false, + expectedLen: 0, // should clear state after finding token + expectedToken: []byte("complete"), + expectedAdv: len("complete\n"), + }, + { + name: "growing token", + input: []byte("growing"), + atEOF: false, + expectedLen: len("growing"), + }, + { + name: "flush at EOF", + input: []byte("flush"), + atEOF: true, + expectedLen: 0, // should clear state after flushing + expectedToken: []byte("flush"), + expectedAdv: len("flush"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + state := &State{} + splitFunc := state.Func(bufio.ScanLines) + + adv, token, err := splitFunc(tc.input, tc.atEOF) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, tc.expectedToken, token) + require.Equal(t, tc.expectedAdv, adv) + require.Equal(t, tc.expectedLen, state.MinimumLength) + }) + } +} + +func TestTokenLenState_GrowingToken(t *testing.T) { + state := &State{} + splitFunc := state.Func(bufio.ScanLines) + + // First call with partial token + adv, token, err := splitFunc([]byte("part"), false) + require.NoError(t, err) + require.Nil(t, token) + require.Equal(t, 0, adv) + require.Equal(t, len("part"), state.MinimumLength) + + // Second call with longer partial token + adv, token, err = splitFunc([]byte("partial"), false) + require.NoError(t, err) + require.Nil(t, token) + require.Equal(t, 0, adv) + require.Equal(t, len("partial"), state.MinimumLength) + + // Final call with complete token + adv, token, err = splitFunc([]byte("partial\ntoken"), false) + require.NoError(t, err) + require.Equal(t, []byte("partial"), token) + require.Equal(t, len("partial\n"), adv) + require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token +}