diff --git a/interface.go b/interface.go index 387c584..1de6cdb 100644 --- a/interface.go +++ b/interface.go @@ -1,6 +1,7 @@ package yacht import ( + "context" "time" "k8s.io/client-go/tools/cache" @@ -10,13 +11,18 @@ import ( type Interface interface { Enqueue(obj interface{}) WithEnqueueFunc(EnqueueFunc) *Controller + // Deprecated: Use WithHandlerContextFunc instead. WithHandlerFunc(HandlerFunc) *Controller + WithHandlerContextFunc(HandlerContextFunc) *Controller WithLeaderElection(leaseLock rl.Interface, leaseDuration, renewDeadline, retryPeriod time.Duration) *Controller WithCacheSynced(...cache.InformerSynced) *Controller } +// Deprecated: Use HandlerContextFunc instead. type HandlerFunc func(key interface{}) (requeueAfter *time.Duration, err error) +type HandlerContextFunc func(ctx context.Context, key interface{}) (requeueAfter *time.Duration, err error) + type EnqueueFunc func(obj interface{}) (interface{}, error) type EnqueueFilterFunc func(oldObj, newObj interface{}) (bool, error) diff --git a/yacht.go b/yacht.go index 00b8673..bbcfdd9 100644 --- a/yacht.go +++ b/yacht.go @@ -33,8 +33,8 @@ type Controller struct { // informersSynced records a group of cacheSyncs // The workers will not start working before all the caches are synced successfully informersSynced []cache.InformerSynced - // handlerFunc defines the handler to process the work item - handlerFunc HandlerFunc + // handlerContextFunc defines the handler to process the work item + handlerContextFunc HandlerContextFunc // le specifies the LeaderElector to use le *leaderelection.LeaderElector @@ -165,13 +165,33 @@ func (c *Controller) applyEnqueueFilterFunc(oldObj, newObj interface{}, operatio } // WithHandlerFunc sets a handler function to process the work item off the work queue +// Deprecated: Use WithHandlerContextFunc instead. func (c *Controller) WithHandlerFunc(handlerFunc HandlerFunc) *Controller { if c.runFlag { - panic(fmt.Errorf("can not mutate handlerFunc when controller %s is running", c.name)) + panic(fmt.Errorf("can not mutate handlerContextFunc when controller %s is running", c.name)) } if handlerFunc != nil { - c.handlerFunc = handlerFunc + c.handlerContextFunc = func(ctx context.Context, key interface{}) (requeueAfter *time.Duration, err error) { + select { + case <-ctx.Done(): + return + default: + return handlerFunc(key) + } + } + } + return c +} + +// WithHandlerContextFunc sets a handler function to process the work item off the work queue +func (c *Controller) WithHandlerContextFunc(handlerContextFunc HandlerContextFunc) *Controller { + if c.runFlag { + panic(fmt.Errorf("can not mutate handlerContextFunc when controller %s is running", c.name)) + } + + if handlerContextFunc != nil { + c.handlerContextFunc = handlerContextFunc } return c } @@ -239,8 +259,8 @@ func (c *Controller) Run(ctx context.Context) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() - if c.handlerFunc == nil { - panic(fmt.Errorf("empty handlerFunc for controller %s", c.name)) + if c.handlerContextFunc == nil { + panic(fmt.Errorf("please set handlerContextFunc for controller %s", c.name)) } c.once.Do(func() { @@ -265,7 +285,7 @@ func (c *Controller) run(ctx context.Context) { klog.V(4).Infof("starting %d workers for controller %s", *c.workers, c.name) // Launch workers to process work items from queue for i := 0; i < *c.workers; i++ { - go wait.Until(c.runWorker, time.Second, ctx.Done()) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } <-ctx.Done() @@ -273,20 +293,20 @@ func (c *Controller) run(ctx context.Context) { } // runWorker starts an infinite loop on processing the work item until the work queue is shut down. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // processNextWorkItem reads a single work item from the work queue -func (c *Controller) processNextWorkItem() bool { +func (c *Controller) processNextWorkItem(ctx context.Context) bool { item, quit := c.queue.Get() if quit { return false } defer c.queue.Done(item) - requeueAfter, err := c.handlerFunc(item) + requeueAfter, err := c.handlerContextFunc(ctx, item) if err == nil { c.queue.Forget(item) if requeueAfter != nil {