diff --git a/lib/services/semaphore.go b/lib/services/semaphore.go index 5b07b961d994d..2cebd19a14aba 100644 --- a/lib/services/semaphore.go +++ b/lib/services/semaphore.go @@ -323,6 +323,34 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph return lock, nil } +// SemaphoreLockConfigWithRetry contains parameters for acquiring a semaphore lock +// until it succeeds or context expires. +type SemaphoreLockConfigWithRetry struct { + SemaphoreLockConfig + // Retry is the retry configuration. + Retry retryutils.LinearConfig +} + +// AcquireSemaphoreLockWithRetry attempts to acquire and hold a semaphore lease. If successfully acquired, +// background keepalive processes are started and an associated lock handle is returned. +// If the lease cannot be acquired, the operation is retried according to the retry schedule until +// it succeeds or the context expires. Canceling the supplied context releases the semaphore. +func AcquireSemaphoreLockWithRetry(ctx context.Context, cfg SemaphoreLockConfigWithRetry) (*SemaphoreLock, error) { + retry, err := retryutils.NewLinear(cfg.Retry) + if err != nil { + return nil, trace.Wrap(err) + } + var lease *SemaphoreLock + err = retry.For(ctx, func() (err error) { + lease, err = AcquireSemaphoreLock(ctx, cfg.SemaphoreLockConfig) + return trace.Wrap(err) + }) + if err != nil { + return nil, trace.Wrap(err) + } + return lease, nil +} + // UnmarshalSemaphore unmarshals the Semaphore resource from JSON. func UnmarshalSemaphore(bytes []byte, opts ...MarshalOption) (types.Semaphore, error) { var semaphore types.SemaphoreV3 diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index 7a787c8be253a..ff8b038d867a1 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -35,6 +35,7 @@ import ( "github.com/gravitational/teleport/api/metadata" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/discoveryconfig" + "github.com/gravitational/teleport/api/utils/retryutils" accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha" "github.com/gravitational/teleport/lib/services" aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" @@ -250,19 +251,28 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c // AcquireSemaphoreLock will retry until the semaphore is acquired. // This prevents multiple discovery services to push AWS resources in parallel. // lease must be released to cleanup the resource in auth server. - lease, err := services.AcquireSemaphoreLock( + lease, err := services.AcquireSemaphoreLockWithRetry( ctx, - services.SemaphoreLockConfig{ - Service: s.AccessPoint, - Params: types.AcquireSemaphoreRequest{ - SemaphoreKind: types.KindAccessGraph, - SemaphoreName: semaphoreName, - MaxLeases: 1, - Expires: s.clock.Now().Add(semaphoreExpiration), - Holder: s.Config.ServerID, + services.SemaphoreLockConfigWithRetry{ + SemaphoreLockConfig: services.SemaphoreLockConfig{ + Service: s.AccessPoint, + Params: types.AcquireSemaphoreRequest{ + SemaphoreKind: types.KindAccessGraph, + SemaphoreName: semaphoreName, + MaxLeases: 1, + Expires: s.clock.Now().Add(semaphoreExpiration), + Holder: s.Config.ServerID, + }, + Expiry: semaphoreExpiration, + Clock: s.clock, + }, + Retry: retryutils.LinearConfig{ + Clock: s.clock, + First: time.Second, + Step: semaphoreExpiration / 2, + Max: semaphoreExpiration, + Jitter: retryutils.NewJitter(), }, - Expiry: semaphoreExpiration, - Clock: s.clock, }, ) if err != nil {