From d29237512dd86dabff06bd94a5410e2c42b498c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=A0=E8=A8=80=E7=8B=AC=E4=B8=8A=E6=9C=BA=E6=88=BF?= <88866917+sjmshsh@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:36:19 +0800 Subject: [PATCH] Differentiate between kubuclient and certclient (#1205) --- app/dubbo-cp/cmd/console.go | 91 ------------------- app/dubbo-cp/cmd/root.go | 1 - pkg/authority/server/authority.go | 6 +- pkg/authority/server/authority_test.go | 6 +- pkg/authority/setup.go | 4 +- pkg/config/app/dubbo-cp/config.go | 1 + pkg/config/app/dubbo-cp/dubbo-cp.default.yaml | 1 + pkg/config/kube/config.go | 2 + pkg/core/bootstrap/bootstrap.go | 48 +++++----- pkg/core/cert/provider/client.go | 75 +-------------- pkg/core/cert/provider/storage.go | 24 ++--- pkg/core/election/kube/leaderelection.go | 18 ++-- pkg/core/election/kube/leaderelection_test.go | 2 +- pkg/core/election/universe/leaderelection.go | 36 ++++++++ pkg/core/kubeclient/client/kube.go | 89 ++++++++++++++++++ pkg/core/runtime/builder.go | 30 +++--- pkg/core/runtime/runtime.go | 6 -- .../crd/servicemapping/definition_test.go | 2 +- pkg/rule/server/server.go | 6 +- pkg/rule/setup.go | 49 +++++++++- 20 files changed, 259 insertions(+), 238 deletions(-) delete mode 100644 app/dubbo-cp/cmd/console.go create mode 100644 pkg/core/election/universe/leaderelection.go create mode 100644 pkg/core/kubeclient/client/kube.go diff --git a/app/dubbo-cp/cmd/console.go b/app/dubbo-cp/cmd/console.go deleted file mode 100644 index 83b6d1ea5..000000000 --- a/app/dubbo-cp/cmd/console.go +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "fmt" - "time" - - "github.com/apache/dubbo-admin/pkg/admin" - "github.com/apache/dubbo-admin/pkg/config" - dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" - "github.com/apache/dubbo-admin/pkg/core/bootstrap" - "github.com/apache/dubbo-admin/pkg/core/cmd" - "github.com/apache/dubbo-admin/pkg/core/logger" - "github.com/spf13/cobra" -) - -func newConsoleCmdWithOpts(opts cmd.RunCmdOpts) *cobra.Command { - args := struct { - configPath string - }{} - - cmd := &cobra.Command{ - Use: "console", - Short: "Launch Dubbo Admin console server.", - Long: `Launch Dubbo Admin console server.`, - RunE: func(cmd *cobra.Command, _ []string) error { - cfg := dubbo_cp.DefaultConfig() - err := config.Load(args.configPath, &cfg) - if err != nil { - logger.Sugar().Error(err, "could not load the configuration") - return err - } - gracefulCtx, ctx := opts.SetupSignalHandler() - - rt, err := bootstrap.Bootstrap(gracefulCtx, &cfg) - if err != nil { - logger.Sugar().Error(err, "unable to set up Control Plane runtime") - return err - } - cfgForDisplay, err := config.ConfigForDisplay(&cfg) - if err != nil { - logger.Sugar().Error(err, "unable to prepare config for display") - return err - } - cfgBytes, err := config.ToJson(cfgForDisplay) - if err != nil { - logger.Sugar().Error(err, "unable to convert config to json") - return err - } - logger.Sugar().Info(fmt.Sprintf("Current config %s", cfgBytes)) - - if err := admin.Setup(rt); err != nil { - logger.Sugar().Error(err, "unable to set up Metrics") - } - logger.Sugar().Info("starting Control Plane") - if err := rt.Start(gracefulCtx.Done()); err != nil { - logger.Sugar().Error(err, "problem running Control Plane") - return err - } - - logger.Sugar().Info("Stop signal received. Waiting 3 seconds for components to stop gracefully...") - select { - case <-ctx.Done(): - case <-time.After(gracefullyShutdownDuration): - } - logger.Sugar().Info("Stopping Control Plane") - return nil - }, - } - - // flags - cmd.PersistentFlags().StringVarP(&args.configPath, "config-file", "c", "", "configuration file") - - return cmd -} diff --git a/app/dubbo-cp/cmd/root.go b/app/dubbo-cp/cmd/root.go index ef0fad396..d1c073d11 100644 --- a/app/dubbo-cp/cmd/root.go +++ b/app/dubbo-cp/cmd/root.go @@ -55,7 +55,6 @@ func GetRootCmd(args []string) *cobra.Command { // sub-commands cmd.AddCommand(newRunCmdWithOpts(cmd2.DefaultRunCmdOpts)) - cmd.AddCommand(newConsoleCmdWithOpts(cmd2.DefaultRunCmdOpts)) cmd.AddCommand(version.NewVersionCmd()) return cmd diff --git a/pkg/authority/server/authority.go b/pkg/authority/server/authority.go index 9ac42d9c3..fe158e33d 100644 --- a/pkg/authority/server/authority.go +++ b/pkg/authority/server/authority.go @@ -34,7 +34,7 @@ import ( type AuthorityService struct { mesh.UnimplementedAuthorityServiceServer Options *dubbo_cp.Config - KubuClient cert.Client + CertClient cert.Client CertStorage cert.Storage WebhookServer *webhook.Webhook @@ -60,7 +60,7 @@ func (s *AuthorityService) Start(stop <-chan struct{}) error { } } }() - s.KubuClient.UpdateWebhookConfig(s.Options, s.CertStorage) + s.CertClient.UpdateWebhookConfig(s.Options, s.CertStorage) select { case <-stop: logger.Sugar().Info("stopping admin") @@ -100,7 +100,7 @@ func (s *AuthorityService) CreateIdentity( } p, _ := peer.FromContext(c) - endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.KubuClient) + endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.CertClient) if err != nil { logger.Sugar().Warnf("Failed to exact endpoint from context: %v. RemoteAddr: %s", err, p.Addr.String()) diff --git a/pkg/authority/server/authority_test.go b/pkg/authority/server/authority_test.go index d165e3581..24843933c 100644 --- a/pkg/authority/server/authority_test.go +++ b/pkg/authority/server/authority_test.go @@ -77,7 +77,7 @@ func TestCSRFailed(t *testing.T) { impl := &AuthorityService{ Options: options, CertStorage: storage, - KubuClient: kubeClient.Client, + CertClient: kubeClient.Client, } certificate, err := impl.CreateIdentity(c, &mesh.IdentityRequest{ @@ -148,7 +148,7 @@ func TestTokenFailed(t *testing.T) { impl := &AuthorityService{ Options: options, CertStorage: storage, - KubuClient: kubeClient, + CertClient: kubeClient, } csr, privateKey, err := provider.GenerateCSR() @@ -264,7 +264,7 @@ func TestSuccess(t *testing.T) { impl := &AuthorityService{ Options: options, CertStorage: storage, - KubuClient: kubeClient, + CertClient: kubeClient, } csr, privateKey, err := provider.GenerateCSR() diff --git a/pkg/authority/setup.go b/pkg/authority/setup.go index 8c91fac24..74e40d09b 100644 --- a/pkg/authority/setup.go +++ b/pkg/authority/setup.go @@ -34,9 +34,9 @@ func Setup(rt core_runtime.Runtime) error { return rt.CertStorage().GetServerCert(info.ServerName), nil }) server.WebhookServer.Init(rt.Config()) - server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.KubuClient()) + server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.CertStorage().GetCertClient()) server.WebhookServer.Patches = append(server.WebhookServer.Patches, server.JavaInjector.NewPod) - server.KubuClient = rt.KubuClient() + server.CertClient = rt.CertStorage().GetCertClient() server.CertStorage = rt.CertStorage() } if err := RegisterCertificateService(rt, server); err != nil { diff --git a/pkg/config/app/dubbo-cp/config.go b/pkg/config/app/dubbo-cp/config.go index 651439f4b..2621bcaba 100644 --- a/pkg/config/app/dubbo-cp/config.go +++ b/pkg/config/app/dubbo-cp/config.go @@ -100,6 +100,7 @@ var DefaultConfig = func() Config { IsKubernetesConnected: false, RestConfigQps: 50, RestConfigBurst: 100, + KubeFileConfig: "", }, } } diff --git a/pkg/config/app/dubbo-cp/dubbo-cp.default.yaml b/pkg/config/app/dubbo-cp/dubbo-cp.default.yaml index c3f922560..e2947caf5 100644 --- a/pkg/config/app/dubbo-cp/dubbo-cp.default.yaml +++ b/pkg/config/app/dubbo-cp/dubbo-cp.default.yaml @@ -36,6 +36,7 @@ kube-config: in-pod-env: false rest-config-qps: 50 rest-config-burst: 100 + kube-file-config: "" grpc-server: plain-server-port: 30060 secure-server-port: 30062 diff --git a/pkg/config/kube/config.go b/pkg/config/kube/config.go index 9cd4a6d75..ca8100919 100644 --- a/pkg/config/kube/config.go +++ b/pkg/config/kube/config.go @@ -27,6 +27,8 @@ type KubeConfig struct { RestConfigQps int `yaml:"rest-config-qps"` // Burst for rest config RestConfigBurst int `yaml:"rest-config-burst"` + + KubeFileConfig string `yaml:"kube-file-config"` } func (o *KubeConfig) Sanitize() {} diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go index 4d735beb1..0cc4fc223 100644 --- a/pkg/core/bootstrap/bootstrap.go +++ b/pkg/core/bootstrap/bootstrap.go @@ -19,6 +19,8 @@ package bootstrap import ( "context" + "github.com/apache/dubbo-admin/pkg/core/election/universe" + "github.com/apache/dubbo-admin/pkg/core/kubeclient/client" dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" "github.com/apache/dubbo-admin/pkg/core/cert/provider" @@ -35,16 +37,11 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru return nil, err } - if err := initKubuClient(cfg, builder); err != nil { - return nil, err - } + kubeenv := true - if cfg.KubeConfig.IsKubernetesConnected == false { - rt, err := builder.Build() - if err != nil { - return nil, err - } - return rt, nil + if !initKubeClient(cfg, builder) { + // Non-k8s environment + kubeenv = false } if err := initCertStorage(cfg, builder); err != nil { @@ -55,10 +52,14 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru return nil, err } - builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace, - builder.Config().KubeConfig.ServiceName, - "dubbo-cp-lock", - builder.KubuClient().GetKubClient()))) + if kubeenv == true { + builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace, + builder.Config().KubeConfig.ServiceName, + "dubbo-cp-lock", + builder.CertStorage().GetCertClient().GetKubClient()))) + } else { + builder.WithComponentManager(component.NewManager(universe.NewLeaderElection())) + } rt, err := builder.Build() if err != nil { return nil, err @@ -74,20 +75,21 @@ func Bootstrap(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Runti return runtime, nil } -func initKubuClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error { - client := provider.NewClient() - if !client.Init(cfg) { +func initKubeClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) bool { + kubeClient := client.NewKubeClient() + if !kubeClient.Init(cfg) { logger.Sugar().Warnf("Failed to connect to Kubernetes cluster. Will ignore OpenID Connect check.") cfg.KubeConfig.IsKubernetesConnected = false } else { cfg.KubeConfig.IsKubernetesConnected = true } - builder.WithKubuClient(client) - return nil + builder.WithKubeClient(kubeClient) + return cfg.KubeConfig.IsKubernetesConnected } func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error { - storage := provider.NewStorage(cfg, builder.KubuClient()) + client := provider.NewClient(builder.KubeClient().GetKubernetesClientSet()) + storage := provider.NewStorage(cfg, client) loadRootCert() loadAuthorityCert(storage, cfg, builder) @@ -99,12 +101,12 @@ func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error } func loadRootCert() { - // TODO + // TODO loadRootCert } func loadAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config, builder *core_runtime.Builder) { if cfg.KubeConfig.IsKubernetesConnected { - certStr, priStr := builder.KubuClient().GetAuthorityCert(cfg.KubeConfig.Namespace) + certStr, priStr := storage.GetCertClient().GetAuthorityCert(cfg.KubeConfig.Namespace) if certStr != "" && priStr != "" { storage.GetAuthorityCert().Cert = provider.DecodeCert(certStr) storage.GetAuthorityCert().CertPem = certStr @@ -123,14 +125,14 @@ func refreshAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config) { // TODO lock if multi cp-server if storage.GetConfig().KubeConfig.IsKubernetesConnected { - storage.GetKubuClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem, + storage.GetCertClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem, provider.EncodePrivateKey(storage.GetAuthorityCert().PrivateKey), storage.GetConfig().KubeConfig.Namespace) } } if storage.GetConfig().KubeConfig.IsKubernetesConnected { logger.Sugar().Info("Writing ca to config maps.") - if storage.GetKubuClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) { + if storage.GetCertClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) { logger.Sugar().Info("Write ca to config maps success.") } else { logger.Sugar().Warnf("Write ca to config maps failed.") diff --git a/pkg/core/cert/provider/client.go b/pkg/core/cert/provider/client.go index b948ddb08..fe02a47a8 100644 --- a/pkg/core/cert/provider/client.go +++ b/pkg/core/cert/provider/client.go @@ -17,37 +17,22 @@ package provider import ( "context" - "flag" - "os" - "path/filepath" "reflect" "strings" + dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" "github.com/apache/dubbo-admin/pkg/core/endpoint" "github.com/apache/dubbo-admin/pkg/core/logger" - dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" - informerclient "github.com/apache/dubbo-admin/pkg/rule/clientgen/clientset/versioned" admissionregistrationV1 "k8s.io/api/admissionregistration/v1" k8sauth "k8s.io/api/authentication/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/homedir" ) -var kubeconfig string - -func init() { - flag.StringVar(&kubeconfig, "kubeconfig", "", - "Paths to a kubeconfig. Only required if out-of-cluster.") -} - type Client interface { - Init(options *dubbo_cp.Config) bool GetAuthorityCert(namespace string) (string, string) UpdateAuthorityCert(cert string, pri string, namespace string) UpdateAuthorityPublicKey(cert string) bool @@ -55,62 +40,16 @@ type Client interface { UpdateWebhookConfig(options *dubbo_cp.Config, storage Storage) GetNamespaceLabels(namespace string) map[string]string GetKubClient() *kubernetes.Clientset - GetInformerClient() *informerclient.Clientset } type ClientImpl struct { - options *dubbo_cp.Config - kubeClient *kubernetes.Clientset - informerClient *informerclient.Clientset -} - -func NewClient() Client { - return &ClientImpl{} + kubeClient *kubernetes.Clientset } -func (c *ClientImpl) Init(options *dubbo_cp.Config) bool { - c.options = options - config, err := rest.InClusterConfig() - options.KubeConfig.InPodEnv = err == nil - if err != nil { - logger.Sugar().Infof("Failed to load config from Pod. Will fall back to kube config file.") - // Read kubeconfig from command line - if len(kubeconfig) <= 0 { - // Read kubeconfig from env - kubeconfig = os.Getenv(clientcmd.RecommendedConfigPathEnvVar) - if len(kubeconfig) <= 0 { - // Read kubeconfig from home dir - if home := homedir.HomeDir(); home != "" { - kubeconfig = filepath.Join(home, ".kube", "config") - } - } - } - // use the current context in kubeconfig - logger.Sugar().Infof("Read kubeconfig from %s", kubeconfig) - config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - logger.Sugar().Warnf("Failed to load config from kube config file.") - return false - } - } - - // set qps and burst for rest config - config.QPS = float32(c.options.KubeConfig.RestConfigQps) - config.Burst = c.options.KubeConfig.RestConfigBurst - // creates the clientset - clientSet, err := kubernetes.NewForConfig(config) - if err != nil { - logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error()) - return false +func NewClient(kubeClient *kubernetes.Clientset) Client { + return &ClientImpl{ + kubeClient: kubeClient, } - informerClient, err := informerclient.NewForConfig(config) - if err != nil { - logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error()) - return false - } - c.kubeClient = clientSet - c.informerClient = informerClient - return true } func (c *ClientImpl) GetAuthorityCert(namespace string) (string, string) { @@ -372,7 +311,3 @@ func (c *ClientImpl) UpdateWebhookConfig(options *dubbo_cp.Config, storage Stora func (c *ClientImpl) GetKubClient() *kubernetes.Clientset { return c.kubeClient } - -func (c *ClientImpl) GetInformerClient() *informerclient.Clientset { - return c.informerClient -} diff --git a/pkg/core/cert/provider/storage.go b/pkg/core/cert/provider/storage.go index 488e61ef1..b11b57285 100644 --- a/pkg/core/cert/provider/storage.go +++ b/pkg/core/cert/provider/storage.go @@ -31,7 +31,7 @@ import ( type storageImpl struct { config *dubbo_cp.Config - kubuClient Client + certClient Client mutex *sync.Mutex @@ -57,13 +57,13 @@ type Storage interface { GetTrustedCerts() []*Cert GetConfig() *dubbo_cp.Config - GetKubuClient() Client + GetCertClient() Client Start(stop <-chan struct{}) error NeedLeaderElection() bool } -func (s storageImpl) Start(stop <-chan struct{}) error { +func (s *storageImpl) Start(stop <-chan struct{}) error { go s.RefreshServerCert(stop) go func(stop <-chan struct{}) { interval := math.Min(math.Floor(float64(s.config.Security.CaValidity)/100), 10_000) @@ -74,11 +74,11 @@ func (s storageImpl) Start(stop <-chan struct{}) error { // TODO lock if multi cp-server // TODO refresh signed cert - NewleaderElection().Election(&s, s.config, s.kubuClient.GetKubClient()) + NewleaderElection().Election(s, s.config, s.certClient.GetKubClient()) if s.config.KubeConfig.IsKubernetesConnected { - s.kubuClient.UpdateAuthorityCert(s.GetAuthorityCert().CertPem, EncodePrivateKey(s.GetAuthorityCert().PrivateKey), s.config.KubeConfig.Namespace) - s.kubuClient.UpdateWebhookConfig(s.config, &s) - if s.kubuClient.UpdateAuthorityPublicKey(s.GetAuthorityCert().CertPem) { + s.certClient.UpdateAuthorityCert(s.GetAuthorityCert().CertPem, EncodePrivateKey(s.GetAuthorityCert().PrivateKey), s.config.KubeConfig.Namespace) + s.certClient.UpdateWebhookConfig(s.config, s) + if s.certClient.UpdateAuthorityPublicKey(s.GetAuthorityCert().CertPem) { logger.Sugar().Infof("Write ca to config maps success.") } else { logger.Sugar().Warnf("Write ca to config maps failed.") @@ -97,7 +97,7 @@ func (s storageImpl) Start(stop <-chan struct{}) error { return nil } -func (s storageImpl) NeedLeaderElection() bool { +func (s *storageImpl) NeedLeaderElection() bool { return false } @@ -109,14 +109,14 @@ type Cert struct { tlsCert *tls.Certificate } -func NewStorage(options *dubbo_cp.Config, kubuClient Client) *storageImpl { +func NewStorage(options *dubbo_cp.Config, certClient Client) *storageImpl { return &storageImpl{ mutex: &sync.Mutex{}, authorityCert: &Cert{}, trustedCerts: []*Cert{}, config: options, - kubuClient: kubuClient, + certClient: certClient, } } @@ -243,6 +243,6 @@ func (s *storageImpl) GetConfig() *dubbo_cp.Config { return s.config } -func (s *storageImpl) GetKubuClient() Client { - return s.kubuClient +func (s *storageImpl) GetCertClient() Client { + return s.certClient } diff --git a/pkg/core/election/kube/leaderelection.go b/pkg/core/election/kube/leaderelection.go index 44f82f0bc..aceb2a3e9 100644 --- a/pkg/core/election/kube/leaderelection.go +++ b/pkg/core/election/kube/leaderelection.go @@ -30,7 +30,7 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" ) -type LeaderElection struct { +type KubeLeaderElection struct { leader int32 namespace string name string @@ -45,13 +45,13 @@ type LeaderElection struct { } // Start will start leader election, calling all runFns when we become the leader. -func (l *LeaderElection) Start(stop <-chan struct{}) { +func (l *KubeLeaderElection) Start(stop <-chan struct{}) { logger.Sugar().Info("starting Leader Elector") for { le, err := l.create() if err != nil { // This should never happen; errors are only from invalid input and the input is not user modifiable - panic("LeaderElection creation failed: " + err.Error()) + panic("KubeLeaderElection creation failed: " + err.Error()) } l.cycle.Inc() ctx, cancel := context.WithCancel(context.Background()) @@ -74,7 +74,7 @@ func (l *LeaderElection) Start(stop <-chan struct{}) { } } -func (l *LeaderElection) create() (*leaderelection.LeaderElector, error) { +func (l *KubeLeaderElection) create() (*leaderelection.LeaderElector, error) { callbacks := leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { l.setLeader(true) @@ -119,15 +119,15 @@ func (l *LeaderElection) create() (*leaderelection.LeaderElector, error) { }) } -func (p *LeaderElection) AddCallbacks(callbacks component.LeaderCallbacks) { +func (p *KubeLeaderElection) AddCallbacks(callbacks component.LeaderCallbacks) { p.callbacks = append(p.callbacks, callbacks) } -func (p *LeaderElection) IsLeader() bool { +func (p *KubeLeaderElection) IsLeader() bool { return syncatomic.LoadInt32(&(p.leader)) == 1 } -func (p *LeaderElection) setLeader(leader bool) { +func (p *KubeLeaderElection) setLeader(leader bool) { var value int32 = 0 if leader { value = 1 @@ -135,11 +135,11 @@ func (p *LeaderElection) setLeader(leader bool) { syncatomic.StoreInt32(&p.leader, value) } -func NewLeaderElection(namespace, name, electionID string, client kubernetes.Interface) *LeaderElection { +func NewLeaderElection(namespace, name, electionID string, client kubernetes.Interface) *KubeLeaderElection { if name == "" { name = "unknown" } - return &LeaderElection{ + return &KubeLeaderElection{ namespace: namespace, name: name, electionID: electionID, diff --git a/pkg/core/election/kube/leaderelection_test.go b/pkg/core/election/kube/leaderelection_test.go index 5ff3327b5..a3cff391b 100644 --- a/pkg/core/election/kube/leaderelection_test.go +++ b/pkg/core/election/kube/leaderelection_test.go @@ -38,7 +38,7 @@ const testLock = "test-lock" func createElection(t *testing.T, name string, expectLeader bool, client kubernetes.Interface, fns ...component.LeaderCallbacks, -) (*LeaderElection, chan struct{}) { +) (*KubeLeaderElection, chan struct{}) { t.Helper() l := NewLeaderElection("ns", name, testLock, client) l.ttl = time.Second diff --git a/pkg/core/election/universe/leaderelection.go b/pkg/core/election/universe/leaderelection.go new file mode 100644 index 000000000..a34acc682 --- /dev/null +++ b/pkg/core/election/universe/leaderelection.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package universe + +import "github.com/apache/dubbo-admin/pkg/core/runtime/component" + +type UniverseLeaderElection struct { + // TODO Implement a leader election mechanism that does not depend on k8s +} + +func (l *UniverseLeaderElection) Start(stop <-chan struct{}) {} + +func (l *UniverseLeaderElection) AddCallbacks(callbacks component.LeaderCallbacks) {} + +func (l *UniverseLeaderElection) IsLeader() bool { + return false +} + +func NewLeaderElection() *UniverseLeaderElection { + return &UniverseLeaderElection{} +} diff --git a/pkg/core/kubeclient/client/kube.go b/pkg/core/kubeclient/client/kube.go new file mode 100644 index 000000000..74f6426d3 --- /dev/null +++ b/pkg/core/kubeclient/client/kube.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "os" + "path/filepath" + + dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" + "github.com/apache/dubbo-admin/pkg/core/logger" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" +) + +type KubeClient interface { + GetKubernetesClientSet() *kubernetes.Clientset +} + +type kubeClientImpl struct { + kubernetesClientSet *kubernetes.Clientset +} + +func NewKubeClient() *kubeClientImpl { + return &kubeClientImpl{} +} + +func (k *kubeClientImpl) GetKubernetesClientSet() *kubernetes.Clientset { + return k.kubernetesClientSet +} + +func (k *kubeClientImpl) Init(options *dubbo_cp.Config) bool { + config, err := rest.InClusterConfig() + options.KubeConfig.InPodEnv = err == nil + kubeconfig := options.KubeConfig.KubeFileConfig + if err != nil { + logger.Sugar().Infof("Failed to load config from Pod. Will fall back to kube config file.") + // Read kubeconfig from command line + if len(kubeconfig) <= 0 { + // Read kubeconfig from env + kubeconfig = os.Getenv(clientcmd.RecommendedConfigPathEnvVar) + if len(kubeconfig) <= 0 { + // Read kubeconfig from home dir + if home := homedir.HomeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + } + } + // use the current context in kubeconfig + logger.Sugar().Infof("Read kubeconfig from %s", kubeconfig) + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + logger.Sugar().Warnf("Failed to load config from kube config file.") + return false + } + } + + // set qps and burst for rest config + config.QPS = float32(options.KubeConfig.RestConfigQps) + config.Burst = options.KubeConfig.RestConfigBurst + // creates the clientset + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error()) + return false + } + if err != nil { + logger.Sugar().Warnf("Failed to create clientgen to kubernetes. " + err.Error()) + return false + } + k.kubernetesClientSet = clientSet + return true +} diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go index 5faabbd9f..58dabb6aa 100644 --- a/pkg/core/runtime/builder.go +++ b/pkg/core/runtime/builder.go @@ -20,6 +20,7 @@ package runtime import ( "context" "fmt" + "github.com/apache/dubbo-admin/pkg/core/kubeclient/client" "os" "time" @@ -35,7 +36,7 @@ type BuilderContext interface { ComponentManager() component.Manager Config() *dubbo_cp.Config CertStorage() provider.Storage - KubuClient() provider.Client + KubeClient() client.KubeClient } var _ BuilderContext = &Builder{} @@ -45,14 +46,14 @@ type Builder struct { cm component.Manager appCtx context.Context + kubeClient client.KubeClient grpcServer *server.GrpcServer - kubuClient provider.Client certStorage provider.Storage *runtimeInfo } -func (b *Builder) KubuClient() provider.Client { - return b.kubuClient +func (b *Builder) KubeClient() client.KubeClient { + return b.kubeClient } func (b *Builder) CertStorage() provider.Storage { @@ -84,34 +85,39 @@ func BuilderFor(appCtx context.Context, cfg *dubbo_cp.Config) (*Builder, error) } func (b *Builder) Build() (Runtime, error) { + if !b.cfg.KubeConfig.IsKubernetesConnected { + return &runtime{ + RuntimeInfo: b.runtimeInfo, + RuntimeContext: &runtimeContext{ + cfg: b.cfg, + }, + Manager: b.cm, + }, nil + } if b.grpcServer == nil { return nil, errors.Errorf("grpcServer has not been configured") } if b.certStorage == nil { return nil, errors.Errorf("certStorage has not been configured") } - if b.kubuClient == nil { - return nil, errors.Errorf("kubuClient has not been configured") - } return &runtime{ RuntimeInfo: b.runtimeInfo, RuntimeContext: &runtimeContext{ cfg: b.cfg, grpcServer: b.grpcServer, certStorage: b.certStorage, - kubuClient: b.kubuClient, }, Manager: b.cm, }, nil } -func (b *Builder) WithCertStorage(storage provider.Storage) *Builder { - b.certStorage = storage +func (b *Builder) WithKubeClient(kubeClient client.KubeClient) *Builder { + b.kubeClient = kubeClient return b } -func (b *Builder) WithKubuClient(client provider.Client) *Builder { - b.kubuClient = client +func (b *Builder) WithCertStorage(storage provider.Storage) *Builder { + b.certStorage = storage return b } diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go index e970e226c..5b009abaf 100644 --- a/pkg/core/runtime/runtime.go +++ b/pkg/core/runtime/runtime.go @@ -72,7 +72,6 @@ type RuntimeContext interface { Config() *dubbo_cp.Config GrpcServer() *server.GrpcServer CertStorage() provider.Storage - KubuClient() provider.Client } type runtime struct { @@ -89,11 +88,6 @@ type runtimeContext struct { cfg *dubbo_cp.Config grpcServer *server.GrpcServer certStorage provider.Storage - kubuClient provider.Client -} - -func (rc *runtimeContext) KubuClient() provider.Client { - return rc.kubuClient } func (rc *runtimeContext) CertStorage() provider.Storage { diff --git a/pkg/rule/crd/servicemapping/definition_test.go b/pkg/rule/crd/servicemapping/definition_test.go index 92e7502f3..966c1cdf4 100644 --- a/pkg/rule/crd/servicemapping/definition_test.go +++ b/pkg/rule/crd/servicemapping/definition_test.go @@ -26,7 +26,7 @@ func TestCopy(t *testing.T) { Name: "test-policy", Spec: &PolicySpec{ InterfaceName: "test-interface", - ApplicationNames: []string{"test-aplication"}, + ApplicationNames: []string{"test-application"}, }, } diff --git a/pkg/rule/server/server.go b/pkg/rule/server/server.go index 30e246332..5a41846be 100644 --- a/pkg/rule/server/server.go +++ b/pkg/rule/server/server.go @@ -17,6 +17,7 @@ package server import ( "fmt" + informerclient "github.com/apache/dubbo-admin/pkg/rule/clientgen/clientset/versioned" "github.com/apache/dubbo-admin/api/mesh" dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp" @@ -35,7 +36,8 @@ type RuleServer struct { Options *dubbo_cp.Config CertStorage provider.Storage - KubeClient provider.Client + CertClient provider.Client + InformerClient *informerclient.Clientset Storage *storage.Storage Controller *crd.Controller InformerFactory informFactory.SharedInformerFactory @@ -71,7 +73,7 @@ func (s *RuleServer) Observe(stream mesh.RuleService_ObserveServer) error { return fmt.Errorf("failed to get peer from context") } - endpoint, err := endpoint.ExactEndpoint(stream.Context(), s.CertStorage, s.Options, s.KubeClient) + endpoint, err := endpoint.ExactEndpoint(stream.Context(), s.CertStorage, s.Options, s.CertClient) if err != nil { logger.Sugar().Errorf("failed to get endpoint from context: %v. RemoteAddr: %s", err, p.Addr) diff --git a/pkg/rule/setup.go b/pkg/rule/setup.go index e180de1cb..7fff0e809 100644 --- a/pkg/rule/setup.go +++ b/pkg/rule/setup.go @@ -16,6 +16,12 @@ package rule import ( + informerclient "github.com/apache/dubbo-admin/pkg/rule/clientgen/clientset/versioned" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "os" + "path/filepath" "time" "github.com/apache/dubbo-admin/api/mesh" @@ -38,8 +44,11 @@ import ( func Setup(rt core_runtime.Runtime) error { ruleServer := server.NewRuleServer(rt.Config()) ruleServer.CertStorage = rt.CertStorage() - ruleServer.KubeClient = rt.KubuClient() + ruleServer.CertClient = rt.CertStorage().GetCertClient() ruleServer.Storage = storage.NewStorage() + if err := initInformerClient(rt, ruleServer); err != nil { + return errors.Wrap(err, "InformerClient Register failed") + } if err := RegisterController(ruleServer); err != nil { return errors.Wrap(err, "Controller Register failed") } @@ -52,9 +61,45 @@ func Setup(rt core_runtime.Runtime) error { return nil } +func initInformerClient(rt core_runtime.Runtime, server *server.RuleServer) error { + config, err := rest.InClusterConfig() + rt.Config().KubeConfig.InPodEnv = err == nil + kubeconfig := rt.Config().KubeConfig.KubeFileConfig + if err != nil { + logger.Sugar().Infof("Failed to load config from Pod. Will fall back to kube config file.") + if len(kubeconfig) <= 0 { + // Read kubeconfig from env + kubeconfig = os.Getenv(clientcmd.RecommendedConfigPathEnvVar) + if len(kubeconfig) <= 0 { + // Read kubeconfig from home dir + if home := homedir.HomeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + } + } + // use the current context in kubeconfig + logger.Sugar().Infof("Read kubeconfig from %s", kubeconfig) + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + logger.Sugar().Warnf("Failed to load config from kube config file.") + return err + } + } + // set qps and burst for rest config + config.QPS = float32(rt.Config().KubeConfig.RestConfigQps) + config.Burst = rt.Config().KubeConfig.RestConfigBurst + informerClient, err := informerclient.NewForConfig(config) + if err != nil { + logger.Sugar().Warnf("Failed to create informerclient to kubernetes. " + err.Error()) + return err + } + server.InformerClient = informerClient + return nil +} + func RegisterController(s *server.RuleServer) error { logger.Sugar().Info("Init rule controller...") - informerFactory := informers.NewSharedInformerFactory(s.KubeClient.GetInformerClient(), time.Second*30) + informerFactory := informers.NewSharedInformerFactory(s.InformerClient, time.Second*30) s.InformerFactory = informerFactory authenticationHandler := authentication.NewHandler(s.Storage) authorizationHandler := authorization.NewHandler(s.Storage)