From 81b3840d980631d9d21e42e638ad62291cf26c1a Mon Sep 17 00:00:00 2001 From: Nithin Katta Date: Fri, 20 Jun 2025 11:15:37 +0100 Subject: [PATCH 1/2] CLI - support authentication SASL_PLAINTEXT --- .env | 6 ++ Makefile | 9 +- README.md | 4 +- docker-compose.dev.secure.yml | 45 ++++++++ docker-compose.dev.yml | 14 +-- go.mod | 13 +-- go.sum | 37 +------ internal/core/commands/status.go | 4 +- pkg/cluster/cluster.go | 4 +- pkg/constants/constants.go | 7 +- pkg/session/session.go | 172 +++++++++++++++---------------- 11 files changed, 159 insertions(+), 156 deletions(-) create mode 100644 .env create mode 100644 docker-compose.dev.secure.yml diff --git a/.env b/.env new file mode 100644 index 0000000..f9f9f91 --- /dev/null +++ b/.env @@ -0,0 +1,6 @@ +OK_BROKERS="" +OK_IS_AUTHENTICATED="false" +OK_IS_SECURE_KAFKA="false" +OK_PASSWORD="" +OK_USERNAME="" +OK_VERSION="3.9.0" diff --git a/Makefile b/Makefile index 1345b9a..f631adc 100644 --- a/Makefile +++ b/Makefile @@ -4,10 +4,17 @@ BINARY_NAME=ok dev: $(CONTAINER_CMD) -f docker-compose.dev.yml up --build -d +secure-dev: + $(CONTAINER_CMD) -f docker-compose.dev.secure.yml up --build -d + dev-run: build install clean: - $(CONTAINER_CMD) -f docker-compose.dev.yml down + $(CONTAINER_CMD) -f docker-compose.dev.yml down -v + rm -rf bin/* + +secure-clean: + $(CONTAINER_CMD) -f docker-compose.dev.secure.yml down -v rm -rf bin/* setup: diff --git a/README.md b/README.md index b971a70..1adcfd4 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ OpenKommander is a command line utility and admin UI for Apache Kafka compatible 3. **Start the development environment** ```bash make setup - make dev + make dev #For secure use `make secure-dev` ``` 4. **Execute into the container** @@ -50,7 +50,7 @@ kafka: You can modify `config/config.yaml` to connect to different Kafka clusters: ```yaml -# Development environment (default) +# Development environment (default) [use kafka:9094 for secure - user bob/bobpassword] kafka: broker: kafka:9093 diff --git a/docker-compose.dev.secure.yml b/docker-compose.dev.secure.yml new file mode 100644 index 0000000..472b387 --- /dev/null +++ b/docker-compose.dev.secure.yml @@ -0,0 +1,45 @@ +services: + kafka: + image: 'bitnami/kafka:latest' + ports: + - '9094:9094' + environment: + - KAFKA_ENABLE_KRAFT=yes + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_LISTENERS=SECURE://:9094,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SECURE:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=SECURE://127.0.0.1:9094 + - KAFKA_NODE_ID=1 + - KAFKA_CLIENT_USERS=bob + - KAFKA_CLIENT_PASSWORDS=bobpassword + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 + - BITNAMI_DEBUG=true + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURE + - KAFKA_CLIENT_LISTENER_NAME=SECURE + - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN + volumes: + - kafka_data:/bitnami/kafka + networks: + - kafka-net + + app: + build: + context: . + dockerfile: docker/dev/Dockerfile.dev + ports: + - "8080:8080" + - "8081:8081" + depends_on: + - kafka + volumes: + - .:/app + networks: + - kafka-net + +networks: + kafka-net: + driver: bridge + +volumes: + kafka_data: diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 35a1e76..68e6ac1 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -18,13 +18,15 @@ services: - "9092:9092" - "9093:9093" environment: - - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - KAFKA_CFG_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,EXTERNAL://localhost:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - - BITNAMI_DEBUG=yes + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CLIENT_LISTENER_NAME=PLAINTEXT volumes: - kafka_data:/bitnami/kafka networks: diff --git a/go.mod b/go.mod index c4fc57b..721a797 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/IBM/sarama v1.45.2 github.com/gorilla/mux v1.8.1 github.com/jedib0t/go-pretty/v6 v6.6.7 + github.com/joho/godotenv v1.5.1 github.com/spf13/cobra v1.9.1 - github.com/spf13/viper v1.20.1 ) require ( @@ -15,8 +15,6 @@ require ( github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/fsnotify/fsnotify v1.8.0 // indirect - github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -29,21 +27,12 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect - github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.4.7 // indirect - github.com/sagikazarmark/locafero v0.7.0 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect - github.com/spf13/afero v1.12.0 // indirect - github.com/spf13/cast v1.7.1 // indirect github.com/spf13/pflag v1.0.6 // indirect - github.com/subosito/gotenv v1.6.0 // indirect - go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.38.0 // indirect golang.org/x/net v0.40.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.25.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 245e2bc..f1e1f1e 100644 --- a/go.sum +++ b/go.sum @@ -13,16 +13,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= -github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= -github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= -github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= -github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= -github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= @@ -51,16 +43,12 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jedib0t/go-pretty/v6 v6.6.7 h1:m+LbHpm0aIAPLzLbMfn8dc3Ht8MW7lsSO4MPItz/Uuo= github.com/jedib0t/go-pretty/v6 v6.6.7/go.mod h1:YwC5CE4fJ1HFUDeivSV1r//AmANFHyqczZk+U6BDALU= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= -github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -70,40 +58,21 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= -github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= -github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= -github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= -github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= -github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= -github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= -github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= -github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= -github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= @@ -144,8 +113,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/core/commands/status.go b/internal/core/commands/status.go index 55de8f4..3801b0b 100644 --- a/internal/core/commands/status.go +++ b/internal/core/commands/status.go @@ -24,7 +24,7 @@ func NewFailure(err string, httpCode int) *Failure { func GetAdminClient() (sarama.ClusterAdmin, *Failure) { currentSession := session.GetCurrentSession() if !currentSession.IsAuthenticated() { - return nil, NewFailure("No active session found", http.StatusUnauthorized) + return nil, NewFailure("No active session found \n", http.StatusUnauthorized) } client, err := currentSession.GetAdminClient() @@ -38,7 +38,7 @@ func GetAdminClient() (sarama.ClusterAdmin, *Failure) { func GetClient() (sarama.Client, *Failure) { currentSession := session.GetCurrentSession() if !currentSession.IsAuthenticated() { - return nil, NewFailure("No active session found", http.StatusUnauthorized) + return nil, NewFailure("No active session found \n", http.StatusUnauthorized) } client, err := currentSession.GetClient() diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 03ca7af..91610f4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,9 +11,7 @@ type Cluster struct { Config *sarama.Config } -func NewCluster(brokers []string, version sarama.KafkaVersion) *Cluster { - config := sarama.NewConfig() - config.Version = version +func NewCluster(config *sarama.Config, brokers []string) *Cluster { return &Cluster{ Brokers: brokers, Config: config, diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a482f57..2d34dd4 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -5,10 +5,9 @@ import ( ) var ( - OpenKommanderConfigFilename = ".openkommander_config" - KafkaVersion = "3.9.0" - SaramaKafkaVersion sarama.KafkaVersion = sarama.V3_9_0_0 - KafkaBroker = "kafka:9093" + KafkaVersion = "4.0.0" + SaramaKafkaVersion sarama.KafkaVersion = sarama.V4_0_0_0 + KafkaBroker = "kafka:9093" ) func init() { diff --git a/pkg/session/session.go b/pkg/session/session.go index 93d372f..480b3b5 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3,9 +3,10 @@ package session import ( "bufio" "context" - "encoding/json" "fmt" + "github.com/joho/godotenv" "os" + "strconv" "strings" "time" @@ -29,27 +30,24 @@ type session struct { adminClient sarama.ClusterAdmin isAuthenticated bool version sarama.KafkaVersion -} - -type SessionData struct { - Brokers []string `json:"brokers"` - IsAuthenticated bool `json:"isAuthenticated"` - Version string `json:"version"` + isSecureKafka bool + username string + password string } func (s *session) Info() string { - return fmt.Sprintf("Brokers: %v, Authenticated: %v, Version: %v", s.brokers, s.isAuthenticated, s.version) + return fmt.Sprintf("Brokers: %v, Authenticated: %v, Version: %v, Secure: %v", s.brokers, s.isAuthenticated, s.version, s.isSecureKafka) } func (s *session) Connect(ctx context.Context) (sarama.Client, error) { if s.client != nil { return s.client, nil } - client, err := cluster.NewCluster(s.brokers, s.version).Connect(ctx) + client, err := cluster.NewCluster(getSaramaConfig(s), s.brokers).Connect(ctx) if err != nil { return nil, fmt.Errorf("error connecting to cluster: %w", err) } - adminClient, err := cluster.NewCluster(s.brokers, s.version).ConnectAdmin(ctx) + adminClient, err := cluster.NewCluster(getSaramaConfig(s), s.brokers).ConnectAdmin(ctx) if err != nil { return nil, fmt.Errorf("error connecting to cluster as admin: %w", err) } @@ -69,6 +67,10 @@ func (s *session) Disconnect() { s.adminClient = nil s.isAuthenticated = false s.version = constants.SaramaKafkaVersion + s.isSecureKafka = false + s.username = "" + s.password = "" + s.brokers = []string{} fmt.Println("Logged out successfully!") } @@ -96,7 +98,7 @@ func (s *session) GetAdminClient() (sarama.ClusterAdmin, error) { } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - adminClient, err := cluster.NewCluster(s.brokers, s.version).ConnectAdmin(ctx) + adminClient, err := cluster.NewCluster(getSaramaConfig(s), s.brokers).ConnectAdmin(ctx) if err != nil { return nil, err } @@ -114,36 +116,16 @@ func GetCurrentSession() *session { var currentSession *session -func createDefaultSession() error { - file, err := os.Create(constants.OpenKommanderConfigFilename) - if err != nil { - fmt.Println("Error creating session file:", err) - return err - } - defer func() { - if err := file.Close(); err != nil { - fmt.Printf("Error closing file: %v\n", err) - } - }() - - sessionData := SessionData{Brokers: []string{}, IsAuthenticated: false, Version: constants.SaramaKafkaVersion.String()} - return json.NewEncoder(file).Encode(sessionData) -} - func saveSession() error { - file, err := os.Create(constants.OpenKommanderConfigFilename) - if err != nil { - fmt.Println("Error creating session file:", err) - return err - } - defer func() { - if err := file.Close(); err != nil { - fmt.Printf("Error closing file: %v\n", err) - } - }() - - sessionData := SessionData{Brokers: currentSession.brokers, IsAuthenticated: currentSession.isAuthenticated, Version: currentSession.version.String()} - err = json.NewEncoder(file).Encode(sessionData) + envMap := make(map[string]string) + envMap["OK_BROKERS"] = strings.Join(currentSession.brokers, ",") + envMap["OK_IS_AUTHENTICATED"] = strconv.FormatBool(currentSession.isAuthenticated) + envMap["OK_VERSION"] = currentSession.version.String() + envMap["OK_IS_SECURE_KAFKA"] = strconv.FormatBool(currentSession.isSecureKafka) + envMap["OK_USERNAME"] = currentSession.username + envMap["OK_PASSWORD"] = currentSession.password + + err := godotenv.Write(envMap, ".env") if err != nil { fmt.Println("Error encoding session data:", err) return err @@ -152,37 +134,12 @@ func saveSession() error { } func loadSession() error { - file, err := os.Open(constants.OpenKommanderConfigFilename) - if err != nil { - err = createDefaultSession() - if err != nil { - fmt.Println("Error creating session file:", err) - return err - } - - file, err = os.Open(constants.OpenKommanderConfigFilename) - if err != nil { - fmt.Println("Error opening session file:", err) - return err - } - } - defer func() { - if err := file.Close(); err != nil { - fmt.Printf("Error closing file: %v\n", err) - } - }() - - var data SessionData - decoder := json.NewDecoder(file) - err = decoder.Decode(&data) - if err != nil { - fmt.Println("Error decoding session data:", err) - return err - } - - currentSession.brokers = data.Brokers - currentSession.isAuthenticated = data.IsAuthenticated - currentSession.version, _ = sarama.ParseKafkaVersion(data.Version) + currentSession.brokers = strings.Split(os.Getenv("OK_BROKERS"), ",") + currentSession.isAuthenticated, _ = strconv.ParseBool(os.Getenv("OK_IS_AUTHENTICATED")) + currentSession.version, _ = sarama.ParseKafkaVersion(os.Getenv("OK_VERSION")) + currentSession.isSecureKafka, _ = strconv.ParseBool(os.Getenv("OK_IS_SECURE_KAFKA")) + currentSession.username = os.Getenv("OK_USERNAME") + currentSession.password = os.Getenv("OK_PASSWORD") return nil } @@ -193,10 +150,13 @@ func init() { client: nil, adminClient: nil, version: constants.SaramaKafkaVersion, + isSecureKafka: false, + username: "", + password: "", } - + envReadError := godotenv.Load() err := loadSession() - if err != nil { + if err != nil || envReadError != nil { fmt.Println("Error loading session:", err) } } @@ -206,29 +166,17 @@ func Login() { fmt.Println("Already logged in.") return } - - versionReader := bufio.NewReader(os.Stdin) - fmt.Printf("Enter kafka version [%s]: ", constants.KafkaVersion) - - version, _ := versionReader.ReadString('\n') - version = strings.TrimSpace(version) - if version == "" { - version = constants.KafkaVersion - } + version := readUserInput("Enter kafka version [" + constants.KafkaVersion + "]: ") currentSession.version, _ = sarama.ParseKafkaVersion(version) - reader := bufio.NewReader(os.Stdin) - - fmt.Printf("Enter broker address [%s]: ", constants.KafkaBroker) - - broker, _ := reader.ReadString('\n') - broker = strings.TrimSpace(broker) - if broker == "" { - broker = constants.KafkaBroker + currentSession.isSecureKafka = readUserClosedInput("Is your kafka configured with SASL_PLAINTEXT security? (y/n): ") + if currentSession.isSecureKafka { + currentSession.username = readUserInput("Enter configured username: ") + currentSession.password = readUserInput("Enter configured password: ") } + broker := readUserInput("Enter broker address [" + constants.KafkaBroker + "]: ") currentSession.brokers = []string{broker} - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() client, err := currentSession.Connect(ctx) @@ -270,3 +218,45 @@ func DisplaySession() { fmt.Println("No active session.") } } + +func readUserInput(inputMessage string) string { + reader := bufio.NewReader(os.Stdin) + fmt.Printf(inputMessage) + + input, _ := reader.ReadString('\n') + input = strings.TrimSpace(input) + if input == "" { + fmt.Println("Please enter a valid Input") + readUserInput(inputMessage) + } + return input +} + +func readUserClosedInput(inputMessage string) bool { + reader := bufio.NewReader(os.Stdin) + fmt.Printf(inputMessage) + + input, _ := reader.ReadString('\n') + input = strings.TrimSpace(input) + if input == "y" { + return true + } else if input == "n" { + return false + } else { + fmt.Println("Please enter a valid Input") + readUserClosedInput(inputMessage) + } + return false +} + +func getSaramaConfig(s *session) *sarama.Config { + config := sarama.NewConfig() + config.Version = s.version + if s.isSecureKafka { + config.Net.SASL.Enable = true + config.Net.SASL.User = s.username + config.Net.SASL.Password = s.password + config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + } + return config +} From 9e071e2b5f6e040449a047485534883d966856d7 Mon Sep 17 00:00:00 2001 From: Nithin Katta Date: Fri, 20 Jun 2025 11:52:09 +0100 Subject: [PATCH 2/2] CLI - support authentication SASL_PLAINTEXT --- .env | 2 +- pkg/session/session.go | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.env b/.env index f9f9f91..d7d60bf 100644 --- a/.env +++ b/.env @@ -3,4 +3,4 @@ OK_IS_AUTHENTICATED="false" OK_IS_SECURE_KAFKA="false" OK_PASSWORD="" OK_USERNAME="" -OK_VERSION="3.9.0" +OK_VERSION="4.0.0" diff --git a/pkg/session/session.go b/pkg/session/session.go index 480b3b5..f60417a 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -221,28 +221,29 @@ func DisplaySession() { func readUserInput(inputMessage string) string { reader := bufio.NewReader(os.Stdin) - fmt.Printf(inputMessage) + fmt.Print(inputMessage) input, _ := reader.ReadString('\n') input = strings.TrimSpace(input) if input == "" { fmt.Println("Please enter a valid Input") - readUserInput(inputMessage) + return readUserInput(inputMessage) } return input } func readUserClosedInput(inputMessage string) bool { reader := bufio.NewReader(os.Stdin) - fmt.Printf(inputMessage) + fmt.Print(inputMessage) input, _ := reader.ReadString('\n') input = strings.TrimSpace(input) - if input == "y" { + switch input { + case "y": return true - } else if input == "n" { + case "n": return false - } else { + default: fmt.Println("Please enter a valid Input") readUserClosedInput(inputMessage) }