diff --git a/docker/body_reader.go b/docker/body_reader.go index 0af73a880..01e926ab0 100644 --- a/docker/body_reader.go +++ b/docker/body_reader.go @@ -17,22 +17,27 @@ import ( "github.com/sirupsen/logrus" ) -// bodyReaderMinimumProgress is the minimum progress we want to see before we retry -const bodyReaderMinimumProgress = 1 * 1024 * 1024 +const ( + // bodyReaderMinimumProgress is the minimum progress we consider a good reason to retry + bodyReaderMinimumProgress = 1 * 1024 * 1024 + // bodyReaderMSSinceLastRetry is the minimum time since a last retry we consider a good reason to retry + bodyReaderMSSinceLastRetry = 60 * 1_000 +) // bodyReader is an io.ReadCloser returned by dockerImageSource.GetBlob, // which can transparently resume some (very limited) kinds of aborted connections. type bodyReader struct { - ctx context.Context - c *dockerClient - - path string // path to pass to makeRequest to retry - logURL *url.URL // a string to use in error messages - body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. - lastRetryOffset int64 - offset int64 // Current offset within the blob + ctx context.Context + c *dockerClient + path string // path to pass to makeRequest to retry + logURL *url.URL // a string to use in error messages firstConnectionTime time.Time - lastSuccessTime time.Time // time.Time{} if N/A + + body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close. + lastRetryOffset int64 // -1 if N/A + lastRetryTime time.Time // time.Time{} if N/A + offset int64 // Current offset within the blob + lastSuccessTime time.Time // time.Time{} if N/A } // newBodyReader creates a bodyReader for request path in c. @@ -44,15 +49,17 @@ func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody return nil, err } res := &bodyReader{ - ctx: ctx, - c: c, - + ctx: ctx, + c: c, path: path, logURL: logURL, - body: firstBody, - lastRetryOffset: 0, - offset: 0, firstConnectionTime: time.Now(), + + body: firstBody, + lastRetryOffset: -1, + lastRetryTime: time.Time{}, + offset: 0, + lastSuccessTime: time.Time{}, } return res, nil } @@ -190,6 +197,7 @@ func (br *bodyReader) Read(p []byte) (int, error) { consumedBody = true br.body = res.Body br.lastRetryOffset = br.offset + br.lastRetryTime = time.Time{} return n, nil default: @@ -198,29 +206,40 @@ func (br *bodyReader) Read(p []byte) (int, error) { } } -// millisecondsSince is like time.Since(tm).Milliseconds, but it returns a floating-point value -func millisecondsSince(tm time.Time) float64 { - return float64(time.Since(tm).Nanoseconds()) / 1_000_000.0 +// millisecondsSinceOptional is like currentTime.Sub(tm).Milliseconds, but it returns a floating-point value. +// If tm is time.Time{}, it returns math.NaN() +func millisecondsSinceOptional(currentTime time.Time, tm time.Time) float64 { + if tm == (time.Time{}) { + return math.NaN() + } + return float64(currentTime.Sub(tm).Nanoseconds()) / 1_000_000.0 } // errorIfNotReconnecting makes a heuristic decision whether we should reconnect after err at redactedURL; if so, it returns nil, // otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic) func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error { - totalTime := millisecondsSince(br.firstConnectionTime) - failureTime := math.NaN() - if (br.lastSuccessTime != time.Time{}) { - failureTime = millisecondsSince(br.lastSuccessTime) - } - logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress", - redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime) + currentTime := time.Now() + msSinceFirstConnection := millisecondsSinceOptional(currentTime, br.firstConnectionTime) + msSinceLastRetry := millisecondsSinceOptional(currentTime, br.lastRetryTime) + msSinceLastSuccess := millisecondsSinceOptional(currentTime, br.lastSuccessTime) + logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: total %d @%.3f ms, last retry %d @%.3f ms, last progress @%.3f ms", + redactedURL, originalErr, br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess) progress := br.offset - br.lastRetryOffset - if progress < bodyReaderMinimumProgress { - logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress) - return fmt.Errorf("(heuristic tuning data: last retry %d, current offset %d; %.3f ms total, %.3f ms since progress): %w", - br.lastRetryOffset, br.offset, totalTime, failureTime, originalErr) + if progress >= bodyReaderMinimumProgress { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %d bytes…", redactedURL, originalErr, progress) + return nil + } + if br.lastRetryTime == (time.Time{}) || msSinceLastRetry >= bodyReaderMSSinceLastRetry { + if br.lastRetryTime == (time.Time{}) { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting (first reconnection)…", redactedURL, originalErr) + } else { + logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %.3f ms…", redactedURL, originalErr, msSinceLastRetry) + } + return nil } - logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr) - return nil + logrus.Debugf("Not reconnecting to %s: insufficient progress %d / time since last retry %.3f ms", redactedURL, progress, msSinceLastRetry) + return fmt.Errorf("(heuristic tuning data: total %d @%.3f ms, last retry %d @%.3f ms, last progress @ %.3f ms): %w", + br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess, originalErr) } // Close implements io.ReadCloser diff --git a/docker/body_reader_test.go b/docker/body_reader_test.go index ef702edc7..0011582b7 100644 --- a/docker/body_reader_test.go +++ b/docker/body_reader_test.go @@ -1,9 +1,13 @@ package docker import ( + "errors" + "math" "net/http" "testing" + "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -108,3 +112,85 @@ func TestParseContentRange(t *testing.T) { assert.Error(t, err, c, c) } } + +func TestMillisecondsSinceOptional(t *testing.T) { + current := time.Date(2023, 2, 9, 8, 7, 6, 5, time.UTC) + res := millisecondsSinceOptional(current, time.Time{}) + assert.True(t, math.IsNaN(res)) + tm := current.Add(-60 * time.Second) // 60 seconds _before_ current + res = millisecondsSinceOptional(current, tm) + assert.Equal(t, res, 60_000.0) +} + +func TestBodyReaderErrorIfNotReconnecting(t *testing.T) { + // Silence logrus.Info logs in the tested method + prevLevel := logrus.StandardLogger().Level + logrus.StandardLogger().SetLevel(logrus.WarnLevel) + t.Cleanup(func() { + logrus.StandardLogger().SetLevel(prevLevel) + }) + + for _, c := range []struct { + name string + previousRetry bool + currentOffset int64 + currentTime int // milliseconds + expectReconnect bool + }{ + { + name: "A lot of progress, after a long time, second retry", + previousRetry: true, + currentOffset: 2 * bodyReaderMinimumProgress, + currentTime: 2 * bodyReaderMSSinceLastRetry, + expectReconnect: true, + }, + { + name: "A lot of progress, after little time, second retry", + previousRetry: true, + currentOffset: 2 * bodyReaderMinimumProgress, + currentTime: 1, + expectReconnect: true, + }, + { + name: "Little progress, after a long time, second retry", + previousRetry: true, + currentOffset: 1, + currentTime: 2 * bodyReaderMSSinceLastRetry, + expectReconnect: true, + }, + { + name: "Little progress, after little time, second retry", + previousRetry: true, + currentOffset: 1, + currentTime: 1, + expectReconnect: false, + }, + { + name: "Little progress, after little time, first retry", + previousRetry: false, + currentOffset: 1, + currentTime: bodyReaderMSSinceLastRetry / 2, + expectReconnect: true, + }, + } { + tm := time.Now() + br := bodyReader{} + if c.previousRetry { + br.lastRetryOffset = 2 * bodyReaderMinimumProgress + br.offset = br.lastRetryOffset + c.currentOffset + br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime+2*bodyReaderMSSinceLastRetry) * time.Millisecond) + br.lastRetryTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond) + } else { + br.lastRetryOffset = -1 + br.lastRetryTime = time.Time{} + br.offset = c.currentOffset + br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond) + } + err := br.errorIfNotReconnecting(errors.New("some error for error text only"), "URL for error text only") + if c.expectReconnect { + assert.NoError(t, err, c.name, br) + } else { + assert.Error(t, err, c.name, br) + } + } +}