Skip to content

Commit

Permalink
Enable lease-based counting
Browse files Browse the repository at this point in the history
  • Loading branch information
carreter committed Jun 24, 2024
1 parent c07ff4e commit d3351b0
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 64 deletions.
72 changes: 51 additions & 21 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/uuid"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
Expand Down Expand Up @@ -80,6 +81,16 @@ type GrpcProxyAgentOptions struct {

SyncForever bool
XfrChannelSize int

// Providing a label selector enables updating the server count by counting the
// number of valid leases matching the selector.
ServerCountLeaseSelector string
// Initial fallback server count to use if proxy server lease listing fails.
FallbackServerCount uint
// Time in seconds for which the cached server count is valid.
ServerCountCacheValiditySecs uint
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand Down Expand Up @@ -122,6 +133,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.UintVar(&o.FallbackServerCount, "fallback-server-count", o.FallbackServerCount, "Initial fallback server count to use if proxy server lease listing fails.")
flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.")
flags.UintVar(&o.ServerCountCacheValiditySecs, "server-count-cache-validity-secs", o.ServerCountCacheValiditySecs, "Time in seconds for which the cached server count is valid.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
return flags
}

Expand Down Expand Up @@ -198,6 +213,17 @@ func (o *GrpcProxyAgentOptions) Validate() error {
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
return fmt.Errorf("agent address is invalid: %v", err)
}
if o.ServerCountLeaseSelector != "" {
if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil {
return fmt.Errorf("invalid server count lease selector: %w", err)
}
}
if o.KubeconfigPath != "" {
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}

return nil
}

Expand All @@ -222,27 +248,31 @@ func validateAgentIdentifiers(agentIdentifiers string) error {

func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
o := GrpcProxyAgentOptions{
AgentCert: "",
AgentKey: "",
CaCert: "",
ProxyServerHost: "127.0.0.1",
ProxyServerPort: 8091,
HealthServerHost: "",
HealthServerPort: 8093,
AdminBindAddress: "127.0.0.1",
AdminServerPort: 8094,
EnableProfiling: false,
EnableContentionProfiling: false,
AgentID: defaultAgentID(),
AgentIdentifiers: "",
SyncInterval: 1 * time.Second,
ProbeInterval: 1 * time.Second,
SyncIntervalCap: 10 * time.Second,
KeepaliveTime: 1 * time.Hour,
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
AgentCert: "",
AgentKey: "",
CaCert: "",
ProxyServerHost: "127.0.0.1",
ProxyServerPort: 8091,
HealthServerHost: "",
HealthServerPort: 8093,
AdminBindAddress: "127.0.0.1",
AdminServerPort: 8094,
EnableProfiling: false,
EnableContentionProfiling: false,
AgentID: defaultAgentID(),
AgentIdentifiers: "",
SyncInterval: 1 * time.Second,
ProbeInterval: 1 * time.Second,
SyncIntervalCap: 10 * time.Second,
KeepaliveTime: 1 * time.Hour,
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
FallbackServerCount: 1,
ServerCountLeaseSelector: "",
ServerCountCacheValiditySecs: 10,
KubeconfigPath: "",
}
return &o
}
Expand Down
33 changes: 32 additions & 1 deletion cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"

"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
Expand Down Expand Up @@ -133,7 +137,34 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
}),
}
cc := o.ClientSetConfig(dialOptions...)
cs := cc.NewAgentClientSet(drainCh, stopCh)

var serverCounter servercounter.ServerCounter
if o.ServerCountLeaseSelector != "" {
var k8sClient *kubernetes.Clientset
config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubernetes client config: %v", err)
}
k8sClient, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
}
sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*time.Duration(o.ServerCountCacheValiditySecs))
serverLeaseCounter, err := servercounter.NewServerLeaseCounter(
sharedInformerFactory.Coordination().V1().Leases().Lister(),
o.ServerCountLeaseSelector,
int(o.FallbackServerCount),
)
if err != nil {
return nil, fmt.Errorf("failed to create server lease counter: %w", err)
}
serverCounter = servercounter.NewCachedServerCounter(serverLeaseCounter, time.Second*time.Duration(o.ServerCountCacheValiditySecs))
sharedInformerFactory.Start(context.Background().Done())
} else {
serverCounter = servercounter.StaticServerCounter(o.FallbackServerCount)
}

cs := cc.NewAgentClientSet(drainCh, stopCh, serverCounter)
cs.Serve()

return cs, nil
Expand Down
84 changes: 51 additions & 33 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/server"
Expand Down Expand Up @@ -73,7 +74,13 @@ type ProxyRunOptions struct {
// ID of this proxy server.
ServerID string
// Number of proxy server instances, should be 1 unless it is a HA proxy server.
// Serves as an initial fallback count when server counts are performed using leases.
ServerCount uint
// Providing a label selector enables updating the server count by counting the
// number of valid leases matching the selector.
ServerCountLeaseSelector string
// Time in seconds for which the cached server count is valid.
ServerCountCacheValiditySecs uint
// Agent pod's namespace for token-based agent authentication
AgentNamespace string
// Agent pod's service account for token-based agent authentication
Expand Down Expand Up @@ -128,7 +135,9 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.EnableProfiling, "enable-profiling", o.EnableProfiling, "enable pprof at host:admin-port/debug/pprof")
flags.BoolVar(&o.EnableContentionProfiling, "enable-contention-profiling", o.EnableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.")
flags.StringVar(&o.ServerID, "server-id", o.ServerID, "The unique ID of this server. Can also be set by the 'PROXY_SERVER_ID' environment variable.")
flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server.")
flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server. Used as an initial fallback count when lease-based server counting is enabled.")
flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.")
flags.UintVar(&o.ServerCountCacheValiditySecs, "server-count-cache-validity-secs", o.ServerCountCacheValiditySecs, "Time in seconds for which the cached server count is valid.")
flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).")
flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
Expand Down Expand Up @@ -314,43 +323,52 @@ func (o *ProxyRunOptions) Validate() error {
}
}

// validate server count lease selector
if o.ServerCountLeaseSelector != "" {
if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil {
return fmt.Errorf("invalid server count lease selector: %w", err)
}
}

return nil
}

func NewProxyRunOptions() *ProxyRunOptions {
o := ProxyRunOptions{
ServerCert: "",
ServerKey: "",
ServerCaCert: "",
ClusterCert: "",
ClusterKey: "",
ClusterCaCert: "",
Mode: "grpc",
UdsName: "",
DeleteUDSFile: true,
ServerPort: 8090,
ServerBindAddress: "",
AgentPort: 8091,
AgentBindAddress: "",
HealthPort: 8092,
HealthBindAddress: "",
AdminPort: 8095,
AdminBindAddress: "127.0.0.1",
KeepaliveTime: 1 * time.Hour,
FrontendKeepaliveTime: 1 * time.Hour,
EnableProfiling: false,
EnableContentionProfiling: false,
ServerID: defaultServerID(),
ServerCount: 1,
AgentNamespace: "",
AgentServiceAccount: "",
KubeconfigPath: "",
KubeconfigQPS: 0,
KubeconfigBurst: 0,
AuthenticationAudience: "",
ProxyStrategies: "default",
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
ServerCert: "",
ServerKey: "",
ServerCaCert: "",
ClusterCert: "",
ClusterKey: "",
ClusterCaCert: "",
Mode: "grpc",
UdsName: "",
DeleteUDSFile: true,
ServerPort: 8090,
ServerBindAddress: "",
AgentPort: 8091,
AgentBindAddress: "",
HealthPort: 8092,
HealthBindAddress: "",
AdminPort: 8095,
AdminBindAddress: "127.0.0.1",
KeepaliveTime: 1 * time.Hour,
FrontendKeepaliveTime: 1 * time.Hour,
EnableProfiling: false,
EnableContentionProfiling: false,
ServerID: defaultServerID(),
ServerCount: 1,
ServerCountLeaseSelector: "",
ServerCountCacheValiditySecs: 10,
AgentNamespace: "",
AgentServiceAccount: "",
KubeconfigPath: "",
KubeconfigQPS: 0,
KubeconfigBurst: 0,
AuthenticationAudience: "",
ProxyStrategies: "default",
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
}
return &o
}
Expand Down
21 changes: 20 additions & 1 deletion cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"

"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
Expand Down Expand Up @@ -132,7 +134,24 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize)

var serverCounter servercounter.ServerCounter
if o.ServerCountLeaseSelector != "" {
sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, time.Second*time.Duration(o.ServerCountCacheValiditySecs))
serverLeaseCounter, err := servercounter.NewServerLeaseCounter(
sharedInformerFactory.Coordination().V1().Leases().Lister(),
o.ServerCountLeaseSelector,
int(o.ServerCount),
)
if err != nil {
return fmt.Errorf("failed to create server lease counter: %w", err)
}
serverCounter = servercounter.NewCachedServerCounter(serverLeaseCounter, time.Second*time.Duration(o.ServerCountCacheValiditySecs))
} else {
serverCounter = servercounter.StaticServerCounter(o.ServerCount)
}

p.server = server.NewProxyServer(o.ServerID, ps, serverCounter, authOpt, o.XfrChannelSize)

frontendStop, err := p.runFrontendServer(ctx, o, p.server)
if err != nil {
Expand Down
28 changes: 27 additions & 1 deletion examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: system:konnectivity-agent
labels:
kubernetes.io/cluster-service: "true"
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "watch", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: system:konnectivity-agent
labels:
kubernetes.io/cluster-service: "true"
subjects:
- kind: ServiceAccount
name: konnectivity-agent
namespace: kube-system
roleRef:
kind: ClusterRole
name: system:konnectivity-agent
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ServiceAccount
Expand Down Expand Up @@ -53,7 +78,8 @@ spec:
"--sync-forever",
"--probe-interval=5s",
"--service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token",
"--agent-identifiers=ipv4=${HOST_IP}"
"--agent-identifiers=ipv4=${HOST_IP}",
"--server-count-lease-selector=apiserver.kubernetes.io/identity=kube-apiserver",
]
env:
- name: POD_NAME
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type ClientSetConfig struct {
XfrChannelSize int
}

func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}, serverCounter servercounter.ServerCounter) *ClientSet {
return &ClientSet{
clients: make(map[string]*Client),
agentID: cc.AgentID,
Expand All @@ -164,8 +164,8 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
drainCh: drainCh,
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
respectReceivedServerCount: true,
serverCounter: servercounter.StaticServerCounter(0),
respectReceivedServerCount: false,
serverCounter: serverCounter,
}
}

Expand Down Expand Up @@ -233,7 +233,7 @@ func (cs *ClientSet) connectOnce() error {
cs.serverCounter = servercounter.StaticServerCounter(newServerCount)
klog.V(2).Infof("respecting server count change suggestion, new count: %v", newServerCount)
} else {
klog.V(2).Infof("ignoring server count change suggestion")
klog.V(2).Infof("ignoring server count change suggestion of %v", newServerCount)
}
}
if err := cs.AddClient(c.serverID, c); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien
}

// NewProxyServer creates a new ProxyServer instance
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer {
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCounter servercounter.ServerCounter, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer {
var bms []BackendManager
for _, ps := range proxyStrategies {
switch ps {
Expand All @@ -394,7 +394,7 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun
established: make(map[string](map[int64]*ProxyClientConnection)),
PendingDial: NewPendingDialManager(),
serverID: serverID,
serverCounter: servercounter.StaticServerCounter(serverCount),
serverCounter: serverCounter,
BackendManagers: bms,
AgentAuthenticationOptions: agentAuthenticationOptions,
// use the first backend-manager as the Readiness Manager
Expand Down
Loading

0 comments on commit d3351b0

Please sign in to comment.