Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix simple refactor] Differentiate between kubuclient and certclient #1205

Merged
merged 6 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 0 additions & 91 deletions app/dubbo-cp/cmd/console.go

This file was deleted.

1 change: 0 additions & 1 deletion app/dubbo-cp/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func GetRootCmd(args []string) *cobra.Command {

// sub-commands
cmd.AddCommand(newRunCmdWithOpts(cmd2.DefaultRunCmdOpts))
cmd.AddCommand(newConsoleCmdWithOpts(cmd2.DefaultRunCmdOpts))
cmd.AddCommand(version.NewVersionCmd())

return cmd
Expand Down
6 changes: 3 additions & 3 deletions pkg/authority/server/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type AuthorityService struct {
mesh.UnimplementedAuthorityServiceServer
Options *dubbo_cp.Config
KubuClient cert.Client
CertClient cert.Client
CertStorage cert.Storage

WebhookServer *webhook.Webhook
Expand All @@ -60,7 +60,7 @@ func (s *AuthorityService) Start(stop <-chan struct{}) error {
}
}
}()
s.KubuClient.UpdateWebhookConfig(s.Options, s.CertStorage)
s.CertClient.UpdateWebhookConfig(s.Options, s.CertStorage)
select {
case <-stop:
logger.Sugar().Info("stopping admin")
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *AuthorityService) CreateIdentity(
}

p, _ := peer.FromContext(c)
endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.KubuClient)
endpoint, err := endpoint.ExactEndpoint(c, s.CertStorage, s.Options, s.CertClient)
if err != nil {
logger.Sugar().Warnf("Failed to exact endpoint from context: %v. RemoteAddr: %s", err, p.Addr.String())

Expand Down
6 changes: 3 additions & 3 deletions pkg/authority/server/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestCSRFailed(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient.Client,
CertClient: kubeClient.Client,
}

certificate, err := impl.CreateIdentity(c, &mesh.IdentityRequest{
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestTokenFailed(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient,
CertClient: kubeClient,
}

csr, privateKey, err := provider.GenerateCSR()
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestSuccess(t *testing.T) {
impl := &AuthorityService{
Options: options,
CertStorage: storage,
KubuClient: kubeClient,
CertClient: kubeClient,
}

csr, privateKey, err := provider.GenerateCSR()
Expand Down
4 changes: 2 additions & 2 deletions pkg/authority/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func Setup(rt core_runtime.Runtime) error {
return rt.CertStorage().GetServerCert(info.ServerName), nil
})
server.WebhookServer.Init(rt.Config())
server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.KubuClient())
server.JavaInjector = patch.NewJavaSdk(rt.Config(), rt.CertStorage().GetCertClient())
server.WebhookServer.Patches = append(server.WebhookServer.Patches, server.JavaInjector.NewPod)
server.KubuClient = rt.KubuClient()
server.CertClient = rt.CertStorage().GetCertClient()
server.CertStorage = rt.CertStorage()
}
if err := RegisterCertificateService(rt, server); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/app/dubbo-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var DefaultConfig = func() Config {
IsKubernetesConnected: false,
RestConfigQps: 50,
RestConfigBurst: 100,
KubeFileConfig: "",
},
}
}
1 change: 1 addition & 0 deletions pkg/config/app/dubbo-cp/dubbo-cp.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kube-config:
in-pod-env: false
rest-config-qps: 50
rest-config-burst: 100
kube-file-config: ""
grpc-server:
plain-server-port: 30060
secure-server-port: 30062
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/kube/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type KubeConfig struct {
RestConfigQps int `yaml:"rest-config-qps"`
// Burst for rest config
RestConfigBurst int `yaml:"rest-config-burst"`

KubeFileConfig string `yaml:"kube-file-config"`
}

func (o *KubeConfig) Sanitize() {}
Expand Down
48 changes: 25 additions & 23 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package bootstrap

import (
"context"
"github.com/apache/dubbo-admin/pkg/core/election/universe"
"github.com/apache/dubbo-admin/pkg/core/kubeclient/client"

dubbo_cp "github.com/apache/dubbo-admin/pkg/config/app/dubbo-cp"
"github.com/apache/dubbo-admin/pkg/core/cert/provider"
Expand All @@ -35,16 +37,11 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru
return nil, err
}

if err := initKubuClient(cfg, builder); err != nil {
return nil, err
}
kubeenv := true

if cfg.KubeConfig.IsKubernetesConnected == false {
rt, err := builder.Build()
if err != nil {
return nil, err
}
return rt, nil
if !initKubeClient(cfg, builder) {
// Non-k8s environment
kubeenv = false
}

if err := initCertStorage(cfg, builder); err != nil {
Expand All @@ -55,10 +52,14 @@ func buildRuntime(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Ru
return nil, err
}

builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace,
builder.Config().KubeConfig.ServiceName,
"dubbo-cp-lock",
builder.KubuClient().GetKubClient())))
if kubeenv == true {
builder.WithComponentManager(component.NewManager(kube.NewLeaderElection(builder.Config().KubeConfig.Namespace,
builder.Config().KubeConfig.ServiceName,
"dubbo-cp-lock",
builder.CertStorage().GetCertClient().GetKubClient())))
} else {
builder.WithComponentManager(component.NewManager(universe.NewLeaderElection()))
}
rt, err := builder.Build()
if err != nil {
return nil, err
Expand All @@ -74,20 +75,21 @@ func Bootstrap(appCtx context.Context, cfg *dubbo_cp.Config) (core_runtime.Runti
return runtime, nil
}

func initKubuClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error {
client := provider.NewClient()
if !client.Init(cfg) {
func initKubeClient(cfg *dubbo_cp.Config, builder *core_runtime.Builder) bool {
kubeClient := client.NewKubeClient()
if !kubeClient.Init(cfg) {
logger.Sugar().Warnf("Failed to connect to Kubernetes cluster. Will ignore OpenID Connect check.")
cfg.KubeConfig.IsKubernetesConnected = false
} else {
cfg.KubeConfig.IsKubernetesConnected = true
}
builder.WithKubuClient(client)
return nil
builder.WithKubeClient(kubeClient)
return cfg.KubeConfig.IsKubernetesConnected
}

func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error {
storage := provider.NewStorage(cfg, builder.KubuClient())
client := provider.NewClient(builder.KubeClient().GetKubernetesClientSet())
storage := provider.NewStorage(cfg, client)
loadRootCert()
loadAuthorityCert(storage, cfg, builder)

Expand All @@ -99,12 +101,12 @@ func initCertStorage(cfg *dubbo_cp.Config, builder *core_runtime.Builder) error
}

func loadRootCert() {
// TODO
// TODO loadRootCert
}

func loadAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config, builder *core_runtime.Builder) {
if cfg.KubeConfig.IsKubernetesConnected {
certStr, priStr := builder.KubuClient().GetAuthorityCert(cfg.KubeConfig.Namespace)
certStr, priStr := storage.GetCertClient().GetAuthorityCert(cfg.KubeConfig.Namespace)
if certStr != "" && priStr != "" {
storage.GetAuthorityCert().Cert = provider.DecodeCert(certStr)
storage.GetAuthorityCert().CertPem = certStr
Expand All @@ -123,14 +125,14 @@ func refreshAuthorityCert(storage provider.Storage, cfg *dubbo_cp.Config) {

// TODO lock if multi cp-server
if storage.GetConfig().KubeConfig.IsKubernetesConnected {
storage.GetKubuClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem,
storage.GetCertClient().UpdateAuthorityCert(storage.GetAuthorityCert().CertPem,
provider.EncodePrivateKey(storage.GetAuthorityCert().PrivateKey), storage.GetConfig().KubeConfig.Namespace)
}
}

if storage.GetConfig().KubeConfig.IsKubernetesConnected {
logger.Sugar().Info("Writing ca to config maps.")
if storage.GetKubuClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) {
if storage.GetCertClient().UpdateAuthorityPublicKey(storage.GetAuthorityCert().CertPem) {
logger.Sugar().Info("Write ca to config maps success.")
} else {
logger.Sugar().Warnf("Write ca to config maps failed.")
Expand Down
Loading
Loading