Skip to content

Commit

Permalink
feat: Support multiple MOFED DS
Browse files Browse the repository at this point in the history
Mofed driver precompiled container images
are compiled using a specific Kernel.

As a result, the Mofed Driver DaemonSet should
have the Kernel as part of the Node Selector.

In addition, since there can be Nodes with different
Kernel versions, a DaemonSet for each existing Kernel
in the cluster is created.

A PodAntiAffinity is added to Mofed pod to make sure,
that only one Mofed pod can run on the node.

Signed-off-by: Fred Rolland <frolland@nvidia.com>
  • Loading branch information
rollandf committed Dec 18, 2023
1 parent 10ec312 commit e765759
Show file tree
Hide file tree
Showing 14 changed files with 672 additions and 126 deletions.
10 changes: 10 additions & 0 deletions controllers/nicclusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NicClusterPolicyReconciler struct {
Scheme *runtime.Scheme
ClusterTypeProvider clustertype.Provider
StaticConfigProvider staticconfig.Provider
MigrationCh chan struct{}

stateManager state.Manager
}
Expand Down Expand Up @@ -86,6 +87,11 @@ type NicClusterPolicyReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NicClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling NicClusterPolicy")

Expand Down Expand Up @@ -179,6 +185,10 @@ func (r *NicClusterPolicyReconciler) handleMOFEDWaitLabels(
_ = r.Client.List(ctx, pods, client.MatchingLabels{"driver-pod": podLabel})
for i := range pods.Items {
pod := pods.Items[i]
if pod.Spec.NodeName == "" {
// In case that Pod is in Pending state
continue
}
labelValue := "true"
// We assume that OFED pod contains only one container to simplify the logic.
// We can revisit this logic in the future if needed
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,15 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
staticConfigProvider := staticconfig.NewProvider(staticconfig.StaticConfig{CniBinDirectory: "/opt/cni/bin"})

migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

err = (&NicClusterPolicyReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ClusterTypeProvider: clusterTypeProvider,
StaticConfigProvider: staticConfigProvider,
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand Down
7 changes: 7 additions & 0 deletions controllers/upgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/NVIDIA/k8s-operator-libs/pkg/upgrade"
Expand Down Expand Up @@ -47,6 +48,7 @@ type UpgradeReconciler struct {
client.Client
Scheme *runtime.Scheme
StateManager upgrade.ClusterUpgradeStateManager
MigrationCh chan struct{}
}

const plannedRequeueInterval = time.Minute * 2
Expand All @@ -64,6 +66,11 @@ const UpgradeStateAnnotation = "nvidia.com/ofed-upgrade-state"
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *UpgradeReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling Upgrade")

Expand Down
14 changes: 10 additions & 4 deletions controllers/upgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ var _ = Describe("Upgrade Controller", func() {
})
Context("When NicClusterPolicy CR is created", func() {
It("Upgrade policy is disabled", func() {
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)
upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}

req := ctrl.Request{NamespacedName: types.NamespacedName{Name: consts.NicClusterPolicyResourceName}}
Expand All @@ -76,10 +79,13 @@ var _ = Describe("Upgrade Controller", func() {
err := k8sClient.Create(goctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
}
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}
// Call removeNodeUpgradeStateLabels function
err := upgradeReconciler.removeNodeUpgradeStateLabels(goctx.TODO())
Expand Down
59 changes: 38 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func setupWebhookControllers(mgr ctrl.Manager) error {
return nil
}

func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager) error {
func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager, migrationChan chan struct{}) error {
ctrLog := setupLog.WithName("controller")
clusterTypeProvider, err := clustertype.NewProvider(ctx, c)

Expand All @@ -93,6 +93,7 @@ func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager)
Scheme: mgr.GetScheme(),
ClusterTypeProvider: clusterTypeProvider, // we want to cache information about the cluster type
StaticConfigProvider: staticInfoProvider,
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("NicClusterPolicy")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NicClusterPolicy")
return err
Expand Down Expand Up @@ -161,35 +162,26 @@ func main() {
os.Exit(1)
}

// run migration logic before controllers start
if err := migrate.Migrate(stopCtx, setupLog.WithName("migrate"), directClient); err != nil {
setupLog.Error(err, "failed to run migration logic")
os.Exit(1)
migrationCompletionChan := make(chan struct{})
m := migrate.Migrator{
K8sClient: directClient,
MigrationCh: migrationCompletionChan,
LeaderElection: enableLeaderElection,
Logger: ctrl.Log.WithName("Migrator"),
}

err = setupCRDControllers(stopCtx, directClient, mgr)
err = mgr.Add(&m)
if err != nil {
setupLog.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

err = setupCRDControllers(stopCtx, directClient, mgr, migrationCompletionChan)
if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
os.Exit(1)
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
err = setupUpgradeController(mgr, migrationCompletionChan)
if err != nil {
os.Exit(1)
}

Expand All @@ -216,3 +208,28 @@ func main() {
os.Exit(1)
}
}

func setupUpgradeController(mgr ctrl.Manager, migrationChan chan struct{}) error {
upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
return err
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
return err
}
return nil
}
19 changes: 15 additions & 4 deletions manifests/state-ofed-driver/0050_ofed-driver-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
nvidia.com/ofed-driver: ""
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-ds
mofed-ds-format-version: "1"
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}-ds
namespace: {{ .RuntimeSpec.Namespace }}
spec:
updateStrategy:
type: OnDelete
selector:
matchLabels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
template:
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelMajor }}.{{ .RuntimeSpec.KernelMinor }}
driver-pod: mofed-{{ .CrSpec.Version }}
nvidia.com/ofed-driver: ""
spec:
Expand All @@ -40,6 +41,14 @@ spec:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: nvidia.com/ofed-driver
operator: Exists
topologyKey: kubernetes.io/hostname
serviceAccountName: ofed-driver
hostNetwork: true
{{- if .CrSpec.ImagePullSecrets }}
Expand Down Expand Up @@ -161,6 +170,8 @@ spec:
feature.node.kubernetes.io/pci-15b3.present: "true"
feature.node.kubernetes.io/system-os_release.ID: {{ .RuntimeSpec.OSName }}
feature.node.kubernetes.io/system-os_release.VERSION_ID: "{{ .RuntimeSpec.OSVer }}"
feature.node.kubernetes.io/kernel-version.major: "{{ .RuntimeSpec.KernelMajor }}"
feature.node.kubernetes.io/kernel-version.minor: "{{ .RuntimeSpec.KernelMinor }}"
{{- if .NodeAffinity }}
affinity:
nodeAffinity:
Expand Down
76 changes: 76 additions & 0 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,8 +31,34 @@ import (

"github.com/Mellanox/network-operator/pkg/config"
"github.com/Mellanox/network-operator/pkg/consts"

mellanoxv1alpha1 "github.com/Mellanox/network-operator/api/v1alpha1"
)

// Migrator migrates from previous versions
type Migrator struct {
K8sClient client.Client
MigrationCh chan struct{}
LeaderElection bool
Logger logr.Logger
}

// Implements manager.NeedLeaderElection
func (m *Migrator) NeedLeaderElection() bool {
return m.LeaderElection
}

// Implements manager.Runnable
func (m *Migrator) Start(ctx context.Context) error {
err := Migrate(ctx, m.Logger, m.K8sClient)
if err != nil {
m.Logger.Error(err, "failed to migrate Network Operator")
return err
}
close(m.MigrationCh)
return nil
}

// Migrate contains logic which should run once during controller start.
// The main use case for this handler is network-operator upgrade
// for example, the handler can contain logic to change old data format to a new one or
Expand All @@ -54,6 +81,11 @@ func migrate(ctx context.Context, log logr.Logger, c client.Client) error {
log.V(consts.LogLevelError).Error(err, "error trying to remove state label on NV IPAM configmap")
return err
}
if err := handleSingleMofedDS(ctx, log, c); err != nil {
// critical for the operator operation, will fail Mofed migration
log.V(consts.LogLevelError).Error(err, "error trying to remove state label on NV IPAM configmap")
return err
}
return nil
}

Expand Down Expand Up @@ -114,3 +146,47 @@ func removeStateLabelFromNVIpamConfigMap(ctx context.Context, log logr.Logger, c
}
return nil
}

func handleSingleMofedDS(ctx context.Context, log logr.Logger, c client.Client) error {
ncp := &mellanoxv1alpha1.NicClusterPolicy{}
key := types.NamespacedName{
Name: consts.NicClusterPolicyResourceName,
}
err := c.Get(ctx, key, ncp)
if apiErrors.IsNotFound(err) {
log.V(consts.LogLevelDebug).Info("NIC ClusterPolicy not found, skip handling single MOFED DS")
return nil
} else if err != nil {
log.V(consts.LogLevelError).Error(err, "fail to get NIC ClusterPolicy")
return err
}
if ncp.Spec.OFEDDriver == nil {
return nil
}
log.V(consts.LogLevelDebug).Info("Searching for single MOFED DS")
dsList := &appsv1.DaemonSetList{}
err = c.List(ctx, dsList, client.MatchingLabels{"nvidia.com/ofed-driver": ""})
if err != nil {
log.V(consts.LogLevelError).Error(err, "fail to list MOFED DS")
return err
}
for i := range dsList.Items {
mofedDs := &dsList.Items[i]
// The single MOFED DS does not contain the label "mofed-ds-format-version"
_, ok := mofedDs.Labels["mofed-ds-format-version"]
if ok {
continue
}
log.V(consts.LogLevelDebug).Info("Found single MOFED DS", "name", mofedDs.Name)
policy := metav1.DeletePropagationOrphan
err = c.Delete(ctx, mofedDs, &client.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
log.V(consts.LogLevelError).Error(err, "fail delete single MOFED DS")
return err
}
log.V(consts.LogLevelDebug).Info("Deleted single MOFED DS with orphaned", "name", mofedDs.Name)
return nil
}
log.V(consts.LogLevelDebug).Info("Single MOFED DS not found")
return nil
}
17 changes: 16 additions & 1 deletion pkg/migrate/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package migrate

import (
"context"
"os"
"testing"
"time"

Expand All @@ -30,6 +31,8 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

mellanoxcomv1alpha1 "github.com/Mellanox/network-operator/api/v1alpha1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -58,9 +61,21 @@ var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
testLog = logf.Log.WithName("test-log").WithName("setup")

err := mellanoxcomv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

// +kubebuilder:scaffold:scheme

By("bootstrapping test environment")
// Go to project root directory
err = os.Chdir("../..")
Expect(err).NotTo(HaveOccurred())

testEnv = &envtest.Environment{}
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{"config/crd/bases"},
CRDInstallOptions: envtest.CRDInstallOptions{ErrorIfPathMissing: true},
ErrorIfCRDPathMissing: true,
}

cfg, err := testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading

0 comments on commit e765759

Please sign in to comment.