diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 541cb8ad4..efb6221f3 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -80,6 +81,16 @@ type GrpcProxyAgentOptions struct { SyncForever bool XfrChannelSize int + + // Providing a label selector enables updating the server count by counting the + // number of valid leases matching the selector. + ServerCountLeaseSelector string + // Initial fallback server count to use if proxy server lease listing fails. + FallbackServerCount uint + // Time in seconds for which the cached server count is valid. + ServerCountCacheValiditySecs uint + // Path to kubeconfig (used by kubernetes client for lease listing) + KubeconfigPath string } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -122,6 +133,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.") flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.") + flags.UintVar(&o.FallbackServerCount, "fallback-server-count", o.FallbackServerCount, "Initial fallback server count to use if proxy server lease listing fails.") + flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.") + flags.UintVar(&o.ServerCountCacheValiditySecs, "server-count-cache-validity-secs", o.ServerCountCacheValiditySecs, "Time in seconds for which the cached server count is valid.") + flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") return flags } @@ -198,6 +213,17 @@ func (o *GrpcProxyAgentOptions) Validate() error { if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil { return fmt.Errorf("agent address is invalid: %v", err) } + if o.ServerCountLeaseSelector != "" { + if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil { + return fmt.Errorf("invalid server count lease selector: %w", err) + } + } + if o.KubeconfigPath != "" { + if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) { + return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) + } + } + return nil } @@ -222,27 +248,31 @@ func validateAgentIdentifiers(agentIdentifiers string) error { func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { o := GrpcProxyAgentOptions{ - AgentCert: "", - AgentKey: "", - CaCert: "", - ProxyServerHost: "127.0.0.1", - ProxyServerPort: 8091, - HealthServerHost: "", - HealthServerPort: 8093, - AdminBindAddress: "127.0.0.1", - AdminServerPort: 8094, - EnableProfiling: false, - EnableContentionProfiling: false, - AgentID: defaultAgentID(), - AgentIdentifiers: "", - SyncInterval: 1 * time.Second, - ProbeInterval: 1 * time.Second, - SyncIntervalCap: 10 * time.Second, - KeepaliveTime: 1 * time.Hour, - ServiceAccountTokenPath: "", - WarnOnChannelLimit: false, - SyncForever: false, - XfrChannelSize: 150, + AgentCert: "", + AgentKey: "", + CaCert: "", + ProxyServerHost: "127.0.0.1", + ProxyServerPort: 8091, + HealthServerHost: "", + HealthServerPort: 8093, + AdminBindAddress: "127.0.0.1", + AdminServerPort: 8094, + EnableProfiling: false, + EnableContentionProfiling: false, + AgentID: defaultAgentID(), + AgentIdentifiers: "", + SyncInterval: 1 * time.Second, + ProbeInterval: 1 * time.Second, + SyncIntervalCap: 10 * time.Second, + KeepaliveTime: 1 * time.Hour, + ServiceAccountTokenPath: "", + WarnOnChannelLimit: false, + SyncForever: false, + XfrChannelSize: 150, + FallbackServerCount: 1, + ServerCountLeaseSelector: "", + ServerCountCacheValiditySecs: 10, + KubeconfigPath: "", } return &o } diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index 8c087de53..fb1f4daf4 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -38,7 +38,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -133,7 +137,34 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st }), } cc := o.ClientSetConfig(dialOptions...) - cs := cc.NewAgentClientSet(drainCh, stopCh) + + var serverCounter servercounter.ServerCounter + if o.ServerCountLeaseSelector != "" { + var k8sClient *kubernetes.Clientset + config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed to load kubernetes client config: %v", err) + } + k8sClient, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err) + } + sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*time.Duration(o.ServerCountCacheValiditySecs)) + serverLeaseCounter, err := servercounter.NewServerLeaseCounter( + sharedInformerFactory.Coordination().V1().Leases().Lister(), + o.ServerCountLeaseSelector, + int(o.FallbackServerCount), + ) + if err != nil { + return nil, fmt.Errorf("failed to create server lease counter: %w", err) + } + serverCounter = servercounter.NewCachedServerCounter(serverLeaseCounter, time.Second*time.Duration(o.ServerCountCacheValiditySecs)) + sharedInformerFactory.Start(context.Background().Done()) + } else { + serverCounter = servercounter.StaticServerCounter(o.FallbackServerCount) + } + + cs := cc.NewAgentClientSet(drainCh, stopCh, serverCounter) cs.Serve() return cs, nil diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index fd6d468c2..78b73f637 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/server" @@ -73,7 +74,13 @@ type ProxyRunOptions struct { // ID of this proxy server. ServerID string // Number of proxy server instances, should be 1 unless it is a HA proxy server. + // Serves as an initial fallback count when server counts are performed using leases. ServerCount uint + // Providing a label selector enables updating the server count by counting the + // number of valid leases matching the selector. + ServerCountLeaseSelector string + // Time in seconds for which the cached server count is valid. + ServerCountCacheValiditySecs uint // Agent pod's namespace for token-based agent authentication AgentNamespace string // Agent pod's service account for token-based agent authentication @@ -128,7 +135,9 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.EnableProfiling, "enable-profiling", o.EnableProfiling, "enable pprof at host:admin-port/debug/pprof") flags.BoolVar(&o.EnableContentionProfiling, "enable-contention-profiling", o.EnableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.") flags.StringVar(&o.ServerID, "server-id", o.ServerID, "The unique ID of this server. Can also be set by the 'PROXY_SERVER_ID' environment variable.") - flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server.") + flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server. Used as an initial fallback count when lease-based server counting is enabled.") + flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.") + flags.UintVar(&o.ServerCountCacheValiditySecs, "server-count-cache-validity-secs", o.ServerCountCacheValiditySecs, "Time in seconds for which the cached server count is valid.") flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).") flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") @@ -314,43 +323,52 @@ func (o *ProxyRunOptions) Validate() error { } } + // validate server count lease selector + if o.ServerCountLeaseSelector != "" { + if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil { + return fmt.Errorf("invalid server count lease selector: %w", err) + } + } + return nil } func NewProxyRunOptions() *ProxyRunOptions { o := ProxyRunOptions{ - ServerCert: "", - ServerKey: "", - ServerCaCert: "", - ClusterCert: "", - ClusterKey: "", - ClusterCaCert: "", - Mode: "grpc", - UdsName: "", - DeleteUDSFile: true, - ServerPort: 8090, - ServerBindAddress: "", - AgentPort: 8091, - AgentBindAddress: "", - HealthPort: 8092, - HealthBindAddress: "", - AdminPort: 8095, - AdminBindAddress: "127.0.0.1", - KeepaliveTime: 1 * time.Hour, - FrontendKeepaliveTime: 1 * time.Hour, - EnableProfiling: false, - EnableContentionProfiling: false, - ServerID: defaultServerID(), - ServerCount: 1, - AgentNamespace: "", - AgentServiceAccount: "", - KubeconfigPath: "", - KubeconfigQPS: 0, - KubeconfigBurst: 0, - AuthenticationAudience: "", - ProxyStrategies: "default", - CipherSuites: make([]string, 0), - XfrChannelSize: 10, + ServerCert: "", + ServerKey: "", + ServerCaCert: "", + ClusterCert: "", + ClusterKey: "", + ClusterCaCert: "", + Mode: "grpc", + UdsName: "", + DeleteUDSFile: true, + ServerPort: 8090, + ServerBindAddress: "", + AgentPort: 8091, + AgentBindAddress: "", + HealthPort: 8092, + HealthBindAddress: "", + AdminPort: 8095, + AdminBindAddress: "127.0.0.1", + KeepaliveTime: 1 * time.Hour, + FrontendKeepaliveTime: 1 * time.Hour, + EnableProfiling: false, + EnableContentionProfiling: false, + ServerID: defaultServerID(), + ServerCount: 1, + ServerCountLeaseSelector: "", + ServerCountCacheValiditySecs: 10, + AgentNamespace: "", + AgentServiceAccount: "", + KubeconfigPath: "", + KubeconfigQPS: 0, + KubeconfigBurst: 0, + AuthenticationAudience: "", + ProxyStrategies: "default", + CipherSuites: make([]string, 0), + XfrChannelSize: 10, } return &o } diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index d684ff999..8c6e5d1c4 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -39,9 +39,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" @@ -132,7 +134,24 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } - p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize) + + var serverCounter servercounter.ServerCounter + if o.ServerCountLeaseSelector != "" { + sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*time.Duration(o.ServerCountCacheValiditySecs)) + serverLeaseCounter, err := servercounter.NewServerLeaseCounter( + sharedInformerFactory.Coordination().V1().Leases().Lister(), + o.ServerCountLeaseSelector, + int(o.ServerCount), + ) + if err != nil { + return fmt.Errorf("failed to create server lease counter: %w", err) + } + serverCounter = servercounter.NewCachedServerCounter(serverLeaseCounter, time.Second*time.Duration(o.ServerCountCacheValiditySecs)) + } else { + serverCounter = servercounter.StaticServerCounter(o.ServerCount) + } + + p.server = server.NewProxyServer(o.ServerID, ps, serverCounter, authOpt, o.XfrChannelSize) frontendStop, err := p.runFrontendServer(ctx, o, p.server) if err != nil { diff --git a/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml b/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml index 779fdeb85..5030e5e03 100644 --- a/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml +++ b/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml @@ -1,3 +1,28 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "watch", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +subjects: +- kind: ServiceAccount + name: konnectivity-agent + namespace: kube-system +roleRef: + kind: ClusterRole + name: system:konnectivity-agent + apiGroup: rbac.authorization.k8s.io --- apiVersion: v1 kind: ServiceAccount @@ -53,7 +78,8 @@ spec: "--sync-forever", "--probe-interval=5s", "--service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token", - "--agent-identifiers=ipv4=${HOST_IP}" + "--agent-identifiers=ipv4=${HOST_IP}", + "--server-count-lease-selector=apiserver.kubernetes.io/identity=kube-apiserver", ] env: - name: POD_NAME diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 28c1f479c..c6457f1bf 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -148,7 +148,7 @@ type ClientSetConfig struct { XfrChannelSize int } -func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet { +func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}, serverCounter servercounter.ServerCounter) *ClientSet { return &ClientSet{ clients: make(map[string]*Client), agentID: cc.AgentID, @@ -164,8 +164,8 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C drainCh: drainCh, xfrChannelSize: cc.XfrChannelSize, stopCh: stopCh, - respectReceivedServerCount: true, - serverCounter: servercounter.StaticServerCounter(0), + respectReceivedServerCount: false, + serverCounter: serverCounter, } } @@ -233,7 +233,7 @@ func (cs *ClientSet) connectOnce() error { cs.serverCounter = servercounter.StaticServerCounter(newServerCount) klog.V(2).Infof("respecting server count change suggestion, new count: %v", newServerCount) } else { - klog.V(2).Infof("ignoring server count change suggestion") + klog.V(2).Infof("ignoring server count change suggestion of %v", newServerCount) } } if err := cs.AddClient(c.serverID, c); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 14f51292a..412ef0314 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -375,7 +375,7 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien } // NewProxyServer creates a new ProxyServer instance -func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { +func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCounter servercounter.ServerCounter, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { var bms []BackendManager for _, ps := range proxyStrategies { switch ps { @@ -394,7 +394,7 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun established: make(map[string](map[int64]*ProxyClientConnection)), PendingDial: NewPendingDialManager(), serverID: serverID, - serverCounter: servercounter.StaticServerCounter(serverCount), + serverCounter: serverCounter, BackendManagers: bms, AgentAuthenticationOptions: agentAuthenticationOptions, // use the first backend-manager as the Readiness Manager diff --git a/pkg/servercounter/cache.go b/pkg/servercounter/cache.go index 9ab0f9f18..9f8885e4f 100644 --- a/pkg/servercounter/cache.go +++ b/pkg/servercounter/cache.go @@ -24,6 +24,7 @@ func (csc *CachedServerCounter) CountServers() int { newCount := csc.inner.CountServers() if newCount != csc.cachedCount { klog.Infof("updated cached server count from %v to %v", csc.cachedCount, newCount) + csc.cachedCount = newCount } csc.lastRefresh = time.Now() } diff --git a/pkg/servercounter/lease_counter.go b/pkg/servercounter/lease_counter.go index e25afeb51..434d71d2a 100644 --- a/pkg/servercounter/lease_counter.go +++ b/pkg/servercounter/lease_counter.go @@ -44,7 +44,16 @@ func (lc *ServerLeaseCounter) CountServers() int { } } - lc.fallbackCount = count + if count == 0 { + klog.Infof("no valid leases found, using fallback count (%v)", lc.fallbackCount) + return lc.fallbackCount + } + + if count != lc.fallbackCount { + klog.Infof("found %v valid leases, updating fallback count", count) + lc.fallbackCount = count + } + return count }