diff --git a/internal/daemon/controller/manager.go b/internal/daemon/controller/manager.go index 32330313..7186fb1d 100644 --- a/internal/daemon/controller/manager.go +++ b/internal/daemon/controller/manager.go @@ -178,22 +178,35 @@ func (m *Manager) runWorker(ctx context.Context, resourceType string, workerID i // processItem handles a single work item. func (m *Manager) processItem(ctx context.Context, resourceType string, ctrl Controller, queue *WorkQueue, key string) { - defer queue.Done(key) - m.logger.Debug("reconciling", "resourceType", resourceType, "key", key) err := ctrl.Reconcile(ctx, key) if err != nil { - m.logger.Error("reconcile failed, requeuing", + requeue := queue.RequeueWithBackoff(key) + if requeue.Terminal { + m.logger.Error("reconcile failed permanently, dropping item", + "resourceType", resourceType, + "key", key, + "maxRetries", queue.MaxRetries(), + "error", err) + queue.Forget(key) + return + } + + m.logger.Warn("reconcile failed, scheduling retry", "resourceType", resourceType, "key", key, + "retryCount", requeue.Attempt, + "nextDelay", requeue.Delay, "error", err) - queue.Requeue(key) return } + queue.Done(key) + queue.Forget(key) + m.logger.Debug("reconcile complete", "resourceType", resourceType, "key", key) diff --git a/internal/daemon/controller/manager_test.go b/internal/daemon/controller/manager_test.go index 01811e95..d0ba55c0 100644 --- a/internal/daemon/controller/manager_test.go +++ b/internal/daemon/controller/manager_test.go @@ -88,6 +88,7 @@ func TestManager_RequeueOnError(t *testing.T) { ctrl.reconcileErr = errors.New("temporary error") m.Register("devnets", ctrl) + m.GetQueue("devnets").SetBackoffConfig(time.Millisecond, 5*time.Millisecond, 0, 10) ctx, cancel := context.WithCancel(context.Background()) @@ -99,7 +100,7 @@ func TestManager_RequeueOnError(t *testing.T) { m.Enqueue("devnets", "failing-devnet") // Wait for multiple retries - time.Sleep(100 * time.Millisecond) + time.Sleep(60 * time.Millisecond) cancel() // Should have been called multiple times due to requeue @@ -109,6 +110,34 @@ func TestManager_RequeueOnError(t *testing.T) { } } +func TestManager_RequeueStopsAtMaxRetries(t *testing.T) { + m := NewManager() + + ctrl := &mockController{reconcileErr: errors.New("persistent failure")} + m.Register("devnets", ctrl) + + maxRetries := 3 + m.GetQueue("devnets").SetBackoffConfig(time.Millisecond, 5*time.Millisecond, 0, maxRetries) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go m.Start(ctx, 1) + time.Sleep(10 * time.Millisecond) + + m.Enqueue("devnets", "failing-devnet") + + // Give enough time for initial reconcile + max retries. + time.Sleep(120 * time.Millisecond) + cancel() + + calls := ctrl.getCalls() + expected := maxRetries + 1 // initial attempt + retries + if len(calls) != expected { + t.Fatalf("expected %d reconcile calls, got %d", expected, len(calls)) + } +} + func TestManager_MultipleControllers(t *testing.T) { m := NewManager() diff --git a/internal/daemon/controller/queue.go b/internal/daemon/controller/queue.go index d8b307bc..9ddf8eb2 100644 --- a/internal/daemon/controller/queue.go +++ b/internal/daemon/controller/queue.go @@ -2,9 +2,26 @@ package controller import ( + "math" + "math/rand" "sync" + "time" ) +const ( + defaultBackoffBaseDelay = time.Second + defaultBackoffMaxDelay = 60 * time.Second + defaultBackoffJitter = 0.20 + defaultBackoffRetries = 8 +) + +// RequeueResult describes the outcome of scheduling a retry. +type RequeueResult struct { + Attempt int + Delay time.Duration + Terminal bool +} + // WorkQueue is a rate-limited deduplicating work queue. // Items added while an item is being processed will be re-queued // when Done is called. This is inspired by the Kubernetes workqueue. @@ -24,6 +41,17 @@ type WorkQueue struct { // shuttingDown indicates the queue is shutting down shuttingDown bool + // retries tracks how many times an item has been retried. + retries map[interface{}]int + + // backoff config + baseDelay time.Duration + maxDelay time.Duration + jitterFactor float64 + maxRetries int + randFloat64 func() float64 + afterFunc func(time.Duration, func()) *time.Timer + mu sync.Mutex } @@ -33,6 +61,14 @@ func NewWorkQueue() *WorkQueue { queue: make([]interface{}, 0), dirty: make(map[interface{}]struct{}), processing: make(map[interface{}]struct{}), + retries: make(map[interface{}]int), + baseDelay: defaultBackoffBaseDelay, + maxDelay: defaultBackoffMaxDelay, + // Keep jitter modest to spread retries without making behavior erratic. + jitterFactor: defaultBackoffJitter, + maxRetries: defaultBackoffRetries, + randFloat64: rand.Float64, + afterFunc: time.AfterFunc, } q.cond = sync.NewCond(&q.mu) return q @@ -124,6 +160,98 @@ func (q *WorkQueue) Requeue(item interface{}) { q.cond.Signal() } +// RequeueWithBackoff schedules a delayed retry using exponential backoff + jitter. +// Returns terminal=true when max retries is exceeded and no further retry is scheduled. +func (q *WorkQueue) RequeueWithBackoff(item interface{}) RequeueResult { + q.mu.Lock() + defer q.mu.Unlock() + + delete(q.processing, item) + + if q.shuttingDown { + return RequeueResult{Terminal: true} + } + + attempt := q.retries[item] + 1 + if attempt > q.maxRetries { + delete(q.dirty, item) + return RequeueResult{ + Attempt: attempt - 1, + Terminal: true, + } + } + + q.retries[item] = attempt + q.dirty[item] = struct{}{} + + delay := q.computeBackoffDelayLocked(attempt) + q.afterFunc(delay, func() { + q.enqueueIfDirty(item) + }) + + return RequeueResult{ + Attempt: attempt, + Delay: delay, + } +} + +// Forget clears retry bookkeeping for an item after successful processing. +func (q *WorkQueue) Forget(item interface{}) { + q.mu.Lock() + defer q.mu.Unlock() + delete(q.retries, item) +} + +// NumRequeues returns the number of recorded retries for an item. +func (q *WorkQueue) NumRequeues(item interface{}) int { + q.mu.Lock() + defer q.mu.Unlock() + return q.retries[item] +} + +// MaxRetries returns the configured max retry count. +func (q *WorkQueue) MaxRetries() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.maxRetries +} + +// SetBackoffConfig configures delay/jitter/retry policy. Intended for tests. +func (q *WorkQueue) SetBackoffConfig(baseDelay, maxDelay time.Duration, jitterFactor float64, maxRetries int) { + q.mu.Lock() + defer q.mu.Unlock() + if baseDelay > 0 { + q.baseDelay = baseDelay + } + if maxDelay > 0 { + q.maxDelay = maxDelay + } + if jitterFactor >= 0 { + q.jitterFactor = jitterFactor + } + if maxRetries > 0 { + q.maxRetries = maxRetries + } +} + +// SetRandomSource overrides jitter source. Intended for tests. +func (q *WorkQueue) SetRandomSource(randFloat64 func() float64) { + q.mu.Lock() + defer q.mu.Unlock() + if randFloat64 != nil { + q.randFloat64 = randFloat64 + } +} + +// SetAfterFunc overrides timer scheduling. Intended for tests. +func (q *WorkQueue) SetAfterFunc(afterFunc func(time.Duration, func()) *time.Timer) { + q.mu.Lock() + defer q.mu.Unlock() + if afterFunc != nil { + q.afterFunc = afterFunc + } +} + // Len returns the number of items in the queue. func (q *WorkQueue) Len() int { q.mu.Lock() @@ -146,3 +274,71 @@ func (q *WorkQueue) ShuttingDown() bool { defer q.mu.Unlock() return q.shuttingDown } + +func (q *WorkQueue) computeBackoffDelayLocked(attempt int) time.Duration { + exp := attempt - 1 + if exp < 0 { + exp = 0 + } + // Cap exponent to avoid overflow; delay is capped to maxDelay anyway. + if exp > 30 { + exp = 30 + } + delay := q.baseDelay * time.Duration(1< q.maxDelay { + delay = q.maxDelay + } + + if q.jitterFactor <= 0 { + return delay + } + + jitterRange := float64(delay) * q.jitterFactor + if jitterRange <= 0 { + return delay + } + + jitter := (q.randFloat64()*2 - 1) * jitterRange + jittered := float64(delay) + jitter + if jittered < 0 { + return 0 + } + if jittered > float64(math.MaxInt64) { + return q.maxDelay + } + jitteredDelay := time.Duration(jittered) + if jitteredDelay > q.maxDelay { + return q.maxDelay + } + return jitteredDelay +} + +func (q *WorkQueue) enqueueIfDirty(item interface{}) { + q.mu.Lock() + defer q.mu.Unlock() + + if q.shuttingDown { + return + } + if _, processing := q.processing[item]; processing { + return + } + if _, dirty := q.dirty[item]; !dirty { + return + } + if q.inQueueLocked(item) { + return + } + + q.queue = append(q.queue, item) + q.cond.Signal() +} + +func (q *WorkQueue) inQueueLocked(item interface{}) bool { + for _, queued := range q.queue { + if queued == item { + return true + } + } + return false +} diff --git a/internal/daemon/controller/queue_test.go b/internal/daemon/controller/queue_test.go index 37884931..4dde7d34 100644 --- a/internal/daemon/controller/queue_test.go +++ b/internal/daemon/controller/queue_test.go @@ -6,6 +6,30 @@ import ( "time" ) +func getWithTimeout(t *testing.T, q *WorkQueue, timeout time.Duration) (interface{}, bool) { + t.Helper() + done := make(chan struct { + item interface{} + shutdown bool + }, 1) + + go func() { + item, shutdown := q.Get() + done <- struct { + item interface{} + shutdown bool + }{item: item, shutdown: shutdown} + }() + + select { + case res := <-done: + return res.item, res.shutdown + case <-time.After(timeout): + t.Fatalf("timed out waiting for queue item") + return nil, true + } +} + func TestWorkQueue_AddAndGet(t *testing.T) { q := NewWorkQueue() defer q.ShutDown() @@ -191,3 +215,89 @@ func TestWorkQueue_ConcurrentAddGet(t *testing.T) { t.Errorf("expected %d unique items, got %d", numItems, len(received)) } } + +func TestWorkQueue_RequeueWithBackoff(t *testing.T) { + q := NewWorkQueue() + q.SetBackoffConfig(time.Millisecond, time.Millisecond, 0, 3) + defer q.ShutDown() + + q.Add("item1") + item, shutdown := q.Get() + if shutdown { + t.Fatal("unexpected shutdown") + } + + result := q.RequeueWithBackoff(item) + if result.Terminal { + t.Fatal("expected non-terminal retry") + } + if result.Attempt != 1 { + t.Fatalf("expected attempt 1, got %d", result.Attempt) + } + if result.Delay != time.Millisecond { + t.Fatalf("expected 1ms delay, got %v", result.Delay) + } + + requeued, shutdown := getWithTimeout(t, q, 100*time.Millisecond) + if shutdown { + t.Fatal("unexpected shutdown while waiting for requeued item") + } + if requeued != "item1" { + t.Fatalf("expected item1, got %v", requeued) + } +} + +func TestWorkQueue_RequeueWithBackoff_MaxRetries(t *testing.T) { + q := NewWorkQueue() + q.SetBackoffConfig(time.Millisecond, time.Millisecond, 0, 2) + defer q.ShutDown() + + q.Add("item1") + item, _ := q.Get() + + // Attempt 1 + res1 := q.RequeueWithBackoff(item) + if res1.Terminal || res1.Attempt != 1 { + t.Fatalf("unexpected attempt 1 result: %+v", res1) + } + item, _ = getWithTimeout(t, q, 100*time.Millisecond) + + // Attempt 2 + res2 := q.RequeueWithBackoff(item) + if res2.Terminal || res2.Attempt != 2 { + t.Fatalf("unexpected attempt 2 result: %+v", res2) + } + item, _ = getWithTimeout(t, q, 100*time.Millisecond) + + // Attempt 3 should be terminal (maxRetries=2) + res3 := q.RequeueWithBackoff(item) + if !res3.Terminal { + t.Fatalf("expected terminal retry result") + } + if q.NumRequeues("item1") != 2 { + t.Fatalf("expected retry count to stay at 2, got %d", q.NumRequeues("item1")) + } +} + +func TestWorkQueue_BackoffJitterBounds(t *testing.T) { + q := NewWorkQueue() + q.SetBackoffConfig(time.Second, 60*time.Second, 0.2, 10) + + // Lowest jitter branch. + q.SetRandomSource(func() float64 { return 0.0 }) + q.mu.Lock() + low := q.computeBackoffDelayLocked(3) // 4s base, -20% => 3.2s + q.mu.Unlock() + if low < 3200*time.Millisecond || low > 4*time.Second { + t.Fatalf("unexpected low-jitter delay: %v", low) + } + + // Highest jitter branch should still respect max delay cap. + q.SetRandomSource(func() float64 { return 1.0 }) + q.mu.Lock() + high := q.computeBackoffDelayLocked(10) + q.mu.Unlock() + if high > 60*time.Second { + t.Fatalf("expected capped max delay <= 60s, got %v", high) + } +}