From 506cbe3412d3a10a3ea54781b16e1d256df22572 Mon Sep 17 00:00:00 2001 From: hsldymq Date: Wed, 10 Apr 2024 21:13:59 +0800 Subject: [PATCH] change HoldToken method --- keeper.go | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/keeper.go b/keeper.go index 7760c17..f326dbf 100644 --- a/keeper.go +++ b/keeper.go @@ -16,10 +16,9 @@ type HoldToken interface { // ListenShutdown will block the current goroutine until the shutdown stage is triggered. ListenShutdown() - // HoldChan returns a channel that will be closed when the shutdown stage is triggered. - HoldChan() <-chan struct{} - Release() + + Context() context.Context } type TokenAllocator interface { @@ -75,10 +74,13 @@ type ShutdownKeeper struct { shutdownHoldChan chan struct{} closeShutdownHoldChan func() - shuttingNotifyChan chan struct{} + shuttingChan chan struct{} status int32 shutdownChan chan struct{} + + tokenCtx context.Context + tokenCtxCancel func() } func NewKeeper(opts KeeperOpts) *ShutdownKeeper { @@ -87,6 +89,7 @@ func NewKeeper(opts KeeperOpts) *ShutdownKeeper { maxHoldTime = 30 * time.Second } + ctx, cancel := context.WithCancel(context.Background()) keeper := &ShutdownKeeper{ signals: opts.Signals, signalHandler: opts.OnSignalShutdown, @@ -100,10 +103,13 @@ func NewKeeper(opts KeeperOpts) *ShutdownKeeper { holdingTokenNum: 0, shutdownHoldChan: make(chan struct{}), - shuttingNotifyChan: make(chan struct{}), + shuttingChan: make(chan struct{}), status: statusReady, shutdownChan: make(chan struct{}), + + tokenCtx: ctx, + tokenCtxCancel: cancel, } keeper.closeShutdownHoldChan = sync.OnceFunc(func() { close(keeper.shutdownHoldChan) @@ -127,7 +133,8 @@ func (k *ShutdownKeeper) Wait() { go k.listenSignals() go k.listenContext() } - <-k.shuttingNotifyChan + <-k.shuttingChan + k.tokenCtxCancel() if k.forceHold { <-time.After(k.maxHoldTime) @@ -154,7 +161,7 @@ func (k *ShutdownKeeper) AllocHoldToken() HoldToken { k.startShutdown(nil) } } - }, k.shuttingNotifyChan) + }, k.tokenCtx) } // OnShuttingDown registers a function to be called when the shutdown process is triggered. @@ -208,7 +215,7 @@ func (k *ShutdownKeeper) listenContext() { func (k *ShutdownKeeper) startShutdown(eventFunc func()) bool { if atomic.CompareAndSwapInt32(&k.status, statusWaiting, statusShutting) || atomic.CompareAndSwapInt32(&k.status, statusReady, statusShutting) { - defer close(k.shuttingNotifyChan) + defer close(k.shuttingChan) if eventFunc != nil { eventFunc() } @@ -223,25 +230,25 @@ func (k *ShutdownKeeper) getHoldingTokenNum() int32 { } type holdTokenImpl struct { - releasingFunc func() - shutdownNotifyChan <-chan struct{} + releasingFunc func() + ctx context.Context } -func newHoldTokenImpl(releasingFunc func(), shutdownNotifyChan <-chan struct{}) *holdTokenImpl { +func newHoldTokenImpl(releasingFunc func(), ctx context.Context) *holdTokenImpl { return &holdTokenImpl{ - releasingFunc: sync.OnceFunc(releasingFunc), - shutdownNotifyChan: shutdownNotifyChan, + releasingFunc: sync.OnceFunc(releasingFunc), + ctx: ctx, } } func (kt *holdTokenImpl) ListenShutdown() { - <-kt.shutdownNotifyChan -} - -func (kt *holdTokenImpl) HoldChan() <-chan struct{} { - return kt.shutdownNotifyChan + <-kt.Context().Done() } func (kt *holdTokenImpl) Release() { kt.releasingFunc() } + +func (kt *holdTokenImpl) Context() context.Context { + return kt.ctx +}