-
Notifications
You must be signed in to change notification settings - Fork 2.2k
brontide: use vectored I/O to send 50% less packets on the wire #10450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -700,3 +700,135 @@ | |||||||||
| t.Fatalf("expected n: %d, got: %d", expN, nn) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // writeRecorder records all data written to it, tracking each Write call | ||||||||||
| // separately to verify write coalescing behavior. | ||||||||||
| type writeRecorder struct { | ||||||||||
| writes [][]byte | ||||||||||
| } | ||||||||||
|
|
||||||||||
| func (w *writeRecorder) Write(p []byte) (int, error) { | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the style guide (rule 17), every function should be commented with its purpose. This method is missing a comment. Also, rule 18 states that function comments must begin with the function name. I suggest adding a comment like: // Write records the written bytes. A copy of the slice is taken to avoid
// issues with buffer reuse.
Suggested change
References
|
||||||||||
| // Make a copy of the data to avoid issues with buffer reuse. | ||||||||||
| w.writes = append(w.writes, append([]byte{}, p...)) | ||||||||||
|
|
||||||||||
| return len(p), nil | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // totalBytes returns the total number of bytes written across all Write calls. | ||||||||||
| func (w *writeRecorder) totalBytes() int { | ||||||||||
|
Comment on lines
+717
to
+718
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to the style guide (rule 18), function comments must begin with the function name. This comment starts with a lowercase letter.
Suggested change
References
|
||||||||||
| total := 0 | ||||||||||
| for _, write := range w.writes { | ||||||||||
| total += len(write) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return total | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // TestFlushCoalescedWrite verifies that the Flush method writes header and body | ||||||||||
| // data correctly. Note: For non-TCP writers (like bytes.Buffer), net.Buffers | ||||||||||
| // falls back to sequential writes. | ||||||||||
| func TestFlushCoalescedWrite(t *testing.T) { | ||||||||||
| t.Parallel() | ||||||||||
|
|
||||||||||
| testCases := []struct { | ||||||||||
| name string | ||||||||||
| payloadSize int | ||||||||||
| }{ | ||||||||||
| { | ||||||||||
| name: "small message", | ||||||||||
| payloadSize: 10, | ||||||||||
| }, | ||||||||||
| { | ||||||||||
| name: "medium message", | ||||||||||
| payloadSize: 500, | ||||||||||
| }, | ||||||||||
| { | ||||||||||
| name: "large message", | ||||||||||
| payloadSize: 1400, | ||||||||||
| }, | ||||||||||
| { | ||||||||||
| name: "max message", | ||||||||||
| payloadSize: math.MaxUint16, | ||||||||||
| }, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| for _, tc := range testCases { | ||||||||||
| t.Run(tc.name, func(t *testing.T) { | ||||||||||
| var b Machine | ||||||||||
| b.split() | ||||||||||
|
|
||||||||||
| payload := bytes.Repeat([]byte("x"), tc.payloadSize) | ||||||||||
| err := b.WriteMessage(payload) | ||||||||||
| require.NoError(t, err) | ||||||||||
|
|
||||||||||
| recorder := &writeRecorder{} | ||||||||||
| n, err := b.Flush(recorder) | ||||||||||
| require.NoError(t, err) | ||||||||||
|
|
||||||||||
| // The returned value should be the plaintext payload size. | ||||||||||
| require.Equal( | ||||||||||
| t, tc.payloadSize, n, | ||||||||||
| "returned byte count should match payload size", | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| // Total bytes written should be header + payload + MAC. | ||||||||||
| expectedTotal := encHeaderSize + tc.payloadSize + | ||||||||||
| macSize | ||||||||||
| require.Equal( | ||||||||||
| t, expectedTotal, recorder.totalBytes(), | ||||||||||
| "total written bytes should be "+ | ||||||||||
| "header + payload + MAC", | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| // Verify that subsequent Flush is a no-op. | ||||||||||
| n, err = b.Flush(recorder) | ||||||||||
| require.NoError(t, err) | ||||||||||
| require.Equal( | ||||||||||
| t, 0, n, | ||||||||||
| "subsequent flush should be no-op", | ||||||||||
| ) | ||||||||||
| }) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // TestFlushPartialWriteRecovery specifically tests that partial writes | ||||||||||
| // can be recovered by calling Flush again, and that the final result | ||||||||||
| // is correct. | ||||||||||
| func TestFlushPartialWriteRecovery(t *testing.T) { | ||||||||||
| t.Parallel() | ||||||||||
|
|
||||||||||
| // Create a test connection to get properly paired encrypt/decrypt | ||||||||||
| // machines. | ||||||||||
| localConn, remoteConn, err := establishTestConnection(t) | ||||||||||
| require.NoError(t, err) | ||||||||||
|
|
||||||||||
| localBrontide := localConn.(*Conn) | ||||||||||
|
|
||||||||||
| payload := []byte("hello world") | ||||||||||
| err = localBrontide.noise.WriteMessage(payload) | ||||||||||
| require.NoError(t, err) | ||||||||||
|
|
||||||||||
| // First, write only part of the header using a timeout writer. | ||||||||||
| var partialOutput bytes.Buffer | ||||||||||
| partialWriter := NewTimeoutWriter(&partialOutput, 5) | ||||||||||
|
|
||||||||||
| n, err := localBrontide.noise.Flush(partialWriter) | ||||||||||
| require.ErrorIs(t, err, iotest.ErrTimeout) | ||||||||||
| require.Equal(t, 0, n, "no payload bytes should be reported yet") | ||||||||||
|
|
||||||||||
| // Now write the rest directly to the underlying connection. We need to | ||||||||||
| // write the partial output first, then flush the rest. | ||||||||||
| _, err = localBrontide.conn.Write(partialOutput.Bytes()) | ||||||||||
| require.NoError(t, err) | ||||||||||
|
|
||||||||||
| // Flush remaining bytes to the connection. | ||||||||||
| n, err = localBrontide.noise.Flush(localBrontide.conn) | ||||||||||
| require.NoError(t, err) | ||||||||||
| require.Equal(t, len(payload), n) | ||||||||||
|
|
||||||||||
| // Read and verify the message on the remote end. | ||||||||||
| buf := make([]byte, len(payload)) | ||||||||||
| _, err = io.ReadFull(remoteConn, buf) | ||||||||||
| require.NoError(t, err) | ||||||||||
| require.Equal(t, payload, buf) | ||||||||||
| } | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO comment seems to be a leftover from development. Moving
releaseBuffers()before the error check would introduce a bug: in case of a partial write with an error, the buffers would be released prematurely, preventing a successful retry on the nextFlush()call. The current placement is correct. I suggest removing this comment to avoid future confusion.