Skip to content

Commit

Permalink
[v16] add services.AcquireSemaphoreLockWithRetry (#42631)
Browse files Browse the repository at this point in the history
* transform `services.SemaphoreLock` to satisfy `context.Context`

This PR makes `services.SemaphoreLock` statisfy the `context.Context`
interface. Extending the `services.SemaphoreLock` makes it possible to
use the lease as a context in context propgation.

```go
 lease, err:=services.AcquireSemaphore(ctx,...)
 if err!=nil{
 ...
 }

 ctx, cancel:=context.WithCancel(lease)

```

`lease` is released when the semaphore lock is lost or when the parent
context is cancelled so we can use it as argument to other context
functions without having to carry the parent context.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* use ctx.Done channel from stdlib context to avoid spining a goroutine

* add `services.AcquireSemaphoreLockWithRetry`

This PR introduces `services.AcquireSemaphoreLockWithRetry` function
that only returns if it sucessfully acquired the lock or if the parent
context is cancelled. This function will retry until it sucessfully
acquires the lock before returning.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* Update lib/services/semaphore.go

Co-authored-by: Edward Dowling <EdwardDowling@users.noreply.github.com>

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
Co-authored-by: Edward Dowling <EdwardDowling@users.noreply.github.com>
  • Loading branch information
tigrato and EdwardDowling authored Jun 7, 2024
1 parent 8032b8c commit 6af0038
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
28 changes: 28 additions & 0 deletions lib/services/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,34 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph
return lock, nil
}

// SemaphoreLockConfigWithRetry contains parameters for acquiring a semaphore lock
// until it succeeds or context expires.
type SemaphoreLockConfigWithRetry struct {
SemaphoreLockConfig
// Retry is the retry configuration.
Retry retryutils.LinearConfig
}

// AcquireSemaphoreLockWithRetry attempts to acquire and hold a semaphore lease. If successfully acquired,
// background keepalive processes are started and an associated lock handle is returned.
// If the lease cannot be acquired, the operation is retried according to the retry schedule until
// it succeeds or the context expires. Canceling the supplied context releases the semaphore.
func AcquireSemaphoreLockWithRetry(ctx context.Context, cfg SemaphoreLockConfigWithRetry) (*SemaphoreLock, error) {
retry, err := retryutils.NewLinear(cfg.Retry)
if err != nil {
return nil, trace.Wrap(err)
}
var lease *SemaphoreLock
err = retry.For(ctx, func() (err error) {
lease, err = AcquireSemaphoreLock(ctx, cfg.SemaphoreLockConfig)
return trace.Wrap(err)
})
if err != nil {
return nil, trace.Wrap(err)
}
return lease, nil
}

// UnmarshalSemaphore unmarshals the Semaphore resource from JSON.
func UnmarshalSemaphore(bytes []byte, opts ...MarshalOption) (types.Semaphore, error) {
var semaphore types.SemaphoreV3
Expand Down
32 changes: 21 additions & 11 deletions lib/srv/discovery/access_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/gravitational/teleport/api/metadata"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/discoveryconfig"
"github.com/gravitational/teleport/api/utils/retryutils"
accessgraphv1alpha "github.com/gravitational/teleport/gen/proto/go/accessgraph/v1alpha"
"github.com/gravitational/teleport/lib/services"
aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync"
Expand Down Expand Up @@ -250,19 +251,28 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
// AcquireSemaphoreLock will retry until the semaphore is acquired.
// This prevents multiple discovery services to push AWS resources in parallel.
// lease must be released to cleanup the resource in auth server.
lease, err := services.AcquireSemaphoreLock(
lease, err := services.AcquireSemaphoreLockWithRetry(
ctx,
services.SemaphoreLockConfig{
Service: s.AccessPoint,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.KindAccessGraph,
SemaphoreName: semaphoreName,
MaxLeases: 1,
Expires: s.clock.Now().Add(semaphoreExpiration),
Holder: s.Config.ServerID,
services.SemaphoreLockConfigWithRetry{
SemaphoreLockConfig: services.SemaphoreLockConfig{
Service: s.AccessPoint,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.KindAccessGraph,
SemaphoreName: semaphoreName,
MaxLeases: 1,
Expires: s.clock.Now().Add(semaphoreExpiration),
Holder: s.Config.ServerID,
},
Expiry: semaphoreExpiration,
Clock: s.clock,
},
Retry: retryutils.LinearConfig{
Clock: s.clock,
First: time.Second,
Step: semaphoreExpiration / 2,
Max: semaphoreExpiration,
Jitter: retryutils.NewJitter(),
},
Expiry: semaphoreExpiration,
Clock: s.clock,
},
)
if err != nil {
Expand Down

0 comments on commit 6af0038

Please sign in to comment.