Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions internal/daemon/controller/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,22 +178,35 @@ func (m *Manager) runWorker(ctx context.Context, resourceType string, workerID i

// processItem handles a single work item.
func (m *Manager) processItem(ctx context.Context, resourceType string, ctrl Controller, queue *WorkQueue, key string) {
defer queue.Done(key)

m.logger.Debug("reconciling",
"resourceType", resourceType,
"key", key)

err := ctrl.Reconcile(ctx, key)
if err != nil {
m.logger.Error("reconcile failed, requeuing",
requeue := queue.RequeueWithBackoff(key)
if requeue.Terminal {
m.logger.Error("reconcile failed permanently, dropping item",
"resourceType", resourceType,
"key", key,
"maxRetries", queue.MaxRetries(),
"error", err)
queue.Forget(key)
return
}

m.logger.Warn("reconcile failed, scheduling retry",
"resourceType", resourceType,
"key", key,
"retryCount", requeue.Attempt,
"nextDelay", requeue.Delay,
"error", err)
queue.Requeue(key)
return
}

queue.Done(key)
queue.Forget(key)

m.logger.Debug("reconcile complete",
"resourceType", resourceType,
"key", key)
Expand Down
31 changes: 30 additions & 1 deletion internal/daemon/controller/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestManager_RequeueOnError(t *testing.T) {
ctrl.reconcileErr = errors.New("temporary error")

m.Register("devnets", ctrl)
m.GetQueue("devnets").SetBackoffConfig(time.Millisecond, 5*time.Millisecond, 0, 10)

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -99,7 +100,7 @@ func TestManager_RequeueOnError(t *testing.T) {
m.Enqueue("devnets", "failing-devnet")

// Wait for multiple retries
time.Sleep(100 * time.Millisecond)
time.Sleep(60 * time.Millisecond)
cancel()

// Should have been called multiple times due to requeue
Expand All @@ -109,6 +110,34 @@ func TestManager_RequeueOnError(t *testing.T) {
}
}

func TestManager_RequeueStopsAtMaxRetries(t *testing.T) {
m := NewManager()

ctrl := &mockController{reconcileErr: errors.New("persistent failure")}
m.Register("devnets", ctrl)

maxRetries := 3
m.GetQueue("devnets").SetBackoffConfig(time.Millisecond, 5*time.Millisecond, 0, maxRetries)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go m.Start(ctx, 1)
time.Sleep(10 * time.Millisecond)

m.Enqueue("devnets", "failing-devnet")

// Give enough time for initial reconcile + max retries.
time.Sleep(120 * time.Millisecond)
cancel()

calls := ctrl.getCalls()
expected := maxRetries + 1 // initial attempt + retries
if len(calls) != expected {
t.Fatalf("expected %d reconcile calls, got %d", expected, len(calls))
}
}

func TestManager_MultipleControllers(t *testing.T) {
m := NewManager()

Expand Down
196 changes: 196 additions & 0 deletions internal/daemon/controller/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,26 @@
package controller

import (
"math"
"math/rand"
"sync"
"time"
)

const (
defaultBackoffBaseDelay = time.Second
defaultBackoffMaxDelay = 60 * time.Second
defaultBackoffJitter = 0.20
defaultBackoffRetries = 8
)

// RequeueResult describes the outcome of scheduling a retry.
type RequeueResult struct {
Attempt int
Delay time.Duration
Terminal bool
}

// WorkQueue is a rate-limited deduplicating work queue.
// Items added while an item is being processed will be re-queued
// when Done is called. This is inspired by the Kubernetes workqueue.
Expand All @@ -24,6 +41,17 @@ type WorkQueue struct {
// shuttingDown indicates the queue is shutting down
shuttingDown bool

// retries tracks how many times an item has been retried.
retries map[interface{}]int

// backoff config
baseDelay time.Duration
maxDelay time.Duration
jitterFactor float64
maxRetries int
randFloat64 func() float64
afterFunc func(time.Duration, func()) *time.Timer

mu sync.Mutex
}

Expand All @@ -33,6 +61,14 @@ func NewWorkQueue() *WorkQueue {
queue: make([]interface{}, 0),
dirty: make(map[interface{}]struct{}),
processing: make(map[interface{}]struct{}),
retries: make(map[interface{}]int),
baseDelay: defaultBackoffBaseDelay,
maxDelay: defaultBackoffMaxDelay,
// Keep jitter modest to spread retries without making behavior erratic.
jitterFactor: defaultBackoffJitter,
maxRetries: defaultBackoffRetries,
randFloat64: rand.Float64,
afterFunc: time.AfterFunc,
}
q.cond = sync.NewCond(&q.mu)
return q
Expand Down Expand Up @@ -124,6 +160,98 @@ func (q *WorkQueue) Requeue(item interface{}) {
q.cond.Signal()
}

// RequeueWithBackoff schedules a delayed retry using exponential backoff + jitter.
// Returns terminal=true when max retries is exceeded and no further retry is scheduled.
func (q *WorkQueue) RequeueWithBackoff(item interface{}) RequeueResult {
q.mu.Lock()
defer q.mu.Unlock()

delete(q.processing, item)

if q.shuttingDown {
return RequeueResult{Terminal: true}
}

attempt := q.retries[item] + 1
if attempt > q.maxRetries {
delete(q.dirty, item)
return RequeueResult{
Attempt: attempt - 1,
Terminal: true,
}
}

q.retries[item] = attempt
q.dirty[item] = struct{}{}

delay := q.computeBackoffDelayLocked(attempt)
q.afterFunc(delay, func() {
q.enqueueIfDirty(item)
})

return RequeueResult{
Attempt: attempt,
Delay: delay,
}
}

// Forget clears retry bookkeeping for an item after successful processing.
func (q *WorkQueue) Forget(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
delete(q.retries, item)
}

// NumRequeues returns the number of recorded retries for an item.
func (q *WorkQueue) NumRequeues(item interface{}) int {
q.mu.Lock()
defer q.mu.Unlock()
return q.retries[item]
}

// MaxRetries returns the configured max retry count.
func (q *WorkQueue) MaxRetries() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.maxRetries
}

// SetBackoffConfig configures delay/jitter/retry policy. Intended for tests.
func (q *WorkQueue) SetBackoffConfig(baseDelay, maxDelay time.Duration, jitterFactor float64, maxRetries int) {
q.mu.Lock()
defer q.mu.Unlock()
if baseDelay > 0 {
q.baseDelay = baseDelay
}
if maxDelay > 0 {
q.maxDelay = maxDelay
}
if jitterFactor >= 0 {
q.jitterFactor = jitterFactor
}
if maxRetries > 0 {
q.maxRetries = maxRetries
}
}

// SetRandomSource overrides jitter source. Intended for tests.
func (q *WorkQueue) SetRandomSource(randFloat64 func() float64) {
q.mu.Lock()
defer q.mu.Unlock()
if randFloat64 != nil {
q.randFloat64 = randFloat64
}
}

// SetAfterFunc overrides timer scheduling. Intended for tests.
func (q *WorkQueue) SetAfterFunc(afterFunc func(time.Duration, func()) *time.Timer) {
q.mu.Lock()
defer q.mu.Unlock()
if afterFunc != nil {
q.afterFunc = afterFunc
}
}

// Len returns the number of items in the queue.
func (q *WorkQueue) Len() int {
q.mu.Lock()
Expand All @@ -146,3 +274,71 @@ func (q *WorkQueue) ShuttingDown() bool {
defer q.mu.Unlock()
return q.shuttingDown
}

func (q *WorkQueue) computeBackoffDelayLocked(attempt int) time.Duration {
exp := attempt - 1
if exp < 0 {
exp = 0
}
// Cap exponent to avoid overflow; delay is capped to maxDelay anyway.
if exp > 30 {
exp = 30
}
delay := q.baseDelay * time.Duration(1<<exp)
if delay > q.maxDelay {
delay = q.maxDelay
}

if q.jitterFactor <= 0 {
return delay
}

jitterRange := float64(delay) * q.jitterFactor
if jitterRange <= 0 {
return delay
}

jitter := (q.randFloat64()*2 - 1) * jitterRange
jittered := float64(delay) + jitter
if jittered < 0 {
return 0
}
if jittered > float64(math.MaxInt64) {
return q.maxDelay
}
jitteredDelay := time.Duration(jittered)
if jitteredDelay > q.maxDelay {
return q.maxDelay
}
return jitteredDelay
}

func (q *WorkQueue) enqueueIfDirty(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()

if q.shuttingDown {
return
}
if _, processing := q.processing[item]; processing {
return
}
if _, dirty := q.dirty[item]; !dirty {
return
}
if q.inQueueLocked(item) {
return
}

q.queue = append(q.queue, item)
q.cond.Signal()
}

func (q *WorkQueue) inQueueLocked(item interface{}) bool {
for _, queued := range q.queue {
if queued == item {
return true
}
}
return false
}
Loading
Loading