Skip to content

Commit

Permalink
integrate cert-source lib, add listener CLR file and cert rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Feb 9, 2025
1 parent adfee6a commit f5a8ba6
Show file tree
Hide file tree
Showing 33 changed files with 1,949 additions and 137 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--debug-enable Enable Debug endpoint
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
--default-listener-ip string Default listener IP (default "0.0.0.0")
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
--dynamic-listeners-disable Disable dynamic listeners.
Expand Down Expand Up @@ -178,12 +179,14 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--proxy-listener-ca-chain-cert-file string PEM encoded CA's certificate file. If provided, client certificate is required and verified
--proxy-listener-cert-file string PEM encoded file with server certificate
--proxy-listener-cipher-suites strings List of supported cipher suites
--proxy-listener-crl-file string PEM encoded X509 CRLs file
--proxy-listener-curve-preferences strings List of curve preferences
--proxy-listener-keep-alive duration Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
--proxy-listener-key-file string PEM encoded file with private key for the server certificate
--proxy-listener-key-password string Password to decrypt rsa private key
--proxy-listener-read-buffer-size int Size of the operating system's receive buffer associated with the connection. If zero, system default is used
--proxy-listener-tls-enable Whether or not to use TLS listener
--proxy-listener-tls-refresh duration Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled
--proxy-listener-tls-required-client-subject strings Required client certificate subject common name; example; s:/CN=[value]/C=[state]/C=[DE,PL] or r:/CN=[^val.{2}$]/C=[state]/C=[DE,PL]; check manual for more details
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
Expand All @@ -207,7 +210,9 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--tls-client-key-password string Password to decrypt rsa private key
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
--tls-refresh duration Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled
--tls-same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)
--tls-system-cert-pool Use system pool for root CAs
### Usage example
Expand Down
6 changes: 5 additions & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func initFlags() {
Server.Flags().DurationVar(&c.Proxy.ListenerKeepAlive, "proxy-listener-keep-alive", 60*time.Second, "Keep alive period for an active network connection. If zero, keep-alives are disabled")

Server.Flags().BoolVar(&c.Proxy.TLS.Enable, "proxy-listener-tls-enable", false, "Whether or not to use TLS listener")
Server.Flags().DurationVar(&c.Proxy.TLS.Refresh, "proxy-listener-tls-refresh", 0*time.Second, "Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCertFile, "proxy-listener-cert-file", "", "PEM encoded file with server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyFile, "proxy-listener-key-file", "", "PEM encoded file with private key for the server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", os.Getenv("PROXY_LISTENER_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCRLFile, "proxy-listener-crl-file", "", "PEM encoded X509 CRLs file")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")

Expand Down Expand Up @@ -153,11 +155,13 @@ func initFlags() {

// TLS
Server.Flags().BoolVar(&c.Kafka.TLS.Enable, "tls-enable", false, "Whether or not to use TLS when connecting to the broker")
Server.Flags().DurationVar(&c.Kafka.TLS.Refresh, "tls-refresh", 0*time.Second, "Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().BoolVar(&c.Kafka.TLS.InsecureSkipVerify, "tls-insecure-skip-verify", false, "It controls whether a client verifies the server's certificate chain and host name")
Server.Flags().StringVar(&c.Kafka.TLS.ClientCertFile, "tls-client-cert-file", "", "PEM encoded file with client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyFile, "tls-client-key-file", "", "PEM encoded file with private key for the client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
Server.Flags().BoolVar(&c.Kafka.TLS.SystemCertPool, "tls-system-cert-pool", false, "Use system pool for root CAs")

//Same TLS client cert tls-same-client-cert-enable
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)")
Expand Down
6 changes: 5 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ type Config struct {

TLS struct {
Enable bool
Refresh time.Duration
ListenerCertFile string
ListenerKeyFile string
ListenerKeyPassword string
CAChainCertFile string
ListenerCAChainCertFile string
ListenerCRLFile string
ListenerCipherSuites []string
ListenerCurvePreferences []string
ClientCert struct {
Expand Down Expand Up @@ -145,11 +147,13 @@ type Config struct {

TLS struct {
Enable bool
Refresh time.Duration
InsecureSkipVerify bool
ClientCertFile string
ClientKeyFile string
ClientKeyPassword string
CAChainCertFile string
SystemCertPool bool
SameClientCertEnable bool
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-ldap/ldap/v3 v3.2.3
github.com/google/uuid v1.6.0
github.com/grepplabs/cert-source v0.0.8
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874
github.com/hashicorp/go-plugin v1.6.3
Expand All @@ -26,7 +27,7 @@ require (
github.com/spf13/viper v1.0.2
github.com/stretchr/testify v1.10.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
golang.org/x/net v0.34.0
golang.org/x/oauth2 v0.24.0
google.golang.org/api v0.126.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cU
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/grepplabs/cert-source v0.0.8 h1:rcZeipbbljq46mMvw9yVF4FX/1zzLVfyenV3C07XS8g=
github.com/grepplabs/cert-source v0.0.8/go.mod h1:gs3IoykME1cFfZ6/h6hch8yg8ktUInsR9OY2xSHA2r4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w+PflHoszQNLTUh4kaByUcEWM/9uin4=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -281,8 +283,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 h1:tBiBTKHnIjovYoLX/TPkcf+OjqqKGQrPtGT3Foz+Pgo=
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76/go.mod h1:SQliXeA7Dhkt//vS29v3zpbEwoa+zb2Cn5xj5uO4K5U=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
Expand Down
17 changes: 8 additions & 9 deletions proxy/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package proxy

import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
Expand Down Expand Up @@ -46,7 +45,7 @@ type Client struct {
}

func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
tlsConfig, err := newTLSClientConfig(c)
tlsConfigFunc, err := newTLSClientConfig(c)
if err != nil {
return nil, err
}
Expand All @@ -59,7 +58,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
}
}

dialer, err := newDialer(c, tlsConfig)
dialer, err := newDialer(c, tlsConfigFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -195,7 +194,7 @@ func getAddressToDialAddressMapping(cfg *config.Config) (map[string]config.DialA
return addressToDialAddressMapping, nil
}

func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
func newDialer(c *config.Config, tlsConfigFunc TLSConfigFunc) (Dialer, error) {
directDialer := directDialer{
dialTimeout: c.Kafka.DialTimeout,
keepAlive: c.Kafka.KeepAlive,
Expand Down Expand Up @@ -230,13 +229,13 @@ func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
rawDialer = directDialer
}
if c.Kafka.TLS.Enable {
if tlsConfig == nil {
return nil, errors.New("tlsConfig must not be nil")
if tlsConfigFunc == nil || tlsConfigFunc() == nil {
return nil, errors.New("tlsConfigFunc must not be nil")
}
tlsDialer := tlsDialer{
timeout: c.Kafka.DialTimeout,
rawDialer: rawDialer,
config: tlsConfig,
timeout: c.Kafka.DialTimeout,
rawDialer: rawDialer,
configFunc: tlsConfigFunc,
}
return tlsDialer, nil
}
Expand Down
11 changes: 5 additions & 6 deletions proxy/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ func (d socks5Dialer) Dial(network, addr string) (net.Conn, error) {
}

type tlsDialer struct {
timeout time.Duration
rawDialer Dialer
config *tls.Config
timeout time.Duration
rawDialer Dialer
configFunc TLSConfigFunc
}

// see tls.DialWithDialer
func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
if d.config == nil {
config := d.configFunc()
if config == nil {
return nil, errors.New("tlsConfig must not be nil")
}
if d.rawDialer == nil {
Expand Down Expand Up @@ -106,8 +107,6 @@ func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
}
hostname := addr[:colonPos]

config := d.config

// If no ServerName is set, infer the ServerName
// from the hostname we're connecting to.
if config.ServerName == "" {
Expand Down
122 changes: 42 additions & 80 deletions proxy/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"log/slog"
"net"
"os"
"reflect"
"strings"
"time"

tlsconfig "github.com/grepplabs/cert-source/config"
tlsclientconfig "github.com/grepplabs/cert-source/tls/client/config"
tlsserver "github.com/grepplabs/cert-source/tls/server"
tlsserverconfig "github.com/grepplabs/cert-source/tls/server/config"
"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/youmark/pkcs8"
Expand Down Expand Up @@ -59,25 +64,6 @@ var (
func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
opts := conf.Proxy.TLS

if opts.ListenerKeyFile == "" || opts.ListenerCertFile == "" {
return nil, errors.New("Listener key and cert files must not be empty")
}
certPEMBlock, err := os.ReadFile(opts.ListenerCertFile)
if err != nil {
return nil, err
}
keyPEMBlock, err := os.ReadFile(opts.ListenerKeyFile)
if err != nil {
return nil, err
}
keyPEMBlock, err = decryptPEM(keyPEMBlock, opts.ListenerKeyPassword)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, err
}
cipherSuites, err := getCipherSuites(opts.ListenerCipherSuites)
if err != nil {
return nil, err
Expand All @@ -86,35 +72,27 @@ func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
if err != nil {
return nil, err
}

cfg := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
PreferServerCipherSuites: true,
MinVersion: tls.VersionTLS12,
CurvePreferences: curvePreferences,
CipherSuites: cipherSuites,
}
if opts.CAChainCertFile != "" {
caCertPEMBlock, err := os.ReadFile(opts.CAChainCertFile)
if err != nil {
return nil, err
}
clientCAs := x509.NewCertPool()
if ok := clientCAs.AppendCertsFromPEM(caCertPEMBlock); !ok {
return nil, errors.New("Failed to parse listener root certificate")
}
cfg.ClientCAs = clientCAs
cfg.ClientAuth = tls.RequireAndVerifyClientCert
}

tlsValidateFunc, err := tlsClientCertVerificationFunc(conf)
if err != nil {
return nil, err
}
cfg.VerifyPeerCertificate = tlsValidateFunc

return cfg, nil
tlsConfig, err := tlsserverconfig.GetServerTLSConfig(slog.Default(),
&tlsconfig.TLSServerConfig{
Enable: true,
Refresh: opts.Refresh,
KeyPassword: opts.ListenerKeyPassword,
File: tlsconfig.TLSServerFiles{
Key: opts.ListenerKeyFile,
Cert: opts.ListenerCertFile,
ClientCAs: opts.ListenerCAChainCertFile,
ClientCRL: opts.ListenerCRLFile,
},
},
tlsserver.WithTLSServerVerifyPeerCertificate(tlsValidateFunc),
tlsserver.WithTLSServerCipherSuites(cipherSuites),
tlsserver.WithTLSServerCurvePreferences(curvePreferences),
)
return tlsConfig, nil
}

func getCipherSuites(enabledCipherSuites []string) ([]uint16, error) {
Expand Down Expand Up @@ -147,45 +125,29 @@ func getCurvePreferences(enabledCurvePreferences []string) ([]tls.CurveID, error
return curvePreferences, nil
}

func newTLSClientConfig(conf *config.Config) (*tls.Config, error) {
type TLSConfigFunc func() *tls.Config

func newTLSClientConfig(conf *config.Config) (TLSConfigFunc, error) {
// https://blog.cloudflare.com/exposing-go-on-the-internet/
opts := conf.Kafka.TLS

cfg := &tls.Config{InsecureSkipVerify: opts.InsecureSkipVerify}

if opts.ClientCertFile != "" && opts.ClientKeyFile != "" {
certPEMBlock, err := os.ReadFile(opts.ClientCertFile)
if err != nil {
return nil, err
}
keyPEMBlock, err := os.ReadFile(opts.ClientKeyFile)
if err != nil {
return nil, err
}
keyPEMBlock, err = decryptPEM(keyPEMBlock, opts.ClientKeyPassword)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, err
}
cfg.Certificates = []tls.Certificate{cert}
}

if opts.CAChainCertFile != "" {
caCertPEMBlock, err := os.ReadFile(opts.CAChainCertFile)
if err != nil {
return nil, err
}
rootCAs := x509.NewCertPool()
if ok := rootCAs.AppendCertsFromPEM(caCertPEMBlock); !ok {
return nil, errors.New("Failed to parse client root certificate")
}

cfg.RootCAs = rootCAs
tlsConfigFunc, err := tlsclientconfig.GetTLSClientConfigFunc(slog.Default(), &tlsconfig.TLSClientConfig{
Enable: true,
Refresh: opts.Refresh,
InsecureSkipVerify: opts.InsecureSkipVerify,
KeyPassword: opts.ClientKeyPassword,
UseSystemPool: opts.SystemCertPool,
File: tlsconfig.TLSClientFiles{
Key: opts.ClientKeyFile,
Cert: opts.ClientCertFile,
RootCAs: opts.CAChainCertFile,
},
})
if err != nil {
return nil, err
}
return cfg, nil
return func() *tls.Config {
return tlsConfigFunc()
}, err
}

func decryptPEM(pemData []byte, password string) ([]byte, error) {
Expand Down
Loading

0 comments on commit f5a8ba6

Please sign in to comment.