Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package etcd3retry

import (
"context"
"fmt"
"regexp"
"strings"
"time"

etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
Expand Down Expand Up @@ -36,15 +39,15 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error {
return c.Interface.Create(ctx, key, obj, out, ttl)
})
}

// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
return onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error {
return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject)
})
}
Expand All @@ -58,7 +61,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object
// and send it in an "ADDED" event, before watch starts.
func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
var ret watch.Interface
err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
err := onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error {
var innerErr error
ret, innerErr = c.Interface.Watch(ctx, key, opts)
return innerErr
Expand All @@ -72,7 +75,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error {
return c.Interface.Get(ctx, key, opts, objPtr)
})
}
Expand All @@ -84,7 +87,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error {
return c.Interface.GetList(ctx, key, opts, listObj)
})
}
Expand Down Expand Up @@ -125,23 +128,65 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
// )
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
return onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error {
return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
})
}

// isRetriableEtcdError returns true if a retry should be attempted, otherwise false.
// errorLabel is set to a non-empty value that reflects the type of error encountered.
func isRetriableEtcdError(err error) (errorLabel string, retry bool) {
if err != nil {
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok {
if etcdError.Code() == codes.Unavailable {
errorLabel = "Unavailable"
retry = true
}
}
// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an
// etcd client. Classifying them can be fragile since the storage methods may not return etcd client
// errors directly.
var errorLabelsBySuffix = map[string]string{
"etcdserver: leader changed": "LeaderChanged",
"etcdserver: no leader": "NoLeader",
"raft proposal dropped": "ProposalDropped",

"etcdserver: request timed out": "Timeout",
"etcdserver: request timed out, possibly due to previous leader failure": "Timeout",
"etcdserver: request timed out, possible due to connection lost": "Timeout",
"etcdserver: request timed out, waiting for the applied index took too long": "Timeout",
"etcdserver: server stopped": "Stopped",
}

var retriableWriteErrorSuffixes = func() *regexp.Regexp {
// This list should include only errors the caller is certain have no side effects.
suffixes := []string{
"etcdserver: leader changed",
"etcdserver: no leader",
"raft proposal dropped",
}
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
}()

// isRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided
// error is returned from a write attempt. If the error is retriable, a non-empty string classifying
// the error is also returned.
func isRetriableErrorOnWrite(err error) (string, bool) {
if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" {
return errorLabelsBySuffix[suffix], true
}
return "", false
}

var retriableReadErrorSuffixes = func() *regexp.Regexp {
var suffixes []string
for suffix := range errorLabelsBySuffix {
suffixes = append(suffixes, suffix)
}
return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`)))
}()

// isRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided
// error is returned from a read attempt. If the error is retriable, a non-empty string classifying
// the error is also returned.
func isRetriableErrorOnRead(err error) (string, bool) {
if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" {
return errorLabelsBySuffix[suffix], true
}
return
if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable {
return "Unavailable", true
}
return "", false
}

// onError allows the caller to retry fn in case the error returned by fn is retriable
Expand All @@ -163,6 +208,9 @@ func onError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
}

lastErrLabel, retry = retriable(err)
if klog.V(6).Enabled() {
klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry)
}
if retry {
lastErr = err
retryCounter++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ import (
func TestOnError(t *testing.T) {
tests := []struct {
name string
retriableFn func(error) (string, bool)
returnedFnError func(retryCounter int) error
expectedRetries int
expectedFinalError error
}{
{
name: "retry ErrLeaderChanged",
retriableFn: isRetriableErrorOnRead,
returnedFnError: func(_ int) error { return etcdrpc.ErrLeaderChanged },
expectedRetries: 5,
expectedFinalError: etcdrpc.ErrLeaderChanged,
},
{
name: "retry ErrLeaderChanged a few times",
name: "retry ErrLeaderChanged a few times",
retriableFn: isRetriableErrorOnRead,
returnedFnError: func(retryCounter int) error {
if retryCounter == 3 {
return nil
Expand All @@ -38,10 +41,12 @@ func TestOnError(t *testing.T) {
},
{
name: "no retries",
retriableFn: isRetriableErrorOnRead,
returnedFnError: func(_ int) error { return nil },
},
{
name: "no retries for a random error",
retriableFn: isRetriableErrorOnRead,
returnedFnError: func(_ int) error { return fmt.Errorf("random error") },
expectedFinalError: fmt.Errorf("random error"),
},
Expand All @@ -53,7 +58,7 @@ func TestOnError(t *testing.T) {
// we set it to -1 to indicate that the first
// execution is not a retry
actualRetries := -1
err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error {
err := onError(ctx, defaultRetry, scenario.retriableFn, func() error {
actualRetries++
return scenario.returnedFnError(actualRetries)
})
Expand All @@ -71,18 +76,67 @@ func TestOnError(t *testing.T) {
}
}

func TestIsRetriableEtcdError(t *testing.T) {
func TestIsRetriableErrorOnRead(t *testing.T) {
tests := []struct {
name string
etcdErr error
errorLabelExpected string
retryExpected bool
}{
{
name: "error is nil",
name: "generic storage error",
etcdErr: storage.NewKeyNotFoundError("key", 0),
errorLabelExpected: "",
retryExpected: false,
},
{
name: "connection refused error",
etcdErr: &url.Error{Err: &net.OpError{Err: syscall.ECONNREFUSED}},
errorLabelExpected: "",
retryExpected: false,
},
{
name: "etcd unavailable error",
etcdErr: etcdrpc.ErrLeaderChanged,
errorLabelExpected: "LeaderChanged",
retryExpected: true,
},
{
name: "should also inspect error message",
etcdErr: fmt.Errorf("etcdserver: no leader"),
errorLabelExpected: "NoLeader",
retryExpected: true,
},
{
name: "unavailable code with unrecognized suffix",
etcdErr: etcdrpc.ErrGRPCUnhealthy,
errorLabelExpected: "Unavailable",
retryExpected: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
errorCodeGot, retryGot := isRetriableErrorOnRead(test.etcdErr)

if test.errorLabelExpected != errorCodeGot {
t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)
}

if test.retryExpected != retryGot {
t.Errorf("expected retry: %s but got: %s", strconv.FormatBool(test.retryExpected), strconv.FormatBool(retryGot))
}
})
}
}

func TestIsRetriableErrorOnWrite(t *testing.T) {
tests := []struct {
name string
etcdErr error
errorLabelExpected string
retryExpected bool
}{
{
name: "generic storage error",
etcdErr: storage.NewKeyNotFoundError("key", 0),
Expand All @@ -98,20 +152,32 @@ func TestIsRetriableEtcdError(t *testing.T) {
{
name: "etcd unavailable error",
etcdErr: etcdrpc.ErrLeaderChanged,
errorLabelExpected: "Unavailable",
errorLabelExpected: "LeaderChanged",
retryExpected: true,
},
{
name: "should also inspect error message",
etcdErr: fmt.Errorf("etcdserver: leader changed"),
errorLabelExpected: "Unavailable",
etcdErr: fmt.Errorf("etcdserver: no leader"),
errorLabelExpected: "NoLeader",
retryExpected: true,
},
{
name: "unavailable code with unrecognized suffix",
etcdErr: etcdrpc.ErrGRPCUnhealthy,
errorLabelExpected: "",
retryExpected: false,
},
{
name: "timeout not retried for writes",
etcdErr: etcdrpc.ErrGRPCTimeout,
errorLabelExpected: "",
retryExpected: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
errorCodeGot, retryGot := isRetriableEtcdError(test.etcdErr)
errorCodeGot, retryGot := isRetriableErrorOnWrite(test.etcdErr)

if test.errorLabelExpected != errorCodeGot {
t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)
Expand Down