Skip to content

Commit

Permalink
Refactor controllers / managers and cleanup context propagation
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <kklues@nvidia.com>
  • Loading branch information
klueska committed Jan 14, 2025
1 parent 3e51cd8 commit 6b4d5e1
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 318 deletions.
64 changes: 57 additions & 7 deletions cmd/nvidia-dra-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,28 @@ package main
import (
"context"
"fmt"
"sync"
"time"

"k8s.io/client-go/informers"

"github.com/NVIDIA/k8s-dra-driver/pkg/flags"
nvinformers "github.com/NVIDIA/k8s-dra-driver/pkg/nvidia.com/resource/informers/externalversions"
"github.com/NVIDIA/k8s-dra-driver/pkg/workqueue"
)

type ManagerConfig struct {
clientsets flags.ClientSets
nvInformerFactory nvinformers.SharedInformerFactory
coreInformerFactory informers.SharedInformerFactory
workQueue *workqueue.WorkQueue
}

type OwnerExistsFunc func(ctx context.Context, uid string) (bool, error)

type Controller struct {
waitGroup sync.WaitGroup

ImexManager *ImexManager
MultiNodeEnvironmentManager *MultiNodeEnvironmentManager
}
Expand All @@ -32,33 +51,64 @@ func StartController(ctx context.Context, config *Config) (*Controller, error) {
return nil, nil
}

workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())
nvInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
coreInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)

managerConfig := &ManagerConfig{
clientsets: config.clientsets,
nvInformerFactory: nvInformerFactory,
coreInformerFactory: coreInformerFactory,
workQueue: workQueue,
}

imexManager, err := StartImexManager(ctx, config)
if err != nil {
return nil, fmt.Errorf("error starting IMEX manager: %w", err)
}

mneManager, err := StartMultiNodeEnvironmentManager(ctx, config)
mneManager, err := NewMultiNodeEnvironmentManager(ctx, managerConfig)
if err != nil {
return nil, fmt.Errorf("error starting MultiNodeEnvironment manager: %w", err)
}

m := &Controller{
c := &Controller{
ImexManager: imexManager,
MultiNodeEnvironmentManager: mneManager,
}

return m, nil
c.waitGroup.Add(3)
go func() {
defer c.waitGroup.Done()
nvInformerFactory.Start(ctx.Done())
}()
go func() {
defer c.waitGroup.Done()
coreInformerFactory.Start(ctx.Done())
}()
go func() {
defer c.waitGroup.Done()
workQueue.Run(ctx)
}()

if err := c.MultiNodeEnvironmentManager.WaitForCacheSync(ctx); err != nil {
return nil, fmt.Errorf("error syncing cache: %w", err)
}

return c, nil
}

// Stop stops a running Controller.
func (m *Controller) Stop() error {
if m == nil {
return nil
}
imErr := m.ImexManager.Stop()
mnErr := m.MultiNodeEnvironmentManager.Stop()
if imErr != nil || mnErr != nil {
return fmt.Errorf("IMEX manager error: %w, MultiNodeEnvironment manager error: %w", imErr, mnErr)

m.waitGroup.Wait()

if err := m.ImexManager.Stop(); err != nil {
return fmt.Errorf("error stopping IMEX manager: %w", err)
}

return nil
}
191 changes: 191 additions & 0 deletions cmd/nvidia-dra-controller/deviceclass.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright (c) 2025 NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"time"

resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
resourcelisters "k8s.io/client-go/listers/resource/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

nvapi "github.com/NVIDIA/k8s-dra-driver/api/nvidia.com/resource/gpu/v1alpha1"
"github.com/NVIDIA/k8s-dra-driver/pkg/flags"
)

type DeviceClassManager struct {
clientsets flags.ClientSets
ownerExists func(ctx context.Context, uid string) (bool, error)

informer cache.SharedIndexInformer
lister resourcelisters.DeviceClassLister
}

func NewDeviceClassManager(ctx context.Context, config *ManagerConfig, ownerExists OwnerExistsFunc) (*DeviceClassManager, error) {
informer := config.coreInformerFactory.Resource().V1beta1().DeviceClasses().Informer()
lister := resourcelisters.NewDeviceClassLister(informer.GetIndexer())

m := &DeviceClassManager{
clientsets: config.clientsets,
ownerExists: ownerExists,
informer: informer,
lister: lister,
}

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
config.workQueue.Enqueue(obj, m.onAddOrUpdate)
},
UpdateFunc: func(objOld, objNew any) {
config.workQueue.Enqueue(objNew, m.onAddOrUpdate)
},
})
if err != nil {
return nil, fmt.Errorf("error adding event handlers for DeviceClass informer: %w", err)
}

return m, nil
}

func (m *DeviceClassManager) WaitForCacheSync(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

if !cache.WaitForCacheSync(ctx.Done(), m.informer.HasSynced) {
return fmt.Errorf("informer cache sync for DeviceClasses failed")
}

return nil
}

func (m *DeviceClassManager) Create(ctx context.Context, name string, ownerReference metav1.OwnerReference) (*resourceapi.DeviceClass, error) {
if name != "" {
dc, err := m.lister.Get(name)
if err == nil {
if len(dc.OwnerReferences) != 1 && dc.OwnerReferences[0] != ownerReference {
return nil, fmt.Errorf("DeviceClass '%s' exists without expected OwnerReference: %v", name, ownerReference)
}
return dc, nil
}
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving DeviceClass: %w", err)
}
}

deviceClass := &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{ownerReference},
Finalizers: []string{multiNodeEnvironmentFinalizer},
},
Spec: resourceapi.DeviceClassSpec{
Selectors: []resourceapi.DeviceSelector{
{
CEL: &resourceapi.CELDeviceSelector{
Expression: "device.driver == 'gpu.nvidia.com' && device.attributes['gpu.nvidia.com'].type == 'imex-channel'",
},
},
},
},
}

if name == "" {
deviceClass.GenerateName = ownerReference.Name
} else {
deviceClass.Name = name
}

dc, err := m.clientsets.Core.ResourceV1beta1().DeviceClasses().Create(ctx, deviceClass, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("error creating DeviceClass: %w", err)
}

return dc, nil
}

func (m *DeviceClassManager) Delete(ctx context.Context, name string) error {
err := m.clientsets.Core.ResourceV1beta1().DeviceClasses().Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("erroring deleting DeviceClass: %w", err)
}
return nil
}

func (m *DeviceClassManager) RemoveFinalizer(ctx context.Context, name string) error {
dc, err := m.lister.Get(name)
if err != nil && errors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("error retrieving DeviceClass: %w", err)
}

newDC := dc.DeepCopy()

newDC.Finalizers = []string{}
for _, f := range dc.Finalizers {
if f != multiNodeEnvironmentFinalizer {
newDC.Finalizers = append(newDC.Finalizers, f)
}
}

_, err = m.clientsets.Core.ResourceV1beta1().DeviceClasses().Update(ctx, newDC, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating DeviceClass: %w", err)
}

return nil
}

func (m *DeviceClassManager) onAddOrUpdate(ctx context.Context, obj any) error {
dc, ok := obj.(*resourceapi.DeviceClass)
if !ok {
return fmt.Errorf("failed to cast to DeviceClass")
}

klog.Infof("Processing added or updated DeviceClass: %s", dc.Name)

if len(dc.OwnerReferences) != 1 {
return nil
}

if dc.OwnerReferences[0].Kind != nvapi.MultiNodeEnvironmentKind {
return nil
}

exists, err := m.ownerExists(ctx, string(dc.OwnerReferences[0].UID))
if err != nil {
return fmt.Errorf("error checking if owner exists: %w", err)
}
if exists {
return nil
}

if err := m.RemoveFinalizer(ctx, dc.Name); err != nil {
return fmt.Errorf("error removing finalizer on DeviceClass '%s': %w", dc.Name, err)
}

if err := m.Delete(ctx, dc.Name); err != nil {
return fmt.Errorf("error deleting DeviceClass '%s': %w", dc.Name, err)
}

return nil
}
Loading

0 comments on commit 6b4d5e1

Please sign in to comment.