Skip to content

Commit

Permalink
change HoldToken method
Browse files Browse the repository at this point in the history
  • Loading branch information
hsldymq committed Apr 10, 2024
1 parent f97cdd0 commit 506cbe3
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ type HoldToken interface {
// ListenShutdown will block the current goroutine until the shutdown stage is triggered.
ListenShutdown()

// HoldChan returns a channel that will be closed when the shutdown stage is triggered.
HoldChan() <-chan struct{}

Release()

Context() context.Context
}

type TokenAllocator interface {
Expand Down Expand Up @@ -75,10 +74,13 @@ type ShutdownKeeper struct {
shutdownHoldChan chan struct{}
closeShutdownHoldChan func()

shuttingNotifyChan chan struct{}
shuttingChan chan struct{}

status int32
shutdownChan chan struct{}

tokenCtx context.Context
tokenCtxCancel func()
}

func NewKeeper(opts KeeperOpts) *ShutdownKeeper {
Expand All @@ -87,6 +89,7 @@ func NewKeeper(opts KeeperOpts) *ShutdownKeeper {
maxHoldTime = 30 * time.Second
}

ctx, cancel := context.WithCancel(context.Background())
keeper := &ShutdownKeeper{
signals: opts.Signals,
signalHandler: opts.OnSignalShutdown,
Expand All @@ -100,10 +103,13 @@ func NewKeeper(opts KeeperOpts) *ShutdownKeeper {
holdingTokenNum: 0,
shutdownHoldChan: make(chan struct{}),

shuttingNotifyChan: make(chan struct{}),
shuttingChan: make(chan struct{}),

status: statusReady,
shutdownChan: make(chan struct{}),

tokenCtx: ctx,
tokenCtxCancel: cancel,
}
keeper.closeShutdownHoldChan = sync.OnceFunc(func() {
close(keeper.shutdownHoldChan)
Expand All @@ -127,7 +133,8 @@ func (k *ShutdownKeeper) Wait() {
go k.listenSignals()
go k.listenContext()
}
<-k.shuttingNotifyChan
<-k.shuttingChan
k.tokenCtxCancel()

if k.forceHold {
<-time.After(k.maxHoldTime)
Expand All @@ -154,7 +161,7 @@ func (k *ShutdownKeeper) AllocHoldToken() HoldToken {
k.startShutdown(nil)
}
}
}, k.shuttingNotifyChan)
}, k.tokenCtx)
}

// OnShuttingDown registers a function to be called when the shutdown process is triggered.
Expand Down Expand Up @@ -208,7 +215,7 @@ func (k *ShutdownKeeper) listenContext() {

func (k *ShutdownKeeper) startShutdown(eventFunc func()) bool {
if atomic.CompareAndSwapInt32(&k.status, statusWaiting, statusShutting) || atomic.CompareAndSwapInt32(&k.status, statusReady, statusShutting) {
defer close(k.shuttingNotifyChan)
defer close(k.shuttingChan)
if eventFunc != nil {
eventFunc()
}
Expand All @@ -223,25 +230,25 @@ func (k *ShutdownKeeper) getHoldingTokenNum() int32 {
}

type holdTokenImpl struct {
releasingFunc func()
shutdownNotifyChan <-chan struct{}
releasingFunc func()
ctx context.Context
}

func newHoldTokenImpl(releasingFunc func(), shutdownNotifyChan <-chan struct{}) *holdTokenImpl {
func newHoldTokenImpl(releasingFunc func(), ctx context.Context) *holdTokenImpl {
return &holdTokenImpl{
releasingFunc: sync.OnceFunc(releasingFunc),
shutdownNotifyChan: shutdownNotifyChan,
releasingFunc: sync.OnceFunc(releasingFunc),
ctx: ctx,
}
}

func (kt *holdTokenImpl) ListenShutdown() {
<-kt.shutdownNotifyChan
}

func (kt *holdTokenImpl) HoldChan() <-chan struct{} {
return kt.shutdownNotifyChan
<-kt.Context().Done()
}

func (kt *holdTokenImpl) Release() {
kt.releasingFunc()
}

func (kt *holdTokenImpl) Context() context.Context {
return kt.ctx
}

0 comments on commit 506cbe3

Please sign in to comment.