Skip to content

Commit

Permalink
Differentiate between kubuclient and certclient (#1205)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjmshsh authored Jul 19, 2023
1 parent 887c510 commit d292375
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 238 deletions.
91 changes: 0 additions & 91 deletions app/dubbo-cp/cmd/console.go

This file was deleted.

1 change: 0 additions & 1 deletion app/dubbo-cp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func GetRootCmd(args []string) *cobra.Command {

// sub-commands
cmd.AddCommand(newRunCmdWithOpts(cmd2.DefaultRunCmdOpts))
cmd.AddCommand(newConsoleCmdWithOpts(cmd2.DefaultRunCmdOpts))
cmd.AddCommand(version.NewVersionCmd())

return cmd
Expand Down
6 changes: 3 additions & 3 deletions pkg/authority/server/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type AuthorityService struct {
mesh.UnimplementedAuthorityServiceServer
Options *dubbo_cp.Config
KubuClient cert.Client
CertClient cert.Client
CertStorage cert.Storage

WebhookServer *webhook.Webhook
Expand All @@ -60,7 +60,7 @@ func (s *AuthorityService) Start(stop <-chan struct{}) error {
}
}
}()
s.KubuClient.UpdateWebhookConfig(s.Options, s.CertStorage)
s.CertClient.UpdateWebhookConfig(s.Options, s.CertStorage)
select {
case <-stop:
logger.Sugar().Info("stopping admin")
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *AuthorityService) CreateIdentity(
}

p, _ := peer.FromContext(c)
endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.KubuClient)
endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.CertClient)
if err != nil {
logger.Sugar().Warnf("Failed to exact endpoint from context: %v. RemoteAddr: %s", err, p.Addr.String())

Expand Down
6 changes: 3 additions & 3 deletions pkg/authority/server/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestCSRFailed(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient.Client,
CertClient: kubeClient.Client,
}

certificate, err := impl.CreateIdentity(c, &mesh.IdentityRequest{
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestTokenFailed(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient,
CertClient: kubeClient,
}

csr, privateKey, err := provider.GenerateCSR()
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestSuccess(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient,
CertClient: kubeClient,
}

csr, privateKey, err := provider.GenerateCSR()
Expand Down
4 changes: 2 additions & 2 deletions pkg/authority/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func Setup(rt core_runtime.Runtime) error {
return rt.CertStorage().GetServerCert(info.ServerName), nil
})
server.WebhookServer.Init(rt.Config())
server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.KubuClient())
server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.CertStorage().GetCertClient())
server.WebhookServer.Patches = append(server.WebhookServer.Patches, server.JavaInjector.NewPod)
server.KubuClient = rt.KubuClient()
server.CertClient = rt.CertStorage().GetCertClient()
server.CertStorage = rt.CertStorage()
}
if err := RegisterCertificateService(rt, server); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/app/dubbo-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var DefaultConfig = func() Config {
IsKubernetesConnected: false,
RestConfigQps: 50,
RestConfigBurst: 100,
KubeFileConfig: "",
},
}
}
1 change: 1 addition & 0 deletions pkg/config/app/dubbo-cp/dubbo-cp.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kube-config:
in-pod-env: false
rest-config-qps: 50
rest-config-burst: 100
kube-file-config: ""
grpc-server:
plain-server-port: 30060
secure-server-port: 30062
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/kube/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type KubeConfig struct {
RestConfigQps int `yaml:"rest-config-qps"`
// Burst for rest config
RestConfigBurst int `yaml:"rest-config-burst"`

KubeFileConfig string `yaml:"kube-file-config"`
}

func (o *KubeConfig) Sanitize() {}
Expand Down
48 changes: 25 additions & 23 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package bootstrap

import (
"context"
"github.com/apache/dubbo-admin/pkg/core/election/universe"
"github.com/apache/dubbo-admin/pkg/core/kubeclient/client"

dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp"
"github.com/apache/dubbo-admin/pkg/core/cert/provider"
Expand All @@ -35,16 +37,11 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru
return nil, err
}

if err := initKubuClient(cfg, builder); err != nil {
return nil, err
}
kubeenv := true

if cfg.KubeConfig.IsKubernetesConnected == false {
rt, err := builder.Build()
if err != nil {
return nil, err
}
return rt, nil
if !initKubeClient(cfg, builder) {
// Non-k8s environment
kubeenv = false
}

if err := initCertStorage(cfg, builder); err != nil {
Expand All @@ -55,10 +52,14 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru
return nil, err
}

builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace,
builder.Config().KubeConfig.ServiceName,
"dubbo-cp-lock",
builder.KubuClient().GetKubClient())))
if kubeenv == true {
builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace,
builder.Config().KubeConfig.ServiceName,
"dubbo-cp-lock",
builder.CertStorage().GetCertClient().GetKubClient())))
} else {
builder.WithComponentManager(component.NewManager(universe.NewLeaderElection()))
}
rt, err := builder.Build()
if err != nil {
return nil, err
Expand All @@ -74,20 +75,21 @@ func Bootstrap(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Runti
return runtime, nil
}

func initKubuClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error {
client := provider.NewClient()
if !client.Init(cfg) {
func initKubeClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) bool {
kubeClient := client.NewKubeClient()
if !kubeClient.Init(cfg) {
logger.Sugar().Warnf("Failed to connect to Kubernetes cluster. Will ignore OpenID Connect check.")
cfg.KubeConfig.IsKubernetesConnected = false
} else {
cfg.KubeConfig.IsKubernetesConnected = true
}
builder.WithKubuClient(client)
return nil
builder.WithKubeClient(kubeClient)
return cfg.KubeConfig.IsKubernetesConnected
}

func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error {
storage := provider.NewStorage(cfg, builder.KubuClient())
client := provider.NewClient(builder.KubeClient().GetKubernetesClientSet())
storage := provider.NewStorage(cfg, client)
loadRootCert()
loadAuthorityCert(storage, cfg, builder)

Expand All @@ -99,12 +101,12 @@ func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error
}

func loadRootCert() {
// TODO
// TODO loadRootCert
}

func loadAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config, builder *core_runtime.Builder) {
if cfg.KubeConfig.IsKubernetesConnected {
certStr, priStr := builder.KubuClient().GetAuthorityCert(cfg.KubeConfig.Namespace)
certStr, priStr := storage.GetCertClient().GetAuthorityCert(cfg.KubeConfig.Namespace)
if certStr != "" && priStr != "" {
storage.GetAuthorityCert().Cert = provider.DecodeCert(certStr)
storage.GetAuthorityCert().CertPem = certStr
Expand All @@ -123,14 +125,14 @@ func refreshAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config) {

// TODO lock if multi cp-server
if storage.GetConfig().KubeConfig.IsKubernetesConnected {
storage.GetKubuClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem,
storage.GetCertClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem,
provider.EncodePrivateKey(storage.GetAuthorityCert().PrivateKey), storage.GetConfig().KubeConfig.Namespace)
}
}

if storage.GetConfig().KubeConfig.IsKubernetesConnected {
logger.Sugar().Info("Writing ca to config maps.")
if storage.GetKubuClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) {
if storage.GetCertClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) {
logger.Sugar().Info("Write ca to config maps success.")
} else {
logger.Sugar().Warnf("Write ca to config maps failed.")
Expand Down
Loading

0 comments on commit d292375

Please sign in to comment.