From 475416520cfdbea4c2debf9546e5df4ec6415f86 Mon Sep 17 00:00:00 2001 From: Ben Luddy Date: Thu, 30 Jan 2025 16:44:27 -0500 Subject: [PATCH] UPSTREAM: : Don't retry storage calls with side effects. The existing patch retried any etcd error returned from storage with the code "Unavailable". Writes can only be safely retried if the client can be absolutely sure that the initial attempt ended before persisting any changes. The "Unavailable" code includes errors like "timed out" that can't be safely retried for writes. --- .../etcd3/etcd3retry/retry_etcdclient.go | 82 +++++++++++++++---- .../etcd3/etcd3retry/retry_etcdclient_test.go | 82 +++++++++++++++++-- 2 files changed, 139 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go index 12bd733f3d70f..7644563fff78b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go @@ -2,6 +2,9 @@ package etcd3retry import ( "context" + "fmt" + "regexp" + "strings" "time" etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -36,7 +39,7 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface { // in seconds (0 means forever). If no error is returned and out is not nil, out will be // set to the read value from database. func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.Create(ctx, key, obj, out, ttl) }) } @@ -44,7 +47,7 @@ func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.O // Delete removes the specified key and returns the value that existed at that spot. // If key didn't exist, it will return NotFound storage error. func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject) }) } @@ -58,7 +61,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object // and send it in an "ADDED" event, before watch starts. func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { var ret watch.Interface - err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + err := onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { var innerErr error ret, innerErr = c.Interface.Watch(ctx, key, opts) return innerErr @@ -72,7 +75,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp // The returned contents may be delayed, but it is guaranteed that they will // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { return c.Interface.Get(ctx, key, opts, objPtr) }) } @@ -84,7 +87,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio // The returned contents may be delayed, but it is guaranteed that they will // match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnRead, func() error { return c.Interface.GetList(ctx, key, opts, listObj) }) } @@ -125,23 +128,65 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List // ) func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { - return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return onError(ctx, defaultRetry, isRetriableErrorOnWrite, func() error { return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) }) } -// isRetriableEtcdError returns true if a retry should be attempted, otherwise false. -// errorLabel is set to a non-empty value that reflects the type of error encountered. -func isRetriableEtcdError(err error) (errorLabel string, retry bool) { - if err != nil { - if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok { - if etcdError.Code() == codes.Unavailable { - errorLabel = "Unavailable" - retry = true - } - } +// These errors are coming back from the k8s.io/apiserver storage.Interface, not directly from an +// etcd client. Classifying them can be fragile since the storage methods may not return etcd client +// errors directly. +var errorLabelsBySuffix = map[string]string{ + "etcdserver: leader changed": "LeaderChanged", + "etcdserver: no leader": "NoLeader", + "raft proposal dropped": "ProposalDropped", + + "etcdserver: request timed out": "Timeout", + "etcdserver: request timed out, possibly due to previous leader failure": "Timeout", + "etcdserver: request timed out, possible due to connection lost": "Timeout", + "etcdserver: request timed out, waiting for the applied index took too long": "Timeout", + "etcdserver: server stopped": "Stopped", +} + +var retriableWriteErrorSuffixes = func() *regexp.Regexp { + // This list should include only errors the caller is certain have no side effects. + suffixes := []string{ + "etcdserver: leader changed", + "etcdserver: no leader", + "raft proposal dropped", + } + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) +}() + +// isRetriableErrorOnWrite returns true if and only if a retry should be attempted when the provided +// error is returned from a write attempt. If the error is retriable, a non-empty string classifying +// the error is also returned. +func isRetriableErrorOnWrite(err error) (string, bool) { + if suffix := retriableWriteErrorSuffixes.FindString(err.Error()); suffix != "" { + return errorLabelsBySuffix[suffix], true + } + return "", false +} + +var retriableReadErrorSuffixes = func() *regexp.Regexp { + var suffixes []string + for suffix := range errorLabelsBySuffix { + suffixes = append(suffixes, suffix) + } + return regexp.MustCompile(fmt.Sprintf(`(%s)$`, strings.Join(suffixes, `|`))) +}() + +// isRetriableErrorOnRead returns true if and only if a retry should be attempted when the provided +// error is returned from a read attempt. If the error is retriable, a non-empty string classifying +// the error is also returned. +func isRetriableErrorOnRead(err error) (string, bool) { + if suffix := retriableReadErrorSuffixes.FindString(err.Error()); suffix != "" { + return errorLabelsBySuffix[suffix], true } - return + if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok && etcdError.Code() == codes.Unavailable { + return "Unavailable", true + } + return "", false } // onError allows the caller to retry fn in case the error returned by fn is retriable @@ -163,6 +208,9 @@ func onError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s } lastErrLabel, retry = retriable(err) + if klog.V(6).Enabled() { + klog.V(6).InfoS("observed storage error", "err", err, "retriable", retry) + } if retry { lastErr = err retryCounter++ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go index 3e9e9177e7eb6..577c27e594587 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient_test.go @@ -16,18 +16,21 @@ import ( func TestOnError(t *testing.T) { tests := []struct { name string + retriableFn func(error) (string, bool) returnedFnError func(retryCounter int) error expectedRetries int expectedFinalError error }{ { name: "retry ErrLeaderChanged", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return etcdrpc.ErrLeaderChanged }, expectedRetries: 5, expectedFinalError: etcdrpc.ErrLeaderChanged, }, { - name: "retry ErrLeaderChanged a few times", + name: "retry ErrLeaderChanged a few times", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(retryCounter int) error { if retryCounter == 3 { return nil @@ -38,10 +41,12 @@ func TestOnError(t *testing.T) { }, { name: "no retries", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return nil }, }, { name: "no retries for a random error", + retriableFn: isRetriableErrorOnRead, returnedFnError: func(_ int) error { return fmt.Errorf("random error") }, expectedFinalError: fmt.Errorf("random error"), }, @@ -53,7 +58,7 @@ func TestOnError(t *testing.T) { // we set it to -1 to indicate that the first // execution is not a retry actualRetries := -1 - err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + err := onError(ctx, defaultRetry, scenario.retriableFn, func() error { actualRetries++ return scenario.returnedFnError(actualRetries) }) @@ -71,7 +76,7 @@ func TestOnError(t *testing.T) { } } -func TestIsRetriableEtcdError(t *testing.T) { +func TestIsRetriableErrorOnRead(t *testing.T) { tests := []struct { name string etcdErr error @@ -79,10 +84,59 @@ func TestIsRetriableEtcdError(t *testing.T) { retryExpected bool }{ { - name: "error is nil", + name: "generic storage error", + etcdErr: storage.NewKeyNotFoundError("key", 0), + errorLabelExpected: "", + retryExpected: false, + }, + { + name: "connection refused error", + etcdErr: &url.Error{Err: &net.OpError{Err: syscall.ECONNREFUSED}}, errorLabelExpected: "", retryExpected: false, }, + { + name: "etcd unavailable error", + etcdErr: etcdrpc.ErrLeaderChanged, + errorLabelExpected: "LeaderChanged", + retryExpected: true, + }, + { + name: "should also inspect error message", + etcdErr: fmt.Errorf("etcdserver: no leader"), + errorLabelExpected: "NoLeader", + retryExpected: true, + }, + { + name: "unavailable code with unrecognized suffix", + etcdErr: etcdrpc.ErrGRPCUnhealthy, + errorLabelExpected: "Unavailable", + retryExpected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + errorCodeGot, retryGot := isRetriableErrorOnRead(test.etcdErr) + + if test.errorLabelExpected != errorCodeGot { + t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot) + } + + if test.retryExpected != retryGot { + t.Errorf("expected retry: %s but got: %s", strconv.FormatBool(test.retryExpected), strconv.FormatBool(retryGot)) + } + }) + } +} + +func TestIsRetriableErrorOnWrite(t *testing.T) { + tests := []struct { + name string + etcdErr error + errorLabelExpected string + retryExpected bool + }{ { name: "generic storage error", etcdErr: storage.NewKeyNotFoundError("key", 0), @@ -98,20 +152,32 @@ func TestIsRetriableEtcdError(t *testing.T) { { name: "etcd unavailable error", etcdErr: etcdrpc.ErrLeaderChanged, - errorLabelExpected: "Unavailable", + errorLabelExpected: "LeaderChanged", retryExpected: true, }, { name: "should also inspect error message", - etcdErr: fmt.Errorf("etcdserver: leader changed"), - errorLabelExpected: "Unavailable", + etcdErr: fmt.Errorf("etcdserver: no leader"), + errorLabelExpected: "NoLeader", retryExpected: true, }, + { + name: "unavailable code with unrecognized suffix", + etcdErr: etcdrpc.ErrGRPCUnhealthy, + errorLabelExpected: "", + retryExpected: false, + }, + { + name: "timeout not retried for writes", + etcdErr: etcdrpc.ErrGRPCTimeout, + errorLabelExpected: "", + retryExpected: false, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - errorCodeGot, retryGot := isRetriableEtcdError(test.etcdErr) + errorCodeGot, retryGot := isRetriableErrorOnWrite(test.etcdErr) if test.errorLabelExpected != errorCodeGot { t.Errorf("expected error code: %s but got: %s", test.errorLabelExpected, errorCodeGot)