diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go index 12bd733f3d70f..7644563fff78b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go @@ -2,6 +2,9 @@ package etcd3retry import ( "context" + "fmt" + "regexp" + "strings" "time" etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -36,7 +39,7 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface { // in seconds (0 means forever). If no error is returned and out is not nil, out will be // set to the read value from database. func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.Create(ctx, key, obj, out, ttl) }) } @@ -44,7 +47,7 @@ func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.O // Delete removes the specified key and returns the value that existed at that spot. // If key didn't exist, it will return NotFound storage error. func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject) }) } @@ -58,7 +61,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object // and send it in an "ADDED" event, before watch starts. func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { var ret watch.Interface - err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + err := onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { var innerErr error ret, innerErr = c.Interface.Watch(ctx, key, opts) return innerErr @@ -72,7 +75,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp // The returned contents may be delayed, but it is guaranteed that they will // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { return c.Interface.Get(ctx, key, opts, objPtr) }) } @@ -84,7 +87,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio // The returned contents may be delayed, but it is guaranteed that they will // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { return c.Interface.GetList(ctx, key, opts, listObj) }) } @@ -125,23 +128,65 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List // ) func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) }) } -// isRetriableEtcdError returns true if a retry should be attempted, otherwise false. -// errorLabel is set to a non-empty value that reflects the type of error encountered. -func isRetriableEtcdError(err error) (errorLabel string, retry bool) { - if err != nil { - if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok { - if etcdError.Code() == codes.Unavailable { - errorLabel = "Unavailable" - retry = true - } - } +// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an +// etcd client. Classifying them can be fragile since the storage methods may not return etcd client +// errors directly. +var errorLabelsBySuffix = map[string]string{ + "etcdserver: leader changed": "LeaderChanged", + "etcdserver: no leader": "NoLeader", + "raft proposal dropped": "ProposalDropped", + + "etcdserver: request timed out": "Timeout", + "etcdserver: request timed out, possibly due to previous leader failure": "Timeout", + "etcdserver: request timed out, possible due to connection lost": "Timeout", + "etcdserver: request timed out, waiting for the applied index took too long": "Timeout", + "etcdserver: server stopped": "Stopped", +} + +var retriableWriteErrorSuffixes = func() *regexp.Regexp { + // This list should include only errors the caller is certain have no side effects. + suffixes := []string{ + "etcdserver: leader changed", + "etcdserver: no leader", + "raft proposal dropped", + } + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) +}() + +// isRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided +// error is returned from a write attempt. If the error is retriable, a non-empty string classifying +// the error is also returned. +func isRetriableErrorOnWrite(err error) (string, bool) { + if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" { + return errorLabelsBySuffix[suffix], true + } + return "", false +} + +var retriableReadErrorSuffixes = func() *regexp.Regexp { + var suffixes []string + for suffix := range errorLabelsBySuffix { + suffixes = append(suffixes, suffix) + } + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) +}() + +// isRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided +// error is returned from a read attempt. If the error is retriable, a non-empty string classifying +// the error is also returned. +func isRetriableErrorOnRead(err error) (string, bool) { + if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" { + return errorLabelsBySuffix[suffix], true } - return + if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable { + return "Unavailable", true + } + return "", false } // onError allows the caller to retry fn in case the error returned by fn is retriable @@ -163,6 +208,9 @@ func onError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s } lastErrLabel, retry = retriable(err) + if klog.V(6).Enabled() { + klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry) + } if retry { lastErr = err retryCounter++ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go index 3e9e9177e7eb6..577c27e594587 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go @@ -16,18 +16,21 @@ import ( func TestOnError(t *testing.T) { tests := []struct { name string + retriableFn func(error) (string, bool) returnedFnError func(retryCounter int) error expectedRetries int expectedFinalError error }{ { name: "retry ErrLeaderChanged", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return etcdrpc.ErrLeaderChanged }, expectedRetries: 5, expectedFinalError: etcdrpc.ErrLeaderChanged, }, { - name: "retry ErrLeaderChanged a few times", + name: "retry ErrLeaderChanged a few times", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(retryCounter int) error { if retryCounter == 3 { return nil @@ -38,10 +41,12 @@ func TestOnError(t *testing.T) { }, { name: "no retries", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return nil }, }, { name: "no retries for a random error", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return fmt.Errorf("random error") }, expectedFinalError: fmt.Errorf("random error"), }, @@ -53,7 +58,7 @@ func TestOnError(t *testing.T) { // we set it to -1 to indicate that the first // execution is not a retry actualRetries := -1 - err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + err := onError(ctx, defaultRetry, scenario.retriableFn, func() error { actualRetries++ return scenario.returnedFnError(actualRetries) }) @@ -71,7 +76,7 @@ func TestOnError(t *testing.T) { } } -func TestIsRetriableEtcdError(t *testing.T) { +func TestIsRetriableErrorOnRead(t *testing.T) { tests := []struct { name string etcdErr error @@ -79,10 +84,59 @@ func TestIsRetriableEtcdError(t *testing.T) { retryExpected bool }{ { - name: "error is nil", + name: "generic storage error", + etcdErr: storage.NewKeyNotFoundError("key", 0), + errorLabelExpected: "", + retryExpected: false, + }, + { + name: "connection refused error", + etcdErr: &url.Error{Err: &net.OpError{Err: syscall.ECONNREFUSED}}, errorLabelExpected: "", retryExpected: false, }, + { + name: "etcd unavailable error", + etcdErr: etcdrpc.ErrLeaderChanged, + errorLabelExpected: "LeaderChanged", + retryExpected: true, + }, + { + name: "should also inspect error message", + etcdErr: fmt.Errorf("etcdserver: no leader"), + errorLabelExpected: "NoLeader", + retryExpected: true, + }, + { + name: "unavailable code with unrecognized suffix", + etcdErr: etcdrpc.ErrGRPCUnhealthy, + errorLabelExpected: "Unavailable", + retryExpected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + errorCodeGot, retryGot := isRetriableErrorOnRead(test.etcdErr) + + if test.errorLabelExpected != errorCodeGot { + t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot) + } + + if test.retryExpected != retryGot { + t.Errorf("expected retry: %s but got: %s", strconv.FormatBool(test.retryExpected), strconv.FormatBool(retryGot)) + } + }) + } +} + +func TestIsRetriableErrorOnWrite(t *testing.T) { + tests := []struct { + name string + etcdErr error + errorLabelExpected string + retryExpected bool + }{ { name: "generic storage error", etcdErr: storage.NewKeyNotFoundError("key", 0), @@ -98,20 +152,32 @@ func TestIsRetriableEtcdError(t *testing.T) { { name: "etcd unavailable error", etcdErr: etcdrpc.ErrLeaderChanged, - errorLabelExpected: "Unavailable", + errorLabelExpected: "LeaderChanged", retryExpected: true, }, { name: "should also inspect error message", - etcdErr: fmt.Errorf("etcdserver: leader changed"), - errorLabelExpected: "Unavailable", + etcdErr: fmt.Errorf("etcdserver: no leader"), + errorLabelExpected: "NoLeader", retryExpected: true, }, + { + name: "unavailable code with unrecognized suffix", + etcdErr: etcdrpc.ErrGRPCUnhealthy, + errorLabelExpected: "", + retryExpected: false, + }, + { + name: "timeout not retried for writes", + etcdErr: etcdrpc.ErrGRPCTimeout, + errorLabelExpected: "", + retryExpected: false, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - errorCodeGot, retryGot := isRetriableEtcdError(test.etcdErr) + errorCodeGot, retryGot := isRetriableErrorOnWrite(test.etcdErr) if test.errorLabelExpected != errorCodeGot { t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)