Skip to content

Commit

Permalink
Abort HA Realization Logic After Timeout
Browse files Browse the repository at this point in the history
A strange HA behavior was reported in [icingadb-787], resulting in both
instances being active.

The logs contained an entry of the previous active instance exiting the
HA.realize() method successfully after 1m9s. This, however, should not
be possible as the method's context is deadlined to a minute after the
heartbeat was received.

With introducing Settings.QuickContextExit for retry.WithBackoff in
[icinga-go-library-69] and using it here, the function directly returns
a context.DeadlineExceeded error the moment the context has expired.
Doing so allows directly handing over, while the other instance can now
take over due to the expired heartbeat in the database.

As a related change, the HA.insertEnvironment() method was inlined into
the retryable function to use the deadlined context. Otherwise, this
might block afterwards, as it was used within HA.realize(), but without
the passed context.

In addition, the main loop select cases for hactx.Done() and ctx.Done()
were unified, as hactx is a derived ctx. A closed ctx case may be lost
as the hactx case could have been chosen.

[icinga-go-library-69]: Icinga/icinga-go-library#69
[icingadb-787]: #787
  • Loading branch information
oxzi committed Sep 5, 2024
1 parent 77ccab2 commit 46a9b12
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 25 deletions.
7 changes: 4 additions & 3 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ func run() int {

cancelHactx()
case <-hactx.Done():
// Nothing to do here, surrounding loop will terminate now.
if ctx.Err() != nil {
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
}
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
case <-ha.Done():
if err := ha.Err(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
Expand All @@ -337,8 +340,6 @@ func run() int {
cancelHactx()

return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
Expand Down
36 changes: 14 additions & 22 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
Expand Down Expand Up @@ -264,6 +264,9 @@ func (h *HA) controller() {

// realize a HA cycle triggered by a heartbeat event.
//
// The given context MUST have a deadline, as the method will otherwise panic. This deadline is hardly enforced to
// cancel the logic if it has expired.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
func (h *HA) realize(
ctx context.Context,
Expand Down Expand Up @@ -300,6 +303,7 @@ func (h *HA) realize(
if errBegin != nil {
return errors.Wrap(errBegin, "can't start transaction")
}
defer func() { _ = tx.Rollback() }()

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
Expand Down Expand Up @@ -370,9 +374,14 @@ func (h *HA) realize(

if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
return database.CantPerformQuery(err, stmt)
}

if err != nil {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}
}
Expand All @@ -386,7 +395,7 @@ func (h *HA) realize(
retry.Retryable,
backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second),
retry.Settings{
// Intentionally no timeout is set, as we use a context with a deadline.
// Intentionally, no timeout is set because a context with a deadline is used and QuickContextExit is set.
OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
log := h.logger.Debugw
Expand All @@ -413,19 +422,14 @@ func (h *HA) realize(
zap.NamedError("recovered_error", lastErr))
}
},
QuickContextExit: true,

Check failure on line 425 in pkg/icingadb/ha.go

View workflow job for this annotation

GitHub Actions / MySQL

unknown field QuickContextExit in struct literal of type retry.Settings

Check failure on line 425 in pkg/icingadb/ha.go

View workflow job for this annotation

GitHub Actions / PostgreSQL

unknown field QuickContextExit in struct literal of type retry.Settings

Check failure on line 425 in pkg/icingadb/ha.go

View workflow job for this annotation

GitHub Actions / vet

unknown field QuickContextExit in struct literal of type retry.Settings

Check failure on line 425 in pkg/icingadb/ha.go

View workflow job for this annotation

GitHub Actions / lint

unknown field QuickContextExit in struct literal of type retry.Settings (compile)

Check failure on line 425 in pkg/icingadb/ha.go

View workflow job for this annotation

GitHub Actions / lint

unknown field QuickContextExit in struct literal of type retry.Settings (compile)
},
)
if err != nil {
return err
}

if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
Expand All @@ -445,18 +449,6 @@ func (h *HA) realizeLostHeartbeat() {
}
}

// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
func (h *HA) insertEnvironment() error {
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)

if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}

return nil
}

func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
Expand Down

0 comments on commit 46a9b12

Please sign in to comment.