From d819522acaa6f26d1f6875967d5a5e281f1305f9 Mon Sep 17 00:00:00 2001 From: LeeHao <1838249551@qq.com> Date: Thu, 8 Aug 2024 23:12:34 +0800 Subject: [PATCH] feat-cdc:use multi chann communication between consumer and pika master Signed-off-by: LeeHao <1838249551@qq.com> --- tools/pika_cdc/README.md | 17 ++- tools/pika_cdc/conf/cdc.yml | 2 + tools/pika_cdc/conf/conf.go | 1 + tools/pika_cdc/consumer/consumer.go | 12 ++- tools/pika_cdc/consumer/kafka.go | 129 +++++++++++------------ tools/pika_cdc/consumer/protocol.go | 2 +- tools/pika_cdc/consumer/redis.go | 61 +++++++---- tools/pika_cdc/main.go | 4 +- tools/pika_cdc/pika/replprotocol.go | 10 +- tools/pika_cdc/pika/replprotocol_test.go | 13 --- tools/pika_cdc/pika/server.go | 42 +++++--- 11 files changed, 161 insertions(+), 132 deletions(-) diff --git a/tools/pika_cdc/README.md b/tools/pika_cdc/README.md index db06c9357b..b265fbc16e 100644 --- a/tools/pika_cdc/README.md +++ b/tools/pika_cdc/README.md @@ -1,6 +1,17 @@ +# Pika cdc +**A tool for incremental synchronization of pika command** + +By imitating a pika slave + # Build -## generate proto file +**Make sure the system has protoc installed** ```bash -cd pika -protoc --go_out=. *.proto +brew install protobuf +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +``` + +## Build pika cdc +```bash +make ``` \ No newline at end of file diff --git a/tools/pika_cdc/conf/cdc.yml b/tools/pika_cdc/conf/cdc.yml index 68d12c8375..9d2c4f8d70 100644 --- a/tools/pika_cdc/conf/cdc.yml +++ b/tools/pika_cdc/conf/cdc.yml @@ -14,6 +14,8 @@ retries : 0 # retry interval while send message failed(ms) retry_interval: 10 parallel_thread_size: 1 +# the size of the cached channel in pika cdc +buffer_msg_numbers: 10 diff --git a/tools/pika_cdc/conf/conf.go b/tools/pika_cdc/conf/conf.go index 1f3fb58b8f..70b48f9b5c 100644 --- a/tools/pika_cdc/conf/conf.go +++ b/tools/pika_cdc/conf/conf.go @@ -21,6 +21,7 @@ type PikaCdcConfig struct { Retries int `yaml:"retries"` RetryInterval int `yaml:"retry_interval"` ParallelThreadSize int `yaml:"parallel_thread_size"` + BufferMsgNumbers int `yaml:"buffer_msg_numbers"` } var ConfigInstance = PikaCdcConfig{} diff --git a/tools/pika_cdc/consumer/consumer.go b/tools/pika_cdc/consumer/consumer.go index a00e0aab09..f944a51370 100644 --- a/tools/pika_cdc/consumer/consumer.go +++ b/tools/pika_cdc/consumer/consumer.go @@ -5,7 +5,7 @@ import ( ) type Consumer interface { - SendCmdMessage(msg []byte) error + SendCmdMessage(dbName string, msg []byte) error Name() string Close() error Run() @@ -14,12 +14,16 @@ type Consumer interface { type Factory struct{} -func GenerateConsumers(config conf.PikaCdcConfig, msgChan *chan []byte) ([]Consumer, error) { +func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) { var consumers []Consumer - kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries) + + // kafka + kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries, msgChanns) consumers = append(consumers, kafka) + + // redis for _, r := range config.RedisServers { - newRedis, _ := NewRedis(r, msgChan) + newRedis, _ := NewRedis(r, msgChanns) consumers = append(consumers, newRedis) } return consumers, nil diff --git a/tools/pika_cdc/consumer/kafka.go b/tools/pika_cdc/consumer/kafka.go index 68eb408e1d..24fa4741cf 100644 --- a/tools/pika_cdc/consumer/kafka.go +++ b/tools/pika_cdc/consumer/kafka.go @@ -1,97 +1,90 @@ package consumer import ( - "context" - "errors" "github.com/segmentio/kafka-go" - "github.com/sirupsen/logrus" - "log" "sync" - "time" ) type Kafka struct { - servers []string - topic string - retries int - conns []*kafka.Conn - wg sync.WaitGroup - messageChan chan kafka.Message - stopChan chan bool - once sync.Once - protocol Protocol + servers []string + topic string + retries int + conns []*kafka.Conn + wg sync.WaitGroup + msgChanns map[string]chan []byte + stopChan chan bool + once sync.Once + protocol Protocol } -func (k *Kafka) SendCmdMessage(msg []byte) error { - select { - case k.messageChan <- kafka.Message{Value: k.protocol.ToConsumer(msg)}: - return nil - case <-time.After(2 * time.Second): - e := errors.New("send pika cmd timeout") - logrus.Warn("{}", e) - return e - } -} - -func (k *Kafka) sendMessage() { - for { - select { - case msg := <-k.messageChan: - for _, conn := range k.conns { - _, _ = conn.WriteMessages(msg) - } - case _ = <-k.stopChan: - return - } - } +func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error { + //retries := k.retries + //select { + //case *k.messageChan <- k.protocol.ToConsumer(msg): + // return nil + //case <-time.After(2 * time.Second): + // e := errors.New("send pika cmd timeout and retry send pika cmd") + // logrus.Warn("{}", e) + // retries-- + // if retries <= 0 { + // break + // } + //} + return nil } func (k *Kafka) Name() string { return "Kafka" } -func NewKafka(servers []string, topic string, retries int) (*Kafka, error) { +func NewKafka(servers []string, topic string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) { k := &Kafka{} - k.protocol = &KafkaProtocol{} - for _, server := range servers { - conn, err := kafka.DialLeader(context.Background(), "tcp", server, topic, 0) - if err != nil { - return k, err - } else { - conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - k.conns = append(k.conns, conn) - } - } - k.messageChan = make(chan kafka.Message) - k.stopChan = make(chan bool) - go k.sendMessage() + //k.protocol = &KafkaProtocol{} + //for _, server := range servers { + // conn, err := kafka.DialLeader(context.Background(), "tcp", server, topic, 0) + // if err != nil { + // return k, err + // } else { + // conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + // k.conns = append(k.conns, conn) + // } + //} + //k.messageChan = msgChanns + //k.stopChan = make(chan bool) + //k.retries = retries return k, nil } func (k *Kafka) close() error { - k.stopChan <- true - close(k.stopChan) - close(k.messageChan) - for _, conn := range k.conns { - err := conn.Close() - if err != nil { - log.Println(err) - return err - } - } + //k.stopChan <- true + //close(k.stopChan) + //close(*k.messageChan) + //for _, conn := range k.conns { + // err := conn.Close() + // if err != nil { + // logrus.Warn(err) + // return err + // } + //} return nil } func (k *Kafka) Close() error { - var err error - err = nil - k.once.Do(func() { - err = k.close() - }) - return err + //var err error + //err = nil + //k.once.Do(func() { + // err = k.close() + //}) + //return err + return nil } func (k *Kafka) Run() { - + //select { + //case msg := <-*k.messageChan: + // k.SendCmdMessage(msg) + //case <-k.stopChan: + // return + //} } func (k *Kafka) Stop() { - + k.stopChan <- true } diff --git a/tools/pika_cdc/consumer/protocol.go b/tools/pika_cdc/consumer/protocol.go index 7ce8da1aad..d0e1832866 100644 --- a/tools/pika_cdc/consumer/protocol.go +++ b/tools/pika_cdc/consumer/protocol.go @@ -12,5 +12,5 @@ func (rp RedisProtocol) ToConsumer(msg []byte) []byte { type KafkaProtocol struct{} func (kp KafkaProtocol) ToConsumer(msg []byte) []byte { - return nil + return msg } diff --git a/tools/pika_cdc/consumer/redis.go b/tools/pika_cdc/consumer/redis.go index 7951c83028..6d2dafa46e 100644 --- a/tools/pika_cdc/consumer/redis.go +++ b/tools/pika_cdc/consumer/redis.go @@ -4,27 +4,32 @@ import ( "bufio" "fmt" "net" + "sync" "time" ) type Redis struct { - protocol Protocol - conn net.Conn - msgChan *chan []byte + protocol Protocol + conns map[string]net.Conn + msgChanns map[string]chan []byte + stopChan chan bool } -func NewRedis(addr string, msgChan *chan []byte) (*Redis, error) { - r := &Redis{protocol: RedisProtocol{}, msgChan: msgChan} +func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) { + r := &Redis{protocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)} var err error - r.conn, err = net.Dial("tcp", addr) - if err != nil { - return nil, fmt.Errorf("failed to connect to Redis server: %v", err) + for dbName, _ := range msgChanns { + r.conns[dbName], err = net.Dial("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to connect to Redis server: %v", err) + } + // todo(leehao) send select cmd to redis so that distinguish between different db } return r, nil } -func (r *Redis) SendCmdMessage(msg []byte) error { - _, err := r.sendRedisData(msg) +func (r *Redis) SendCmdMessage(dbName string, msg []byte) error { + _, err := r.sendRedisData(dbName, msg) return err } @@ -32,31 +37,43 @@ func (r *Redis) Name() string { return string("Redis") } func (r *Redis) Close() error { - return r.conn.Close() + for _, conn := range r.conns { + conn.Close() + } + return nil } -func (r *Redis) sendRedisData(data []byte) (string, error) { - - r.conn.SetDeadline(time.Now().Add(5 * time.Second)) - - _, err := r.conn.Write(data) +func (r *Redis) sendRedisData(dbName string, data []byte) (string, error) { + r.conns[dbName].SetDeadline(time.Now().Add(5 * time.Second)) + _, err := r.conns[dbName].Write(data) if err != nil { return "", fmt.Errorf("failed to send data to Redis server: %v", err) } - - reader := bufio.NewReader(r.conn) + reader := bufio.NewReader(r.conns[dbName]) response, err := reader.ReadString('\n') if err != nil { return "", fmt.Errorf("failed to read response from Redis server: %v", err) } - return response, nil } func (r *Redis) Run() { - for msg := range *r.msgChan { - r.sendRedisData(msg) + var wg sync.WaitGroup + for dbName, chann := range r.msgChanns { + wg.Add(1) + go func(dbName string, ch chan []byte) { + defer wg.Done() + for { + select { + case msg := <-ch: + r.sendRedisData(dbName, msg) + case <-r.stopChan: + return + } + } + }(dbName, chann) } + wg.Wait() } func (r *Redis) Stop() { - + r.stopChan <- true } diff --git a/tools/pika_cdc/main.go b/tools/pika_cdc/main.go index 2f8dac15fb..74705436a9 100644 --- a/tools/pika_cdc/main.go +++ b/tools/pika_cdc/main.go @@ -8,10 +8,10 @@ import ( ) func main() { - if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil { + if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, conf.ConfigInstance.BufferMsgNumbers); err != nil { logrus.Fatal("failed to connect pika server, {}", err) } else { - if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil { + if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChanns); err != nil { logrus.Fatal("failed to generate consumers, {}", err) } else { for _, c := range consumers { diff --git a/tools/pika_cdc/pika/replprotocol.go b/tools/pika_cdc/pika/replprotocol.go index 6674b77228..f4285a46a1 100644 --- a/tools/pika_cdc/pika/replprotocol.go +++ b/tools/pika_cdc/pika/replprotocol.go @@ -130,10 +130,11 @@ func (repl *ReplProtocol) GetSyncWithPika() error { return nil } -func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { +func (repl *ReplProtocol) GetBinlogSync() (map[string][]byte, error) { binlogSyncType := inner.Type_kBinlogSync - var binlogByte []byte + // This is a collection of binlogs for all DB's + binlogBytes := make(map[string][]byte) // todo(leehao): Receive multiple binlog sync responses simultaneously binlogSyncResp := repl.getResponse() if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || @@ -142,6 +143,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { } else { for index, item := range binlogSyncResp.BinlogSync { slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) + dbName := repl.dbMetaInfo.DbsInfo[index].DbName binlogOffset := repl.binlogSyncInfos[index].binlogOffset if len(item.Binlog) == 0 { *binlogOffset.Filenum = 0 @@ -153,7 +155,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil { logrus.Fatal(err) } else { - binlogByte = binlogItem.Content + binlogBytes[*dbName] = binlogItem.Content } } err := repl.sendReplReq(&inner.InnerRequest{ @@ -182,7 +184,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { } } } - return binlogByte, nil + return binlogBytes, nil } func (repl *ReplProtocol) Ping() string { diff --git a/tools/pika_cdc/pika/replprotocol_test.go b/tools/pika_cdc/pika/replprotocol_test.go index 13b7e01787..a163ecca82 100644 --- a/tools/pika_cdc/pika/replprotocol_test.go +++ b/tools/pika_cdc/pika/replprotocol_test.go @@ -30,19 +30,6 @@ func TestConnect(t *testing.T) { fmt.Println(client.Get(cxt, "key")) } -//func getPort(addr string) int32 { -// portStr := addr[strings.LastIndex(addr, ":")+1:] -// port, _ := strconv.Atoi(portStr) -// return int32(port) -//} -//func getIP(addr string) string { -// index := strings.LastIndex(addr, ":") -// if index == -1 { -// return addr -// } -// return addr[:index] -//} - func TestSendMetaSync(t *testing.T) { ip := string("127.0.0.1") listener, e := net.Listen("tcp", ":0") diff --git a/tools/pika_cdc/pika/server.go b/tools/pika_cdc/pika/server.go index dc1bcde4ae..67ffe5dfea 100644 --- a/tools/pika_cdc/pika/server.go +++ b/tools/pika_cdc/pika/server.go @@ -3,7 +3,6 @@ package pika import ( "bufio" "github.com/sirupsen/logrus" - "log" "net" "strconv" "strings" @@ -14,13 +13,14 @@ type Server struct { stop chan bool pikaConn net.Conn pikaAddr string - MsgChan *chan []byte + bufferMsgNumber int + MsgChanns map[string]chan []byte pikaReplProtocol ReplProtocol writer *bufio.Writer reader *bufio.Reader } -// Use todo(leehao): add middleware here +// Use todo(leehao): middleware can be added here in the future func Use() { } @@ -37,16 +37,14 @@ func getIP(addr string) string { return addr[:index] } -func New(s string, msgChan *chan []byte) (Server, error) { +func New(s string, bufferMsgNumber int) (Server, error) { server := Server{} - if msgChan == nil { - ch := make(chan []byte, 10) - server.MsgChan = &ch - } + server.MsgChanns = make(map[string]chan []byte) conn, err := net.Dial("tcp", s) if err != nil { - log.Fatal("Error connecting to Pika server:", err) + logrus.Fatal("Error connecting to Pika server:", err) } + server.bufferMsgNumber = bufferMsgNumber server.pikaConn = conn server.writer = bufio.NewWriter(server.pikaConn) server.reader = bufio.NewReader(server.pikaConn) @@ -57,8 +55,15 @@ func New(s string, msgChan *chan []byte) (Server, error) { port: getPort(conn.LocalAddr().String()), } err = server.CreateSyncWithPika() + server.buildMsgChann() return server, err } +func (s *Server) buildMsgChann() { + dbMetaInfo := s.pikaReplProtocol.dbMetaInfo + for _, dbInfo := range dbMetaInfo.DbsInfo { + s.MsgChanns[*dbInfo.DbName] = make(chan []byte, s.bufferMsgNumber) + } +} // Run This method will block execution until an error occurs func (s *Server) Run() { @@ -67,10 +72,17 @@ func (s *Server) Run() { case <-s.stop: return case <-time.After(100 * time.Millisecond): - bytes, _ := s.pikaReplProtocol.GetBinlogSync() - if len(bytes) != 0 { + binlogBytes, _ := s.pikaReplProtocol.GetBinlogSync() + if len(binlogBytes) != 0 { logrus.Info("get a pika binlog send to msg chan") - *s.MsgChan <- bytes + for dbName, binlog := range binlogBytes { + chann, exists := s.MsgChanns[dbName] + if !exists { + chann = make(chan []byte, s.bufferMsgNumber) + s.MsgChanns[dbName] = chann + } + chann <- binlog + } } } } @@ -79,11 +91,11 @@ func (s *Server) Run() { func (s *Server) Exit() { s.stop <- true close(s.stop) - close(*s.MsgChan) + for _, chann := range s.MsgChanns { + close(chann) + } } func (s *Server) CreateSyncWithPika() error { - //ping := s.pikaReplProtocol.Ping() - //logrus.Info(ping) return s.pikaReplProtocol.GetSyncWithPika() }