Skip to content

Commit

Permalink
Eliminate multiple client creations
Browse files Browse the repository at this point in the history
Longhorn 6866
Longhorn 6936

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit authored and David Ko committed Oct 23, 2023
1 parent 5c67a2b commit 5e0868b
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 214 deletions.
20 changes: 13 additions & 7 deletions app/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/upgrade"
"github.com/longhorn/longhorn-manager/util"
"github.com/longhorn/longhorn-manager/util/client"

metricscollector "github.com/longhorn/longhorn-manager/metrics_collector"
)
Expand Down Expand Up @@ -138,14 +139,19 @@ func startManager(c *cli.Context) error {

logger := logrus.StandardLogger().WithField("node", currentNodeID)

clients, err := client.NewClients(kubeconfigPath, ctx.Done())
if err != nil {
return err
}

webhookTypes := []string{types.WebhookTypeConversion, types.WebhookTypeAdmission}
for _, webhookType := range webhookTypes {
if err := startWebhook(ctx, serviceAccount, kubeconfigPath, webhookType); err != nil {
if err := startWebhook(ctx, webhookType, clients); err != nil {
return err
}
}

if err := startRecoveryBackend(ctx, serviceAccount, kubeconfigPath); err != nil {
if err := startRecoveryBackend(clients); err != nil {
return err
}

Expand All @@ -155,32 +161,32 @@ func startManager(c *cli.Context) error {

proxyConnCounter := util.NewAtomicCounter()

ds, wsc, err := controller.StartControllers(logger, ctx.Done(),
wsc, err := controller.StartControllers(logger, clients,
currentNodeID, serviceAccount, managerImage, backingImageManagerImage, shareManagerImage,
kubeconfigPath, meta.Version, proxyConnCounter)
if err != nil {
return err
}

m := manager.NewVolumeManager(currentNodeID, ds, proxyConnCounter)
m := manager.NewVolumeManager(currentNodeID, clients.Datastore, proxyConnCounter)

metricscollector.InitMetricsCollectorSystem(logger, currentNodeID, ds, kubeconfigPath, proxyConnCounter)
metricscollector.InitMetricsCollectorSystem(logger, currentNodeID, clients.Datastore, kubeconfigPath, proxyConnCounter)

defaultImageSettings := map[types.SettingName]string{
types.SettingNameDefaultEngineImage: engineImage,
types.SettingNameDefaultInstanceManagerImage: instanceManagerImage,
types.SettingNameDefaultBackingImageManagerImage: backingImageManagerImage,
types.SettingNameSupportBundleManagerImage: supportBundleManagerImage,
}
if err := ds.UpdateCustomizedSettings(defaultImageSettings); err != nil {
if err := clients.Datastore.UpdateCustomizedSettings(defaultImageSettings); err != nil {
return err
}

if err := updateRegistrySecretName(m); err != nil {
return err
}

if err := initDaemonNode(ds); err != nil {
if err := initDaemonNode(clients.Datastore); err != nil {
return err
}

Expand Down
26 changes: 2 additions & 24 deletions app/recovery_backend.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,18 @@
package app

import (
"context"
"fmt"
"net/http"

"github.com/sirupsen/logrus"

"k8s.io/client-go/tools/clientcmd"

"github.com/longhorn/longhorn-manager/recovery_backend/server"
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"
"github.com/longhorn/longhorn-manager/util/client"
)

func startRecoveryBackend(ctx context.Context, serviceAccount, kubeconfigPath string) error {
func startRecoveryBackend(clients *client.Clients) error {
logrus.Info("Starting longhorn recovery-backend server")

namespace := util.GetNamespace(types.EnvPodNamespace)

cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return fmt.Errorf("failed to get client config: %v", err)
}

client, err := client.NewClient(ctx, cfg, namespace, true)
if err != nil {
return err
}

if err := client.Start(ctx); err != nil {
return err
}

s := server.New(namespace, client.Datastore)
s := server.New(clients.Namespace, clients.Datastore)
router := http.Handler(server.NewRouter(s))
go func() {
if err := s.ListenAndServe(router); err != nil {
Expand Down
15 changes: 3 additions & 12 deletions app/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ import (

"github.com/sirupsen/logrus"

"k8s.io/client-go/tools/clientcmd"

"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"
"github.com/longhorn/longhorn-manager/util/client"
"github.com/longhorn/longhorn-manager/webhook/server"
)

var (
defaultStartTimeout = 60 * time.Second
)

func startWebhook(ctx context.Context, serviceAccount, kubeconfigPath, webhookType string) error {
func startWebhook(ctx context.Context, webhookType string, clients *client.Clients) error {
logrus.Infof("Starting longhorn %s webhook server", webhookType)

var webhookPort int
Expand All @@ -34,14 +32,7 @@ func startWebhook(ctx context.Context, serviceAccount, kubeconfigPath, webhookTy
return fmt.Errorf("unexpected webhook server type %v", webhookType)
}

namespace := util.GetNamespace(types.EnvPodNamespace)

cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return fmt.Errorf("failed to get client config: %v", err)
}

s := server.New(ctx, cfg, namespace, webhookType)
s := server.New(ctx, clients.Namespace, webhookType, clients)
go func() {
if err := s.ListenAndServe(); err != nil {
logrus.Fatalf("Error %v webhook server failed: %v", webhookType, err)
Expand Down
78 changes: 10 additions & 68 deletions controller/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controller
import (
"fmt"
"math"
"os"
"strconv"
"time"

Expand All @@ -12,24 +11,17 @@ import (
"golang.org/x/time/rate"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

corev1 "k8s.io/api/core/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
clientset "k8s.io/client-go/kubernetes"
metricsclientset "k8s.io/metrics/pkg/client/clientset/versioned"

"github.com/longhorn/longhorn-manager/datastore"
"github.com/longhorn/longhorn-manager/engineapi"
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"
"github.com/longhorn/longhorn-manager/util/client"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
lhinformers "github.com/longhorn/longhorn-manager/k8s/pkg/client/informers/externalversions"
)

var (
Expand All @@ -38,59 +30,15 @@ var (
)

// StartControllers initiates all Longhorn component controllers and monitors to manage the creating, updating, and deletion of Longhorn resources
func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{},
func StartControllers(logger logrus.FieldLogger, clients *client.Clients,
controllerID, serviceAccount, managerImage, backingImageManagerImage, shareManagerImage,
kubeconfigPath, version string, proxyConnCounter util.Counter) (*datastore.DataStore, *WebsocketController, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
logrus.Warnf("Cannot detect pod namespace, environment variable %v is missing, "+
"using default namespace", types.EnvPodNamespace)
namespace = corev1.NamespaceDefault
}

config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get client config")
}

config.Burst = 100
config.QPS = 50

kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get k8s client")
}

lhClient, err := lhclientset.NewForConfig(config)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get clientset")
}

extensionsClient, err := apiextensionsclientset.NewForConfig(config)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get k8s extension client")
}

metricsClient, err := metricsclientset.NewForConfig(config)
if err != nil {
return nil, nil, errors.Wrap(err, "unable to get metrics client")
}

scheme := runtime.NewScheme()
if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil {
return nil, nil, errors.Wrap(err, "unable to create scheme")
}

// TODO: there shouldn't be a need for a 30s resync period unless our code is buggy and our controllers aren't really
// level based. What we are effectively doing with this is hiding faulty logic in production.
// Another reason for increasing this substantially, is that it introduces a lot of unnecessary work and will
// lead to scalability problems, since we dump the whole cache of each object back in to the reconciler every 30 seconds.
// if a specific controller requires a periodic resync, one enable it only for that informer, add a resync to the event handler, go routine, etc.
// some refs to look at: https://github.com/kubernetes-sigs/controller-runtime/issues/521
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
lhInformerFactory := lhinformers.NewSharedInformerFactory(lhClient, time.Second*30)

ds := datastore.NewDataStore(lhInformerFactory, lhClient, kubeInformerFactory, kubeClient, extensionsClient, namespace)
kubeconfigPath, version string, proxyConnCounter util.Counter) (*WebsocketController, error) {
namespace := clients.Namespace
kubeClient := clients.Clients.K8s
metricsClient := clients.MetricsClient
ds := clients.Datastore
scheme := clients.Scheme
stopCh := clients.StopCh

// Longhorn controllers
replicaController := NewReplicaController(logger, ds, scheme, kubeClient, namespace, controllerID)
Expand Down Expand Up @@ -129,12 +77,6 @@ func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{},
kubernetesSecretController := NewKubernetesSecretController(logger, ds, scheme, kubeClient, controllerID, namespace)
kubernetesPDBController := NewKubernetesPDBController(logger, ds, kubeClient, controllerID, namespace)

go kubeInformerFactory.Start(stopCh)
go lhInformerFactory.Start(stopCh)
if !ds.Sync(stopCh) {
return nil, nil, fmt.Errorf("datastore cache sync up failed")
}

// Start goroutines for Longhorn controllers
go replicaController.Run(Workers, stopCh)
go engineController.Run(Workers, stopCh)
Expand Down Expand Up @@ -172,7 +114,7 @@ func StartControllers(logger logrus.FieldLogger, stopCh <-chan struct{},
go kubernetesSecretController.Run(Workers, stopCh)
go kubernetesPDBController.Run(Workers, stopCh)

return ds, websocketController, nil
return websocketController, nil
}

func ParseResourceRequirement(val string) (*corev1.ResourceRequirements, error) {
Expand Down
Loading

0 comments on commit 5e0868b

Please sign in to comment.