Skip to content

Commit cec6704

Browse files
add services.AcquireSemaphoreLockWithRetry (#42609)
* transform `services.SemaphoreLock` to satisfy `context.Context` This PR makes `services.SemaphoreLock` statisfy the `context.Context` interface. Extending the `services.SemaphoreLock` makes it possible to use the lease as a context in context propgation. ```go lease, err:=services.AcquireSemaphore(ctx,...) if err!=nil{ ... } ctx, cancel:=context.WithCancel(lease) ``` `lease` is released when the semaphore lock is lost or when the parent context is cancelled so we can use it as argument to other context functions without having to carry the parent context. Signed-off-by: Tiago Silva <tiago.silva@goteleport.com> * use ctx.Done channel from stdlib context to avoid spining a goroutine * add `services.AcquireSemaphoreLockWithRetry` This PR introduces `services.AcquireSemaphoreLockWithRetry` function that only returns if it sucessfully acquired the lock or if the parent context is cancelled. This function will retry until it sucessfully acquires the lock before returning. Signed-off-by: Tiago Silva <tiago.silva@goteleport.com> * Update lib/services/semaphore.go Co-authored-by: Edward Dowling <EdwardDowling@users.noreply.github.com> --------- Signed-off-by: Tiago Silva <tiago.silva@goteleport.com> Co-authored-by: Edward Dowling <EdwardDowling@users.noreply.github.com>
1 parent 4797eec commit cec6704

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

lib/services/semaphore.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,34 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph
323323
return lock, nil
324324
}
325325

326+
// SemaphoreLockConfigWithRetry contains parameters for acquiring a semaphore lock
327+
// until it succeeds or context expires.
328+
type SemaphoreLockConfigWithRetry struct {
329+
SemaphoreLockConfig
330+
// Retry is the retry configuration.
331+
Retry retryutils.LinearConfig
332+
}
333+
334+
// AcquireSemaphoreLockWithRetry attempts to acquire and hold a semaphore lease. If successfully acquired,
335+
// background keepalive processes are started and an associated lock handle is returned.
336+
// If the lease cannot be acquired, the operation is retried according to the retry schedule until
337+
// it succeeds or the context expires. Canceling the supplied context releases the semaphore.
338+
func AcquireSemaphoreLockWithRetry(ctx context.Context, cfg SemaphoreLockConfigWithRetry) (*SemaphoreLock, error) {
339+
retry, err := retryutils.NewLinear(cfg.Retry)
340+
if err != nil {
341+
return nil, trace.Wrap(err)
342+
}
343+
var lease *SemaphoreLock
344+
err = retry.For(ctx, func() (err error) {
345+
lease, err = AcquireSemaphoreLock(ctx, cfg.SemaphoreLockConfig)
346+
return trace.Wrap(err)
347+
})
348+
if err != nil {
349+
return nil, trace.Wrap(err)
350+
}
351+
return lease, nil
352+
}
353+
326354
// UnmarshalSemaphore unmarshals the Semaphore resource from JSON.
327355
func UnmarshalSemaphore(bytes []byte, opts ...MarshalOption) (types.Semaphore, error) {
328356
var semaphore types.SemaphoreV3

lib/srv/discovery/access_graph.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/gravitational/teleport/api/metadata"
3636
"github.com/gravitational/teleport/api/types"
3737
"github.com/gravitational/teleport/api/types/discoveryconfig"
38+
"github.com/gravitational/teleport/api/utils/retryutils"
3839
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
3940
"github.com/gravitational/teleport/lib/services"
4041
aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
@@ -250,19 +251,28 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
250251
// AcquireSemaphoreLock will retry until the semaphore is acquired.
251252
// This prevents multiple discovery services to push AWS resources in parallel.
252253
// lease must be released to cleanup the resource in auth server.
253-
lease, err := services.AcquireSemaphoreLock(
254+
lease, err := services.AcquireSemaphoreLockWithRetry(
254255
ctx,
255-
services.SemaphoreLockConfig{
256-
Service: s.AccessPoint,
257-
Params: types.AcquireSemaphoreRequest{
258-
SemaphoreKind: types.KindAccessGraph,
259-
SemaphoreName: semaphoreName,
260-
MaxLeases: 1,
261-
Expires: s.clock.Now().Add(semaphoreExpiration),
262-
Holder: s.Config.ServerID,
256+
services.SemaphoreLockConfigWithRetry{
257+
SemaphoreLockConfig: services.SemaphoreLockConfig{
258+
Service: s.AccessPoint,
259+
Params: types.AcquireSemaphoreRequest{
260+
SemaphoreKind: types.KindAccessGraph,
261+
SemaphoreName: semaphoreName,
262+
MaxLeases: 1,
263+
Expires: s.clock.Now().Add(semaphoreExpiration),
264+
Holder: s.Config.ServerID,
265+
},
266+
Expiry: semaphoreExpiration,
267+
Clock: s.clock,
268+
},
269+
Retry: retryutils.LinearConfig{
270+
Clock: s.clock,
271+
First: time.Second,
272+
Step: semaphoreExpiration / 2,
273+
Max: semaphoreExpiration,
274+
Jitter: retryutils.NewJitter(),
263275
},
264-
Expiry: semaphoreExpiration,
265-
Clock: s.clock,
266276
},
267277
)
268278
if err != nil {

0 commit comments

Comments
 (0)