Skip to content

Commit

Permalink
[v16] transform services.SemaphoreLock to satisfy context.Context (
Browse files Browse the repository at this point in the history
…#42624)

* transform `services.SemaphoreLock` to satisfy `context.Context`

This PR makes `services.SemaphoreLock` statisfy the `context.Context`
interface. Extending the `services.SemaphoreLock` makes it possible to
use the lease as a context in context propgation.

```go
 lease, err:=services.AcquireSemaphore(ctx,...)
 if err!=nil{
 ...
 }

 ctx, cancel:=context.WithCancel(lease)

```

`lease` is released when the semaphore lock is lost or when the parent
context is cancelled so we can use it as argument to other context
functions without having to carry the parent context.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* use ctx.Done channel from stdlib context to avoid spining a goroutine

* add godoc

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato authored Jun 7, 2024
1 parent b366c59 commit 8032b8c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 25 deletions.
67 changes: 55 additions & 12 deletions lib/services/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,35 @@ func (l *SemaphoreLockConfig) CheckAndSetDefaults() error {

// SemaphoreLock provides a convenient interface for managing
// semaphore lease keepalive operations.
// SemaphoreLock implements the [context.Context] interface
// and can be used to propagate cancellation when the parent
// context is canceled or when the lease expires.
//
// lease,err := AcquireSemaphoreLock(ctx, cfg)
// if err != nil {
// ... handle error ...
// }
// defer func(){
// lease.Stop()
// err := lease.Wait()
// if err != nil {
// ... handle error ...
// }
// }()
//
// newCtx,cancel := context.WithCancel(ctx)
// defer cancel()
// ... do work with newCtx ...
type SemaphoreLock struct {
// ctx is the parent context for the lease keepalive operation.
// it's used to propagate deadline cancellations from the parent
// context and to carry values for the context interface.
ctx context.Context
cancelCtx context.CancelFunc
cfg SemaphoreLockConfig
lease0 types.SemaphoreLease
retry retryutils.Retry
ticker clockwork.Ticker
doneC chan struct{}
closeOnce sync.Once
renewalC chan struct{}
cond *sync.Cond
Expand All @@ -107,8 +130,27 @@ func (l *SemaphoreLock) finish(err error) {

// Done signals that lease keepalive operations
// have stopped.
// If the parent context is canceled, the lease
// will be released and done will be closed.
func (l *SemaphoreLock) Done() <-chan struct{} {
return l.doneC
return l.ctx.Done()
}

// Deadline returns the deadline of the parent context if it exists.
func (l *SemaphoreLock) Deadline() (time.Time, bool) {
return l.ctx.Deadline()
}

// Value returns the value associated with the key in the parent context.
func (l *SemaphoreLock) Value(key interface{}) interface{} {
return l.ctx.Value(key)
}

// Error returns the final error value.
func (l *SemaphoreLock) Err() error {
l.cond.L.Lock()
defer l.cond.L.Unlock()
return l.err
}

// Wait blocks until the final result is available. Note that
Expand All @@ -127,7 +169,7 @@ func (l *SemaphoreLock) Wait() error {
func (l *SemaphoreLock) Stop() {
l.closeOnce.Do(func() {
l.ticker.Stop()
close(l.doneC)
l.cancelCtx()
})
}

Expand All @@ -141,9 +183,8 @@ func (l *SemaphoreLock) keepAlive(ctx context.Context) {
var nodrop bool
var err error
lease := l.lease0
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
l.cancelCtx()
l.Stop()
defer l.finish(err)
if nodrop {
Expand Down Expand Up @@ -267,14 +308,16 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph
if err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(ctx)
lock := &SemaphoreLock{
cfg: cfg,
lease0: *lease,
retry: retry,
ticker: cfg.Clock.NewTicker(cfg.TickRate),
doneC: make(chan struct{}),
renewalC: make(chan struct{}),
cond: sync.NewCond(&sync.Mutex{}),
ctx: ctx,
cancelCtx: cancel,
cfg: cfg,
lease0: *lease,
retry: retry,
ticker: cfg.Clock.NewTicker(cfg.TickRate),
renewalC: make(chan struct{}),
cond: sync.NewCond(&sync.Mutex{}),
}
go lock.keepAlive(ctx)
return lock, nil
Expand Down
19 changes: 6 additions & 13 deletions lib/srv/discovery/access_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,14 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
if err != nil {
return trace.Wrap(err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-lease.Done():
cancel()
}
}()

// once the lease parent context is canceled, the lease will be released.
// this will stop the access graph sync.
ctx, cancel := context.WithCancel(lease)
defer cancel()

defer func() {
lease.Stop()
if err := lease.Wait(); err != nil {
Expand Down

0 comments on commit 8032b8c

Please sign in to comment.