From 6af0038da34751158582d15815ba3e5fee1bf11d Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Fri, 7 Jun 2024 20:08:53 +0100 Subject: [PATCH] [v16] add `services.AcquireSemaphoreLockWithRetry` (#42631) * transform `services.SemaphoreLock` to satisfy `context.Context` This PR makes `services.SemaphoreLock` statisfy the `context.Context` interface. Extending the `services.SemaphoreLock` makes it possible to use the lease as a context in context propgation. ```go lease, err:=services.AcquireSemaphore(ctx,...) if err!=nil{ ... } ctx, cancel:=context.WithCancel(lease) ``` `lease` is released when the semaphore lock is lost or when the parent context is cancelled so we can use it as argument to other context functions without having to carry the parent context. Signed-off-by: Tiago Silva * use ctx.Done channel from stdlib context to avoid spining a goroutine * add `services.AcquireSemaphoreLockWithRetry` This PR introduces `services.AcquireSemaphoreLockWithRetry` function that only returns if it sucessfully acquired the lock or if the parent context is cancelled. This function will retry until it sucessfully acquires the lock before returning. Signed-off-by: Tiago Silva * Update lib/services/semaphore.go Co-authored-by: Edward Dowling --------- Signed-off-by: Tiago Silva Co-authored-by: Edward Dowling --- lib/services/semaphore.go | 28 +++++++++++++++++++++++++++ lib/srv/discovery/access_graph.go | 32 ++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/lib/services/semaphore.go b/lib/services/semaphore.go index 5b07b961d994d..2cebd19a14aba 100644 --- a/lib/services/semaphore.go +++ b/lib/services/semaphore.go @@ -323,6 +323,34 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph return lock, nil } +// SemaphoreLockConfigWithRetry contains parameters for acquiring a semaphore lock +// until it succeeds or context expires. +type SemaphoreLockConfigWithRetry struct { + SemaphoreLockConfig + // Retry is the retry configuration. + Retry retryutils.LinearConfig +} + +// AcquireSemaphoreLockWithRetry attempts to acquire and hold a semaphore lease. If successfully acquired, +// background keepalive processes are started and an associated lock handle is returned. +// If the lease cannot be acquired, the operation is retried according to the retry schedule until +// it succeeds or the context expires. Canceling the supplied context releases the semaphore. +func AcquireSemaphoreLockWithRetry(ctx context.Context, cfg SemaphoreLockConfigWithRetry) (*SemaphoreLock, error) { + retry, err := retryutils.NewLinear(cfg.Retry) + if err != nil { + return nil, trace.Wrap(err) + } + var lease *SemaphoreLock + err = retry.For(ctx, func() (err error) { + lease, err = AcquireSemaphoreLock(ctx, cfg.SemaphoreLockConfig) + return trace.Wrap(err) + }) + if err != nil { + return nil, trace.Wrap(err) + } + return lease, nil +} + // UnmarshalSemaphore unmarshals the Semaphore resource from JSON. func UnmarshalSemaphore(bytes []byte, opts ...MarshalOption) (types.Semaphore, error) { var semaphore types.SemaphoreV3 diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index 7a787c8be253a..ff8b038d867a1 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -35,6 +35,7 @@ import ( "github.com/gravitational/teleport/api/metadata" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/discoveryconfig" + "github.com/gravitational/teleport/api/utils/retryutils" accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" "github.com/gravitational/teleport/lib/services" aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" @@ -250,19 +251,28 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c // AcquireSemaphoreLock will retry until the semaphore is acquired. // This prevents multiple discovery services to push AWS resources in parallel. // lease must be released to cleanup the resource in auth server. - lease, err := services.AcquireSemaphoreLock( + lease, err := services.AcquireSemaphoreLockWithRetry( ctx, - services.SemaphoreLockConfig{ - Service: s.AccessPoint, - Params: types.AcquireSemaphoreRequest{ - SemaphoreKind: types.KindAccessGraph, - SemaphoreName: semaphoreName, - MaxLeases: 1, - Expires: s.clock.Now().Add(semaphoreExpiration), - Holder: s.Config.ServerID, + services.SemaphoreLockConfigWithRetry{ + SemaphoreLockConfig: services.SemaphoreLockConfig{ + Service: s.AccessPoint, + Params: types.AcquireSemaphoreRequest{ + SemaphoreKind: types.KindAccessGraph, + SemaphoreName: semaphoreName, + MaxLeases: 1, + Expires: s.clock.Now().Add(semaphoreExpiration), + Holder: s.Config.ServerID, + }, + Expiry: semaphoreExpiration, + Clock: s.clock, + }, + Retry: retryutils.LinearConfig{ + Clock: s.clock, + First: time.Second, + Step: semaphoreExpiration / 2, + Max: semaphoreExpiration, + Jitter: retryutils.NewJitter(), }, - Expiry: semaphoreExpiration, - Clock: s.clock, }, ) if err != nil {