diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index dd9b2aafa..b26a1133b 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -585,7 +585,7 @@ func (p *PostgresKeeper) usePgrewind(db *cluster.DB) bool { return p.pgSUUsername != "" && p.pgSUPassword != "" && db.Spec.UsePgrewind } -func (p *PostgresKeeper) updateKeeperInfo() error { +func (p *PostgresKeeper) updateKeeperInfo(shuttingDown bool) error { p.localStateMutex.Lock() keeperUID := p.keeperLocalState.UID clusterUID := p.keeperLocalState.ClusterUID @@ -601,6 +601,13 @@ func (p *PostgresKeeper) updateKeeperInfo() error { log.Warnf("failed to get postgres binary version: %v", err) } + lastPGState := p.getLastPGState() + if shuttingDown { + // Tell the sentinels we are shutting this down. + // We don't need the mutex since p.getLastPGState() + // does a deep copy. + lastPGState.Healthy = false + } keeperInfo := &cluster.KeeperInfo{ InfoUID: common.UID(), UID: keeperUID, @@ -610,7 +617,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error { Maj: maj, Min: min, }, - PostgresState: p.getLastPGState(), + PostgresState: lastPGState, CanBeMaster: p.canBeMaster, CanBeSynchronousReplica: p.canBeSynchronousReplica, @@ -801,9 +808,9 @@ func (p *PostgresKeeper) getLastPGState() *cluster.PostgresState { } func (p *PostgresKeeper) Start(ctx context.Context) { - endSMCh := make(chan struct{}) - endPgStatecheckerCh := make(chan struct{}) - endUpdateKeeperInfo := make(chan struct{}) + endPostgresKeeperSM := make(chan struct{}, 1) + endUpdatePGState := make(chan struct{}, 1) + endUpdateKeeperInfo := make(chan struct{}, 1) var err error var cd *cluster.ClusterData @@ -828,9 +835,29 @@ func (p *PostgresKeeper) Start(ctx context.Context) { _ = p.pgm.StopIfStarted(true) - smTimerCh := time.NewTimer(0).C - updatePGStateTimerCh := time.NewTimer(0).C - updateKeeperInfoTimerCh := time.NewTimer(0).C + postgresKeeperSMTimer := time.NewTimer(0) + updatePGStateTimer := time.NewTimer(0) + updateKeeperInfoTimer := time.NewTimer(0) + + // Make sure there is a single goroutine sending KeeperInfo updates. + // We want to force an order in calls to updateKeeperInfo, so the + // KeeperInfo sent when the context is cancelled is guaranteed to be + // the last one. + doUpdateKeeperInfo := make(chan struct{}, 1) + go func() { + defer close(endUpdateKeeperInfo) + for range doUpdateKeeperInfo { + if err = p.updateKeeperInfo(false); err != nil { + log.Errorw("failed to update keeper info", zap.Error(err)) + } + endUpdateKeeperInfo <- struct{}{} + } + // Once the channel is closed, send a dying gasp + if err = p.updateKeeperInfo(true); err != nil { + log.Errorw("failed to update keeper info", zap.Error(err)) + } + }() + for { // The sleepInterval can be updated during normal execution. Ensure we regularly // refresh the metric to account for those changes. @@ -842,39 +869,37 @@ func (p *PostgresKeeper) Start(ctx context.Context) { if err = p.pgm.StopIfStarted(true); err != nil { log.Errorw("failed to stop pg instance", zap.Error(err)) } + close(doUpdateKeeperInfo) // This will notify sentinels we are shutting down + for range endUpdateKeeperInfo { // Wait until the dying gasp is sent + } p.end <- nil return - case <-smTimerCh: + case <-postgresKeeperSMTimer.C: go func() { p.postgresKeeperSM(ctx) - endSMCh <- struct{}{} + endPostgresKeeperSM <- struct{}{} }() - case <-endSMCh: - smTimerCh = time.NewTimer(p.sleepInterval).C + case <-endPostgresKeeperSM: + postgresKeeperSMTimer.Reset(p.sleepInterval) - case <-updatePGStateTimerCh: - // updateKeeperInfo two times faster than the sleep interval + case <-updatePGStateTimer.C: + // updatePGState two times faster than the sleep interval go func() { p.updatePGState(ctx) - endPgStatecheckerCh <- struct{}{} + endUpdatePGState <- struct{}{} }() - case <-endPgStatecheckerCh: - // updateKeeperInfo two times faster than the sleep interval - updatePGStateTimerCh = time.NewTimer(p.sleepInterval / 2).C + case <-endUpdatePGState: + // updatePGState two times faster than the sleep interval + updatePGStateTimer.Reset(p.sleepInterval / 2) - case <-updateKeeperInfoTimerCh: - go func() { - if err := p.updateKeeperInfo(); err != nil { - log.Errorw("failed to update keeper info", zap.Error(err)) - } - endUpdateKeeperInfo <- struct{}{} - }() + case <-updateKeeperInfoTimer.C: + doUpdateKeeperInfo <- struct{}{} case <-endUpdateKeeperInfo: - updateKeeperInfoTimerCh = time.NewTimer(p.sleepInterval).C + updateKeeperInfoTimer.Reset(p.sleepInterval) } } }