-
Notifications
You must be signed in to change notification settings - Fork 4.6k
mem: allow using io.WriterTo with a io.LimitedReader #8697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
The caller of this function can wrap the io.Reader in a io.LimitedReader. This happens if some max message size is set. If so, this `io.WriterTo` check doesn't work anymore. Work around this by checking if it is maybe a `io.LimitedReader`. Overall, the problem I'm trying to solve is that the constant ```go buf := pool.Get(readAllBufSize) ``` 32KiB is way too much in our use case. Messages are typically at max about only 1KiB in size so we always overallocate by ~31KiB in the best case scenario so we want to use the `io.WriterTo` branch so that we could appropriately size the buffer. Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #8697 +/- ##
=========================================
Coverage ? 83.25%
=========================================
Files ? 416
Lines ? 32306
Branches ? 0
=========================================
Hits ? 26895
Misses ? 4027
Partials ? 1384
🚀 New features to boost your workflow:
|
|
@ash2k any thoughts? |
|
@arjan-bal : Can you please take a first look and see if this seems ok. |
| // them. E.g. might be a single big chunk, and we wouldn't chop it | ||
| // into pieces. | ||
| w := NewWriter(&result, pool) | ||
| _, err := wt.WriteTo(w) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would circumvent the limit of the LimitedReader, wouldn't it? By directly accessing the underlying reader we are bypassing the limiter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but I'm not sure how to do a "LimitedWriter" if we could say it that way. Should we add something like https://github.com/nanmu42/limitio/blob/master/limitio.go to grpc-go code (as a library or our own impl)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @GiedriusS, one solution I can think of is to create your own wrapper struct that wraps a io.LimitReader and also implements io.WriteTo. This would allow your reader to control the size of the temporary buffer being used. Here's a example implementation could work.
type LimitWriterTo struct {
Reader io.Reader // The underlying io.LimitReader
}
func (l *LimitWriterTo) WriteTo(w io.Writer) (n int64, err error) {
// Define your custom buffer size here (e.g., 64K, 128K)
buffer := make([]byte, 1024) // You could get this from a buffer pool.
// Use io.CopyBuffer internally with your custom buffer
// or implement the read/write loop manually for ultimate control.
return io.CopyBuffer(w, l.Reader, buffer)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core issue is that grpc-go does an assertion (and it wraps io.Reader inside of a io.LimitedReader itself) whether it's a io.Reader and io.LimitedReader is not a io.Reader so I think this path would never be hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I overlooked the line in the PR description: "This happens if some max message size is set."
gRPC controls the the reader type and not external code. Let me think about it a little more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we set the size of the copy buffer in the for loop below as min(readAllBufSize, LimitReader.N)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is quite huge
Line 59 in 1ed87ec
| defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 |
LimitedWriter to grpc-go code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can you describe how you plan to use the new LimitedReader type to workaround this?
I have one approach in mind which involves having the LimitedReader conditionally implement io.WriterTo using a type assertion if the wrapped reader also implements io.WriterTo. I wanted to know if there's a simpler solution for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're correct - the decompressor through WriterTo will still be able to read as much as they want. I'm now actually thinking maybe really the best way would be to provide a way to use some specific bucketed []byte pool - that way the buckets would be created according to how big the users of grpc-go anticipate the inputs will be.
In the end there is probably no best decision and it depends on what do you think is an appropriate abstraction is here in the context of grpc-go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the simplest solution I came up with:
- Introduce a
LimitWriter: Create a wrapper aroundio.Writerthat restricts the number of bytes written. If a write exceeds the limit, it returns a specific sentinel error (e.g.,ErrLimitExhausted). - Update
ReadAll: InReadAll, we check if the reader is an*io.LimitedReaderand if the underlyingio.Readerimplementsio.WriterTo. If so, we create a new writer usingNewWriter(&result, pool), wrap it in ourLimitWriter, and callWriteToon the underlying reader. This effectively transfers the limit constraint from the reader to the writer.
To keep this optimization transparent to callers, ReadAll must trap the error returned by LimitWriter. It should translate that error into a successful return (nil error) and update the N field on the *io.LimitedReader to reflect the bytes actually consumed.
Here are some snippets to explain this:
The Helper Type
var ErrLimitExhausted = errors.New("limit exhausted")
type limitWriter struct {
w io.Writer
n int64
}
func (l *limitWriter) Write(p []byte) (n int, err error) {
if l.n <= 0 {
return 0, ErrLimitExhausted
}
// If p is larger than remaining limit, truncate it
if int64(len(p)) > l.n {
p = p[:l.n]
err = ErrLimitExhausted // We will return this after the underlying write
}
n, wErr := l.w.Write(p)
l.n -= int64(n)
if wErr != nil {
return n, wErr
}
return n, err
}The Updated ReadAll Function
Here is how the logic fits into your existing context.
func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
var result BufferSlice
// 1. Check for the specific optimization case: *io.LimitedReader wrapping an io.WriterTo
if lr, ok := r.(*io.LimitedReader); ok {
if wt, ok := lr.R.(io.WriterTo); ok {
w := NewWriter(&result, pool)
// Wrap the writer to enforce the reader's limit
lw := &limitWriter{w: w, n: lr.N}
// Delegate the heavy lifting to the underlying implementation
n, err := wt.WriteTo(lw)
// Update the LimitedReader state
lr.N -= n
// If we hit the limit, it's not a real error in this context;
// it just means we read everything we were allowed to.
if err == ErrLimitExhausted {
return result, nil
}
return result, err
}
}
// 2. Standard optimization for direct io.WriterTo
if wt, ok := r.(io.WriterTo); ok {
w := NewWriter(&result, pool)
_, err := wt.WriteTo(w)
return result, err
}
// ... rest of fallback implementation ...
}What do you think?
|
This PR is labeled as requiring an update from the reporter, and no update has been received after 6 days. If no update is provided in the next 7 days, this issue will be automatically closed. |
The caller of this function can wrap the io.Reader in a io.LimitedReader. This happens if some max message size is set. If so, this
io.WriterTocheck doesn't work anymore. Work around this by checking if it is maybe aio.LimitedReader.Overall, the problem I'm trying to solve is that the constant
32KiB is way too much in our use case. Messages are typically at max about only 1KiB in size so we always overallocate by ~31KiB in the best case scenario so we want to use the
io.WriterTobranch so that we could appropriately size the buffer.Is this OK? Any suggestions on how to make the code prettier? Also, maybe some suggestions on how to do something like
io.LimitedReaderon theio.WriterTo?