From 5e0868b84aca3cc601ad00cc0e37f8bf01d3efd6 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 23 Oct 2023 16:23:56 +0800 Subject: [PATCH] Eliminate multiple client creations Longhorn 6866 Longhorn 6936 Signed-off-by: Derek Su --- app/daemon.go | 20 ++++-- app/recovery_backend.go | 26 +------ app/webhook.go | 15 +--- controller/controller_manager.go | 78 +++------------------ util/client/client.go | 114 ++++++++++++++++++++----------- webhook/server/mutation.go | 38 +++++------ webhook/server/server.go | 48 +++++-------- webhook/server/validation.go | 30 ++++---- 8 files changed, 155 insertions(+), 214 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index 743616a22b..83297c04e1 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -25,6 +25,7 @@ import ( "github.com/longhorn/longhorn-manager/types" "github.com/longhorn/longhorn-manager/upgrade" "github.com/longhorn/longhorn-manager/util" + "github.com/longhorn/longhorn-manager/util/client" metricscollector "github.com/longhorn/longhorn-manager/metrics_collector" ) @@ -138,14 +139,19 @@ func startManager(c *cli.Context) error { logger := logrus.StandardLogger().WithField("node", currentNodeID) + clients, err := client.NewClients(kubeconfigPath, ctx.Done()) + if err != nil { + return err + } + webhookTypes := []string{types.WebhookTypeConversion, types.WebhookTypeAdmission} for _, webhookType := range webhookTypes { - if err := startWebhook(ctx, serviceAccount, kubeconfigPath, webhookType); err != nil { + if err := startWebhook(ctx, webhookType, clients); err != nil { return err } } - if err := startRecoveryBackend(ctx, serviceAccount, kubeconfigPath); err != nil { + if err := startRecoveryBackend(clients); err != nil { return err } @@ -155,16 +161,16 @@ func startManager(c *cli.Context) error { proxyConnCounter := util.NewAtomicCounter() - ds, wsc, err := controller.StartControllers(logger, ctx.Done(), + wsc, err := controller.StartControllers(logger, clients, currentNodeID, serviceAccount, managerImage, backingImageManagerImage, shareManagerImage, kubeconfigPath, meta.Version, proxyConnCounter) if err != nil { return err } - m := manager.NewVolumeManager(currentNodeID, ds, proxyConnCounter) + m := manager.NewVolumeManager(currentNodeID, clients.Datastore, proxyConnCounter) - metricscollector.InitMetricsCollectorSystem(logger, currentNodeID, ds, kubeconfigPath, proxyConnCounter) + metricscollector.InitMetricsCollectorSystem(logger, currentNodeID, clients.Datastore, kubeconfigPath, proxyConnCounter) defaultImageSettings := map[types.SettingName]string{ types.SettingNameDefaultEngineImage: engineImage, @@ -172,7 +178,7 @@ func startManager(c *cli.Context) error { types.SettingNameDefaultBackingImageManagerImage: backingImageManagerImage, types.SettingNameSupportBundleManagerImage: supportBundleManagerImage, } - if err := ds.UpdateCustomizedSettings(defaultImageSettings); err != nil { + if err := clients.Datastore.UpdateCustomizedSettings(defaultImageSettings); err != nil { return err } @@ -180,7 +186,7 @@ func startManager(c *cli.Context) error { return err } - if err := initDaemonNode(ds); err != nil { + if err := initDaemonNode(clients.Datastore); err != nil { return err } diff --git a/app/recovery_backend.go b/app/recovery_backend.go index 7344e79286..a66d4ed00d 100644 --- a/app/recovery_backend.go +++ b/app/recovery_backend.go @@ -1,40 +1,18 @@ package app import ( - "context" - "fmt" "net/http" "github.com/sirupsen/logrus" - "k8s.io/client-go/tools/clientcmd" - "github.com/longhorn/longhorn-manager/recovery_backend/server" - "github.com/longhorn/longhorn-manager/types" - "github.com/longhorn/longhorn-manager/util" "github.com/longhorn/longhorn-manager/util/client" ) -func startRecoveryBackend(ctx context.Context, serviceAccount, kubeconfigPath string) error { +func startRecoveryBackend(clients *client.Clients) error { logrus.Info("Starting longhorn recovery-backend server") - namespace := util.GetNamespace(types.EnvPodNamespace) - - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) - if err != nil { - return fmt.Errorf("failed to get client config: %v", err) - } - - client, err := client.NewClient(ctx, cfg, namespace, true) - if err != nil { - return err - } - - if err := client.Start(ctx); err != nil { - return err - } - - s := server.New(namespace, client.Datastore) + s := server.New(clients.Namespace, clients.Datastore) router := http.Handler(server.NewRouter(s)) go func() { if err := s.ListenAndServe(router); err != nil { diff --git a/app/webhook.go b/app/webhook.go index 29e886b839..69c79aa863 100644 --- a/app/webhook.go +++ b/app/webhook.go @@ -10,10 +10,8 @@ import ( "github.com/sirupsen/logrus" - "k8s.io/client-go/tools/clientcmd" - "github.com/longhorn/longhorn-manager/types" - "github.com/longhorn/longhorn-manager/util" + "github.com/longhorn/longhorn-manager/util/client" "github.com/longhorn/longhorn-manager/webhook/server" ) @@ -21,7 +19,7 @@ var ( defaultStartTimeout = 60 * time.Second ) -func startWebhook(ctx context.Context, serviceAccount, kubeconfigPath, webhookType string) error { +func startWebhook(ctx context.Context, webhookType string, clients *client.Clients) error { logrus.Infof("Starting longhorn %s webhook server", webhookType) var webhookPort int @@ -34,14 +32,7 @@ func startWebhook(ctx context.Context, serviceAccount, kubeconfigPath, webhookTy return fmt.Errorf("unexpected webhook server type %v", webhookType) } - namespace := util.GetNamespace(types.EnvPodNamespace) - - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) - if err != nil { - return fmt.Errorf("failed to get client config: %v", err) - } - - s := server.New(ctx, cfg, namespace, webhookType) + s := server.New(ctx, clients.Namespace, webhookType, clients) go func() { if err := s.ListenAndServe(); err != nil { logrus.Fatalf("Error %v webhook server failed: %v", webhookType, err) diff --git a/controller/controller_manager.go b/controller/controller_manager.go index d78aac816e..b3aff5ca26 100644 --- a/controller/controller_manager.go +++ b/controller/controller_manager.go @@ -3,7 +3,6 @@ package controller import ( "fmt" "math" - "os" "strconv" "time" @@ -12,24 +11,17 @@ import ( "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" corev1 "k8s.io/api/core/v1" - apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - clientset "k8s.io/client-go/kubernetes" - metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned" "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/engineapi" "github.com/longhorn/longhorn-manager/types" "github.com/longhorn/longhorn-manager/util" + "github.com/longhorn/longhorn-manager/util/client" longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" - lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" - lhinformers "github.com/longhorn/longhorn-manager/k8s/pkg/client/informers/externalversions" ) var ( @@ -38,59 +30,15 @@ var ( ) // StartControllers initiates all Longhorn component controllers and monitors to manage the creating, updating, and deletion of Longhorn resources -func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{}, +func StartControllers(logger logrus.FieldLogger, clients *client.Clients, controllerID, serviceAccount, managerImage, backingImageManagerImage, shareManagerImage, - kubeconfigPath, version string, proxyConnCounter util.Counter) (*datastore.DataStore, *WebsocketController, error) { - namespace := os.Getenv(types.EnvPodNamespace) - if namespace == "" { - logrus.Warnf("Cannot detect pod namespace, environment variable %v is missing, "+ - "using default namespace", types.EnvPodNamespace) - namespace = corev1.NamespaceDefault - } - - config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to get client config") - } - - config.Burst = 100 - config.QPS = 50 - - kubeClient, err := clientset.NewForConfig(config) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to get k8s client") - } - - lhClient, err := lhclientset.NewForConfig(config) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to get clientset") - } - - extensionsClient, err := apiextensionsclientset.NewForConfig(config) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to get k8s extension client") - } - - metricsClient, err := metricsclientset.NewForConfig(config) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to get metrics client") - } - - scheme := runtime.NewScheme() - if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil { - return nil, nil, errors.Wrap(err, "unable to create scheme") - } - - // TODO: there shouldn't be a need for a 30s resync period unless our code is buggy and our controllers aren't really - // level based. What we are effectively doing with this is hiding faulty logic in production. - // Another reason for increasing this substantially, is that it introduces a lot of unnecessary work and will - // lead to scalability problems, since we dump the whole cache of each object back in to the reconciler every 30 seconds. - // if a specific controller requires a periodic resync, one enable it only for that informer, add a resync to the event handler, go routine, etc. - // some refs to look at: https://github.com/kubernetes-sigs/controller-runtime/issues/521 - kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30) - lhInformerFactory := lhinformers.NewSharedInformerFactory(lhClient, time.Second*30) - - ds := datastore.NewDataStore(lhInformerFactory, lhClient, kubeInformerFactory, kubeClient, extensionsClient, namespace) + kubeconfigPath, version string, proxyConnCounter util.Counter) (*WebsocketController, error) { + namespace := clients.Namespace + kubeClient := clients.Clients.K8s + metricsClient := clients.MetricsClient + ds := clients.Datastore + scheme := clients.Scheme + stopCh := clients.StopCh // Longhorn controllers replicaController := NewReplicaController(logger, ds, scheme, kubeClient, namespace, controllerID) @@ -129,12 +77,6 @@ func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{}, kubernetesSecretController := NewKubernetesSecretController(logger, ds, scheme, kubeClient, controllerID, namespace) kubernetesPDBController := NewKubernetesPDBController(logger, ds, kubeClient, controllerID, namespace) - go kubeInformerFactory.Start(stopCh) - go lhInformerFactory.Start(stopCh) - if !ds.Sync(stopCh) { - return nil, nil, fmt.Errorf("datastore cache sync up failed") - } - // Start goroutines for Longhorn controllers go replicaController.Run(Workers, stopCh) go engineController.Run(Workers, stopCh) @@ -172,7 +114,7 @@ func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{}, go kubernetesSecretController.Run(Workers, stopCh) go kubernetesPDBController.Run(Workers, stopCh) - return ds, websocketController, nil + return websocketController, nil } func ParseResourceRequirement(val string) (*corev1.ResourceRequirements, error) { diff --git a/util/client/client.go b/util/client/client.go index a7fb7f3517..3f527022da 100644 --- a/util/client/client.go +++ b/util/client/client.go @@ -1,75 +1,111 @@ package client import ( - "context" "fmt" + "os" "time" "github.com/pkg/errors" - "github.com/rancher/wrangler/pkg/clients" - "github.com/rancher/wrangler/pkg/schemes" + wranglerClients "github.com/rancher/wrangler/pkg/clients" + wranglerSchemes "github.com/rancher/wrangler/pkg/schemes" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" - "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - clientset "k8s.io/client-go/kubernetes" + metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned" "github.com/longhorn/longhorn-manager/datastore" - + longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" lhinformers "github.com/longhorn/longhorn-manager/k8s/pkg/client/informers/externalversions" + "github.com/longhorn/longhorn-manager/types" ) -type Client struct { - clients.Clients - Datastore *datastore.DataStore +type Clients struct { + wranglerClients.Clients + MetricsClient *metricsclientset.Clientset + Scheme *runtime.Scheme + Namespace string + Datastore *datastore.DataStore + StopCh <-chan struct{} } -func NewClient(ctx context.Context, config *rest.Config, namespace string, needDataStore bool) (*Client, error) { - if err := schemes.Register(appsv1.AddToScheme); err != nil { - return nil, err +func NewClients(kubeconfigPath string, stopCh <-chan struct{}) (*Clients, error) { + namespace := os.Getenv(types.EnvPodNamespace) + if namespace == "" { + logrus.Warnf("Cannot detect pod namespace, environment variable %v is missing, using default namespace", types.EnvPodNamespace) + namespace = corev1.NamespaceDefault } - clients, err := clients.NewFromConfig(config, nil) + config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { - return nil, err + return nil, errors.Wrap(err, "unable to get client config") } - var ds *datastore.DataStore + config.Burst = 100 + config.QPS = 50 - if needDataStore { - kubeClient, err := clientset.NewForConfig(config) - if err != nil { - return nil, errors.Wrap(err, "unable to get k8s client") - } + if err := wranglerSchemes.Register(appsv1.AddToScheme); err != nil { + return nil, err + } - extensionsClient, err := apiextensionsclientset.NewForConfig(config) - if err != nil { - return nil, errors.Wrap(err, "unable to get k8s extension client") - } + // Create k8s client + clients, err := wranglerClients.NewFromConfig(config, nil) + if err != nil { + return nil, errors.Wrap(err, "unable to get k8s client") + } - lhClient, err := lhclientset.NewForConfig(config) - if err != nil { - return nil, errors.Wrap(err, "unable to get lh client") - } + // Create Longhorn client + lhClient, err := lhclientset.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "unable to get clientset") + } - kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30) - lhInformerFactory := lhinformers.NewSharedInformerFactory(lhClient, time.Second*30) + scheme := runtime.NewScheme() + if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil { + return nil, errors.Wrap(err, "unable to create scheme") + } - ds = datastore.NewDataStore(lhInformerFactory, lhClient, kubeInformerFactory, kubeClient, extensionsClient, namespace) + // Create API extension client + extensionsClient, err := apiextensionsclientset.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "unable to get k8s extension client") + } - go kubeInformerFactory.Start(ctx.Done()) - go lhInformerFactory.Start(ctx.Done()) + // Create metrics client + metricsClient, err := metricsclientset.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "unable to get metrics client") + } - if !ds.Sync(ctx.Done()) { - return nil, fmt.Errorf("datastore cache sync up failed") - } + // TODO: there shouldn't be a need for a 30s resync period unless our code is buggy and our controllers aren't really + // level based. What we are effectively doing with this is hiding faulty logic in production. + // Another reason for increasing this substantially, is that it introduces a lot of unnecessary work and will + // lead to scalability problems, since we dump the whole cache of each object back in to the reconciler every 30 seconds. + // if a specific controller requires a periodic resync, one enable it only for that informer, add a resync to the event handler, go routine, etc. + // some refs to look at: https://github.com/kubernetes-sigs/controller-runtime/issues/521 + kubeInformerFactory := informers.NewSharedInformerFactory(clients.K8s, time.Second*30) + lhInformerFactory := lhinformers.NewSharedInformerFactory(lhClient, time.Second*30) + + ds := datastore.NewDataStore(lhInformerFactory, lhClient, kubeInformerFactory, clients.K8s, extensionsClient, namespace) + + go kubeInformerFactory.Start(stopCh) + go lhInformerFactory.Start(stopCh) + if !ds.Sync(stopCh) { + return nil, fmt.Errorf("datastore cache sync up failed") } - return &Client{ - Clients: *clients, - Datastore: ds, + return &Clients{ + Clients: *clients, + MetricsClient: metricsClient, + Scheme: scheme, + Namespace: namespace, + Datastore: ds, + StopCh: stopCh, }, nil } diff --git a/webhook/server/mutation.go b/webhook/server/mutation.go index c77b327657..bd14cf15eb 100644 --- a/webhook/server/mutation.go +++ b/webhook/server/mutation.go @@ -5,7 +5,7 @@ import ( "github.com/rancher/wrangler/pkg/webhook" - "github.com/longhorn/longhorn-manager/util/client" + "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/webhook/admission" "github.com/longhorn/longhorn-manager/webhook/resources/backingimage" "github.com/longhorn/longhorn-manager/webhook/resources/backingimagedatasource" @@ -26,26 +26,26 @@ import ( "github.com/longhorn/longhorn-manager/webhook/resources/volumeattachment" ) -func Mutation(client *client.Client) (http.Handler, []admission.Resource, error) { +func Mutation(ds *datastore.DataStore) (http.Handler, []admission.Resource, error) { resources := []admission.Resource{} mutators := []admission.Mutator{ - backup.NewMutator(client.Datastore), - backingimage.NewMutator(client.Datastore), - backingimagemanager.NewMutator(client.Datastore), - backingimagedatasource.NewMutator(client.Datastore), - node.NewMutator(client.Datastore), - volume.NewMutator(client.Datastore), - engine.NewMutator(client.Datastore), - recurringjob.NewMutator(client.Datastore), - engineimage.NewMutator(client.Datastore), - orphan.NewMutator(client.Datastore), - sharemanager.NewMutator(client.Datastore), - backupvolume.NewMutator(client.Datastore), - snapshot.NewMutator(client.Datastore), - replica.NewMutator(client.Datastore), - supportbundle.NewMutator(client.Datastore), - systembackup.NewMutator(client.Datastore), - volumeattachment.NewMutator(client.Datastore), + backup.NewMutator(ds), + backingimage.NewMutator(ds), + backingimagemanager.NewMutator(ds), + backingimagedatasource.NewMutator(ds), + node.NewMutator(ds), + volume.NewMutator(ds), + engine.NewMutator(ds), + recurringjob.NewMutator(ds), + engineimage.NewMutator(ds), + orphan.NewMutator(ds), + sharemanager.NewMutator(ds), + backupvolume.NewMutator(ds), + snapshot.NewMutator(ds), + replica.NewMutator(ds), + supportbundle.NewMutator(ds), + systembackup.NewMutator(ds), + volumeattachment.NewMutator(ds), } router := webhook.NewRouter() diff --git a/webhook/server/server.go b/webhook/server/server.go index e40cde7977..bff770424f 100644 --- a/webhook/server/server.go +++ b/webhook/server/server.go @@ -11,8 +11,6 @@ import ( "github.com/rancher/dynamiclistener/server" "github.com/sirupsen/logrus" - "k8s.io/client-go/rest" - admissionregv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -47,31 +45,26 @@ var ( type WebhookServer struct { context context.Context - cfg *rest.Config namespace string webhookType string + clients *client.Clients } -func New(ctx context.Context, cfg *rest.Config, namespace, webhookType string) *WebhookServer { +func New(ctx context.Context, namespace, webhookType string, clients *client.Clients) *WebhookServer { return &WebhookServer{ context: ctx, - cfg: cfg, namespace: namespace, webhookType: webhookType, + clients: clients, } } func (s *WebhookServer) admissionWebhookListenAndServe() error { - client, err := client.NewClient(s.context, s.cfg, s.namespace, true) - if err != nil { - return err - } - - validationHandler, validationResources, err := Validation(client) + validationHandler, validationResources, err := Validation(s.clients.Datastore) if err != nil { return err } - mutationHandler, mutationResources, err := Mutation(client) + mutationHandler, mutationResources, err := Mutation(s.clients.Datastore) if err != nil { return err } @@ -81,19 +74,14 @@ func (s *WebhookServer) admissionWebhookListenAndServe() error { router.Handle("/v1/healthz", newhealthzHandler()) router.Handle(validationPath, validationHandler) router.Handle(mutationPath, mutationHandler) - if err := s.runAdmissionWebhookListenAndServe(client, router, validationResources, mutationResources); err != nil { + if err := s.runAdmissionWebhookListenAndServe(router, validationResources, mutationResources); err != nil { return err } - return client.Start(s.context) + return s.clients.Start(s.context) } func (s *WebhookServer) conversionWebhookListenAndServe() error { - client, err := client.NewClient(s.context, s.cfg, s.namespace, false) - if err != nil { - return err - } - conversionHandler, conversionResources, err := Conversion() if err != nil { return err @@ -103,11 +91,11 @@ func (s *WebhookServer) conversionWebhookListenAndServe() error { router.Handle("/v1/healthz", newhealthzHandler()) router.Handle(conversionPath, conversionHandler) - if err := s.runConversionWebhookListenAndServe(client, router, conversionResources); err != nil { + if err := s.runConversionWebhookListenAndServe(router, conversionResources); err != nil { return err } - return client.Start(s.context) + return s.clients.Start(s.context) } func (s *WebhookServer) ListenAndServe() error { @@ -121,9 +109,9 @@ func (s *WebhookServer) ListenAndServe() error { } } -func (s *WebhookServer) runAdmissionWebhookListenAndServe(client *client.Client, handler http.Handler, validationResources []admission.Resource, mutationResources []admission.Resource) error { - apply := client.Apply.WithDynamicLookup() - client.Core.Secret().OnChange(s.context, "secrets", func(key string, secret *corev1.Secret) (*corev1.Secret, error) { +func (s *WebhookServer) runAdmissionWebhookListenAndServe(handler http.Handler, validationResources []admission.Resource, mutationResources []admission.Resource) error { + apply := s.clients.Apply.WithDynamicLookup() + s.clients.Core.Secret().OnChange(s.context, "secrets", func(key string, secret *corev1.Secret) (*corev1.Secret, error) { if secret == nil || secret.Name != caName || secret.Namespace != s.namespace || len(secret.Data[corev1.TLSCertKey]) == 0 { return nil, nil } @@ -191,7 +179,7 @@ func (s *WebhookServer) runAdmissionWebhookListenAndServe(client *client.Client, tlsName := fmt.Sprintf("%s.%s.svc", admissionWebhookServiceName, s.namespace) return server.ListenAndServe(s.context, types.DefaultAdmissionWebhookPort, 0, handler, &server.ListenOpts{ - Secrets: client.Core.Secret(), + Secrets: s.clients.Core.Secret(), CertNamespace: s.namespace, CertName: certName, CAName: caName, @@ -204,8 +192,8 @@ func (s *WebhookServer) runAdmissionWebhookListenAndServe(client *client.Client, }) } -func (s *WebhookServer) runConversionWebhookListenAndServe(client *client.Client, handler http.Handler, conversionResources []string) error { - client.Core.Secret().OnChange(s.context, "secrets", func(key string, secret *corev1.Secret) (*corev1.Secret, error) { +func (s *WebhookServer) runConversionWebhookListenAndServe(handler http.Handler, conversionResources []string) error { + s.clients.Core.Secret().OnChange(s.context, "secrets", func(key string, secret *corev1.Secret) (*corev1.Secret, error) { if secret == nil || secret.Name != caName || secret.Namespace != s.namespace || len(secret.Data[corev1.TLSCertKey]) == 0 { return nil, nil } @@ -214,7 +202,7 @@ func (s *WebhookServer) runConversionWebhookListenAndServe(client *client.Client logrus.Infof("Building conversion rules...") for _, name := range conversionResources { - crd, err := client.CRD.CustomResourceDefinition().Get(name, metav1.GetOptions{}) + crd, err := s.clients.CRD.CustomResourceDefinition().Get(name, metav1.GetOptions{}) if err != nil { return secret, err } @@ -238,7 +226,7 @@ func (s *WebhookServer) runConversionWebhookListenAndServe(client *client.Client if !reflect.DeepEqual(existingCRD, crd) { logrus.Infof("Update CRD for %+v", name) - if _, err = client.CRD.CustomResourceDefinition().Update(crd); err != nil { + if _, err = s.clients.CRD.CustomResourceDefinition().Update(crd); err != nil { return secret, err } } @@ -250,7 +238,7 @@ func (s *WebhookServer) runConversionWebhookListenAndServe(client *client.Client tlsName := fmt.Sprintf("%s.%s.svc", conversionWebhookServiceName, s.namespace) return server.ListenAndServe(s.context, types.DefaultConversionWebhookPort, 0, handler, &server.ListenOpts{ - Secrets: client.Core.Secret(), + Secrets: s.clients.Core.Secret(), CertNamespace: s.namespace, CertName: certName, CAName: caName, diff --git a/webhook/server/validation.go b/webhook/server/validation.go index 53c0ee3d46..6230bd747f 100644 --- a/webhook/server/validation.go +++ b/webhook/server/validation.go @@ -5,9 +5,9 @@ import ( "github.com/rancher/wrangler/pkg/webhook" + "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/types" "github.com/longhorn/longhorn-manager/util" - "github.com/longhorn/longhorn-manager/util/client" "github.com/longhorn/longhorn-manager/webhook/admission" "github.com/longhorn/longhorn-manager/webhook/resources/backingimage" "github.com/longhorn/longhorn-manager/webhook/resources/engine" @@ -24,7 +24,7 @@ import ( "github.com/longhorn/longhorn-manager/webhook/resources/volumeattachment" ) -func Validation(client *client.Client) (http.Handler, []admission.Resource, error) { +func Validation(ds *datastore.DataStore) (http.Handler, []admission.Resource, error) { currentNodeID, err := util.GetRequiredEnv(types.EnvNodeName) if err != nil { return nil, nil, err @@ -32,19 +32,19 @@ func Validation(client *client.Client) (http.Handler, []admission.Resource, erro resources := []admission.Resource{} validators := []admission.Validator{ - node.NewValidator(client.Datastore), - setting.NewValidator(client.Datastore), - recurringjob.NewValidator(client.Datastore), - backingimage.NewValidator(client.Datastore), - volume.NewValidator(client.Datastore, currentNodeID), - orphan.NewValidator(client.Datastore), - snapshot.NewValidator(client.Datastore), - supportbundle.NewValidator(client.Datastore), - systembackup.NewValidator(client.Datastore), - systemrestore.NewValidator(client.Datastore), - volumeattachment.NewValidator(client.Datastore), - engine.NewValidator(client.Datastore), - replica.NewValidator(client.Datastore), + node.NewValidator(ds), + setting.NewValidator(ds), + recurringjob.NewValidator(ds), + backingimage.NewValidator(ds), + volume.NewValidator(ds, currentNodeID), + orphan.NewValidator(ds), + snapshot.NewValidator(ds), + supportbundle.NewValidator(ds), + systembackup.NewValidator(ds), + systemrestore.NewValidator(ds), + volumeattachment.NewValidator(ds), + engine.NewValidator(ds), + replica.NewValidator(ds), } router := webhook.NewRouter()