Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pika cdc for incremental synchronization #2855

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ pkg
!codis/cmd/fe/assets/**

tests/tmp
tools/pika_cdc/pika/proto
2 changes: 2 additions & 0 deletions src/pika_inner_message.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto2";
package InnerMessage;

option go_package = "./proto/inner";

enum Type {
kMetaSync = 1;
kTrySync = 2;
Expand Down
2 changes: 2 additions & 0 deletions src/rsync_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
syntax = "proto2";
package RsyncService;

option go_package = "./proto/rsync";

enum Type {
kRsyncMeta = 1;
kRsyncFile = 2;
Expand Down
1 change: 1 addition & 0 deletions third/blackwidow
Submodule blackwidow added at 904475
1 change: 1 addition & 0 deletions third/glog
Submodule glog added at ecdbd7
1 change: 1 addition & 0 deletions third/pink
Submodule pink added at 60ac6c
26 changes: 26 additions & 0 deletions tools/pika_cdc/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
PROTO_OUT = ./pika
PROTO_DIR = ../../src/

GO_FILES = $(shell find . -name '*.go')

PROTO_FILES := $(wildcard $(PROTO_DIR)/*.proto)

OUTPUT_BIN = pika_cdc

PROTOC = protoc
GO_BUILD = go build
GO_CLEAN = go clean

.PHONY: all proto build clean

all: proto build

proto: $(PROTO_FILES)
$(PROTOC) --proto_path=$(PROTO_DIR) --go_out=$(PROTO_OUT) $^

build: $(GO_FILES)
$(GO_BUILD) -o $(OUTPUT_BIN)

clean:
$(GO_CLEAN)
rm -f $(OUTPUT_BIN)
24 changes: 24 additions & 0 deletions tools/pika_cdc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Pika cdc
**A tool for incremental synchronization of pika command**
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert the emphasis to a heading.

The tool name "A tool for incremental synchronization of pika command" is emphasized using double asterisks. It should be a heading instead.

Apply this diff to convert the emphasis to a heading:

-**A tool for incremental synchronization of pika command**
+## A tool for incremental synchronization of pika command
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
**A tool for incremental synchronization of pika command**
## A tool for incremental synchronization of pika command
Tools
Markdownlint

2-2: null
Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


By imitating a pika slave

# Build
**Make sure the system has protoc installed**
```bash
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
```

## Build pika cdc
```bash
make
```

## Todo:

Consumer side:
- [x] **redis**
- [x] **kafka** Create a topic of the same name for each pika's DB
- [ ] **bifrost**
20 changes: 20 additions & 0 deletions tools/pika_cdc/conf/cdc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# pika_server
pika_server : 127.0.0.1:11221
# For data from one DB of one pika, a separate MQ topic is created,
# and the name of the topic is the dbname of the pika
kafka_servers:
- 127.0.0.1:9092
redis_servers:
- 127.0.0.1:6379
pulsar_servers:
- 127.0.0.1:6650
# retry times while send message failed
retries : 0
# retry interval while send message failed(ms)
retry_interval: 10
parallel_thread_size: 1
# the size of the cached channel in pika cdc
buffer_msg_numbers: 10



50 changes: 50 additions & 0 deletions tools/pika_cdc/conf/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package conf

import (
"fmt"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"runtime"
)

type PikaCdcConfig struct {
PikaServer string `yaml:"pika_server"`
KafkaServers []string `yaml:"kafka_servers"`
RedisServers []string `yaml:"redis_servers"`
PulsarServers []string `yaml:"pulsar_servers"`
Retries int `yaml:"retries"`
RetryInterval int `yaml:"retry_interval"`
ParallelThreadSize int `yaml:"parallel_thread_size"`
BufferMsgNumbers int `yaml:"buffer_msg_numbers"`
}

var ConfigInstance = PikaCdcConfig{}

func init() {
_, filename, _, _ := runtime.Caller(0)
filename = filepath.Join(filepath.Dir(filename), "cdc.yml")
file, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal("fail to read file:", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error logging for file reading.

The error message should include the filename for better context.

- log.Fatal("fail to read file:", err)
+ log.Fatalf("fail to read file %s: %v", filename, err)

Committable suggestion was skipped due to low confidence.

Comment on lines +31 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update deprecated ioutil.ReadFile usage.

The ioutil.ReadFile function is deprecated. Use os.ReadFile instead for reading files.

- file, err := ioutil.ReadFile(filename)
+ file, err := os.ReadFile(filename)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
file, err := ioutil.ReadFile(filename)
if err != nil {
log.Fatal("fail to read file:", err)
file, err := os.ReadFile(filename)
if err != nil {
log.Fatal("fail to read file:", err)

}

err = yaml.Unmarshal(file, &ConfigInstance)
if err != nil {
log.Fatal("fail to yaml unmarshal:", err)
Comment on lines +37 to +38
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error logging for YAML unmarshalling.

The error message should be more descriptive to aid in debugging.

- log.Fatal("fail to yaml unmarshal:", err)
+ log.Fatalf("fail to unmarshal YAML: %v", err)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil {
log.Fatal("fail to yaml unmarshal:", err)
if err != nil {
log.Fatalf("fail to unmarshal YAML: %v", err)

}

logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
return "", fmt.Sprintf("%s:%d", path.Base(f.File), f.Line)
},
})

logrus.SetReportCaller(true)
logrus.SetOutput(os.Stdout)
}
32 changes: 32 additions & 0 deletions tools/pika_cdc/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package consumer

import (
"pika_cdc/conf"
)

type Consumer interface {
SendCmdMessage(dbName string, msg []byte) error
Name() string
Close() error
Run()
Stop()
}

type Factory struct{}

func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
var consumers []Consumer

// kafka
for _, k := range config.KafkaServers {
kafka, _ := NewKafka(k, config.Retries, msgChanns)
consumers = append(consumers, kafka)
}

// redis
for _, r := range config.RedisServers {
newRedis, _ := NewRedis(r, msgChanns)
consumers = append(consumers, newRedis)
}
return consumers, nil
}
Comment on lines +17 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the GenerateConsumers function to reduce code duplication.

The code for creating Kafka and Redis consumers is similar. Consider refactoring the function to reduce code duplication.

Here's a suggested refactor:

func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
    var consumers []Consumer
    var err error

    // Create Kafka consumers
    for _, k := range config.KafkaServers {
        var consumer Consumer
        consumer, err = NewKafka(k, config.Retries, msgChanns)
        if err != nil {
            return nil, fmt.Errorf("failed to create Kafka consumer: %v", err)
        }
        consumers = append(consumers, consumer)
    }

    // Create Redis consumers
    for _, r := range config.RedisServers {
        var consumer Consumer
        consumer, err = NewRedis(r, msgChanns)
        if err != nil {
            return nil, fmt.Errorf("failed to create Redis consumer: %v", err)
        }
        consumers = append(consumers, consumer)
    }

    return consumers, nil
}

This refactor:

  • Handles the errors returned by NewKafka and NewRedis.
  • Reduces code duplication by using a similar structure for creating Kafka and Redis consumers.
  • Returns an error if any of the consumer creations fail.

91 changes: 91 additions & 0 deletions tools/pika_cdc/consumer/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package consumer

import (
"context"
"github.com/segmentio/kafka-go"
"sync"
)

type Kafka struct {
servers string
topics []string
retries int
kafkaConns map[string]*kafka.Conn
wg sync.WaitGroup
msgChanns map[string]chan []byte
stopChan chan bool
once sync.Once
protocol KafkaProtocol
}

func (k *Kafka) SendCmdMessage(dbName string, msg []byte) error {
k.kafkaConns[dbName].Write(k.protocol.ToConsumer(msg))
return nil
}

func (k *Kafka) Name() string {
return "Kafka"
}

func NewKafka(server string, retries int, msgChanns map[string]chan []byte) (*Kafka, error) {
k := &Kafka{}
k.protocol = KafkaProtocol{}
k.kafkaConns = make(map[string]*kafka.Conn)
k.msgChanns = make(map[string]chan []byte)
for dbname, chann := range msgChanns {
conn, err := kafka.DialLeader(context.Background(), "tcp", server, dbname, 0)
if err != nil {
return k, err
} else {
k.kafkaConns[dbname] = conn
}
k.msgChanns[dbname] = chann
}
k.stopChan = make(chan bool)
k.retries = retries
return k, nil
}

func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
Comment on lines +49 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement functionality to close the connections and channels.

The close method is currently empty with all the code commented out. Implement functionality to close the connections and channels.

Apply this diff to implement the functionality:

 func (k *Kafka) close() error {
-	//k.stopChan <- true
-	//close(k.stopChan)
-	//close(*k.messageChan)
-	//for _, conn := range k.kafkaConns {
-	//	err := conn.Close()
-	//	if err != nil {
-	//		logrus.Warn(err)
-	//		return err
-	//	}
-	//}
+	k.stopChan <- true
+	close(k.stopChan)
+	for _, conn := range k.kafkaConns {
+		err := conn.Close()
+		if err != nil {
+			logrus.Warn(err)
+			return err
+		}
+	}
 	return nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (k *Kafka) close() error {
//k.stopChan <- true
//close(k.stopChan)
//close(*k.messageChan)
//for _, conn := range k.kafkaConns {
// err := conn.Close()
// if err != nil {
// logrus.Warn(err)
// return err
// }
//}
return nil
}
func (k *Kafka) close() error {
k.stopChan <- true
close(k.stopChan)
for _, conn := range k.kafkaConns {
err := conn.Close()
if err != nil {
logrus.Warn(err)
return err
}
}
return nil
}

func (k *Kafka) Close() error {
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
return nil
}
Comment on lines +62 to +70
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement functionality to close the consumer.

The Close method is currently empty with all the code commented out. Implement functionality to close the consumer.

Apply this diff to implement the functionality:

 func (k *Kafka) Close() error {
-	//var err error
-	//err = nil
-	//k.once.Do(func() {
-	//	err = k.close()
-	//})
-	//return err
+	var err error
+	k.once.Do(func() {
+		err = k.close()
+	})
+	return err
-	return nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (k *Kafka) Close() error {
//var err error
//err = nil
//k.once.Do(func() {
// err = k.close()
//})
//return err
return nil
}
func (k *Kafka) Close() error {
var err error
k.once.Do(func() {
err = k.close()
})
return err
}

func (k *Kafka) Run() {
var wg sync.WaitGroup
for dbName, chann := range k.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
k.SendCmdMessage(dbName, msg)
case <-k.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (k *Kafka) Stop() {
k.stopChan <- true
}
27 changes: 27 additions & 0 deletions tools/pika_cdc/consumer/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package consumer

import (
"fmt"
"strconv"
)

type Protocol interface {
ToConsumer(msg []byte) []byte
}
type RedisProtocol struct{}

func (rp RedisProtocol) ToConsumer(msg []byte) []byte {
return msg
}
func (rp RedisProtocol) Select(dbName string) []byte {
db, _ := strconv.Atoi(dbName[len(dbName)-1:])
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg)
}
Comment on lines +16 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the database name parsing and error handling.

The Select method has the following issues:

  1. The database name is assumed to end with a single digit. This assumption may not always hold true.
  2. The error returned by strconv.Atoi is ignored. This can lead to unexpected behavior if the database name does not end with a valid integer.

Consider applying this diff to fix the issues:

-func (rp RedisProtocol) Select(dbName string) []byte {
-	db, _ := strconv.Atoi(dbName[len(dbName)-1:])
+func (rp RedisProtocol) Select(dbName string) ([]byte, error) {
+	db, err := strconv.Atoi(dbName)
+	if err != nil {
+		return nil, fmt.Errorf("invalid database name: %s", dbName)
+	}
 	dbStr := strconv.Itoa(db)
 	msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
-	return []byte(msg)
+	return []byte(msg), nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (rp RedisProtocol) Select(dbName string) []byte {
db, _ := strconv.Atoi(dbName[len(dbName)-1:])
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg)
}
func (rp RedisProtocol) Select(dbName string) ([]byte, error) {
db, err := strconv.Atoi(dbName)
if err != nil {
return nil, fmt.Errorf("invalid database name: %s", dbName)
}
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
return []byte(msg), nil
}


type KafkaProtocol struct{}

func (kp KafkaProtocol) ToConsumer(msg []byte) []byte {
return msg
}
80 changes: 80 additions & 0 deletions tools/pika_cdc/consumer/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package consumer

import (
"bufio"
"fmt"
"net"
"sync"
"time"
)

type Redis struct {
redisProtocol RedisProtocol
conns map[string]net.Conn
msgChanns map[string]chan []byte
stopChan chan bool
}

func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
var err error
for dbName, _ := range msgChanns {
r.conns[dbName], err = net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
}
selectCmdBytes := r.redisProtocol.Select(dbName)
r.conns[dbName].Write(selectCmdBytes)
}
return r, nil
}
Comment on lines +18 to +30
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the connections in case of an error.

The NewRedis function establishes connections to multiple Redis databases. If an error occurs after some connections have been established, the open connections are not closed. Close the open connections before returning the error.

Apply this diff to close the connections in case of an error:

 func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
 	r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
 	var err error
 	for dbName, _ := range msgChanns {
 		r.conns[dbName], err = net.Dial("tcp", addr)
 		if err != nil {
+			r.Close()
 			return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
 		}
 		selectCmdBytes := r.redisProtocol.Select(dbName)
 		r.conns[dbName].Write(selectCmdBytes)
 	}
 	return r, nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
var err error
for dbName, _ := range msgChanns {
r.conns[dbName], err = net.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
}
selectCmdBytes := r.redisProtocol.Select(dbName)
r.conns[dbName].Write(selectCmdBytes)
}
return r, nil
}
func NewRedis(addr string, msgChanns map[string]chan []byte) (*Redis, error) {
r := &Redis{redisProtocol: RedisProtocol{}, conns: make(map[string]net.Conn), msgChanns: msgChanns, stopChan: make(chan bool)}
var err error
for dbName, _ := range msgChanns {
r.conns[dbName], err = net.Dial("tcp", addr)
if err != nil {
r.Close()
return nil, fmt.Errorf("failed to connect to Redis server: %v", err)
}
selectCmdBytes := r.redisProtocol.Select(dbName)
r.conns[dbName].Write(selectCmdBytes)
}
return r, nil
}


func (r *Redis) SendCmdMessage(dbName string, msg []byte) error {
_, err := r.sendRedisData(dbName, msg)
return err
}

func (r *Redis) Name() string {
return string("Redis")
}
func (r *Redis) Close() error {
for _, conn := range r.conns {
conn.Close()
}
return nil
}

func (r *Redis) sendRedisData(dbName string, data []byte) (string, error) {
r.conns[dbName].SetDeadline(time.Now().Add(5 * time.Second))
_, err := r.conns[dbName].Write(data)
if err != nil {
return "", fmt.Errorf("failed to send data to Redis server: %v", err)
}
reader := bufio.NewReader(r.conns[dbName])
response, err := reader.ReadString('\n')
if err != nil {
return "", fmt.Errorf("failed to read response from Redis server: %v", err)
}
return response, nil
}
func (r *Redis) Run() {
var wg sync.WaitGroup
for dbName, chann := range r.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
r.sendRedisData(dbName, msg)
case <-r.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
Comment on lines +60 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle errors from sendRedisData.

The Run method calls sendRedisData but does not handle potential errors. Consider logging or handling these errors to ensure robustness.

Apply this diff to handle errors from sendRedisData:

 func (r *Redis) Run() {
 	var wg sync.WaitGroup
 	for dbName, chann := range r.msgChanns {
 		wg.Add(1)
 		go func(dbName string, ch chan []byte) {
 			defer wg.Done()
 			for {
 				select {
 				case msg := <-ch:
-					r.sendRedisData(dbName, msg)
+					if _, err := r.sendRedisData(dbName, msg); err != nil {
+						fmt.Printf("Error sending data to Redis: %v\n", err)
+					}
 				case <-r.stopChan:
 					return
 				}
 			}
 		}(dbName, chann)
 	}
 	wg.Wait()
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (r *Redis) Run() {
var wg sync.WaitGroup
for dbName, chann := range r.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
r.sendRedisData(dbName, msg)
case <-r.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}
func (r *Redis) Run() {
var wg sync.WaitGroup
for dbName, chann := range r.msgChanns {
wg.Add(1)
go func(dbName string, ch chan []byte) {
defer wg.Done()
for {
select {
case msg := <-ch:
if _, err := r.sendRedisData(dbName, msg); err != nil {
fmt.Printf("Error sending data to Redis: %v\n", err)
}
case <-r.stopChan:
return
}
}
}(dbName, chann)
}
wg.Wait()
}

func (r *Redis) Stop() {
r.stopChan <- true
}
Loading
Loading