Skip to content

Commit

Permalink
Make number of workers configurable (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: Luis Davim <luis.davim@jet.com>
  • Loading branch information
luisdavim authored and Amit Kumar Das committed Oct 8, 2019
1 parent b8565d1 commit 5091d52
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 15 deletions.
10 changes: 7 additions & 3 deletions controller/composite/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (pc *parentController) String() string {

// Start triggers the reconciliation process of this controller
// instance
func (pc *parentController) Start() {
func (pc *parentController) Start(workerCount int) {
// init the channels to signal cancellation
// a single stop channel for all workers
// done channel is triggered when all workers are stopped
Expand Down Expand Up @@ -203,6 +203,10 @@ func (pc *parentController) Start() {
})
}

if workerCount <= 0 {
workerCount = 5
}

go func() {
defer close(pc.doneCh)
defer utilruntime.HandleCrash()
Expand Down Expand Up @@ -241,9 +245,9 @@ func (pc *parentController) Start() {
return
}

// 5 workers ought to be enough for anyone.
glog.Infof("Starting %d workers for %s", workerCount, pc)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
5 changes: 4 additions & 1 deletion controller/composite/metacontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Metacontroller struct {
revisionLister metalisters.ControllerRevisionLister
revisionInformer cache.SharedIndexInformer

workerCount int
queue workqueue.RateLimitingInterface
parentControllers map[string]*parentController

Expand All @@ -62,13 +63,15 @@ func NewMetacontroller(
dynamicInformerFactory *dynamicinformer.SharedInformerFactory,
metaInformerFactory metainformers.SharedInformerFactory,
metaClientset metaclientset.Interface,
workerCount int,
) *Metacontroller {

mc := &Metacontroller{
resourceManager: resourceMgr,
metaClientset: metaClientset,
dynamicClientset: dynamicClientset,
dynamicInformerFactory: dynamicInformerFactory,
workerCount: workerCount,

lister: metaInformerFactory.Metacontroller().V1alpha1().CompositeControllers().Lister(),
informer: metaInformerFactory.Metacontroller().V1alpha1().CompositeControllers().Informer(),
Expand Down Expand Up @@ -186,7 +189,7 @@ func (mc *Metacontroller) syncCompositeController(cc *v1alpha1.CompositeControll
if err != nil {
return err
}
pc.Start()
pc.Start(mc.workerCount)
mc.parentControllers[cc.Name] = pc
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions controller/decorator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func newDecoratorController(

// Start starts the decorator controller based on its fields
// that were initialised earlier (mostly via its constructor)
func (c *decoratorController) Start() {
func (c *decoratorController) Start(workerCount int) {
// init the channels with empty structs
c.stopCh = make(chan struct{})
c.doneCh = make(chan struct{})
Expand Down Expand Up @@ -230,6 +230,10 @@ func (c *decoratorController) Start() {
})
}

if workerCount <= 0 {
workerCount = 5
}

go func() {
// close done channel i.e. mark closure of this start invocation
defer close(c.doneCh)
Expand Down Expand Up @@ -260,9 +264,9 @@ func (c *decoratorController) Start() {
return
}

// 5 workers ought to be enough for anyone.
glog.Infof("Starting %d workers for %v", workerCount, c.schema.Name)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
5 changes: 4 additions & 1 deletion controller/decorator/metacontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Metacontroller struct {
lister mclisters.DecoratorControllerLister
informer cache.SharedIndexInformer

workerCount int
queue workqueue.RateLimitingInterface
decoratorControllers map[string]*decoratorController

Expand All @@ -58,6 +59,7 @@ func NewMetacontroller(
clientset *dynamicclientset.Clientset,
dynInformers *dynamicinformer.SharedInformerFactory,
mcInformerFactory mcinformers.SharedInformerFactory,
workerCount int,
) *Metacontroller {

mc := &Metacontroller{
Expand All @@ -70,6 +72,7 @@ func NewMetacontroller(

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DecoratorController"),
decoratorControllers: make(map[string]*decoratorController),
workerCount: workerCount,
}

mc.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -181,7 +184,7 @@ func (mc *Metacontroller) syncDecoratorController(dc *v1alpha1.DecoratorControll
if err != nil {
return err
}
c.Start()
c.Start(mc.workerCount)
mc.decoratorControllers[dc.Name] = c
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions controller/generic/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func newGenericControllerManager(

// Start starts the decorator controller based on its fields
// that were initialised earlier (mostly via its constructor)
func (mgr *genericControllerManager) Start() {
func (mgr *genericControllerManager) Start(workerCount int) {
// init the channels with empty structs
mgr.stopCh = make(chan struct{})
mgr.doneCh = make(chan struct{})
Expand Down Expand Up @@ -252,6 +252,10 @@ func (mgr *genericControllerManager) Start() {
}
}

if workerCount <= 0 {
workerCount = 5
}

go func() {
// close done channel i.e. mark closure of this start invocation
defer close(mgr.doneCh)
Expand Down Expand Up @@ -280,10 +284,9 @@ func (mgr *genericControllerManager) Start() {
return
}

// ensure sufficient workers to reconcile good number of
// GenericController resources simultaneously
glog.Infof("Starting %d workers for %s", workerCount, mgr)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
5 changes: 4 additions & 1 deletion controller/generic/metacontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type MetaController struct {

queue workqueue.RateLimitingInterface
genericControllers map[string]*genericControllerManager
workerCount int

stopCh, doneCh chan struct{}
}
Expand All @@ -61,12 +62,14 @@ func NewMetacontroller(
dynClientset *dynamicclientset.Clientset,
dynInformerFactory *dynamicinformer.SharedInformerFactory,
metaInformerFactory metainformers.SharedInformerFactory,
workerCount int,
) *MetaController {

mc := &MetaController{
resourceManager: resourceMgr,
dynClientset: dynClientset,
dynInformerFactory: dynInformerFactory,
workerCount: workerCount,

lister: metaInformerFactory.Metacontroller().V1alpha1().GenericControllers().Lister(),
informer: metaInformerFactory.Metacontroller().V1alpha1().GenericControllers().Informer(),
Expand Down Expand Up @@ -213,7 +216,7 @@ func (mc *MetaController) syncGenericController(ctrl *v1alpha1.GenericController
return err
}

wc.Start()
wc.Start(mc.workerCount)
mc.genericControllers[ctrl.Key()] = wc
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
informerRelist = flag.Duration("cache-flush-interval", 30*time.Minute, "How often to flush local caches and relist objects from the API server")
debugAddr = flag.String("debug-addr", ":9999", "The address to bind the debug http endpoints")
clientConfigPath = flag.String("client-config-path", "", "Path to kubeconfig file (same format as used by kubectl); if not specified, use in-cluster config")
workerCount = flag.Int("workers-count", 5, "How many workers to start per controller to process queued events")
)

func main() {
Expand All @@ -63,7 +64,7 @@ func main() {
glog.Fatal(err)
}

stopServer, err := server.Start(config, *discoveryInterval, *informerRelist)
stopServer, err := server.Start(config, *discoveryInterval, *informerRelist, *workerCount)
if err != nil {
glog.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func Start(
config *rest.Config,
discoveryInterval time.Duration,
informerRelist time.Duration,
workerCount int,
) (stop func(), err error) {

// Periodically refresh discovery to pick up newly-installed resources.
Expand Down Expand Up @@ -86,18 +87,21 @@ func Start(
dynamicInformerFactory,
metaInformerFactory,
metaClientset,
workerCount,
),
decorator.NewMetacontroller(
resourceMgr,
dynamicClientset,
dynamicInformerFactory,
metaInformerFactory,
workerCount,
),
generic.NewMetacontroller(
resourceMgr,
dynamicClientset,
dynamicInformerFactory,
metaInformerFactory,
workerCount,
),
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/framework/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func testMain(tests func() int) error {
// locally inside the test binary, since that's part of the
// code under test.
stopServer, err := server.Start(
ApiserverConfig(), 500*time.Millisecond, 30*time.Minute,
ApiserverConfig(), 500*time.Millisecond, 30*time.Minute, 5,
)
if err != nil {
return errors.Wrapf(err, "Can't start metacontroller server")
Expand Down

0 comments on commit 5091d52

Please sign in to comment.