Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New proxy server counting logic for agent and server #635

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ALL_ARCH ?= amd64 arm arm64 ppc64le s390x
# The output type could either be docker (local), or registry.
OUTPUT_TYPE ?= docker
GO_TOOLCHAIN ?= golang
GO_VERSION ?= 1.22.2
GO_VERSION ?= 1.22.3
BASEIMAGE ?= gcr.io/distroless/static-debian11:nonroot

ifeq ($(GOPATH),)
Expand Down Expand Up @@ -80,6 +80,10 @@ test:
test-integration: build
go test -mod=vendor -race ./tests -agent-path $(PWD)/bin/proxy-agent

.PHONY: test-e2e
test-e2e: docker-build
go test -mod=vendor ./e2e -agent-image ${AGENT_FULL_IMAGE}-$(TARGETARCH):${TAG} -server-image ${SERVER_FULL_IMAGE}-$(TARGETARCH):${TAG}

## --------------------------------------
## Binaries
## --------------------------------------
Expand Down
30 changes: 30 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/uuid"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
Expand Down Expand Up @@ -80,6 +81,16 @@ type GrpcProxyAgentOptions struct {

SyncForever bool
XfrChannelSize int

// Providing a label selector enables updating the server count by counting the
// number of valid leases matching the selector.
ServerCountLeaseSelector string
// Initial fallback server count to use if proxy server lease listing fails.
ServerCount uint
// Lease informer resync period.
InformerResync time.Duration
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
Expand Down Expand Up @@ -122,6 +133,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "Static server count, also used as fallback server count if proxy server lease listing fails.")
flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.")
flags.DurationVar(&o.InformerResync, "informer-resync", o.InformerResync, "Lease informer resync period")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
return flags
}

Expand Down Expand Up @@ -198,6 +213,17 @@ func (o *GrpcProxyAgentOptions) Validate() error {
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
return fmt.Errorf("agent address is invalid: %v", err)
}
if o.ServerCountLeaseSelector != "" {
if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil {
return fmt.Errorf("invalid server count lease selector: %w", err)
}
}
if o.KubeconfigPath != "" {
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}

return nil
}

Expand Down Expand Up @@ -243,6 +269,10 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
ServerCount: 1,
ServerCountLeaseSelector: "",
InformerResync: 10 * time.Second,
KubeconfigPath: "",
}
return &o
}
Expand Down
33 changes: 32 additions & 1 deletion cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"

"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
Expand Down Expand Up @@ -133,7 +137,34 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
}),
}
cc := o.ClientSetConfig(dialOptions...)
cs := cc.NewAgentClientSet(drainCh, stopCh)

var serverCounter servercounter.ServerCounter
if o.ServerCountLeaseSelector != "" {
var k8sClient *kubernetes.Clientset
config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubernetes client config: %v", err)
}
k8sClient, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
}
sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, o.InformerResync)
serverLeaseCounter, err := servercounter.NewServerLeaseCounter(
sharedInformerFactory.Coordination().V1().Leases().Lister(),
o.ServerCountLeaseSelector,
int(o.ServerCount),
)
if err != nil {
return nil, fmt.Errorf("failed to create server lease counter: %w", err)
}
serverCounter = serverLeaseCounter
sharedInformerFactory.Start(context.Background().Done())
} else {
serverCounter = servercounter.StaticServerCounter(o.ServerCount)
}

cs := cc.NewAgentClientSet(drainCh, stopCh, serverCounter)
cs.Serve()

return cs, nil
Expand Down
20 changes: 19 additions & 1 deletion cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/server"
Expand Down Expand Up @@ -73,7 +74,13 @@ type ProxyRunOptions struct {
// ID of this proxy server.
ServerID string
// Number of proxy server instances, should be 1 unless it is a HA proxy server.
// Serves as an initial fallback count when server counts are performed using leases.
ServerCount uint
// Providing a label selector enables updating the server count by counting the
// number of valid leases matching the selector.
ServerCountLeaseSelector string
// Lease informer resync period.
InformerResync time.Duration
// Agent pod's namespace for token-based agent authentication
AgentNamespace string
// Agent pod's service account for token-based agent authentication
Expand Down Expand Up @@ -128,7 +135,9 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.EnableProfiling, "enable-profiling", o.EnableProfiling, "enable pprof at host:admin-port/debug/pprof")
flags.BoolVar(&o.EnableContentionProfiling, "enable-contention-profiling", o.EnableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.")
flags.StringVar(&o.ServerID, "server-id", o.ServerID, "The unique ID of this server. Can also be set by the 'PROXY_SERVER_ID' environment variable.")
flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server.")
flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server. Used as an initial fallback count when lease-based server counting is enabled.")
flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.")
flags.DurationVar(&o.InformerResync, "informer-resync", o.InformerResync, "Lease informer resync period")
flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).")
flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).")
Expand Down Expand Up @@ -314,6 +323,13 @@ func (o *ProxyRunOptions) Validate() error {
}
}

// validate server count lease selector
if o.ServerCountLeaseSelector != "" {
if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil {
return fmt.Errorf("invalid server count lease selector: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -342,6 +358,8 @@ func NewProxyRunOptions() *ProxyRunOptions {
EnableContentionProfiling: false,
ServerID: defaultServerID(),
ServerCount: 1,
ServerCountLeaseSelector: "",
InformerResync: time.Second * 10,
AgentNamespace: "",
AgentServiceAccount: "",
KubeconfigPath: "",
Expand Down
21 changes: 20 additions & 1 deletion cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/servercounter"

"sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
Expand Down Expand Up @@ -132,7 +134,24 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
if err != nil {
return err
}
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize)

var serverCounter servercounter.ServerCounter
if o.ServerCountLeaseSelector != "" {
sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, o.InformerResync)
serverLeaseCounter, err := servercounter.NewServerLeaseCounter(
sharedInformerFactory.Coordination().V1().Leases().Lister(),
o.ServerCountLeaseSelector,
int(o.ServerCount),
)
if err != nil {
return fmt.Errorf("failed to create server lease counter: %w", err)
}
serverCounter = serverLeaseCounter
} else {
serverCounter = servercounter.StaticServerCounter(o.ServerCount)
}

p.server = server.NewProxyServer(o.ServerID, ps, serverCounter, authOpt, o.XfrChannelSize)

frontendStop, err := p.runFrontendServer(ctx, o, p.server)
if err != nil {
Expand Down
104 changes: 104 additions & 0 deletions e2e/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package e2e

import (
"context"
"flag"
"log"
"os"
"testing"

"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/e2e-framework/pkg/env"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/envfuncs"
"sigs.k8s.io/e2e-framework/support/kind"
)

var (
testenv env.Environment
agentImage = flag.String("agent-image", "", "The proxy agent's docker image.")
serverImage = flag.String("server-image", "", "The proxy server's docker image.")
)

func TestMain(m *testing.M) {
flag.Parse()
if *agentImage == "" {
log.Fatalf("must provide agent image with -agent-image")
}
if *serverImage == "" {
log.Fatalf("must provide server image with -server-image")
}

scheme.AddToScheme(scheme.Scheme)

testenv = env.New()
kindClusterName := "kind-test"
kindCluster := kind.NewCluster(kindClusterName)

testenv.Setup(
envfuncs.CreateCluster(kindCluster, kindClusterName),
envfuncs.LoadImageToCluster(kindClusterName, *agentImage),
envfuncs.LoadImageToCluster(kindClusterName, *serverImage),
func(ctx context.Context, cfg *envconf.Config) (context.Context, error) {
client := cfg.Client()

// Render agent RBAC templates.
agentServiceAccount, _, err := renderAgentTemplate("serviceaccount.yaml", struct{}{})
if err != nil {
return nil, err
}
agentClusterRole, _, err := renderAgentTemplate("clusterrole.yaml", struct{}{})
if err != nil {
return nil, err
}
agentClusterRoleBinding, _, err := renderAgentTemplate("clusterrolebinding.yaml", struct{}{})
if err != nil {
return ctx, err
}

// Submit agent RBAC templates to k8s.
err = client.Resources().Create(ctx, agentServiceAccount)
if err != nil {
return ctx, err
}
err = client.Resources().Create(ctx, agentClusterRole)
if err != nil {
return ctx, err
}
err = client.Resources().Create(ctx, agentClusterRoleBinding)
if err != nil {
return ctx, err
}

// Render server RBAC and Service templates.
serverClusterRoleBinding, _, err := renderServerTemplate("clusterrolebinding.yaml", struct{}{})
if err != nil {
return ctx, err
}
serverService, _, err := renderServerTemplate("service.yaml", struct{}{})
if err != nil {
return ctx, err
}

// Submit server templates to k8s.
err = client.Resources().Create(ctx, serverClusterRoleBinding)
if err != nil {
return ctx, err
}
err = client.Resources().Create(ctx, serverService)
if err != nil {
return ctx, err
}

return ctx, nil
},
)

testenv.Finish(envfuncs.DestroyCluster(kindClusterName))

os.Exit(testenv.Run(m))
}

func TestSingleAgentAndServer(t *testing.T) {

}
Loading