Skip to content

Commit 9435f00

Browse files
authored
[v15] transform services.SemaphoreLock to satisfy context.Context (#42623)
* transform `services.SemaphoreLock` to satisfy `context.Context` This PR makes `services.SemaphoreLock` statisfy the `context.Context` interface. Extending the `services.SemaphoreLock` makes it possible to use the lease as a context in context propgation. ```go lease, err:=services.AcquireSemaphore(ctx,...) if err!=nil{ ... } ctx, cancel:=context.WithCancel(lease) ``` `lease` is released when the semaphore lock is lost or when the parent context is cancelled so we can use it as argument to other context functions without having to carry the parent context. Signed-off-by: Tiago Silva <tiago.silva@goteleport.com> * use ctx.Done channel from stdlib context to avoid spining a goroutine * add godoc --------- Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
1 parent d8c98b4 commit 9435f00

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

lib/services/semaphore.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,35 @@ func (l *SemaphoreLockConfig) CheckAndSetDefaults() error {
8080

8181
// SemaphoreLock provides a convenient interface for managing
8282
// semaphore lease keepalive operations.
83+
// SemaphoreLock implements the [context.Context] interface
84+
// and can be used to propagate cancellation when the parent
85+
// context is canceled or when the lease expires.
86+
//
87+
// lease,err := AcquireSemaphoreLock(ctx, cfg)
88+
// if err != nil {
89+
// ... handle error ...
90+
// }
91+
// defer func(){
92+
// lease.Stop()
93+
// err := lease.Wait()
94+
// if err != nil {
95+
// ... handle error ...
96+
// }
97+
// }()
98+
//
99+
// newCtx,cancel := context.WithCancel(ctx)
100+
// defer cancel()
101+
// ... do work with newCtx ...
83102
type SemaphoreLock struct {
103+
// ctx is the parent context for the lease keepalive operation.
104+
// it's used to propagate deadline cancellations from the parent
105+
// context and to carry values for the context interface.
106+
ctx context.Context
107+
cancelCtx context.CancelFunc
84108
cfg SemaphoreLockConfig
85109
lease0 types.SemaphoreLease
86110
retry retryutils.Retry
87111
ticker clockwork.Ticker
88-
doneC chan struct{}
89112
closeOnce sync.Once
90113
renewalC chan struct{}
91114
cond *sync.Cond
@@ -107,8 +130,27 @@ func (l *SemaphoreLock) finish(err error) {
107130

108131
// Done signals that lease keepalive operations
109132
// have stopped.
133+
// If the parent context is canceled, the lease
134+
// will be released and done will be closed.
110135
func (l *SemaphoreLock) Done() <-chan struct{} {
111-
return l.doneC
136+
return l.ctx.Done()
137+
}
138+
139+
// Deadline returns the deadline of the parent context if it exists.
140+
func (l *SemaphoreLock) Deadline() (time.Time, bool) {
141+
return l.ctx.Deadline()
142+
}
143+
144+
// Value returns the value associated with the key in the parent context.
145+
func (l *SemaphoreLock) Value(key interface{}) interface{} {
146+
return l.ctx.Value(key)
147+
}
148+
149+
// Error returns the final error value.
150+
func (l *SemaphoreLock) Err() error {
151+
l.cond.L.Lock()
152+
defer l.cond.L.Unlock()
153+
return l.err
112154
}
113155

114156
// Wait blocks until the final result is available. Note that
@@ -127,7 +169,7 @@ func (l *SemaphoreLock) Wait() error {
127169
func (l *SemaphoreLock) Stop() {
128170
l.closeOnce.Do(func() {
129171
l.ticker.Stop()
130-
close(l.doneC)
172+
l.cancelCtx()
131173
})
132174
}
133175

@@ -141,9 +183,8 @@ func (l *SemaphoreLock) keepAlive(ctx context.Context) {
141183
var nodrop bool
142184
var err error
143185
lease := l.lease0
144-
ctx, cancel := context.WithCancel(ctx)
145186
defer func() {
146-
cancel()
187+
l.cancelCtx()
147188
l.Stop()
148189
defer l.finish(err)
149190
if nodrop {
@@ -267,14 +308,16 @@ func AcquireSemaphoreLock(ctx context.Context, cfg SemaphoreLockConfig) (*Semaph
267308
if err != nil {
268309
return nil, trace.Wrap(err)
269310
}
311+
ctx, cancel := context.WithCancel(ctx)
270312
lock := &SemaphoreLock{
271-
cfg: cfg,
272-
lease0: *lease,
273-
retry: retry,
274-
ticker: cfg.Clock.NewTicker(cfg.TickRate),
275-
doneC: make(chan struct{}),
276-
renewalC: make(chan struct{}),
277-
cond: sync.NewCond(&sync.Mutex{}),
313+
ctx: ctx,
314+
cancelCtx: cancel,
315+
cfg: cfg,
316+
lease0: *lease,
317+
retry: retry,
318+
ticker: cfg.Clock.NewTicker(cfg.TickRate),
319+
renewalC: make(chan struct{}),
320+
cond: sync.NewCond(&sync.Mutex{}),
278321
}
279322
go lock.keepAlive(ctx)
280323
return lock, nil

lib/srv/discovery/access_graph.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -268,21 +268,14 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c
268268
if err != nil {
269269
return trace.Wrap(err)
270270
}
271-
272-
ctx, cancel := context.WithCancel(ctx)
273-
defer cancel()
274271
var wg sync.WaitGroup
275272
defer wg.Wait()
276-
wg.Add(1)
277-
go func() {
278-
defer wg.Done()
279-
select {
280-
case <-ctx.Done():
281-
return
282-
case <-lease.Done():
283-
cancel()
284-
}
285-
}()
273+
274+
// once the lease parent context is canceled, the lease will be released.
275+
// this will stop the access graph sync.
276+
ctx, cancel := context.WithCancel(lease)
277+
defer cancel()
278+
286279
defer func() {
287280
lease.Stop()
288281
if err := lease.Wait(); err != nil {

0 commit comments

Comments
 (0)