Skip to content

Commit

Permalink
feat-cdc:use multi chann communication between consumer and pika master
Browse files Browse the repository at this point in the history
Signed-off-by: LeeHao <1838249551@qq.com>
  • Loading branch information
ForestLH committed Aug 13, 2024
1 parent 361f86c commit d819522
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 132 deletions.
17 changes: 14 additions & 3 deletions tools/pika_cdc/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# Pika cdc
**A tool for incremental synchronization of pika command**

By imitating a pika slave

# Build
## generate proto file
**Make sure the system has protoc installed**
```bash
cd pika
protoc --go_out=. *.proto
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```

## Build pika cdc
```bash
make
```
2 changes: 2 additions & 0 deletions tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ retries : 0
# retry interval while send message failed(ms)
retry_interval: 10
parallel_thread_size: 1
# the size of the cached channel in pika cdc
buffer_msg_numbers: 10



1 change: 1 addition & 0 deletions tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type PikaCdcConfig struct {
Retries int `yaml:"retries"`
RetryInterval int `yaml:"retry_interval"`
ParallelThreadSize int `yaml:"parallel_thread_size"`
BufferMsgNumbers int `yaml:"buffer_msg_numbers"`
}

var ConfigInstance = PikaCdcConfig{}
Expand Down
12 changes: 8 additions & 4 deletions tools/pika_cdc/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type Consumer interface {
SendCmdMessage(msg []byte) error
SendCmdMessage(dbName string, msg []byte) error
Name() string
Close() error
Run()
Expand All @@ -14,12 +14,16 @@ type Consumer interface {

type Factory struct{}

func GenerateConsumers(config conf.PikaCdcConfig, msgChan *chan []byte) ([]Consumer, error) {
func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
var consumers []Consumer
kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries)

// kafka
kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries, msgChanns)
consumers = append(consumers, kafka)

// redis
for _, r := range config.RedisServers {
newRedis, _ := NewRedis(r, msgChan)
newRedis, _ := NewRedis(r, msgChanns)
consumers = append(consumers, newRedis)
}
return consumers, nil
Expand Down
129 changes: 61 additions & 68 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
@@ -1,97 +1,90 @@
package consumer

import (
"context"
"errors"
"github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"log"
"sync"
"time"
)

type Kafka struct {
servers []string
topic string
retries int
conns []*kafka.Conn
wg sync.WaitGroup
messageChan chan kafka.Message
stopChan chan bool
once sync.Once
protocol Protocol
servers []string
topic string
retries int
conns []*kafka.Conn
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
once sync.Once
protocol Protocol
}

func (k *Kafka) SendCmdMessage(msg []byte) error {
select {
case k.messageChan <- kafka.Message{Value: k.protocol.ToConsumer(msg)}:
return nil
case <-time.After(2 * time.Second):
e := errors.New("send pika cmd timeout")
logrus.Warn("{}", e)
return e
}
}

func (k *Kafka) sendMessage() {
for {
select {
case msg := <-k.messageChan:
for _, conn := range k.conns {
_, _ = conn.WriteMessages(msg)
}
case _ = <-k.stopChan:
return
}
}
func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error {
//retries := k.retries
//select {
//case *k.messageChan <- k.protocol.ToConsumer(msg):
// return nil
//case <-time.After(2 * time.Second):
// e := errors.New("send pika cmd timeout and retry send pika cmd")
// logrus.Warn("{}", e)
// retries--
// if retries <= 0 {
// break
// }
//}
return nil
}

func (k *Kafka) Name() string {
return "Kafka"
}

func NewKafka(servers []string, topic string, retries int) (*Kafka, error) {
func NewKafka(servers []string, topic string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) {
k := &Kafka{}
k.protocol = &KafkaProtocol{}
for _, server := range servers {
conn, err := kafka.DialLeader(context.Background(), "tcp", server, topic, 0)
if err != nil {
return k, err
} else {
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
k.conns = append(k.conns, conn)
}
}
k.messageChan = make(chan kafka.Message)
k.stopChan = make(chan bool)
go k.sendMessage()
//k.protocol = &KafkaProtocol{}
//for _, server := range servers {
// conn, err := kafka.DialLeader(context.Background(), "tcp", server, topic, 0)
// if err != nil {
// return k, err
// } else {
// conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// k.conns = append(k.conns, conn)
// }
//}
//k.messageChan = msgChanns
//k.stopChan = make(chan bool)
//k.retries = retries
return k, nil
}

func (k *Kafka) close() error {
k.stopChan <- true
close(k.stopChan)
close(k.messageChan)
for _, conn := range k.conns {
err := conn.Close()
if err != nil {
log.Println(err)
return err
}
}
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.conns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
func (k *Kafka) Close() error {
var err error
err = nil
k.once.Do(func() {
err = k.close()
})
return err
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
return nil
}
func (k *Kafka) Run() {

//select {
//case msg := <-*k.messageChan:
// k.SendCmdMessage(msg)
//case <-k.stopChan:
// return
//}
}
func (k *Kafka) Stop() {

k.stopChan <- true
}
2 changes: 1 addition & 1 deletion tools/pika_cdc/consumer/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ func (rp RedisProtocol) ToConsumer(msg []byte) []byte {
type KafkaProtocol struct{}

func (kp KafkaProtocol) ToConsumer(msg []byte) []byte {
return nil
return msg
}
61 changes: 39 additions & 22 deletions tools/pika_cdc/consumer/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,76 @@ import (
"bufio"
"fmt"
"net"
"sync"
"time"
)

type Redis struct {
protocol Protocol
conn net.Conn
msgChan *chan []byte
protocol Protocol
conns map[string]net.Conn
msgChanns map[string]chan []byte
stopChan chan bool
}

func NewRedis(addr string, msgChan *chan []byte) (*Redis, error) {
r := &Redis{protocol: RedisProtocol{}, msgChan: msgChan}
func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
r := &Redis{protocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
var err error
r.conn, err = net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
for dbName, _ := range msgChanns {
r.conns[dbName], err = net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
}
// todo(leehao) send select cmd to redis so that distinguish between different db
}
return r, nil
}

func (r *Redis) SendCmdMessage(msg []byte) error {
_, err := r.sendRedisData(msg)
func (r *Redis) SendCmdMessage(dbName string, msg []byte) error {
_, err := r.sendRedisData(dbName, msg)
return err
}

func (r *Redis) Name() string {
return string("Redis")
}
func (r *Redis) Close() error {
return r.conn.Close()
for _, conn := range r.conns {
conn.Close()
}
return nil
}

func (r *Redis) sendRedisData(data []byte) (string, error) {

r.conn.SetDeadline(time.Now().Add(5 * time.Second))

_, err := r.conn.Write(data)
func (r *Redis) sendRedisData(dbName string, data []byte) (string, error) {
r.conns[dbName].SetDeadline(time.Now().Add(5 * time.Second))
_, err := r.conns[dbName].Write(data)
if err != nil {
return "", fmt.Errorf("failed to send data to Redis server: %v", err)
}

reader := bufio.NewReader(r.conn)
reader := bufio.NewReader(r.conns[dbName])
response, err := reader.ReadString('\n')
if err != nil {
return "", fmt.Errorf("failed to read response from Redis server: %v", err)
}

return response, nil
}
func (r *Redis) Run() {
for msg := range *r.msgChan {
r.sendRedisData(msg)
var wg sync.WaitGroup
for dbName, chann := range r.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
r.sendRedisData(dbName, msg)
case <-r.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (r *Redis) Stop() {

r.stopChan <- true
}
4 changes: 2 additions & 2 deletions tools/pika_cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
)

func main() {
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil {
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, conf.ConfigInstance.BufferMsgNumbers); err != nil {
logrus.Fatal("failed to connect pika server, {}", err)
} else {
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil {
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChanns); err != nil {
logrus.Fatal("failed to generate consumers, {}", err)
} else {
for _, c := range consumers {
Expand Down
10 changes: 6 additions & 4 deletions tools/pika_cdc/pika/replprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ func (repl *ReplProtocol) GetSyncWithPika() error {
return nil
}

func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
func (repl *ReplProtocol) GetBinlogSync() (map[string][]byte, error) {

binlogSyncType := inner.Type_kBinlogSync
var binlogByte []byte
// This is a collection of binlogs for all DB's
binlogBytes := make(map[string][]byte)
// todo(leehao): Receive multiple binlog sync responses simultaneously
binlogSyncResp := repl.getResponse()
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk ||
Expand All @@ -142,6 +143,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
} else {
for index, item := range binlogSyncResp.BinlogSync {
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum)
dbName := repl.dbMetaInfo.DbsInfo[index].DbName
binlogOffset := repl.binlogSyncInfos[index].binlogOffset
if len(item.Binlog) == 0 {
*binlogOffset.Filenum = 0
Expand All @@ -153,7 +155,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil {
logrus.Fatal(err)
} else {
binlogByte = binlogItem.Content
binlogBytes[*dbName] = binlogItem.Content
}
}
err := repl.sendReplReq(&inner.InnerRequest{
Expand Down Expand Up @@ -182,7 +184,7 @@ func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) {
}
}
}
return binlogByte, nil
return binlogBytes, nil
}

func (repl *ReplProtocol) Ping() string {
Expand Down
Loading

0 comments on commit d819522

Please sign in to comment.