Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Made connection management extensible. #493

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 85 additions & 11 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,50 @@ import (

"github.com/google/uuid"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
"strings"
"net"
"k8s.io/utils/strings/slices"
)

type ProxyConnectionManager struct {
val string
}

var validConnectionManagers = []string{agent.HAConnectionManager, agent.IPListConnectionManager}

func (p ProxyConnectionManager) String() string {
return p.val
}

func (p ProxyConnectionManager) Set(s string) error {
if !slices.Contains(validConnectionManagers, s) {
return fmt.Errorf("%s is not one of the valid values [%s]", p, strings.Join(validConnectionManagers, ","))
}
p.val = s
return nil
}

func (p ProxyConnectionManager) Type() string {
return "string"
}

type GrpcProxyAgentOptions struct {
// Configuration for authenticating with the proxy-server
AgentCert string
AgentKey string
CaCert string

// Configuration for connecting to the proxy-server
ProxyServerHost string
ProxyServerPort int
AlpnProtos []string
ProxyConnectionManager ProxyConnectionManager
ProxyServerHost string
ProxyServerPort int
ProxyServerList string
AlpnProtos []string

// Bind address for the health connections.
HealthServerHost string
Expand Down Expand Up @@ -70,7 +96,7 @@ type GrpcProxyAgentOptions struct {
ServiceAccountTokenPath string

// This warns if we attempt to push onto a "full" transfer channel.
// However checking that the transfer channel is full is not safe.
// However, checking that the transfer channel is full is not safe.
// It violates our race condition checking. Adding locks around a potentially
// blocking call has its own problems, so it cannot easily be made race condition safe.
// The check is an "unlocked" read but is still use at your own peril.
Expand All @@ -79,15 +105,21 @@ type GrpcProxyAgentOptions struct {
SyncForever bool
}

func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
func (o *GrpcProxyAgentOptions) ClientSetConfig() *agent.ClientSetConfig {
return &agent.ClientSetConfig{
ConnectionManager: o.ProxyConnectionManager.val,
Address: fmt.Sprintf("%s:%d", o.ProxyServerHost, o.ProxyServerPort),
Addresses: parseAddresses(o.ProxyServerList),
AgentID: o.AgentID,
AgentIdentifiers: o.AgentIdentifiers,
CaCert: o.CaCert,
AgentCert: o.AgentCert,
AgentKey: o.AgentKey,
AlpnProtos: o.AlpnProtos,
SyncInterval: o.SyncInterval,
ProbeInterval: o.ProbeInterval,
SyncIntervalCap: o.SyncIntervalCap,
DialOptions: dialOptions,
KeepaliveTime: o.KeepaliveTime,
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
Expand All @@ -99,8 +131,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.StringVar(&o.AgentCert, "agent-cert", o.AgentCert, "If non-empty secure communication with this cert.")
flags.StringVar(&o.AgentKey, "agent-key", o.AgentKey, "If non-empty secure communication with this key.")
flags.StringVar(&o.CaCert, "ca-cert", o.CaCert, "If non-empty the CAs we use to validate clients.")
flags.StringVar(&o.ProxyServerHost, "proxy-server-host", o.ProxyServerHost, "The hostname to use to connect to the proxy-server.")
flags.IntVar(&o.ProxyServerPort, "proxy-server-port", o.ProxyServerPort, "The port the proxy server is listening on.")
flags.Var(&o.ProxyConnectionManager, "proxy-connection-manager", fmt.Sprintf("Which connection manager should handle proxy connections [%s]", strings.Join(validConnectionManagers, ",")))
flags.StringVar(&o.ProxyServerHost, "proxy-server-host", o.ProxyServerHost, "The hostname to use to connect to the proxy-server. Useful for single server or load-balancer cases.")
flags.IntVar(&o.ProxyServerPort, "proxy-server-port", o.ProxyServerPort, "The port the proxy server is listening on. Useful for single server or load-balancer cases.")
flags.StringVar(&o.ProxyServerList, "proxy-server-list", o.ProxyServerList, "The comma-separated list of host:port addresses to connect to multiple proxy-servers.")
flags.StringSliceVar(&o.AlpnProtos, "alpn-proto", o.AlpnProtos, "Additional ALPN protocols to be presented when connecting to the server. Useful to distinguish between network proxy and apiserver connections that share the same destination address.")
flags.StringVar(&o.HealthServerHost, "health-server-host", o.HealthServerHost, "The host address to listen on, without port.")
flags.IntVar(&o.HealthServerPort, "health-server-port", o.HealthServerPort, "The port the health server is listening on.")
Expand All @@ -124,8 +158,10 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("AgentCert set to %q.\n", o.AgentCert)
klog.V(1).Infof("AgentKey set to %q.\n", o.AgentKey)
klog.V(1).Infof("CACert set to %q.\n", o.CaCert)
klog.V(1).Infof("ProxyConnectionManager set to %q.\n", o.ProxyConnectionManager)
klog.V(1).Infof("ProxyServerHost set to %q.\n", o.ProxyServerHost)
klog.V(1).Infof("ProxyServerPort set to %d.\n", o.ProxyServerPort)
klog.V(1).Infof("ProxyServerList set to %q.\n", o.ProxyServerList)
klog.V(1).Infof("ALPNProtos set to %+s.\n", o.AlpnProtos)
klog.V(1).Infof("HealthServerHost set to %s\n", o.HealthServerHost)
klog.V(1).Infof("HealthServerPort set to %d.\n", o.HealthServerPort)
Expand Down Expand Up @@ -166,8 +202,23 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return fmt.Errorf("error checking agent CA cert %s, got %v", o.CaCert, err)
}
}
if o.ProxyServerPort <= 0 {
return fmt.Errorf("proxy server port %d must be greater than 0", o.ProxyServerPort)
switch o.ProxyConnectionManager.val {
case agent.HAConnectionManager:
if o.ProxyServerPort <= 0 {
return fmt.Errorf("proxy server port %d must be greater than 0", o.ProxyServerPort)
}
if len(o.ProxyServerList) > 0 {
return fmt.Errorf("proxy server list cannot be defined with the HA connection manager")
}
case agent.IPListConnectionManager:
if err := validateAddressList(o.ProxyServerList); err != nil {
return fmt.Errorf("proxy server address list is invalid: #{err}")
}
if len(o.ProxyServerHost) > 0 || o.ProxyServerPort != 0 {
return fmt.Errorf("cannot define proxyServerList with either proxyServerHost or proxyServerPort")
}
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't defaulting to HA be better (backward compatible)?

return fmt.Errorf("require a valid value for ProxyConnectionManager %s", o.ProxyConnectionManager.val)
}
if o.HealthServerPort <= 0 {
return fmt.Errorf("health server port %d must be greater than 0", o.HealthServerPort)
Expand Down Expand Up @@ -211,11 +262,34 @@ func validateAgentIdentifiers(agentIdentifiers string) error {
return nil
}

func validateAddressList(addressList string) error {
addresses := parseAddresses(addressList)
for _, address := range addresses {
if err := validateAddress(address); err != nil {
return err
}
}
return nil
}

func validateAddress(address string) error {
if net.ParseIP(address) == nil {
return fmt.Errorf("invalid proxy host address #{address}")
}
return nil
}

func parseAddresses(addressList string) []string {
res := strings.Split(addressList, ",")
return res
}

func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
o := GrpcProxyAgentOptions{
AgentCert: "",
AgentKey: "",
CaCert: "",
ProxyConnectionManager: ProxyConnectionManager{val: agent.HAConnectionManager},
ProxyServerHost: "127.0.0.1",
ProxyServerPort: 8091,
HealthServerHost: "",
Expand Down
10 changes: 3 additions & 7 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package app

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
Expand All @@ -30,9 +29,6 @@ import (

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"k8s.io/klog/v2"

"sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options"
Expand Down Expand Up @@ -81,7 +77,7 @@ func (a *Agent) run(o *options.GrpcProxyAgentOptions) error {
}

func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error {
var tlsConfig *tls.Config
/*var tlsConfig *tls.Config
var err error
if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil {
return err
Expand All @@ -92,8 +88,8 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-ch
Time: o.KeepaliveTime,
PermitWithoutStream: true,
}),
}
cc := o.ClientSetConfig(dialOptions...)
} */
cc := o.ClientSetConfig()
cs := cc.NewAgentClientSet(stopCh)
cs.Serve()

Expand Down
3 changes: 1 addition & 2 deletions konnectivity-client/proto/client/client.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion konnectivity-client/proto/client/client_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading