From d39a77b2cba92798bbb89805db97ac8e5737ad4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:23:58 +0100 Subject: [PATCH 1/8] Reorder bodyReader fields to separate constants and state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Purely aesthetic, should not change behavior. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index 0af73a880..e5efc42a3 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -23,16 +23,16 @@ const bodyReaderMinimumProgress = 1 * 1024 * 1024 // bodyReader is an io.ReadCloser returned by dockerImageSource.GetBlob, // which can transparently resume some (very limited) kinds of aborted connections. type bodyReader struct { - ctx context.Context - c *dockerClient - - path string // path to pass to makeRequest to retry - logURL *url.URL // a string to use in error messages - body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. - lastRetryOffset int64 - offset int64 // Current offset within the blob + ctx context.Context + c *dockerClient + path string // path to pass to makeRequest to retry + logURL *url.URL // a string to use in error messages firstConnectionTime time.Time - lastSuccessTime time.Time // time.Time{} if N/A + + body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. + lastRetryOffset int64 + offset int64 // Current offset within the blob + lastSuccessTime time.Time // time.Time{} if N/A } // newBodyReader creates a bodyReader for request path in c. @@ -44,15 +44,15 @@ func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody return nil, err } res := &bodyReader{ - ctx: ctx, - c: c, - + ctx: ctx, + c: c, path: path, logURL: logURL, - body: firstBody, - lastRetryOffset: 0, - offset: 0, firstConnectionTime: time.Now(), + + body: firstBody, + lastRetryOffset: 0, + offset: 0, } return res, nil } From 5a2fb408a0891fb2898fb77ec914b12c33151efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:31:51 +0100 Subject: [PATCH 2/8] Turn millisecondsSince into millisecondsSinceOptional MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... because we will have more cases of the NaN case, so share that code path. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index e5efc42a3..2abb1c4ce 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -198,19 +198,20 @@ func (br *bodyReader) Read(p []byte) (int, error) { } } -// millisecondsSince is like time.Since(tm).Milliseconds, but it returns a floating-point value -func millisecondsSince(tm time.Time) float64 { +// millisecondsSinceOptional is like time.Since(tm).Milliseconds, but it returns a floating-point value. +// If the input time is time.Time{}, it returns math.NaN() +func millisecondsSinceOptional(tm time.Time) float64 { + if tm == (time.Time{}) { + return math.NaN() + } return float64(time.Since(tm).Nanoseconds()) / 1_000_000.0 } // errorIfNotReconnecting makes a heuristic decision whether we should reconnect after err at redactedURL; if so, it returns nil, // otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic) func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error { - totalTime := millisecondsSince(br.firstConnectionTime) - failureTime := math.NaN() - if (br.lastSuccessTime != time.Time{}) { - failureTime = millisecondsSince(br.lastSuccessTime) - } + totalTime := millisecondsSinceOptional(br.firstConnectionTime) + failureTime := millisecondsSinceOptional(br.lastSuccessTime) logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress", redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime) progress := br.offset - br.lastRetryOffset From cf65b0bed18e4a85b510bd75a667dd403d96ac41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:36:19 +0100 Subject: [PATCH 3/8] Only get current time once in errorIfNotReconnecting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoid redundant work. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index 2abb1c4ce..b802952cf 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -198,20 +198,21 @@ func (br *bodyReader) Read(p []byte) (int, error) { } } -// millisecondsSinceOptional is like time.Since(tm).Milliseconds, but it returns a floating-point value. -// If the input time is time.Time{}, it returns math.NaN() -func millisecondsSinceOptional(tm time.Time) float64 { +// millisecondsSinceOptional is like currentTime.Sub(tm).Milliseconds, but it returns a floating-point value. +// If tm is time.Time{}, it returns math.NaN() +func millisecondsSinceOptional(currentTime time.Time, tm time.Time) float64 { if tm == (time.Time{}) { return math.NaN() } - return float64(time.Since(tm).Nanoseconds()) / 1_000_000.0 + return float64(currentTime.Sub(tm).Nanoseconds()) / 1_000_000.0 } // errorIfNotReconnecting makes a heuristic decision whether we should reconnect after err at redactedURL; if so, it returns nil, // otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic) func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error { - totalTime := millisecondsSinceOptional(br.firstConnectionTime) - failureTime := millisecondsSinceOptional(br.lastSuccessTime) + currentTime := time.Now() + totalTime := millisecondsSinceOptional(currentTime, br.firstConnectionTime) + failureTime := millisecondsSinceOptional(currentTime, br.lastSuccessTime) logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress", redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime) progress := br.offset - br.lastRetryOffset From 3f60ff9a3b09c6e616eeed6ef3c6f3583cd52685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:39:08 +0100 Subject: [PATCH 4/8] Rename some variables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consistently use ms... for milliseconds (durations), and ...Time for timestamps. Should not change behavior. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index b802952cf..35bf49a51 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -211,15 +211,15 @@ func millisecondsSinceOptional(currentTime time.Time, tm time.Time) float64 { // otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic) func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error { currentTime := time.Now() - totalTime := millisecondsSinceOptional(currentTime, br.firstConnectionTime) - failureTime := millisecondsSinceOptional(currentTime, br.lastSuccessTime) + msSinceFirstConnection := millisecondsSinceOptional(currentTime, br.firstConnectionTime) + msSinceLastSuccess := millisecondsSinceOptional(currentTime, br.lastSuccessTime) logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress", - redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime) + redactedURL, originalErr, br.lastRetryOffset, br.offset, msSinceFirstConnection, msSinceLastSuccess) progress := br.offset - br.lastRetryOffset if progress < bodyReaderMinimumProgress { logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress) return fmt.Errorf("(heuristic tuning data: last retry %d, current offset %d; %.3f ms total, %.3f ms since progress): %w", - br.lastRetryOffset, br.offset, totalTime, failureTime, originalErr) + br.lastRetryOffset, br.offset, msSinceFirstConnection, msSinceLastSuccess, originalErr) } logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr) return nil From 213718ca17594b43d15e581200dee2efd013c5b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:44:21 +0100 Subject: [PATCH 5/8] Also record, and output, the time of last retry, if any MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This will be used for more heuristics. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index 35bf49a51..34c5884cf 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -31,6 +31,7 @@ type bodyReader struct { body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. lastRetryOffset int64 + lastRetryTime time.Time // time.Time{} if N/A offset int64 // Current offset within the blob lastSuccessTime time.Time // time.Time{} if N/A } @@ -52,7 +53,9 @@ func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody body: firstBody, lastRetryOffset: 0, + lastRetryTime: time.Time{}, offset: 0, + lastSuccessTime: time.Time{}, } return res, nil } @@ -190,6 +193,7 @@ func (br *bodyReader) Read(p []byte) (int, error) { consumedBody = true br.body = res.Body br.lastRetryOffset = br.offset + br.lastRetryTime = time.Time{} return n, nil default: @@ -212,14 +216,15 @@ func millisecondsSinceOptional(currentTime time.Time, tm time.Time) float64 { func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error { currentTime := time.Now() msSinceFirstConnection := millisecondsSinceOptional(currentTime, br.firstConnectionTime) + msSinceLastRetry := millisecondsSinceOptional(currentTime, br.lastRetryTime) msSinceLastSuccess := millisecondsSinceOptional(currentTime, br.lastSuccessTime) - logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress", - redactedURL, originalErr, br.lastRetryOffset, br.offset, msSinceFirstConnection, msSinceLastSuccess) + logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: total %d @%.3f ms, last retry %d @%.3f ms, last progress @%.3f ms", + redactedURL, originalErr, br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess) progress := br.offset - br.lastRetryOffset if progress < bodyReaderMinimumProgress { logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress) - return fmt.Errorf("(heuristic tuning data: last retry %d, current offset %d; %.3f ms total, %.3f ms since progress): %w", - br.lastRetryOffset, br.offset, msSinceFirstConnection, msSinceLastSuccess, originalErr) + return fmt.Errorf("(heuristic tuning data: total %d @%.3f ms, last retry %d @%.3f ms, last progress @ %.3f ms): %w", + br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess, originalErr) } logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr) return nil From f864ed7b18b6bffbef6928e28517f3f181e0ebfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 19:50:28 +0100 Subject: [PATCH 6/8] Relax retry heuristics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Even if we didn't make much progress, allow one retry per minute anyway. Notably this allows one retry ~immediately, we see failures a few dozen milliseconds after the connection is set up. Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index 34c5884cf..4e30743f4 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -17,8 +17,12 @@ import ( "github.com/sirupsen/logrus" ) -// bodyReaderMinimumProgress is the minimum progress we want to see before we retry -const bodyReaderMinimumProgress = 1 * 1024 * 1024 +const ( + // bodyReaderMinimumProgress is the minimum progress we consider a good reason to retry + bodyReaderMinimumProgress = 1 * 1024 * 1024 + // bodyReaderMSSinceLastRetry is the minimum time since a last retry we consider a good reason to retry + bodyReaderMSSinceLastRetry = 60 * 1_000 +) // bodyReader is an io.ReadCloser returned by dockerImageSource.GetBlob, // which can transparently resume some (very limited) kinds of aborted connections. @@ -221,13 +225,21 @@ func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL stri logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: total %d @%.3f ms, last retry %d @%.3f ms, last progress @%.3f ms", redactedURL, originalErr, br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess) progress := br.offset - br.lastRetryOffset - if progress < bodyReaderMinimumProgress { - logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress) - return fmt.Errorf("(heuristic tuning data: total %d @%.3f ms, last retry %d @%.3f ms, last progress @ %.3f ms): %w", - br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess, originalErr) + if progress >= bodyReaderMinimumProgress { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %d bytes…", redactedURL, originalErr, progress) + return nil + } + if br.lastRetryTime == (time.Time{}) || msSinceLastRetry >= bodyReaderMSSinceLastRetry { + if br.lastRetryTime == (time.Time{}) { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting (first reconnection)…", redactedURL, originalErr) + } else { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %.3f ms…", redactedURL, originalErr, msSinceLastRetry) + } + return nil } - logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr) - return nil + logrus.Debugf("Not reconnecting to %s: insufficient progress %d / time since last retry %.3f ms", redactedURL, progress, msSinceLastRetry) + return fmt.Errorf("(heuristic tuning data: total %d @%.3f ms, last retry %d @%.3f ms, last progress @ %.3f ms): %w", + br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess, originalErr) } // Close implements io.ReadCloser From 3bf5378a99f1c9ba7aec7339a3701a29543d009e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 22:23:36 +0100 Subject: [PATCH 7/8] Differentiate between no retry and immediate retry also in the printed offset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miloslav Trmač --- docker/body_reader.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docker/body_reader.go b/docker/body_reader.go index 4e30743f4..01e926ab0 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -34,10 +34,10 @@ type bodyReader struct { firstConnectionTime time.Time body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. - lastRetryOffset int64 - lastRetryTime time.Time // time.Time{} if N/A - offset int64 // Current offset within the blob - lastSuccessTime time.Time // time.Time{} if N/A + lastRetryOffset int64 // -1 if N/A + lastRetryTime time.Time // time.Time{} if N/A + offset int64 // Current offset within the blob + lastSuccessTime time.Time // time.Time{} if N/A } // newBodyReader creates a bodyReader for request path in c. @@ -56,7 +56,7 @@ func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody firstConnectionTime: time.Now(), body: firstBody, - lastRetryOffset: 0, + lastRetryOffset: -1, lastRetryTime: time.Time{}, offset: 0, lastSuccessTime: time.Time{}, From 02d5fc5fd0de7934243f155c016a046f827b0fad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 13 Feb 2023 22:24:04 +0100 Subject: [PATCH 8/8] Add unit tests for the retry heuristics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miloslav Trmač --- docker/body_reader_test.go | 86 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/docker/body_reader_test.go b/docker/body_reader_test.go index ef702edc7..0011582b7 100644 --- a/docker/body_reader_test.go +++ b/docker/body_reader_test.go @@ -1,9 +1,13 @@ package docker import ( + "errors" + "math" "net/http" "testing" + "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -108,3 +112,85 @@ func TestParseContentRange(t *testing.T) { assert.Error(t, err, c, c) } } + +func TestMillisecondsSinceOptional(t *testing.T) { + current := time.Date(2023, 2, 9, 8, 7, 6, 5, time.UTC) + res := millisecondsSinceOptional(current, time.Time{}) + assert.True(t, math.IsNaN(res)) + tm := current.Add(-60 * time.Second) // 60 seconds _before_ current + res = millisecondsSinceOptional(current, tm) + assert.Equal(t, res, 60_000.0) +} + +func TestBodyReaderErrorIfNotReconnecting(t *testing.T) { + // Silence logrus.Info logs in the tested method + prevLevel := logrus.StandardLogger().Level + logrus.StandardLogger().SetLevel(logrus.WarnLevel) + t.Cleanup(func() { + logrus.StandardLogger().SetLevel(prevLevel) + }) + + for _, c := range []struct { + name string + previousRetry bool + currentOffset int64 + currentTime int // milliseconds + expectReconnect bool + }{ + { + name: "A lot of progress, after a long time, second retry", + previousRetry: true, + currentOffset: 2 * bodyReaderMinimumProgress, + currentTime: 2 * bodyReaderMSSinceLastRetry, + expectReconnect: true, + }, + { + name: "A lot of progress, after little time, second retry", + previousRetry: true, + currentOffset: 2 * bodyReaderMinimumProgress, + currentTime: 1, + expectReconnect: true, + }, + { + name: "Little progress, after a long time, second retry", + previousRetry: true, + currentOffset: 1, + currentTime: 2 * bodyReaderMSSinceLastRetry, + expectReconnect: true, + }, + { + name: "Little progress, after little time, second retry", + previousRetry: true, + currentOffset: 1, + currentTime: 1, + expectReconnect: false, + }, + { + name: "Little progress, after little time, first retry", + previousRetry: false, + currentOffset: 1, + currentTime: bodyReaderMSSinceLastRetry / 2, + expectReconnect: true, + }, + } { + tm := time.Now() + br := bodyReader{} + if c.previousRetry { + br.lastRetryOffset = 2 * bodyReaderMinimumProgress + br.offset = br.lastRetryOffset + c.currentOffset + br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime+2*bodyReaderMSSinceLastRetry) * time.Millisecond) + br.lastRetryTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond) + } else { + br.lastRetryOffset = -1 + br.lastRetryTime = time.Time{} + br.offset = c.currentOffset + br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond) + } + err := br.errorIfNotReconnecting(errors.New("some error for error text only"), "URL for error text only") + if c.expectReconnect { + assert.NoError(t, err, c.name, br) + } else { + assert.Error(t, err, c.name, br) + } + } +}