From 4a1dc0a6a244882fe007dee820e746023b8491b1 Mon Sep 17 00:00:00 2001 From: Michal Budzyn Date: Sun, 16 Feb 2025 13:36:13 +0100 Subject: [PATCH] Template dynamicAdvertisedListener, allow {{.brokerId}} for a dynamic advertited hostname based on brokerId. Use fixed port if provided. --- Makefile | 2 +- README.md | 10 +-- cmd/kafka-proxy/server.go | 2 +- go.mod | 4 +- go.sum | 28 -------- proxy/proxy.go | 52 +++++++++++++-- proxy/proxy_test.go | 132 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 188 insertions(+), 42 deletions(-) diff --git a/Makefile b/Makefile index 1201eec7..fd85a1b4 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty) GOPKGS = $(shell go list ./... | grep -v /vendor/) BUILD_FLAGS ?= LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s -TAG ?= "v0.4.0" +TAG ?= "v0.4.1" GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux) GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64) GOARM ?= $(TARGETVARIANT) diff --git a/README.md b/README.md index 1565e904..68f385b9 100644 --- a/README.md +++ b/README.md @@ -48,11 +48,11 @@ As not every Kafka release adds new messages/versions which are relevant to the Linux - curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-linux-amd64.tar.gz | tar xz + curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-linux-amd64.tar.gz | tar xz macOS - curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-darwin-amd64.tar.gz | tar xz + curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.1/kafka-proxy-v0.4.1-darwin-amd64.tar.gz | tar xz 2. Move the binary in to your PATH. @@ -70,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k You can launch a kafka-proxy container for trying it out with - docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0 \ + docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1 \ server \ --bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \ --bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \ @@ -89,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta You can launch a kafka-proxy container with auth-ldap plugin for trying it out with - docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0-all \ + docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.1-all \ server \ --bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \ --bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \ @@ -142,7 +142,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w --default-listener-ip string Default listener IP (default "0.0.0.0") --deterministic-listeners Enable deterministic listeners (listener port = min port + broker id). --dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment - --dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used + --dynamic-advertised-listener string Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided. --dynamic-listeners-disable Disable dynamic listeners. --dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time. --external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 4311ef3e..6c23cd94 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -87,7 +87,7 @@ func init() { func initFlags() { // proxy Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "0.0.0.0", "Default listener IP") - Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If empty, default-listener-ip is used") + Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If left empty, default-listener-ip is used. Supports templating with {{.brokerId}} for dynamic hostnames and a fixed port if provided.") Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))") Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started") Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment") diff --git a/go.mod b/go.mod index 6ff5b688..4116cdb4 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-sdk-go-v2 v1.36.1 github.com/aws/aws-sdk-go-v2/config v1.29.6 + github.com/aws/aws-sdk-go-v2/credentials v1.17.59 + github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 github.com/cenkalti/backoff v1.1.0 github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 @@ -37,7 +39,6 @@ require ( require ( cloud.google.com/go/compute/metadata v0.5.2 // indirect github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.59 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 // indirect @@ -46,7 +47,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 // indirect github.com/aws/smithy-go v1.22.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index c2b2f983..2249c5a9 100644 --- a/go.sum +++ b/go.sum @@ -14,54 +14,30 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8= -github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.36.1 h1:iTDl5U6oAhkNPba0e1t1hrwAo02ZMqbrGq4k5JBWM5E= github.com/aws/aws-sdk-go-v2 v1.36.1/go.mod h1:5PMILGVKiW32oDzjj6RU52yrNrDPUHcbZQYr1sM7qmM= -github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE= -github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4= github.com/aws/aws-sdk-go-v2/config v1.29.6 h1:fqgqEKK5HaZVWLQoLiC9Q+xDlSp+1LYidp6ybGE2OGg= github.com/aws/aws-sdk-go-v2/config v1.29.6/go.mod h1:Ft+WLODzDQmCTHDvqAH1JfC2xxbZ0MxpZAcJqmE1LTQ= -github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg= -github.com/aws/aws-sdk-go-v2/credentials v1.13.4/go.mod h1:/Cj5w9LRsNTLSwexsohwDME32OzJ6U81Zs33zr2ZWOM= github.com/aws/aws-sdk-go-v2/credentials v1.17.59 h1:9btwmrt//Q6JcSdgJOLI98sdr5p7tssS9yAsGe8aKP4= github.com/aws/aws-sdk-go-v2/credentials v1.17.59/go.mod h1:NM8fM6ovI3zak23UISdWidyZuI1ghNe2xjzUZAyT+08= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28 h1:KwsodFKVQTlI5EyhRSugALzsV6mG/SGrdjlMXSZSdso= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.28/go.mod h1:EY3APf9MzygVhKuPXAc5H+MkGb8k/DOSQjWS0LgkKqI= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32 h1:BjUcr3X3K0wZPGFg2bxOWW3VPN8rkE3/61zhP+IHviA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.32/go.mod h1:80+OGC/bgzzFFTUmcuwD0lb4YutwQeKLFpmt6hoWapU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32 h1:m1GeXHVMJsRsUAqG6HjZWx9dj7F5TR+cF1bjyfYyBd4= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.32/go.mod h1:IitoQxGfaKdVLNg0hD8/DXmAqNy0H4K2H2Sf91ti8sI= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13 h1:SYVGSFQHlchIcy6e7x12bsrxClCXSP5et8cqVhL8cuw= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.13/go.mod h1:kizuDaLX37bG5WZaoxGPQR/LNFXpxp0vsUnqfkWXfNE= -github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 h1:ActQgdTNQej/RuUJjB9uxYVLDOvRGtUreXF8L3c8wyg= -github.com/aws/aws-sdk-go-v2/service/sso v1.11.26/go.mod h1:uB9tV79ULEZUXc6Ob18A46KSQ0JDlrplPni9XW6Ot60= github.com/aws/aws-sdk-go-v2/service/sso v1.24.15 h1:/eE3DogBjYlvlbhd2ssWyeuovWunHLxfgw3s/OJa4GQ= github.com/aws/aws-sdk-go-v2/service/sso v1.24.15/go.mod h1:2PCJYpi7EKeA5SkStAmZlF6fi0uUABuhtF8ILHjGc3Y= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDAns8B9DgnRooB1PVhY+Q= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 h1:M/zwXiL2iXUrHputuXgmO94TVNmcenPHxgLXLutodKE= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14/go.mod h1:RVwIw3y/IqxC2YEXSIkAzRDdEU1iRabDPaYjpGCbCGQ= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY= -github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c= github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 h1:TzeR06UCMUq+KA3bDkujxK1GVGy+G8qQN/QVYzGLkQE= github.com/aws/aws-sdk-go-v2/service/sts v1.33.14/go.mod h1:dspXf/oYWGWo6DEvj98wpaTeqt5+DMidZD0A9BYTizc= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= -github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -148,7 +124,6 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -197,8 +172,6 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -471,7 +444,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/proxy/proxy.go b/proxy/proxy.go index e9af3bf7..6fbf4190 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,11 +1,13 @@ package proxy import ( + "bytes" "crypto/tls" "fmt" "net" "strconv" "sync" + "text/template" "github.com/grepplabs/kafka-proxy/config" "github.com/grepplabs/kafka-proxy/pkg/libs/util" @@ -190,17 +192,57 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) port := l.Addr().(*net.TCPAddr).Port address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port)) - dynamicAdvertisedListener := p.dynamicAdvertisedListener - if dynamicAdvertisedListener == "" { - dynamicAdvertisedListener = p.defaultListenerIP + dynamicAdvertisedHost, dynamicAdvertisedPort, err := p.getDynamicAdvertisedAddress(cfg.BrokerID, port) + if err != nil { + return "", 0, err } - cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port)) + cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedHost, fmt.Sprint(dynamicAdvertisedPort)) cfg.ListenerAddress = address p.brokerToListenerConfig[brokerAddress] = cfg logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress) - return dynamicAdvertisedListener, int32(port), nil + return dynamicAdvertisedHost, int32(dynamicAdvertisedPort), nil +} + +func (p *Listeners) getDynamicAdvertisedAddress(brokerID int32, port int) (string, int, error) { + dynamicAdvertisedListener := p.dynamicAdvertisedListener + if dynamicAdvertisedListener == "" { + return p.defaultListenerIP, port, nil + } + dynamicAdvertisedListener, err := p.templateDynamicAdvertisedAddress(brokerID) + if err != nil { + return "", 0, err + } + var ( + dynamicAdvertisedHost = dynamicAdvertisedListener + dynamicAdvertisedPort = port + ) + advHost, advPortStr, err := net.SplitHostPort(dynamicAdvertisedListener) + if err == nil { + if advPort, err := strconv.Atoi(advPortStr); err == nil { + dynamicAdvertisedHost = advHost + dynamicAdvertisedPort = advPort + } + } + return dynamicAdvertisedHost, dynamicAdvertisedPort, nil +} + +func (p *Listeners) templateDynamicAdvertisedAddress(brokerID int32) (string, error) { + tmpl, err := template.New("dynamicAdvertisedHost").Option("missingkey=error").Parse(p.dynamicAdvertisedListener) + if err != nil { + return "", fmt.Errorf("failed to parse host template '%s': %w", p.dynamicAdvertisedListener, err) + } + var buf bytes.Buffer + data := map[string]any{ + "brokerId": brokerID, + "brokerID": brokerID, + } + err = tmpl.Execute(&buf, data) + if err != nil { + return "", fmt.Errorf("failed to execute host template '%s': %w", p.dynamicAdvertisedListener, err) + } + return buf.String(), nil } func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, error) { diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 30242cfd..9006b681 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -283,3 +283,135 @@ func TestGetBrokerToListenerConfig(t *testing.T) { assert.ObjectsAreEqual(tt.mapping, mapping) } } + +func TestGetDynamicAdvertisedAddress(t *testing.T) { + tests := []struct { + name string + dynamicAdvertisedListener string + defaultListenerIP string + brokerID int32 + port int + expectedHost string + expectedPort int + expectError bool + }{ + { + name: "Default listener IP is 127.0.0.1", + dynamicAdvertisedListener: "", + defaultListenerIP: "127.0.0.1", + brokerID: 1, + port: 9092, + expectedHost: "127.0.0.1", + expectedPort: 9092, + expectError: false, + }, + { + name: "Default listener IP is 0.0.0.0", + dynamicAdvertisedListener: "", + defaultListenerIP: "0.0.0.0", + brokerID: 1, + port: 9092, + expectedHost: "0.0.0.0", + expectedPort: 9092, + expectError: false, + }, + { + name: "Default listener IP is localhost", + dynamicAdvertisedListener: "", + defaultListenerIP: "localhost", + brokerID: 1, + port: 9092, + expectedHost: "localhost", + expectedPort: 9092, + expectError: false, + }, + { + name: "Dynamic listener no template, host is IP", + dynamicAdvertisedListener: "0.0.0.0", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "0.0.0.0", + expectedPort: 9093, + expectError: false, + }, + { + name: "Dynamic listener no template, host is IP and port is provided", + dynamicAdvertisedListener: "0.0.0.0:30000", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "0.0.0.0", + expectedPort: 30000, + expectError: false, + }, + { + name: "Dynamic listener no template, host is dns name", + dynamicAdvertisedListener: "kafka-proxy.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "kafka-proxy.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com", + expectedPort: 9093, + expectError: false, + }, + { + name: "Dynamic listener no template, host is dns name and port is provided", + dynamicAdvertisedListener: "kafka-proxy.grepplabs.com:30000", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "kafka-proxy.grepplabs.com", + expectedPort: 30000, + expectError: false, + }, + { + name: "Dynamic listener with template", + dynamicAdvertisedListener: "b-{{.brokerId}}.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "b-2.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com", + expectedPort: 9093, + expectError: false, + }, + { + name: "Dynamic listener with template and port is provided", + dynamicAdvertisedListener: "b-{{.brokerId}}.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com:30000", + defaultListenerIP: "127.0.0.1", + brokerID: 2, + port: 9093, + expectedHost: "b-2.provisionedmskclust.zgjvgc.c2.kafka.eu-central-1.amazonaws.com", + expectedPort: 30000, + expectError: false, + }, + { + name: "Invalid dynamic listener template", + dynamicAdvertisedListener: "broker-{{.invalid}}", + defaultListenerIP: "127.0.0.1", + brokerID: 3, + port: 9094, + expectedHost: "", + expectedPort: 0, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listeners := &Listeners{ + dynamicAdvertisedListener: tt.dynamicAdvertisedListener, + defaultListenerIP: tt.defaultListenerIP, + } + + host, port, err := listeners.getDynamicAdvertisedAddress(tt.brokerID, tt.port) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedHost, host) + assert.Equal(t, tt.expectedPort, port) + } + }) + } +}