Skip to content

Commit

Permalink
Merge pull request #1847 from mtrmac/eof2
Browse files Browse the repository at this point in the history
Relax retry heuristics
  • Loading branch information
rhatdan committed Feb 14, 2023
2 parents bdc5f13 + 02d5fc5 commit ae0edb7
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 33 deletions.
85 changes: 52 additions & 33 deletions docker/body_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@ import (
"github.com/sirupsen/logrus"
)

// bodyReaderMinimumProgress is the minimum progress we want to see before we retry
const bodyReaderMinimumProgress = 1 * 1024 * 1024
const (
// bodyReaderMinimumProgress is the minimum progress we consider a good reason to retry
bodyReaderMinimumProgress = 1 * 1024 * 1024
// bodyReaderMSSinceLastRetry is the minimum time since a last retry we consider a good reason to retry
bodyReaderMSSinceLastRetry = 60 * 1_000
)

// bodyReader is an io.ReadCloser returned by dockerImageSource.GetBlob,
// which can transparently resume some (very limited) kinds of aborted connections.
type bodyReader struct {
ctx context.Context
c *dockerClient

path string // path to pass to makeRequest to retry
logURL *url.URL // a string to use in error messages
body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close.
lastRetryOffset int64
offset int64 // Current offset within the blob
ctx context.Context
c *dockerClient
path string // path to pass to makeRequest to retry
logURL *url.URL // a string to use in error messages
firstConnectionTime time.Time
lastSuccessTime time.Time // time.Time{} if N/A

body io.ReadCloser // The currently open connection we use to read data, or nil if there is nothing to read from / close.
lastRetryOffset int64 // -1 if N/A
lastRetryTime time.Time // time.Time{} if N/A
offset int64 // Current offset within the blob
lastSuccessTime time.Time // time.Time{} if N/A
}

// newBodyReader creates a bodyReader for request path in c.
Expand All @@ -44,15 +49,17 @@ func newBodyReader(ctx context.Context, c *dockerClient, path string, firstBody
return nil, err
}
res := &bodyReader{
ctx: ctx,
c: c,

ctx: ctx,
c: c,
path: path,
logURL: logURL,
body: firstBody,
lastRetryOffset: 0,
offset: 0,
firstConnectionTime: time.Now(),

body: firstBody,
lastRetryOffset: -1,
lastRetryTime: time.Time{},
offset: 0,
lastSuccessTime: time.Time{},
}
return res, nil
}
Expand Down Expand Up @@ -190,6 +197,7 @@ func (br *bodyReader) Read(p []byte) (int, error) {
consumedBody = true
br.body = res.Body
br.lastRetryOffset = br.offset
br.lastRetryTime = time.Time{}
return n, nil

default:
Expand All @@ -198,29 +206,40 @@ func (br *bodyReader) Read(p []byte) (int, error) {
}
}

// millisecondsSince is like time.Since(tm).Milliseconds, but it returns a floating-point value
func millisecondsSince(tm time.Time) float64 {
return float64(time.Since(tm).Nanoseconds()) / 1_000_000.0
// millisecondsSinceOptional is like currentTime.Sub(tm).Milliseconds, but it returns a floating-point value.
// If tm is time.Time{}, it returns math.NaN()
func millisecondsSinceOptional(currentTime time.Time, tm time.Time) float64 {
if tm == (time.Time{}) {
return math.NaN()
}
return float64(currentTime.Sub(tm).Nanoseconds()) / 1_000_000.0
}

// errorIfNotReconnecting makes a heuristic decision whether we should reconnect after err at redactedURL; if so, it returns nil,
// otherwise it returns an appropriate error to return to the caller (possibly augmented with data about the heuristic)
func (br *bodyReader) errorIfNotReconnecting(originalErr error, redactedURL string) error {
totalTime := millisecondsSince(br.firstConnectionTime)
failureTime := math.NaN()
if (br.lastSuccessTime != time.Time{}) {
failureTime = millisecondsSince(br.lastSuccessTime)
}
logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: lastRetryOffset %d, offset %d, %.3f ms since first connection, %.3f ms since last progress",
redactedURL, originalErr, br.lastRetryOffset, br.offset, totalTime, failureTime)
currentTime := time.Now()
msSinceFirstConnection := millisecondsSinceOptional(currentTime, br.firstConnectionTime)
msSinceLastRetry := millisecondsSinceOptional(currentTime, br.lastRetryTime)
msSinceLastSuccess := millisecondsSinceOptional(currentTime, br.lastSuccessTime)
logrus.Debugf("Reading blob body from %s failed (%#v), decision inputs: total %d @%.3f ms, last retry %d @%.3f ms, last progress @%.3f ms",
redactedURL, originalErr, br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess)
progress := br.offset - br.lastRetryOffset
if progress < bodyReaderMinimumProgress {
logrus.Debugf("Not reconnecting to %s because only %d bytes progress made", redactedURL, progress)
return fmt.Errorf("(heuristic tuning data: last retry %d, current offset %d; %.3f ms total, %.3f ms since progress): %w",
br.lastRetryOffset, br.offset, totalTime, failureTime, originalErr)
if progress >= bodyReaderMinimumProgress {
logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %d bytes…", redactedURL, originalErr, progress)
return nil
}
if br.lastRetryTime == (time.Time{}) || msSinceLastRetry >= bodyReaderMSSinceLastRetry {
if br.lastRetryTime == (time.Time{}) {
logrus.Infof("Reading blob body from %s failed (%v), reconnecting (first reconnection)…", redactedURL, originalErr)
} else {
logrus.Infof("Reading blob body from %s failed (%v), reconnecting after %.3f ms…", redactedURL, originalErr, msSinceLastRetry)
}
return nil
}
logrus.Infof("Reading blob body from %s failed (%v), reconnecting…", redactedURL, originalErr)
return nil
logrus.Debugf("Not reconnecting to %s: insufficient progress %d / time since last retry %.3f ms", redactedURL, progress, msSinceLastRetry)
return fmt.Errorf("(heuristic tuning data: total %d @%.3f ms, last retry %d @%.3f ms, last progress @ %.3f ms): %w",
br.offset, msSinceFirstConnection, br.lastRetryOffset, msSinceLastRetry, msSinceLastSuccess, originalErr)
}

// Close implements io.ReadCloser
Expand Down
86 changes: 86 additions & 0 deletions docker/body_reader_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package docker

import (
"errors"
"math"
"net/http"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -108,3 +112,85 @@ func TestParseContentRange(t *testing.T) {
assert.Error(t, err, c, c)
}
}

func TestMillisecondsSinceOptional(t *testing.T) {
current := time.Date(2023, 2, 9, 8, 7, 6, 5, time.UTC)
res := millisecondsSinceOptional(current, time.Time{})
assert.True(t, math.IsNaN(res))
tm := current.Add(-60 * time.Second) // 60 seconds _before_ current
res = millisecondsSinceOptional(current, tm)
assert.Equal(t, res, 60_000.0)
}

func TestBodyReaderErrorIfNotReconnecting(t *testing.T) {
// Silence logrus.Info logs in the tested method
prevLevel := logrus.StandardLogger().Level
logrus.StandardLogger().SetLevel(logrus.WarnLevel)
t.Cleanup(func() {
logrus.StandardLogger().SetLevel(prevLevel)
})

for _, c := range []struct {
name string
previousRetry bool
currentOffset int64
currentTime int // milliseconds
expectReconnect bool
}{
{
name: "A lot of progress, after a long time, second retry",
previousRetry: true,
currentOffset: 2 * bodyReaderMinimumProgress,
currentTime: 2 * bodyReaderMSSinceLastRetry,
expectReconnect: true,
},
{
name: "A lot of progress, after little time, second retry",
previousRetry: true,
currentOffset: 2 * bodyReaderMinimumProgress,
currentTime: 1,
expectReconnect: true,
},
{
name: "Little progress, after a long time, second retry",
previousRetry: true,
currentOffset: 1,
currentTime: 2 * bodyReaderMSSinceLastRetry,
expectReconnect: true,
},
{
name: "Little progress, after little time, second retry",
previousRetry: true,
currentOffset: 1,
currentTime: 1,
expectReconnect: false,
},
{
name: "Little progress, after little time, first retry",
previousRetry: false,
currentOffset: 1,
currentTime: bodyReaderMSSinceLastRetry / 2,
expectReconnect: true,
},
} {
tm := time.Now()
br := bodyReader{}
if c.previousRetry {
br.lastRetryOffset = 2 * bodyReaderMinimumProgress
br.offset = br.lastRetryOffset + c.currentOffset
br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime+2*bodyReaderMSSinceLastRetry) * time.Millisecond)
br.lastRetryTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond)
} else {
br.lastRetryOffset = -1
br.lastRetryTime = time.Time{}
br.offset = c.currentOffset
br.firstConnectionTime = tm.Add(-time.Duration(c.currentTime) * time.Millisecond)
}
err := br.errorIfNotReconnecting(errors.New("some error for error text only"), "URL for error text only")
if c.expectReconnect {
assert.NoError(t, err, c.name, br)
} else {
assert.Error(t, err, c.name, br)
}
}
}

0 comments on commit ae0edb7

Please sign in to comment.