From 1bb402b61caa90de5c6d0bddbfc1abcd460462f3 Mon Sep 17 00:00:00 2001 From: childe Date: Thu, 11 Jul 2024 15:28:50 +0800 Subject: [PATCH] use json-format config arg --- command/cmd/console-consumer.go | 41 +++++++++++++++++++------------ command/cmd/console-producer.go | 26 +++++++++++--------- command/cmd/group-consumer.go | 32 +++++++++++++++--------- command/cmd/simple-consumer.go | 43 ++++++++++++++++++++++++--------- command/cmd/simple-producer.go | 29 +++++++++++++--------- consumer.go | 1 + simple_consumer.go | 1 + 7 files changed, 109 insertions(+), 64 deletions(-) diff --git a/command/cmd/console-consumer.go b/command/cmd/console-consumer.go index 9ab4a36..72bd844 100644 --- a/command/cmd/console-consumer.go +++ b/command/cmd/console-consumer.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "strings" "github.com/childe/healer" "github.com/spf13/cobra" @@ -17,31 +16,43 @@ var consoleConsumerCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { brokers, err := cmd.Flags().GetString("brokers") + if err != nil { + return err + } consumerConfig := map[string]interface{}{"bootstrap.servers": brokers} client, err := cmd.Flags().GetString("client") + if err != nil { + return err + } if client != "" { consumerConfig["client.id"] = client } partitions, err := cmd.Flags().GetIntSlice("partitions") + if err != nil { + return err + } topic, err := cmd.Flags().GetString("topic") + if err != nil { + return err + } if topic == "" || err != nil { return errors.New("topic must be specified") } maxMessages, err := cmd.Flags().GetInt("max-messages") + if err != nil { + return err + } ifJson, err := cmd.Flags().GetBool("json") + if err != nil { + return err + } config, err := cmd.Flags().GetString("config") - - for _, kv := range strings.Split(config, ",") { - if strings.Trim(kv, " ") == "" { - continue - } - t := strings.SplitN(kv, "=", 2) - if len(t) != 2 { - return fmt.Errorf("invalid config : %s", kv) - } - consumerConfig[t[0]] = t[1] + if err != nil { + return err } + json.Unmarshal([]byte(config), &consumerConfig) + var ( consumer *healer.Consumer ) @@ -51,10 +62,8 @@ var consoleConsumerCmd = &cobra.Command{ return err } } else { - assign := make(map[string][]int) - assign[topic] = make([]int, 0) - for _, pid := range partitions { - assign[topic] = append(assign[topic], pid) + assign := map[string][]int{ + topic: partitions, } if consumer, err = healer.NewConsumer(consumerConfig); err != nil { return err @@ -81,7 +90,7 @@ var consoleConsumerCmd = &cobra.Command{ } func init() { - consoleConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go") + consoleConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`) consoleConsumerCmd.Flags().IntSlice("partitions", nil, "partition ids, comma-separated") consoleConsumerCmd.Flags().Int("max-messages", math.MaxInt, "the number of messages to output") consoleConsumerCmd.Flags().Bool("printoffset", true, "if print offset of each message") diff --git a/command/cmd/console-producer.go b/command/cmd/console-producer.go index ffac93a..7e4e634 100644 --- a/command/cmd/console-producer.go +++ b/command/cmd/console-producer.go @@ -2,10 +2,10 @@ package cmd import ( "bufio" + "encoding/json" "errors" "fmt" "os" - "strings" "github.com/childe/healer" "github.com/spf13/cobra" @@ -17,28 +17,30 @@ var consoleProducerCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { brokers, err := cmd.Flags().GetString("brokers") + if err != nil { + return err + } producerConfig := map[string]interface{}{"bootstrap.servers": brokers} client, err := cmd.Flags().GetString("client") + if err != nil { + return err + } if client != "" { producerConfig["client.id"] = client } topic, err := cmd.Flags().GetString("topic") + if err != nil { + return err + } if topic == "" || err != nil { return errors.New("topic must be specified") } config, err := cmd.Flags().GetString("config") - - for _, kv := range strings.Split(config, ",") { - if strings.Trim(kv, " ") == "" { - continue - } - t := strings.SplitN(kv, "=", 2) - if len(t) != 2 { - return fmt.Errorf("invalid config : %s", kv) - } - producerConfig[t[0]] = t[1] + if err != nil { + return err } + json.Unmarshal([]byte(config), &producerConfig) consoleProducer, err := healer.NewProducer(topic, producerConfig) if err != nil { @@ -64,6 +66,6 @@ var consoleProducerCmd = &cobra.Command{ } func init() { - consoleProducerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go") + consoleProducerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`) consoleProducerCmd.Flags().StringP("topic", "t", "", "topic name") } diff --git a/command/cmd/group-consumer.go b/command/cmd/group-consumer.go index ad9f468..ad09659 100644 --- a/command/cmd/group-consumer.go +++ b/command/cmd/group-consumer.go @@ -1,10 +1,10 @@ package cmd import ( + "encoding/json" "errors" "fmt" "math" - "strings" "github.com/childe/healer" "github.com/spf13/cobra" @@ -16,36 +16,44 @@ var groupConsumerCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { brokers, err := cmd.Flags().GetString("brokers") + if err != nil { + return err + } consumerConfig := map[string]interface{}{"bootstrap.servers": brokers} client, err := cmd.Flags().GetString("client") + if err != nil { + return err + } if client != "" { consumerConfig["client.id"] = client } topic, err := cmd.Flags().GetString("topic") + if err != nil { + return err + } if topic == "" || err != nil { return errors.New("topic must be specified") } group, err := cmd.Flags().GetString("group") + if err != nil { + return err + } if group == "" || err != nil { return errors.New("group must be specified") } consumerConfig["group.id"] = group maxMessages, err := cmd.Flags().GetInt("max-messages") + if err != nil { + return err + } // printOffset, err := cmd.Flags().GetBool("printoffset") // jsonFormat, err := cmd.Flags().GetBool("json") config, err := cmd.Flags().GetString("config") - - for _, kv := range strings.Split(config, ",") { - if strings.Trim(kv, " ") == "" { - continue - } - t := strings.SplitN(kv, "=", 2) - if len(t) != 2 { - return fmt.Errorf("invalid config : %s", kv) - } - consumerConfig[t[0]] = t[1] + if err != nil { + return err } + json.Unmarshal([]byte(config), &consumerConfig) consumer, err := healer.NewGroupConsumer(topic, consumerConfig) if err != nil { @@ -76,7 +84,7 @@ var groupConsumerCmd = &cobra.Command{ func init() { groupConsumerCmd.Flags().StringP("topic", "t", "", "topic name") groupConsumerCmd.Flags().StringP("group", "g", "", "group id") - groupConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go") + groupConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`) groupConsumerCmd.Flags().Int("max-messages", math.MaxInt, "the number of messages to output") groupConsumerCmd.Flags().Bool("printoffset", true, "if print offset of each message") groupConsumerCmd.Flags().Bool("json", false, "print message in json format") diff --git a/command/cmd/simple-consumer.go b/command/cmd/simple-consumer.go index 7ed128c..6e3b864 100644 --- a/command/cmd/simple-consumer.go +++ b/command/cmd/simple-consumer.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "strings" "github.com/childe/healer" "github.com/spf13/cobra" @@ -18,33 +17,53 @@ var simpleConsumerCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { brokers, err := cmd.Flags().GetString("brokers") + if err != nil { + return err + } consumerConfig := map[string]interface{}{"bootstrap.servers": brokers} client, err := cmd.Flags().GetString("client") + if err != nil { + return err + } if client != "" { consumerConfig["client.id"] = client } partition, err := cmd.Flags().GetInt32("partition") + if err != nil { + return err + } topic, err := cmd.Flags().GetString("topic") + if err != nil { + return err + } if topic == "" || err != nil { return errors.New("topic must be specified") } offset, err := cmd.Flags().GetInt64("offset") + if err != nil { + return err + } maxMessages, err := cmd.Flags().GetInt32("max-messages") + if err != nil { + return err + } stopOffset, err := cmd.Flags().GetInt64("stopoffset") + if err != nil { + return err + } printOffset, err := cmd.Flags().GetBool("printoffset") + if err != nil { + return err + } jsonFormat, err := cmd.Flags().GetBool("json") + if err != nil { + return err + } config, err := cmd.Flags().GetString("config") - - for _, kv := range strings.Split(config, ",") { - if strings.Trim(kv, " ") == "" { - continue - } - t := strings.SplitN(kv, "=", 2) - if len(t) != 2 { - return fmt.Errorf("invalid config : %s", kv) - } - consumerConfig[t[0]] = t[1] + if err != nil { + return err } + json.Unmarshal([]byte(config), &consumerConfig) simpleConsumer, err := healer.NewSimpleConsumer(topic, partition, consumerConfig) if err != nil { @@ -88,7 +107,7 @@ var simpleConsumerCmd = &cobra.Command{ } func init() { - simpleConsumerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go") + simpleConsumerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`) simpleConsumerCmd.Flags().Int32("partition", 0, "partition id") simpleConsumerCmd.Flags().Int64("offset", -1, "the offset to consume from, -2 which means from beginning; while value -1 means from end") simpleConsumerCmd.Flags().Int32("max-messages", math.MaxInt32, "the number of messages to output") diff --git a/command/cmd/simple-producer.go b/command/cmd/simple-producer.go index 79f22a5..1fbb6ce 100644 --- a/command/cmd/simple-producer.go +++ b/command/cmd/simple-producer.go @@ -3,10 +3,10 @@ package cmd import ( "bufio" "context" + "encoding/json" "errors" "fmt" "os" - "strings" "github.com/childe/healer" "github.com/spf13/cobra" @@ -18,29 +18,34 @@ var simpleProducerCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { brokers, err := cmd.Flags().GetString("brokers") + if err != nil { + return err + } producerConfig := map[string]interface{}{"bootstrap.servers": brokers} client, err := cmd.Flags().GetString("client") + if err != nil { + return err + } if client != "" { producerConfig["client.id"] = client } partition, err := cmd.Flags().GetInt32("partition") + if err != nil { + return err + } topic, err := cmd.Flags().GetString("topic") + if err != nil { + return err + } if topic == "" || err != nil { return errors.New("topic must be specified") } config, err := cmd.Flags().GetString("config") - - for _, kv := range strings.Split(config, ",") { - if strings.Trim(kv, " ") == "" { - continue - } - t := strings.SplitN(kv, "=", 2) - if len(t) != 2 { - return fmt.Errorf("invalid config : %s", kv) - } - producerConfig[t[0]] = t[1] + if err != nil { + return err } + json.Unmarshal([]byte(config), &producerConfig) simpleProducer, err := healer.NewSimpleProducer(context.Background(), topic, partition, producerConfig) if err != nil { @@ -65,7 +70,7 @@ var simpleProducerCmd = &cobra.Command{ } func init() { - simpleProducerCmd.Flags().String("config", "", "XX=YY,AA=ZZ. refer to https://github.com/childe/healer/blob/master/config.go") + simpleProducerCmd.Flags().String("config", "", `{"xx"="yy","aa"="zz"} refer to https://github.com/childe/healer/blob/master/config.go`) simpleProducerCmd.Flags().Int32("partition", 0, "partition id") simpleProducerCmd.Flags().StringP("topic", "t", "", "topic name") } diff --git a/consumer.go b/consumer.go index 82559ef..6a22d05 100644 --- a/consumer.go +++ b/consumer.go @@ -20,6 +20,7 @@ type Consumer struct { // NewConsumer creates a new consumer instance func NewConsumer(config interface{}, topics ...string) (*Consumer, error) { cfg, err := createConsumerConfig(config) + logger.Info("create group consumer", "origin_config", config, "final_config", cfg) if err != nil { return nil, err } diff --git a/simple_consumer.go b/simple_consumer.go index b40719f..1465f2f 100644 --- a/simple_consumer.go +++ b/simple_consumer.go @@ -90,6 +90,7 @@ func NewSimpleConsumerWithBrokers(topic string, partitionID int32, config Consum // NewSimpleConsumer create a simple consumer func NewSimpleConsumer(topic string, partitionID int32, config interface{}) (*SimpleConsumer, error) { cfg, err := createConsumerConfig(config) + logger.Info("create group consumer", "origin_config", config, "final_config", cfg) if err != nil { return nil, err }