Skip to content

Commit

Permalink
[chore][fileconsumer] Fix flush test (#37598)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Jan 31, 2025
1 parent 0c69e17 commit 4f429e5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
23 changes: 18 additions & 5 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,30 @@ func TestFlushPeriodEOF(t *testing.T) {
_, err := temp.WriteString(string(content))
require.NoError(t, err)

// Make sure FlushPeriod is small, so it is guaranteed to expire
f, sink := testFactory(t, withFlushPeriod(5*time.Nanosecond))
flushPeriod := time.Millisecond
f, sink := testFactory(t, withFlushPeriod(flushPeriod))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)
r, err := f.NewReader(temp, fp)
require.NoError(t, err)
assert.Equal(t, int64(0), r.Offset)

internaltime.Now = internaltime.NewAlwaysIncreasingClock().Now
defer func() { internaltime.Now = time.Now }()
clock := internaltime.NewAlwaysIncreasingClock()
internaltime.Now = clock.Now
internaltime.Since = clock.Since
defer func() {
internaltime.Now = time.Now
internaltime.Since = time.Since
}()

// First ReadToEnd should not emit only the terminated token
r.ReadToEnd(context.Background())
sink.ExpectTokens(t, content[0:aContentLength], []byte{'b'})
sink.ExpectToken(t, content[0:aContentLength])

// Advance time past the flush period
clock.Advance(2 * flushPeriod)

// Second ReadToEnd should emit the unterminated token because of flush timeout
r.ReadToEnd(context.Background())
sink.ExpectToken(t, []byte{'b'})
}
12 changes: 1 addition & 11 deletions pkg/stanza/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ type State struct {
LastDataLength int
}

func (s *State) Copy() *State {
if s == nil {
return nil
}
return &State{
LastDataChange: s.LastDataChange,
LastDataLength: s.LastDataLength,
}
}

// Func wraps a bufio.SplitFunc with a timer.
// When the timer expires, an incomplete token may be returned.
// The timer will reset any time the data parameter changes.
Expand Down Expand Up @@ -61,7 +51,7 @@ func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.Spli
}

// Flush timed out
if time.Since(s.LastDataChange) > period {
if internaltime.Since(s.LastDataChange) > period {
s.LastDataChange = internaltime.Now()
s.LastDataLength = 0
return len(data), data, nil
Expand Down

0 comments on commit 4f429e5

Please sign in to comment.