-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/filelog] Fix issue where flushed tokens could be truncated #37596
Conversation
3413ea3
to
58e54a2
Compare
@@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, | |||
r.Offset = info.Size() | |||
} | |||
|
|||
flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout) | |||
tokenLenFunc := m.TokenLenState.Func(f.SplitFunc) | |||
flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm wrong, but I'm confused. Wouldn't the flush function timeout, regardless of buffer resizing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timer would indeed be expired. What this change does though, is makes sure that the token we're passing into the flush function is at least as long as the one we passed in previously.
Example:
Poll 1: Buffer starts at 16kB, grows to 20kB, finds no terminator. Flush hasn't expired, so we return. (At this point, the tokenlen function has captured knowledge that there is 20kB of unterminated content at the EOF.
Poll 2: We see that there was 20kB unterminated content left after the previous poll, so we initialize the scanner with a 20kB buffer. This means it will read the entire token on the first try. The timer has expired, so we flush the token.
The fix here is that we are avoiding the situation where Poll 2 starts with a 16kB buffer and ends up flushing only a 16kB token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay now I understand.
So, instead of resizing the buffer for x times, we configure it to be of "potential length" at the beginning, and we will atleast read "potential length" amount of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the idea. I think the name of the field could be clearer. MinimumLength
is better because it conveys that we know the next token will be at least that long. I'll push the update shortly.
58e54a2
to
1202b33
Compare
Fixes #35042 (and #32100 again)
The issue affected unterminated logs of particular lengths. Specifically, longer than our internal
scanner.DefaultBufferSize
(16kB) and shorter thanmax_log_size
.The failure mode was described in #32100 but was apparently only fixed in some circumstances. I believe this is a more robust fix. I'll articulate the exact failure mode again here:
reader.ReadToEnd
is called. Within this, a scanner is created which starts with a default buffer size. The buffer is filled, but no terminator is found. Therefore the scanner resizes the buffer to accommodate more data, hoping to find a terminator. Eventually, the buffer is large enough to contain all content until EOF, but still no terminator was found. At this time, the flush timer has not expired, soreader.ReadToEnd
returns without emitting anything.reader.ReadToEnd
creates a new scanner, also with default buffer size. The first time is looks for a terminator, it of course doesn't find one, but at this time the flush timer has expired. Therefore, instead of resizing the buffer and continuing to look for a terminator, it just emits what it has.What should happen instead is the scanner continues to resize the buffer to find as much of the unterminated token as possible before emitting it. Therefore, this fix introduces a simple layer into the split func stack which allows us to reason about unterminated tokens more carefully. It captures the length of unterminated tokens and ensures that when we recreate a scanner, we will start with a buffer size that is appropriate to read the same content as last time, plus one additional byte. The extra byte allows us to check if new content has been added, in which case we will resume resizing. If no new content is found, the flusher will emit the entire unterminated token as one.