Skip to content

Commit

Permalink
use json-format config arg
Browse files Browse the repository at this point in the history
  • Loading branch information
childe committed Jul 11, 2024
1 parent 0a98821 commit 1bb402b
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 64 deletions.
41 changes: 25 additions & 16 deletions command/cmd/console-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math"
"strings"

"github.com/childe/healer"
"github.com/spf13/cobra"
Expand All @@ -17,31 +16,43 @@ var consoleConsumerCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
brokers, err := cmd.Flags().GetString("brokers")
if err != nil {
return err
}
consumerConfig := map[string]interface{}{"bootstrap.servers": brokers}
client, err := cmd.Flags().GetString("client")
if err != nil {
return err
}
if client != "" {
consumerConfig["client.id"] = client
}
partitions, err := cmd.Flags().GetIntSlice("partitions")
if err != nil {
return err
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}
if topic == "" || err != nil {
return errors.New("topic must be specified")
}
maxMessages, err := cmd.Flags().GetInt("max-messages")
if err != nil {
return err
}
ifJson, err := cmd.Flags().GetBool("json")
if err != nil {
return err
}
config, err := cmd.Flags().GetString("config")

for _, kv := range strings.Split(config, ",") {
if strings.Trim(kv, " ") == "" {
continue
}
t := strings.SplitN(kv, "=", 2)
if len(t) != 2 {
return fmt.Errorf("invalid config : %s", kv)
}
consumerConfig[t[0]] = t[1]
if err != nil {
return err
}

json.Unmarshal([]byte(config), &consumerConfig)

var (
consumer *healer.Consumer
)
Expand All @@ -51,10 +62,8 @@ var consoleConsumerCmd = &cobra.Command{
return err
}
} else {
assign := make(map[string][]int)
assign[topic] = make([]int, 0)
for _, pid := range partitions {
assign[topic] = append(assign[topic], pid)
assign := map[string][]int{
topic: partitions,
}
if consumer, err = healer.NewConsumer(consumerConfig); err != nil {
return err
Expand All @@ -81,7 +90,7 @@ var consoleConsumerCmd = &cobra.Command{
}

func init() {
consoleConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go")
consoleConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`)
consoleConsumerCmd.Flags().IntSlice("partitions", nil, "partition ids, comma-separated")
consoleConsumerCmd.Flags().Int("max-messages", math.MaxInt, "the number of messages to output")
consoleConsumerCmd.Flags().Bool("printoffset", true, "if print offset of each message")
Expand Down
26 changes: 14 additions & 12 deletions command/cmd/console-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package cmd

import (
"bufio"
"encoding/json"
"errors"
"fmt"
"os"
"strings"

"github.com/childe/healer"
"github.com/spf13/cobra"
Expand All @@ -17,28 +17,30 @@ var consoleProducerCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
brokers, err := cmd.Flags().GetString("brokers")
if err != nil {
return err
}
producerConfig := map[string]interface{}{"bootstrap.servers": brokers}
client, err := cmd.Flags().GetString("client")
if err != nil {
return err
}
if client != "" {
producerConfig["client.id"] = client
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}
if topic == "" || err != nil {
return errors.New("topic must be specified")
}

config, err := cmd.Flags().GetString("config")

for _, kv := range strings.Split(config, ",") {
if strings.Trim(kv, " ") == "" {
continue
}
t := strings.SplitN(kv, "=", 2)
if len(t) != 2 {
return fmt.Errorf("invalid config : %s", kv)
}
producerConfig[t[0]] = t[1]
if err != nil {
return err
}
json.Unmarshal([]byte(config), &producerConfig)

consoleProducer, err := healer.NewProducer(topic, producerConfig)
if err != nil {
Expand All @@ -64,6 +66,6 @@ var consoleProducerCmd = &cobra.Command{
}

func init() {
consoleProducerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go")
consoleProducerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`)
consoleProducerCmd.Flags().StringP("topic", "t", "", "topic name")
}
32 changes: 20 additions & 12 deletions command/cmd/group-consumer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cmd

import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"

"github.com/childe/healer"
"github.com/spf13/cobra"
Expand All @@ -16,36 +16,44 @@ var groupConsumerCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
brokers, err := cmd.Flags().GetString("brokers")
if err != nil {
return err
}
consumerConfig := map[string]interface{}{"bootstrap.servers": brokers}
client, err := cmd.Flags().GetString("client")
if err != nil {
return err
}
if client != "" {
consumerConfig["client.id"] = client
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}
if topic == "" || err != nil {
return errors.New("topic must be specified")
}
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
}
if group == "" || err != nil {
return errors.New("group must be specified")
}
consumerConfig["group.id"] = group

maxMessages, err := cmd.Flags().GetInt("max-messages")
if err != nil {
return err
}
// printOffset, err := cmd.Flags().GetBool("printoffset")
// jsonFormat, err := cmd.Flags().GetBool("json")
config, err := cmd.Flags().GetString("config")

for _, kv := range strings.Split(config, ",") {
if strings.Trim(kv, " ") == "" {
continue
}
t := strings.SplitN(kv, "=", 2)
if len(t) != 2 {
return fmt.Errorf("invalid config : %s", kv)
}
consumerConfig[t[0]] = t[1]
if err != nil {
return err
}
json.Unmarshal([]byte(config), &consumerConfig)

consumer, err := healer.NewGroupConsumer(topic, consumerConfig)
if err != nil {
Expand Down Expand Up @@ -76,7 +84,7 @@ var groupConsumerCmd = &cobra.Command{
func init() {
groupConsumerCmd.Flags().StringP("topic", "t", "", "topic name")
groupConsumerCmd.Flags().StringP("group", "g", "", "group id")
groupConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go")
groupConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`)
groupConsumerCmd.Flags().Int("max-messages", math.MaxInt, "the number of messages to output")
groupConsumerCmd.Flags().Bool("printoffset", true, "if print offset of each message")
groupConsumerCmd.Flags().Bool("json", false, "print message in json format")
Expand Down
43 changes: 31 additions & 12 deletions command/cmd/simple-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math"
"strings"

"github.com/childe/healer"
"github.com/spf13/cobra"
Expand All @@ -18,33 +17,53 @@ var simpleConsumerCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
brokers, err := cmd.Flags().GetString("brokers")
if err != nil {
return err
}
consumerConfig := map[string]interface{}{"bootstrap.servers": brokers}
client, err := cmd.Flags().GetString("client")
if err != nil {
return err
}
if client != "" {
consumerConfig["client.id"] = client
}
partition, err := cmd.Flags().GetInt32("partition")
if err != nil {
return err
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}
if topic == "" || err != nil {
return errors.New("topic must be specified")
}
offset, err := cmd.Flags().GetInt64("offset")
if err != nil {
return err
}
maxMessages, err := cmd.Flags().GetInt32("max-messages")
if err != nil {
return err
}
stopOffset, err := cmd.Flags().GetInt64("stopoffset")
if err != nil {
return err
}
printOffset, err := cmd.Flags().GetBool("printoffset")
if err != nil {
return err
}
jsonFormat, err := cmd.Flags().GetBool("json")
if err != nil {
return err
}
config, err := cmd.Flags().GetString("config")

for _, kv := range strings.Split(config, ",") {
if strings.Trim(kv, " ") == "" {
continue
}
t := strings.SplitN(kv, "=", 2)
if len(t) != 2 {
return fmt.Errorf("invalid config : %s", kv)
}
consumerConfig[t[0]] = t[1]
if err != nil {
return err
}
json.Unmarshal([]byte(config), &consumerConfig)

simpleConsumer, err := healer.NewSimpleConsumer(topic, partition, consumerConfig)
if err != nil {
Expand Down Expand Up @@ -88,7 +107,7 @@ var simpleConsumerCmd = &cobra.Command{
}

func init() {
simpleConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go")
simpleConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`)
simpleConsumerCmd.Flags().Int32("partition", 0, "partition id")
simpleConsumerCmd.Flags().Int64("offset", -1, "the offset to consume from, -2 which means from beginning; while value -1 means from end")
simpleConsumerCmd.Flags().Int32("max-messages", math.MaxInt32, "the number of messages to output")
Expand Down
29 changes: 17 additions & 12 deletions command/cmd/simple-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package cmd
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"

"github.com/childe/healer"
"github.com/spf13/cobra"
Expand All @@ -18,29 +18,34 @@ var simpleProducerCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
brokers, err := cmd.Flags().GetString("brokers")
if err != nil {
return err
}
producerConfig := map[string]interface{}{"bootstrap.servers": brokers}
client, err := cmd.Flags().GetString("client")
if err != nil {
return err
}
if client != "" {
producerConfig["client.id"] = client
}
partition, err := cmd.Flags().GetInt32("partition")
if err != nil {
return err
}
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}
if topic == "" || err != nil {
return errors.New("topic must be specified")
}

config, err := cmd.Flags().GetString("config")

for _, kv := range strings.Split(config, ",") {
if strings.Trim(kv, " ") == "" {
continue
}
t := strings.SplitN(kv, "=", 2)
if len(t) != 2 {
return fmt.Errorf("invalid config : %s", kv)
}
producerConfig[t[0]] = t[1]
if err != nil {
return err
}
json.Unmarshal([]byte(config), &producerConfig)

simpleProducer, err := healer.NewSimpleProducer(context.Background(), topic, partition, producerConfig)
if err != nil {
Expand All @@ -65,7 +70,7 @@ var simpleProducerCmd = &cobra.Command{
}

func init() {
simpleProducerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go")
simpleProducerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`)
simpleProducerCmd.Flags().Int32("partition", 0, "partition id")
simpleProducerCmd.Flags().StringP("topic", "t", "", "topic name")
}
1 change: 1 addition & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Consumer struct {
// NewConsumer creates a new consumer instance
func NewConsumer(config interface{}, topics ...string) (*Consumer, error) {
cfg, err := createConsumerConfig(config)
logger.Info("create group consumer", "origin_config", config, "final_config", cfg)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewSimpleConsumerWithBrokers(topic string, partitionID int32, config Consum
// NewSimpleConsumer create a simple consumer
func NewSimpleConsumer(topic string, partitionID int32, config interface{}) (*SimpleConsumer, error) {
cfg, err := createConsumerConfig(config)
logger.Info("create group consumer", "origin_config", config, "final_config", cfg)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 1bb402b

Please sign in to comment.