-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: pika cdc for incremental synchronization #2855
base: unstable
Are you sure you want to change the base?
Conversation
Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com> feat-cdc:add pika repl send proxy test Signed-off-by: LeeHao <1838249551@qq.com> feat-cdc:add pika repl send proxy test Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
Basic incremental synchronization to redis has been completed Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com> a Signed-off-by: LeeHao <1838249551@qq.com> feat-cdc:Use Makefile to build pika cdc Signed-off-by: LeeHao <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
WalkthroughThe recent updates enhance the Pika CDC project by introducing new configuration files, consumer implementations for Kafka and Redis, and a robust replication protocol for data synchronization. A structured approach to managing dependencies and build processes is established with the inclusion of Makefiles and module definitions. These changes streamline the development workflow and lay the groundwork for future features. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Main
participant Server
participant Consumer
User->>Main: Start Application
Main->>Server: Establish Connection
Server->>Main: Connection Established
Main->>Consumer: Generate Consumers
Consumer->>Main: Consumers Ready
Main->>Server: Run Server
Server->>Consumer: Send Data
Consumer->>Server: Acknowledge Data
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
func getPort(addr string) int32 { | ||
portStr := addr[strings.LastIndex(addr, ":")+1:] | ||
port, _ := strconv.Atoi(portStr) | ||
return int32(port) |
Check failure
Code scanning / CodeQL
Incorrect conversion between integer types High
strconv.Atoi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 30
Outside diff range, codebase verification and nitpick comments (4)
tools/pika_cdc/consumer/redis.go (1)
26-29
: Consider logging errors inSendCmdMessage
.The
SendCmdMessage
method returns an error if sending data fails. Consider logging the error to provide more context for debugging.func (r *Redis) SendCmdMessage(msg []byte) error { _, err := r.sendRedisData(msg) if err != nil { fmt.Printf("Error sending command message: %v\n", err) } return err }tools/pika_cdc/consumer/kafka.go (1)
71-83
: Consider logging errors inclose
method.The
close
method logs errors usinglog.Println
. Consider using a structured logger likelogrus
for consistency.if err != nil { logrus.Errorf("Error closing Kafka connection: %v", err) return err }tools/pika_cdc/pika/replprotocol_test.go (2)
33-44
: Consider Removing Unused CodeThe
getPort
andgetIP
functions are commented out and not used. If they are not needed, consider removing them to maintain code cleanliness.
82-94
: Implement or Remove receiveReplMsgThe
receiveReplMsg
function listens for connections but does not process them. Consider implementing connection handling or removing the function if not needed.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files ignored due to path filters (1)
tools/pika_cdc/go.sum
is excluded by!**/*.sum
Files selected for processing (20)
- .gitignore (1 hunks)
- src/pika_inner_message.proto (1 hunks)
- src/rsync_service.proto (1 hunks)
- third/blackwidow (1 hunks)
- third/glog (1 hunks)
- third/pink (1 hunks)
- tools/pika_cdc/Makefile (1 hunks)
- tools/pika_cdc/README.md (1 hunks)
- tools/pika_cdc/conf/cdc.yml (1 hunks)
- tools/pika_cdc/conf/conf.go (1 hunks)
- tools/pika_cdc/consumer/consumer.go (1 hunks)
- tools/pika_cdc/consumer/kafka.go (1 hunks)
- tools/pika_cdc/consumer/protocol.go (1 hunks)
- tools/pika_cdc/consumer/redis.go (1 hunks)
- tools/pika_cdc/go.mod (1 hunks)
- tools/pika_cdc/main.go (1 hunks)
- tools/pika_cdc/pika/cmd.go (1 hunks)
- tools/pika_cdc/pika/replprotocol.go (1 hunks)
- tools/pika_cdc/pika/replprotocol_test.go (1 hunks)
- tools/pika_cdc/pika/server.go (1 hunks)
Files skipped from review due to trivial changes (9)
- .gitignore
- src/pika_inner_message.proto
- src/rsync_service.proto
- third/blackwidow
- third/glog
- third/pink
- tools/pika_cdc/README.md
- tools/pika_cdc/go.mod
- tools/pika_cdc/pika/cmd.go
Additional comments not posted (11)
tools/pika_cdc/consumer/protocol.go (1)
12-16
: Review KafkaProtocol'sToConsumer
method.The
KafkaProtocol
'sToConsumer
method currently returnsnil
. This might be a placeholder or an incomplete implementation. Ensure that this is the intended behavior or consider implementing the necessary logic to process the message.tools/pika_cdc/conf/cdc.yml (1)
12-15
: Verify retry configuration parameters.Ensure that the
retries
andretry_interval
values are appropriate for your use case. These settings can significantly impact the system's behavior in case of failures.tools/pika_cdc/Makefile (1)
1-26
: Makefile is well-structured.The Makefile is well-organized and correctly uses targets and variables to manage the build process. It follows best practices for compiling protocol buffers and Go binaries.
tools/pika_cdc/pika/replprotocol.go (7)
16-23
: Struct Definition Looks Good!The
ReplProtocol
struct is well-defined and seems appropriate for managing replication protocol operations.
25-31
: Struct Definition Looks Good!The
binlogSyncInfo
struct is well-defined and seems appropriate for managing binlog synchronization information.
188-202
: Function Implementation Looks Good!The
Ping
function is well-implemented and handles errors appropriately.
204-217
: Function Implementation Looks Good!The
sendMetaSyncRequest
function is well-implemented and performs its task correctly.
269-274
: Function Implementation Looks Good!The
buildInternalTag
function is well-implemented and performs its task correctly.
276-285
: Struct Definition Looks Good!The
binlogItem
struct is well-defined and seems appropriate for managing binlog data.
287-328
: Function Implementation Looks Good!The
decodeBinlogItem
function is well-implemented and handles errors appropriately.tools/pika_cdc/pika/replprotocol_test.go (1)
155-179
: Improve Error Handling in sendMetaSyncRequestConsider improving error handling by returning errors instead of using
logrus.Fatal
.- logrus.Fatal("Failed to sendMetaSyncRequest") + return nil, fmt.Errorf("failed to sendMetaSyncRequest")Apply similar changes to other
logrus.Fatal
calls within this function.Likely invalid or redundant comment.
tools/pika_cdc/main.go
Outdated
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil { | ||
logrus.Fatal("failed to connect pika server, {}", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error logging.
The error message in logrus.Fatal
should include the actual error message using %v
instead of {}
for better clarity.
- logrus.Fatal("failed to connect pika server, {}", err)
+ logrus.Fatalf("failed to connect pika server: %v", err)
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil { | |
logrus.Fatal("failed to connect pika server, {}", err) | |
if pikaServer, err := pika.New(conf.ConfigInstance.PikaServer, nil); err != nil { | |
logrus.Fatalf("failed to connect pika server: %v", err) |
tools/pika_cdc/main.go
Outdated
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil { | ||
logrus.Fatal("failed to generate consumers, {}", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error logging.
The error message in logrus.Fatal
should include the actual error message using %v
instead of {}
for better clarity.
- logrus.Fatal("failed to generate consumers, {}", err)
+ logrus.Fatalf("failed to generate consumers: %v", err)
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil { | |
logrus.Fatal("failed to generate consumers, {}", err) | |
if consumers, err := consumer.GenerateConsumers(conf.ConfigInstance, pikaServer.MsgChan); err != nil { | |
logrus.Fatalf("failed to generate consumers: %v", err) |
tools/pika_cdc/consumer/consumer.go
Outdated
kafka, _ := NewKafka(config.KafkaServers, config.Topic, config.Retries) | ||
consumers = append(consumers, kafka) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors when creating Kafka consumer.
Currently, the error from NewKafka
is ignored. Consider handling this error to ensure robustness.
kafka, err := NewKafka(config.KafkaServers, config.Topic, config.Retries)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer: %v", err)
}
consumers = append(consumers, kafka)
tools/pika_cdc/consumer/consumer.go
Outdated
for _, r := range config.RedisServers { | ||
newRedis, _ := NewRedis(r, msgChan) | ||
consumers = append(consumers, newRedis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors when creating Redis consumer.
Currently, the error from NewRedis
is ignored. Consider handling this error to ensure robustness.
newRedis, err := NewRedis(r, msgChan)
if err != nil {
return nil, fmt.Errorf("failed to create Redis consumer: %v", err)
}
consumers = append(consumers, newRedis)
filename = filepath.Join(filepath.Dir(filename), "cdc.yml") | ||
file, err := ioutil.ReadFile(filename) | ||
if err != nil { | ||
log.Fatal("fail to read file:", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error logging for file reading.
The error message should include the filename for better context.
- log.Fatal("fail to read file:", err)
+ log.Fatalf("fail to read file %s: %v", filename, err)
Committable suggestion was skipped due to low confidence.
tools/pika_cdc/pika/server.go
Outdated
// Run This method will block execution until an error occurs | ||
func (s *Server) Run() { | ||
for { | ||
select { | ||
case <-s.stop: | ||
return | ||
case <-time.After(100 * time.Millisecond): | ||
bytes, _ := s.pikaReplProtocol.GetBinlogSync() | ||
if len(bytes) != 0 { | ||
logrus.Info("get a pika binlog send to msg chan") | ||
*s.MsgChan <- bytes | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider logging errors in Run
method.
The Run
method does not handle errors from GetBinlogSync
. Consider logging these errors for better debugging.
bytes, err := s.pikaReplProtocol.GetBinlogSync()
if err != nil {
logrus.Errorf("Error getting binlog sync: %v", err)
continue
}
tools/pika_cdc/pika/server.go
Outdated
func (s *Server) Exit() { | ||
s.stop <- true | ||
close(s.stop) | ||
close(*s.MsgChan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper closure of channels in Exit
method.
The Exit
method closes the stop
channel, which could cause a panic if Run
is still selecting on it. Consider using a more controlled shutdown process.
select {
case s.stop <- true:
default:
}
tools/pika_cdc/pika/server.go
Outdated
func (s *Server) CreateSyncWithPika() error { | ||
//ping := s.pikaReplProtocol.Ping() | ||
//logrus.Info(ping) | ||
return s.pikaReplProtocol.GetSyncWithPika() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure CreateSyncWithPika
handles errors properly.
The CreateSyncWithPika
method calls GetSyncWithPika
but does not handle potential errors. Ensure errors are logged or handled appropriately.
if err := s.pikaReplProtocol.GetSyncWithPika(); err != nil {
logrus.Errorf("Error creating sync with Pika: %v", err)
return err
}
tools/pika_cdc/pika/replprotocol.go
Outdated
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { | ||
|
||
binlogSyncType := inner.Type_kBinlogSync | ||
var binlogByte []byte | ||
// todo(leehao): Receive multiple binlog sync responses simultaneously | ||
binlogSyncResp := repl.getResponse() | ||
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || | ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { | ||
logrus.Fatal("get binlog sync response failed") | ||
} else { | ||
for index, item := range binlogSyncResp.BinlogSync { | ||
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | ||
binlogOffset := repl.binlogSyncInfos[index].binlogOffset | ||
if len(item.Binlog) == 0 { | ||
*binlogOffset.Filenum = 0 | ||
*binlogOffset.Offset = 0 | ||
logrus.Println("receive binlog response keep alive") | ||
} else { | ||
binlogOffset = item.BinlogOffset | ||
repl.binlogSyncInfos[index].binlogOffset = binlogOffset | ||
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil { | ||
logrus.Fatal(err) | ||
} else { | ||
binlogByte = binlogItem.Content | ||
} | ||
} | ||
err := repl.sendReplReq(&inner.InnerRequest{ | ||
Type: &binlogSyncType, | ||
MetaSync: nil, | ||
TrySync: nil, | ||
DbSync: nil, | ||
BinlogSync: &inner.InnerRequest_BinlogSync{ | ||
Node: &inner.Node{ | ||
Ip: &repl.ip, | ||
Port: &repl.port, | ||
}, | ||
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | ||
SlotId: &slotId, | ||
AckRangeStart: binlogOffset, | ||
AckRangeEnd: binlogOffset, | ||
SessionId: &repl.binlogSyncInfos[index].sessionId, | ||
FirstSend: &repl.binlogSyncInfos[index].isFirst, | ||
}, | ||
RemoveSlaveNode: nil, | ||
ConsensusMeta: nil, | ||
}) | ||
if err != nil { | ||
logrus.Warn("Failed to send binlog sync, {}", err) | ||
return nil, err | ||
} | ||
} | ||
} | ||
return binlogByte, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve Error Handling in GetBinlogSync
The use of logrus.Fatal
for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.
Additionally, consider addressing the TODO comment to handle multiple binlog sync responses simultaneously.
- logrus.Fatal("get binlog sync response failed")
+ return nil, fmt.Errorf("get binlog sync response failed")
Apply similar changes to other logrus.Fatal
calls within this function.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { | |
binlogSyncType := inner.Type_kBinlogSync | |
var binlogByte []byte | |
// todo(leehao): Receive multiple binlog sync responses simultaneously | |
binlogSyncResp := repl.getResponse() | |
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || | |
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { | |
logrus.Fatal("get binlog sync response failed") | |
} else { | |
for index, item := range binlogSyncResp.BinlogSync { | |
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | |
binlogOffset := repl.binlogSyncInfos[index].binlogOffset | |
if len(item.Binlog) == 0 { | |
*binlogOffset.Filenum = 0 | |
*binlogOffset.Offset = 0 | |
logrus.Println("receive binlog response keep alive") | |
} else { | |
binlogOffset = item.BinlogOffset | |
repl.binlogSyncInfos[index].binlogOffset = binlogOffset | |
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil { | |
logrus.Fatal(err) | |
} else { | |
binlogByte = binlogItem.Content | |
} | |
} | |
err := repl.sendReplReq(&inner.InnerRequest{ | |
Type: &binlogSyncType, | |
MetaSync: nil, | |
TrySync: nil, | |
DbSync: nil, | |
BinlogSync: &inner.InnerRequest_BinlogSync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | |
SlotId: &slotId, | |
AckRangeStart: binlogOffset, | |
AckRangeEnd: binlogOffset, | |
SessionId: &repl.binlogSyncInfos[index].sessionId, | |
FirstSend: &repl.binlogSyncInfos[index].isFirst, | |
}, | |
RemoveSlaveNode: nil, | |
ConsensusMeta: nil, | |
}) | |
if err != nil { | |
logrus.Warn("Failed to send binlog sync, {}", err) | |
return nil, err | |
} | |
} | |
} | |
return binlogByte, nil | |
func (repl *ReplProtocol) GetBinlogSync() ([]byte, error) { | |
binlogSyncType := inner.Type_kBinlogSync | |
var binlogByte []byte | |
// todo(leehao): Receive multiple binlog sync responses simultaneously | |
binlogSyncResp := repl.getResponse() | |
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || | |
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { | |
return nil, fmt.Errorf("get binlog sync response failed") | |
} else { | |
for index, item := range binlogSyncResp.BinlogSync { | |
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | |
binlogOffset := repl.binlogSyncInfos[index].binlogOffset | |
if len(item.Binlog) == 0 { | |
*binlogOffset.Filenum = 0 | |
*binlogOffset.Offset = 0 | |
logrus.Println("receive binlog response keep alive") | |
} else { | |
binlogOffset = item.BinlogOffset | |
repl.binlogSyncInfos[index].binlogOffset = binlogOffset | |
if binlogItem, err := repl.decodeBinlogItem(item.Binlog); err != nil { | |
logrus.Fatal(err) | |
} else { | |
binlogByte = binlogItem.Content | |
} | |
} | |
err := repl.sendReplReq(&inner.InnerRequest{ | |
Type: &binlogSyncType, | |
MetaSync: nil, | |
TrySync: nil, | |
DbSync: nil, | |
BinlogSync: &inner.InnerRequest_BinlogSync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | |
SlotId: &slotId, | |
AckRangeStart: binlogOffset, | |
AckRangeEnd: binlogOffset, | |
SessionId: &repl.binlogSyncInfos[index].sessionId, | |
FirstSend: &repl.binlogSyncInfos[index].isFirst, | |
}, | |
RemoveSlaveNode: nil, | |
ConsensusMeta: nil, | |
}) | |
if err != nil { | |
logrus.Warn("Failed to send binlog sync, {}", err) | |
return nil, err | |
} | |
} | |
} | |
return binlogByte, nil |
tools/pika_cdc/pika/replprotocol.go
Outdated
func (repl *ReplProtocol) GetSyncWithPika() error { | ||
if err := repl.sendMetaSyncRequest(); err != nil { | ||
return err | ||
} | ||
metaResp := repl.getResponse() | ||
if metaResp == nil { | ||
logrus.Fatal("Failed to get metaResp") | ||
} | ||
repl.dbMetaInfo = metaResp.MetaSync | ||
|
||
trySyncType := inner.Type_kTrySync | ||
binlogSyncType := inner.Type_kBinlogSync | ||
|
||
replDBs := metaResp.MetaSync.DbsInfo | ||
var a uint64 = 0 | ||
var b uint32 = 0 | ||
for _, dbInfo := range replDBs { | ||
newMetaInfo := binlogSyncInfo{ | ||
binlogOffset: &inner.BinlogOffset{ | ||
Filenum: nil, | ||
Offset: nil, | ||
Term: nil, | ||
Index: nil, | ||
}, | ||
fileNum: 0, | ||
offset: 0, | ||
sessionId: 0, | ||
} | ||
newMetaInfo.binlogOffset.Offset = &a | ||
newMetaInfo.binlogOffset.Filenum = &b | ||
|
||
slotId := uint32(*dbInfo.SlotNum) | ||
trySync := &inner.InnerRequest{ | ||
Type: &trySyncType, | ||
TrySync: &inner.InnerRequest_TrySync{ | ||
Node: &inner.Node{ | ||
Ip: &repl.ip, | ||
Port: &repl.port, | ||
}, | ||
Slot: &inner.Slot{ | ||
DbName: dbInfo.DbName, | ||
SlotId: &slotId, | ||
}, | ||
BinlogOffset: newMetaInfo.binlogOffset, | ||
}, | ||
ConsensusMeta: nil, | ||
} | ||
if err := repl.sendReplReq(trySync); err != nil { | ||
return err | ||
} | ||
|
||
trySyncResp := repl.getResponse() | ||
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { | ||
logrus.Fatal("Failed to get TrySync Response Msg") | ||
} | ||
startOffset := trySyncResp.TrySync.GetBinlogOffset() | ||
trySync.TrySync.BinlogOffset = startOffset | ||
// send twice to get session id | ||
if err := repl.sendReplReq(trySync); err != nil { | ||
return err | ||
} | ||
trySyncResp = repl.getResponse() | ||
|
||
newMetaInfo.binlogOffset = startOffset | ||
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId | ||
newMetaInfo.isFirst = true | ||
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo) | ||
} | ||
|
||
// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine | ||
for index, dbInfo := range repl.binlogSyncInfos { | ||
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | ||
binlogSyncReq := &inner.InnerRequest{ | ||
Type: &binlogSyncType, | ||
MetaSync: nil, | ||
TrySync: nil, | ||
DbSync: nil, | ||
BinlogSync: &inner.InnerRequest_BinlogSync{ | ||
Node: &inner.Node{ | ||
Ip: &repl.ip, | ||
Port: &repl.port, | ||
}, | ||
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | ||
SlotId: &slotId, | ||
AckRangeStart: dbInfo.binlogOffset, | ||
AckRangeEnd: dbInfo.binlogOffset, | ||
SessionId: &dbInfo.sessionId, | ||
FirstSend: &dbInfo.isFirst, | ||
}, | ||
RemoveSlaveNode: nil, | ||
ConsensusMeta: nil, | ||
} | ||
if err := repl.sendReplReq(binlogSyncReq); err != nil { | ||
return err | ||
} | ||
repl.binlogSyncInfos[index].isFirst = false | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve Error Handling in GetSyncWithPika
The use of logrus.Fatal
for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.
- logrus.Fatal("Failed to get metaResp")
+ return fmt.Errorf("failed to get metaResp")
Apply similar changes to other logrus.Fatal
calls within this function.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (repl *ReplProtocol) GetSyncWithPika() error { | |
if err := repl.sendMetaSyncRequest(); err != nil { | |
return err | |
} | |
metaResp := repl.getResponse() | |
if metaResp == nil { | |
logrus.Fatal("Failed to get metaResp") | |
} | |
repl.dbMetaInfo = metaResp.MetaSync | |
trySyncType := inner.Type_kTrySync | |
binlogSyncType := inner.Type_kBinlogSync | |
replDBs := metaResp.MetaSync.DbsInfo | |
var a uint64 = 0 | |
var b uint32 = 0 | |
for _, dbInfo := range replDBs { | |
newMetaInfo := binlogSyncInfo{ | |
binlogOffset: &inner.BinlogOffset{ | |
Filenum: nil, | |
Offset: nil, | |
Term: nil, | |
Index: nil, | |
}, | |
fileNum: 0, | |
offset: 0, | |
sessionId: 0, | |
} | |
newMetaInfo.binlogOffset.Offset = &a | |
newMetaInfo.binlogOffset.Filenum = &b | |
slotId := uint32(*dbInfo.SlotNum) | |
trySync := &inner.InnerRequest{ | |
Type: &trySyncType, | |
TrySync: &inner.InnerRequest_TrySync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
Slot: &inner.Slot{ | |
DbName: dbInfo.DbName, | |
SlotId: &slotId, | |
}, | |
BinlogOffset: newMetaInfo.binlogOffset, | |
}, | |
ConsensusMeta: nil, | |
} | |
if err := repl.sendReplReq(trySync); err != nil { | |
return err | |
} | |
trySyncResp := repl.getResponse() | |
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { | |
logrus.Fatal("Failed to get TrySync Response Msg") | |
} | |
startOffset := trySyncResp.TrySync.GetBinlogOffset() | |
trySync.TrySync.BinlogOffset = startOffset | |
// send twice to get session id | |
if err := repl.sendReplReq(trySync); err != nil { | |
return err | |
} | |
trySyncResp = repl.getResponse() | |
newMetaInfo.binlogOffset = startOffset | |
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId | |
newMetaInfo.isFirst = true | |
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo) | |
} | |
// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine | |
for index, dbInfo := range repl.binlogSyncInfos { | |
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | |
binlogSyncReq := &inner.InnerRequest{ | |
Type: &binlogSyncType, | |
MetaSync: nil, | |
TrySync: nil, | |
DbSync: nil, | |
BinlogSync: &inner.InnerRequest_BinlogSync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | |
SlotId: &slotId, | |
AckRangeStart: dbInfo.binlogOffset, | |
AckRangeEnd: dbInfo.binlogOffset, | |
SessionId: &dbInfo.sessionId, | |
FirstSend: &dbInfo.isFirst, | |
}, | |
RemoveSlaveNode: nil, | |
ConsensusMeta: nil, | |
} | |
if err := repl.sendReplReq(binlogSyncReq); err != nil { | |
return err | |
} | |
repl.binlogSyncInfos[index].isFirst = false | |
} | |
return nil | |
} | |
func (repl *ReplProtocol) GetSyncWithPika() error { | |
if err := repl.sendMetaSyncRequest(); err != nil { | |
return err | |
} | |
metaResp := repl.getResponse() | |
if metaResp == nil { | |
return fmt.Errorf("failed to get metaResp") | |
} | |
repl.dbMetaInfo = metaResp.MetaSync | |
trySyncType := inner.Type_kTrySync | |
binlogSyncType := inner.Type_kBinlogSync | |
replDBs := metaResp.MetaSync.DbsInfo | |
var a uint64 = 0 | |
var b uint32 = 0 | |
for _, dbInfo := range replDBs { | |
newMetaInfo := binlogSyncInfo{ | |
binlogOffset: &inner.BinlogOffset{ | |
Filenum: nil, | |
Offset: nil, | |
Term: nil, | |
Index: nil, | |
}, | |
fileNum: 0, | |
offset: 0, | |
sessionId: 0, | |
} | |
newMetaInfo.binlogOffset.Offset = &a | |
newMetaInfo.binlogOffset.Filenum = &b | |
slotId := uint32(*dbInfo.SlotNum) | |
trySync := &inner.InnerRequest{ | |
Type: &trySyncType, | |
TrySync: &inner.InnerRequest_TrySync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
Slot: &inner.Slot{ | |
DbName: dbInfo.DbName, | |
SlotId: &slotId, | |
}, | |
BinlogOffset: newMetaInfo.binlogOffset, | |
}, | |
ConsensusMeta: nil, | |
} | |
if err := repl.sendReplReq(trySync); err != nil { | |
return err | |
} | |
trySyncResp := repl.getResponse() | |
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { | |
return fmt.Errorf("failed to get TrySync Response Msg") | |
} | |
startOffset := trySyncResp.TrySync.GetBinlogOffset() | |
trySync.TrySync.BinlogOffset = startOffset | |
// send twice to get session id | |
if err := repl.sendReplReq(trySync); err != nil { | |
return err | |
} | |
trySyncResp = repl.getResponse() | |
newMetaInfo.binlogOffset = startOffset | |
newMetaInfo.sessionId = *trySyncResp.TrySync.SessionId | |
newMetaInfo.isFirst = true | |
repl.binlogSyncInfos = append(repl.binlogSyncInfos, newMetaInfo) | |
} | |
// todo(leehao): Can find ways to optimize using coroutines here. May be use goroutine | |
for index, dbInfo := range repl.binlogSyncInfos { | |
slotId := uint32(*repl.dbMetaInfo.DbsInfo[index].SlotNum) | |
binlogSyncReq := &inner.InnerRequest{ | |
Type: &binlogSyncType, | |
MetaSync: nil, | |
TrySync: nil, | |
DbSync: nil, | |
BinlogSync: &inner.InnerRequest_BinlogSync{ | |
Node: &inner.Node{ | |
Ip: &repl.ip, | |
Port: &repl.port, | |
}, | |
DbName: repl.dbMetaInfo.DbsInfo[index].DbName, | |
SlotId: &slotId, | |
AckRangeStart: dbInfo.binlogOffset, | |
AckRangeEnd: dbInfo.binlogOffset, | |
SessionId: &dbInfo.sessionId, | |
FirstSend: &dbInfo.isFirst, | |
}, | |
RemoveSlaveNode: nil, | |
ConsensusMeta: nil, | |
} | |
if err := repl.sendReplReq(binlogSyncReq); err != nil { | |
return err | |
} | |
repl.binlogSyncInfos[index].isFirst = false | |
} | |
return nil | |
} |
d819522
to
608b44d
Compare
Signed-off-by: LeeHao <1838249551@qq.com>
608b44d
to
152695a
Compare
Signed-off-by: LeeHao <1838249551@qq.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 18
Outside diff range, codebase verification and nitpick comments (1)
tools/pika_cdc/README.md (1)
19-19
: Fix the heading.The "Todo" heading has the following issues:
- It is missing a hyphen.
- It has a trailing colon.
Apply this diff to fix the heading:
-## Todo: +## To-doTools
LanguageTool
[grammar] ~19-~19: It appears that a hyphen is missing in the noun “To-do” (= task) or did you mean the verb “to do”?
Context: ... ## Build pika cdcbash make
## Todo: Consumer side: - [x] redis - [x]...(TO_DO_HYPHEN)
Markdownlint
19-19: Punctuation: ':'
Trailing punctuation in heading(MD026, no-trailing-punctuation)
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (11)
- tools/pika_cdc/README.md (1 hunks)
- tools/pika_cdc/conf/cdc.yml (1 hunks)
- tools/pika_cdc/conf/conf.go (1 hunks)
- tools/pika_cdc/consumer/consumer.go (1 hunks)
- tools/pika_cdc/consumer/kafka.go (1 hunks)
- tools/pika_cdc/consumer/protocol.go (1 hunks)
- tools/pika_cdc/consumer/redis.go (1 hunks)
- tools/pika_cdc/main.go (1 hunks)
- tools/pika_cdc/pika/replprotocol.go (1 hunks)
- tools/pika_cdc/pika/replprotocol_test.go (1 hunks)
- tools/pika_cdc/pika/server.go (1 hunks)
Additional context used
LanguageTool
tools/pika_cdc/README.md
[grammar] ~19-~19: It appears that a hyphen is missing in the noun “To-do” (= task) or did you mean the verb “to do”?
Context: ... ## Build pika cdcbash make
## Todo: Consumer side: - [x] redis - [x]...(TO_DO_HYPHEN)
Markdownlint
tools/pika_cdc/README.md
19-19: Punctuation: ':'
Trailing punctuation in heading(MD026, no-trailing-punctuation)
2-2: null
Emphasis used instead of a heading(MD036, no-emphasis-as-heading)
GitHub Check: CodeQL
tools/pika_cdc/pika/server.go
[failure] 30-30: Incorrect conversion between integer types
Incorrect conversion of an integer with architecture-dependent bit size from strconv.Atoi to a lower bit size type int32 without an upper bound check.
Additional comments not posted (25)
tools/pika_cdc/conf/cdc.yml (1)
1-20
: LGTM! The configuration file is well-structured and covers the essential settings for Pika CDC.The file is a good addition to the project as it provides a centralized configuration for Pika CDC. The configuration options cover the essential components and settings required for Pika CDC, and the default values seem reasonable.
A few suggestions for improvement:
- Consider adding comments to explain the purpose of each component (Pika server, Kafka servers, Redis servers, Pulsar servers) and how they are used in Pika CDC.
- Consider adding a section for logging configuration to control the verbosity and destination of logs.
- Consider adding a section for monitoring configuration to enable metrics collection and monitoring of Pika CDC.
tools/pika_cdc/README.md (2)
6-17
: LGTM!The build instructions are clear and easy to follow.
21-24
: LGTM!The consumer side todo list is clear and easy to understand.
tools/pika_cdc/consumer/protocol.go (2)
8-10
: LGTM!The code changes are approved.
25-27
: LGTM!The code changes are approved.
tools/pika_cdc/main.go (2)
12-12
: Duplicate comments: Improve error logging.The existing review comments suggesting the use of
logrus.Fatalf
instead oflogrus.Fatal
to include the actual error message are still applicable.Also applies to: 15-15
10-23
: LGTM!The code changes in the main function are approved. The function follows a logical flow of connecting to the Pika server, generating consumers, and running them.
tools/pika_cdc/consumer/consumer.go (1)
22-22
: ** Handle errors when creating Kafka and Redis consumers.**The comments from the previous reviews are still applicable. The errors returned by
NewKafka
andNewRedis
are being ignored in the current code. Consider handling these errors to ensure robustness.Also applies to: 28-28
tools/pika_cdc/conf/conf.go (3)
31-33
: The previous review comment suggesting the update of deprecatedioutil.ReadFile
usage toos.ReadFile
is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.
33-33
: The previous review comment suggesting the improvement of error logging by including the filename for better context is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.
37-38
: The previous review comment suggesting the improvement of error logging for YAML unmarshalling to be more descriptive is still valid and applicable to the current code segment. Please refer to the previous comment and apply the suggested changes.tools/pika_cdc/consumer/kafka.go (4)
21-24
: The previous review comment on theSendCmdMessage
method is still valid. The method should handle errors for better debugging.
30-47
: LGTM!The code changes are approved. The
NewKafka
function handles errors correctly by returning the error.
71-88
: LGTM!The code changes are approved. The
Run
method implements the functionality to consume messages from channels and send them to Kafka correctly.
89-91
: LGTM!The code changes are approved. The
Stop
method implements the functionality to stop the consumer correctly.tools/pika_cdc/consumer/redis.go (1)
78-80
: LGTM!The
Stop
method implementation looks good.tools/pika_cdc/pika/server.go (5)
40-60
: Return errors instead of logging fatal errors inNew
.The previous comment is still applicable:
Ensure proper error handling and resource management in
New
.The
New
function logs fatal errors, which can terminate the program unexpectedly. Consider returning the error instead.Apply this diff to return the error instead of logging fatal errors:
func New(s string, bufferMsgNumber int) (Server, error) { server := Server{} server.MsgChanns = make(map[string]chan []byte) conn, err := net.Dial("tcp", s) if err != nil { - logrus.Fatal("Error connecting to Pika server:", err) + return Server{}, fmt.Errorf("Error connecting to Pika server: %v", err) } server.bufferMsgNumber = bufferMsgNumber server.pikaConn = conn server.writer = bufio.NewWriter(server.pikaConn) server.reader = bufio.NewReader(server.pikaConn) server.pikaReplProtocol = ReplProtocol{ writer: server.writer, reader: server.reader, ip: getIP(conn.LocalAddr().String()), port: getPort(conn.LocalAddr().String()), } err = server.CreateSyncWithPika() server.buildMsgChann() return server, err }
69-88
: Log errors inRun
method.The previous comment is still applicable:
Consider logging errors in
Run
method.The
Run
method does not handle errors fromGetBinlogSync
. Consider logging these errors for better debugging.Apply this diff to log errors from
GetBinlogSync
:func (s *Server) Run() { for { select { case <-s.stop: return case <-time.After(100 * time.Millisecond): - binlogBytes, _ := s.pikaReplProtocol.GetBinlogSync() + binlogBytes, err := s.pikaReplProtocol.GetBinlogSync() + if err != nil { + logrus.Errorf("Error getting binlog sync: %v", err) + continue + } if len(binlogBytes) != 0 { for dbName, binlog := range binlogBytes { chann, exists := s.MsgChanns[dbName] if !exists { chann = make(chan []byte, s.bufferMsgNumber) s.MsgChanns[dbName] = chann } chann <- binlog } } } } }
90-96
: Use a controlled shutdown process inExit
method.The previous comment is still applicable:
Ensure proper closure of channels in
Exit
method.The
Exit
method closes thestop
channel, which could cause a panic ifRun
is still selecting on it. Consider using a more controlled shutdown process.Apply this diff to use a non-blocking send on the
stop
channel for a controlled shutdown:func (s *Server) Exit() { - s.stop <- true - close(s.stop) + select { + case s.stop <- true: + default: + } for _, chann := range s.MsgChanns { close(chann) } }
98-100
: Handle errors inCreateSyncWithPika
method.The previous comment is still applicable:
Ensure
CreateSyncWithPika
handles errors properly.The
CreateSyncWithPika
method callsGetSyncWithPika
but does not handle potential errors. Ensure errors are logged or handled appropriately.Apply this diff to log errors from
GetSyncWithPika
:func (s *Server) CreateSyncWithPika() error { - return s.pikaReplProtocol.GetSyncWithPika() + err := s.pikaReplProtocol.GetSyncWithPika() + if err != nil { + logrus.Errorf("Error creating sync with Pika: %v", err) + } + return err }
27-31
: Handle potential errors and perform bounds check ingetPort
.The previous comment is still applicable:
Handle potential errors in
getPort
.The
getPort
function ignores errors fromstrconv.Atoi
. Consider handling the error to avoid potential issues with invalid input.Additionally, converting the port number to
int32
without an upper bound check can lead to integer overflow.Apply this diff to handle the error and perform a bounds check before the type conversion:
func getPort(addr string) int32 { portStr := addr[strings.LastIndex(addr, ":")+1:] - port, _ := strconv.Atoi(portStr) + port, err := strconv.Atoi(portStr) + if err != nil { + logrus.Errorf("Invalid port in address %s: %v", addr, err) + return 0 + } + if port < 0 || port > 65535 { + logrus.Errorf("Port %d out of range in address %s", port, addr) + return 0 + } return int32(port) }Tools
GitHub Check: CodeQL
[failure] 30-30: Incorrect conversion between integer types
Incorrect conversion of an integer with architecture-dependent bit size from strconv.Atoi to a lower bit size type int32 without an upper bound check.tools/pika_cdc/pika/replprotocol_test.go (4)
142-166
: LGTM!The code changes are approved.
275-280
: LGTM!The code changes are approved.
398-419
: LGTM!The code changes are approved.
421-462
: LGTM!The code changes are approved.
@@ -0,0 +1,24 @@ | |||
# Pika cdc | |||
**A tool for incremental synchronization of pika command** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert the emphasis to a heading.
The tool name "A tool for incremental synchronization of pika command" is emphasized using double asterisks. It should be a heading instead.
Apply this diff to convert the emphasis to a heading:
-**A tool for incremental synchronization of pika command**
+## A tool for incremental synchronization of pika command
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
**A tool for incremental synchronization of pika command** | |
## A tool for incremental synchronization of pika command |
Tools
Markdownlint
2-2: null
Emphasis used instead of a heading(MD036, no-emphasis-as-heading)
func (rp RedisProtocol) Select(dbName string) []byte { | ||
db, _ := strconv.Atoi(dbName[len(dbName)-1:]) | ||
dbStr := strconv.Itoa(db) | ||
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr) | ||
return []byte(msg) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the database name parsing and error handling.
The Select
method has the following issues:
- The database name is assumed to end with a single digit. This assumption may not always hold true.
- The error returned by
strconv.Atoi
is ignored. This can lead to unexpected behavior if the database name does not end with a valid integer.
Consider applying this diff to fix the issues:
-func (rp RedisProtocol) Select(dbName string) []byte {
- db, _ := strconv.Atoi(dbName[len(dbName)-1:])
+func (rp RedisProtocol) Select(dbName string) ([]byte, error) {
+ db, err := strconv.Atoi(dbName)
+ if err != nil {
+ return nil, fmt.Errorf("invalid database name: %s", dbName)
+ }
dbStr := strconv.Itoa(db)
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr)
- return []byte(msg)
+ return []byte(msg), nil
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (rp RedisProtocol) Select(dbName string) []byte { | |
db, _ := strconv.Atoi(dbName[len(dbName)-1:]) | |
dbStr := strconv.Itoa(db) | |
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr) | |
return []byte(msg) | |
} | |
func (rp RedisProtocol) Select(dbName string) ([]byte, error) { | |
db, err := strconv.Atoi(dbName) | |
if err != nil { | |
return nil, fmt.Errorf("invalid database name: %s", dbName) | |
} | |
dbStr := strconv.Itoa(db) | |
msg := fmt.Sprintf("*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", len(dbStr), dbStr) | |
return []byte(msg), nil | |
} |
func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) { | ||
var consumers []Consumer | ||
|
||
// kafka | ||
for _, k := range config.KafkaServers { | ||
kafka, _ := NewKafka(k, config.Retries, msgChanns) | ||
consumers = append(consumers, kafka) | ||
} | ||
|
||
// redis | ||
for _, r := range config.RedisServers { | ||
newRedis, _ := NewRedis(r, msgChanns) | ||
consumers = append(consumers, newRedis) | ||
} | ||
return consumers, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the GenerateConsumers
function to reduce code duplication.
The code for creating Kafka and Redis consumers is similar. Consider refactoring the function to reduce code duplication.
Here's a suggested refactor:
func GenerateConsumers(config conf.PikaCdcConfig, msgChanns map[string]chan []byte) ([]Consumer, error) {
var consumers []Consumer
var err error
// Create Kafka consumers
for _, k := range config.KafkaServers {
var consumer Consumer
consumer, err = NewKafka(k, config.Retries, msgChanns)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer: %v", err)
}
consumers = append(consumers, consumer)
}
// Create Redis consumers
for _, r := range config.RedisServers {
var consumer Consumer
consumer, err = NewRedis(r, msgChanns)
if err != nil {
return nil, fmt.Errorf("failed to create Redis consumer: %v", err)
}
consumers = append(consumers, consumer)
}
return consumers, nil
}
This refactor:
- Handles the errors returned by
NewKafka
andNewRedis
. - Reduces code duplication by using a similar structure for creating Kafka and Redis consumers.
- Returns an error if any of the consumer creations fail.
func (k *Kafka) Close() error { | ||
//var err error | ||
//err = nil | ||
//k.once.Do(func() { | ||
// err = k.close() | ||
//}) | ||
//return err | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement functionality to close the consumer.
The Close
method is currently empty with all the code commented out. Implement functionality to close the consumer.
Apply this diff to implement the functionality:
func (k *Kafka) Close() error {
- //var err error
- //err = nil
- //k.once.Do(func() {
- // err = k.close()
- //})
- //return err
+ var err error
+ k.once.Do(func() {
+ err = k.close()
+ })
+ return err
- return nil
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (k *Kafka) Close() error { | |
//var err error | |
//err = nil | |
//k.once.Do(func() { | |
// err = k.close() | |
//}) | |
//return err | |
return nil | |
} | |
func (k *Kafka) Close() error { | |
var err error | |
k.once.Do(func() { | |
err = k.close() | |
}) | |
return err | |
} |
func (k *Kafka) close() error { | ||
//k.stopChan <- true | ||
//close(k.stopChan) | ||
//close(*k.messageChan) | ||
//for _, conn := range k.kafkaConns { | ||
// err := conn.Close() | ||
// if err != nil { | ||
// logrus.Warn(err) | ||
// return err | ||
// } | ||
//} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement functionality to close the connections and channels.
The close
method is currently empty with all the code commented out. Implement functionality to close the connections and channels.
Apply this diff to implement the functionality:
func (k *Kafka) close() error {
- //k.stopChan <- true
- //close(k.stopChan)
- //close(*k.messageChan)
- //for _, conn := range k.kafkaConns {
- // err := conn.Close()
- // if err != nil {
- // logrus.Warn(err)
- // return err
- // }
- //}
+ k.stopChan <- true
+ close(k.stopChan)
+ for _, conn := range k.kafkaConns {
+ err := conn.Close()
+ if err != nil {
+ logrus.Warn(err)
+ return err
+ }
+ }
return nil
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func (k *Kafka) close() error { | |
//k.stopChan <- true | |
//close(k.stopChan) | |
//close(*k.messageChan) | |
//for _, conn := range k.kafkaConns { | |
// err := conn.Close() | |
// if err != nil { | |
// logrus.Warn(err) | |
// return err | |
// } | |
//} | |
return nil | |
} | |
func (k *Kafka) close() error { | |
k.stopChan <- true | |
close(k.stopChan) | |
for _, conn := range k.kafkaConns { | |
err := conn.Close() | |
if err != nil { | |
logrus.Warn(err) | |
return err | |
} | |
} | |
return nil | |
} |
func TestGetOffsetFromMaster(t *testing.T) { | ||
ip := string("127.0.0.1") | ||
listener, e := net.Listen("tcp", ":0") | ||
if e != nil { | ||
os.Exit(1) | ||
} | ||
selfPort := getPort(listener.Addr().String()) | ||
conn, err := sendMetaSyncRequest(nil) | ||
if err != nil { | ||
logrus.Fatal("Failed to sendMetaSyncRequest") | ||
} | ||
metaResp := getResponse(conn) | ||
trySyncType := inner.Type_kTrySync | ||
replDBs := metaResp.MetaSync.DbsInfo | ||
var fileNum uint32 = 1 | ||
var offset uint64 = 0 | ||
for _, db := range replDBs { | ||
slotId := uint32(*db.SlotNum) | ||
trySync := &inner.InnerRequest{ | ||
Type: &trySyncType, | ||
TrySync: &inner.InnerRequest_TrySync{ | ||
Node: &inner.Node{ | ||
Ip: &ip, | ||
Port: &selfPort, | ||
}, | ||
Slot: &inner.Slot{ | ||
DbName: db.DbName, | ||
SlotId: &slotId, | ||
}, | ||
BinlogOffset: &inner.BinlogOffset{ | ||
Filenum: &fileNum, | ||
Offset: &offset, | ||
Term: nil, | ||
Index: nil, | ||
}, | ||
}, | ||
ConsensusMeta: nil, | ||
} | ||
_, err = sendReplReq(conn, trySync) | ||
if err != nil { | ||
logrus.Fatal("Failed to send TrySync Msg", err) | ||
} | ||
trySyncResp := getResponse(conn) | ||
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { | ||
logrus.Fatal("Failed to get TrySync Response Msg", err) | ||
} | ||
trySync.TrySync.BinlogOffset = trySyncResp.TrySync.GetBinlogOffset() | ||
logrus.Println("get offset:", trySync.TrySync.BinlogOffset) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance TestGetOffsetFromMaster with Assertions and Error Handling
The TestGetOffsetFromMaster
function lacks assertions to verify expected outcomes and does not handle errors effectively. Consider adding assertions and handling errors properly.
if metaResp == nil {
t.Fatal("Failed to get metaResp")
}
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk {
t.Fatalf("Failed to get TrySync Response Msg: %v", err)
}
func TestSendDbSyncReqMsg(t *testing.T) { | ||
ip := string("127.0.0.1") | ||
listener, e := net.Listen("tcp", ":0") | ||
if e != nil { | ||
os.Exit(1) | ||
} | ||
|
||
selfPort := getPort(listener.Addr().String()) | ||
|
||
metaSyncType := inner.Type_kMetaSync | ||
|
||
request := &inner.InnerRequest{ | ||
Type: &metaSyncType, | ||
MetaSync: &inner.InnerRequest_MetaSync{ | ||
Node: &inner.Node{ | ||
Ip: &ip, | ||
Port: &selfPort, | ||
}, | ||
}, | ||
} | ||
conn, err := sendReplReq(nil, request) | ||
if err != nil { | ||
os.Exit(1) | ||
} | ||
metaResp := getResponse(conn) | ||
|
||
dbSyncType := inner.Type_kDBSync | ||
replDBs := metaResp.MetaSync.DbsInfo | ||
for _, db := range replDBs { | ||
var fileNum uint32 = 1 | ||
var offset uint64 = 0 | ||
slotId := uint32(*db.SlotNum) | ||
dbSyncReq := &inner.InnerRequest{ | ||
Type: &dbSyncType, | ||
DbSync: &inner.InnerRequest_DBSync{ | ||
Node: &inner.Node{ | ||
Ip: &ip, | ||
Port: &selfPort, | ||
}, | ||
Slot: &inner.Slot{ | ||
DbName: db.DbName, | ||
SlotId: &slotId, | ||
}, | ||
BinlogOffset: &inner.BinlogOffset{ | ||
Filenum: &fileNum, | ||
Offset: &offset, | ||
Term: nil, | ||
Index: nil, | ||
}, | ||
}, | ||
ConsensusMeta: nil, | ||
} | ||
sendReplReq(conn, dbSyncReq) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance TestSendDbSyncReqMsg with Assertions and Error Handling
The TestSendDbSyncReqMsg
function lacks assertions to verify expected outcomes and does not handle errors effectively. Consider adding assertions and handling errors properly.
if metaResp == nil {
t.Fatal("Failed to get metaResp")
}
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { | ||
if conn == nil { | ||
ip := string("127.0.0.1") | ||
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 | ||
addr := ip + ":" + strconv.Itoa(int(masterReplPort)) | ||
newConn, err := net.Dial("tcp", addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
conn = newConn | ||
} | ||
msg, err := proto.Marshal(request) | ||
if err != nil { | ||
logrus.Fatal("Error Marshal:", err) | ||
} | ||
|
||
pikaTag := []byte(BuildInternalTag(msg)) | ||
allBytes := append(pikaTag, msg...) | ||
_, err = conn.Write(allBytes) | ||
if err != nil { | ||
logrus.Fatal("Error writing to server:", err) | ||
} | ||
return conn, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve Error Handling in sendReplReq
The use of logrus.Fatal
for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.
- logrus.Fatal("Error Marshal:", err)
+ return nil, fmt.Errorf("error marshal: %v", err)
Apply similar changes to other logrus.Fatal
calls within this function.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { | |
if conn == nil { | |
ip := string("127.0.0.1") | |
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 | |
addr := ip + ":" + strconv.Itoa(int(masterReplPort)) | |
newConn, err := net.Dial("tcp", addr) | |
if err != nil { | |
return nil, err | |
} | |
conn = newConn | |
} | |
msg, err := proto.Marshal(request) | |
if err != nil { | |
logrus.Fatal("Error Marshal:", err) | |
} | |
pikaTag := []byte(BuildInternalTag(msg)) | |
allBytes := append(pikaTag, msg...) | |
_, err = conn.Write(allBytes) | |
if err != nil { | |
logrus.Fatal("Error writing to server:", err) | |
} | |
return conn, nil | |
} | |
func sendReplReq(conn net.Conn, request *inner.InnerRequest) (net.Conn, error) { | |
if conn == nil { | |
ip := string("127.0.0.1") | |
var masterReplPort int32 = getPort(conf.ConfigInstance.PikaServer) + 2000 | |
addr := ip + ":" + strconv.Itoa(int(masterReplPort)) | |
newConn, err := net.Dial("tcp", addr) | |
if err != nil { | |
return nil, err | |
} | |
conn = newConn | |
} | |
msg, err := proto.Marshal(request) | |
if err != nil { | |
return nil, fmt.Errorf("error marshal: %v", err) | |
} | |
pikaTag := []byte(BuildInternalTag(msg)) | |
allBytes := append(pikaTag, msg...) | |
_, err = conn.Write(allBytes) | |
if err != nil { | |
return nil, fmt.Errorf("error writing to server: %v", err) | |
} | |
return conn, nil | |
} |
logrus.Fatal(err) | ||
} | ||
metaResp := getResponse(conn) | ||
if metaResp == nil { | ||
logrus.Fatal("Failed to get metaResp") | ||
} | ||
trySyncType := inner.Type_kTrySync | ||
binlogSyncType := inner.Type_kBinlogSync | ||
replDBs := metaResp.MetaSync.DbsInfo | ||
var fileNum uint32 = 1 | ||
var offset uint64 = 0 | ||
ip := getIP(conn.LocalAddr().String()) | ||
port := getPort(conn.LocalAddr().String()) | ||
|
||
for _, db := range replDBs { | ||
slotId := uint32(*db.SlotNum) | ||
trySync := &inner.InnerRequest{ | ||
Type: &trySyncType, | ||
TrySync: &inner.InnerRequest_TrySync{ | ||
Node: &inner.Node{ | ||
Ip: &ip, | ||
Port: &port, | ||
}, | ||
Slot: &inner.Slot{ | ||
DbName: db.DbName, | ||
SlotId: &slotId, | ||
}, | ||
BinlogOffset: &inner.BinlogOffset{ | ||
Filenum: &fileNum, | ||
Offset: &offset, | ||
Term: nil, | ||
Index: nil, | ||
}, | ||
}, | ||
ConsensusMeta: nil, | ||
} | ||
_, err = sendReplReq(conn, trySync) | ||
if err != nil { | ||
logrus.Fatal("Failed to send TrySync Msg", err) | ||
} | ||
trySyncResp := getResponse(conn) | ||
if trySyncResp == nil || *trySyncResp.Code != inner.StatusCode_kOk { | ||
logrus.Fatal("Failed to get TrySync Response Msg", err) | ||
} | ||
startOffset := trySyncResp.TrySync.GetBinlogOffset() | ||
trySync.TrySync.BinlogOffset = startOffset | ||
// send twice to get session id | ||
sendReplReq(conn, trySync) | ||
trySyncResp = getResponse(conn) | ||
|
||
isFirst := true | ||
binlogSyncReq := &inner.InnerRequest{ | ||
Type: &binlogSyncType, | ||
MetaSync: nil, | ||
TrySync: nil, | ||
DbSync: nil, | ||
BinlogSync: &inner.InnerRequest_BinlogSync{ | ||
Node: trySync.TrySync.Node, | ||
DbName: db.DbName, | ||
SlotId: &slotId, | ||
AckRangeStart: startOffset, | ||
AckRangeEnd: startOffset, | ||
SessionId: trySyncResp.TrySync.SessionId, | ||
FirstSend: &isFirst, | ||
}, | ||
RemoveSlaveNode: nil, | ||
ConsensusMeta: nil, | ||
} | ||
sendReplReq(conn, binlogSyncReq) | ||
go func() { | ||
for { | ||
binlogSyncResp := getResponse(conn) | ||
if binlogSyncResp == nil || *binlogSyncResp.Code != inner.StatusCode_kOk || | ||
*binlogSyncResp.Type != inner.Type_kBinlogSync || binlogSyncResp.BinlogSync == nil { | ||
logrus.Fatal("get binlog sync response failed") | ||
} else { | ||
for _, item := range binlogSyncResp.BinlogSync { | ||
*binlogSyncReq.BinlogSync.FirstSend = false | ||
if len(item.Binlog) == 0 { | ||
*binlogSyncReq.BinlogSync.AckRangeStart.Filenum = 0 | ||
*binlogSyncReq.BinlogSync.AckRangeStart.Offset = 0 | ||
logrus.Println("receive binlog response keep alive") | ||
} else { | ||
binlogSyncReq.BinlogSync.AckRangeStart = item.BinlogOffset | ||
binlogSyncReq.BinlogSync.AckRangeEnd = item.BinlogOffset | ||
if binlogItem, err := DecodeBinlogItem(item.Binlog); err != nil { | ||
logrus.Fatal(err) | ||
} else { | ||
SendRedisData("127.0.0.1:6379", binlogItem.Content) | ||
} | ||
} | ||
sendReplReq(conn, binlogSyncReq) | ||
} | ||
} | ||
} | ||
}() | ||
} | ||
for { | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor TestGetIncrementalSync and Improve Error Handling
The TestGetIncrementalSync
function is quite large and complex. Consider breaking it down into smaller, more focused functions for better readability and maintainability.
Also, the use of logrus.Fatal
for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.
- logrus.Fatal(err)
+ t.Fatalf("Failed to send meta sync request: %v", err)
Apply similar changes to other logrus.Fatal
calls within this function.
Committable suggestion was skipped due to low confidence.
func getResponse(conn net.Conn) *inner.InnerResponse { | ||
// Read the header (length) | ||
header := make([]byte, HeaderLength) | ||
_, err := io.ReadFull(conn, header) | ||
if err != nil { | ||
if err != io.EOF { | ||
fmt.Println("Error reading header:", err) | ||
} | ||
return nil | ||
} | ||
|
||
// Convert the header to an integer | ||
var bodyLength uint32 | ||
buffer := bytes.NewBuffer(header) | ||
err = binary.Read(buffer, binary.BigEndian, &bodyLength) | ||
if err != nil { | ||
logrus.Fatal("Error converting header to integer:", err) | ||
return nil | ||
} | ||
// Read the body | ||
body := make([]byte, bodyLength) | ||
_, err = io.ReadFull(conn, body) | ||
if err != nil { | ||
logrus.Fatal("Error reading body:", err) | ||
return nil | ||
} | ||
|
||
res := &inner.InnerResponse{} | ||
err = proto.Unmarshal(body, res) | ||
if err != nil { | ||
logrus.Fatal("Error Deserialization:", err) | ||
} | ||
return res | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve Error Handling in getResponse
The use of logrus.Fatal
for error handling will cause the application to exit abruptly. Consider returning errors or using a more graceful logging approach.
- logrus.Fatal("Error converting header to integer:", err)
+ fmt.Println("Error converting header to integer:", err)
+ return nil
Apply similar changes to other logrus.Fatal
calls within this function.
Committable suggestion was skipped due to low confidence.
#2820
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Consumer
interface and multiple consumer types for enhanced messaging handling.ReplProtocol
for Pika database replication, facilitating data synchronization.Bug Fixes
Documentation
README.md
and test files for better understanding and usage of the new features.Tests