Skip to content

Commit

Permalink
maintainer: fix remove changefeed and some data race (#743)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen authored Jan 2, 2025
1 parent 05c03b4 commit 11ba4e3
Show file tree
Hide file tree
Showing 13 changed files with 281 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ jobs:
if: ${{ always() }}
run: |
DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data' -exec dirname {} \;)
CASE=$(basename $DIR)
[ -z "$DIR" ] && exit 0
CASE=$(basename $DIR)
mkdir -p ./logs/$CASE
cat $DIR/stdout.log
tail -n 10 $DIR/cdc.log
Expand Down
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"os"
"slices"
"strings"

"github.com/pingcap/log"
Expand Down Expand Up @@ -48,6 +49,7 @@ func addNewArchCommandTo(cmd *cobra.Command) {
}

func isNewArchEnabledByConfig(serverConfigFilePath string) bool {

cfg := config.GetDefaultServerConfig()
if len(serverConfigFilePath) > 0 {
// strict decode config file, but ignore debug item
Expand Down Expand Up @@ -80,6 +82,12 @@ func parseConfigFlagFromOSArgs() string {
serverConfigFilePath = os.Args[i+2]
}
}

// If the command is `cdc cli changefeed`, means it's not a server config file.
if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") {
serverConfigFilePath = ""
}

return serverConfigFilePath
}

Expand Down
2 changes: 1 addition & 1 deletion docs/design/2024-12-20-ticdc-flow-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TiCDC processes data in two main parts:

The following diagram illustrates the relationship between the **data puller** and **data sinker**:

![Data Flow](./medias/flow-control-1.png)
![Data Flow](../media/flow-control-1.png)
<!-- The source file for this diagram: docs/design/medias/flow-control-1.puml -->

In this architecture, **EventService** and **EventCollector** facilitate communication between the two parts:
Expand Down
8 changes: 6 additions & 2 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string,

if err != nil {
log.Info("failed to send dispatcher request message to event service, try again later",
zap.Stringer("target", target),
zap.String("changefeedID", req.Dispatcher.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", req.Dispatcher.GetId()),
zap.Any("target", target.String()),
zap.Any("request", req),
zap.Error(err))
// Put the request back to the channel for later retry.
c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{
Expand Down Expand Up @@ -570,6 +573,7 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent,
func (d *dispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) {
d.eventServiceInfo.Lock()
defer d.eventServiceInfo.Unlock()

if event.GetType() != commonEvent.TypeReadyEvent {
log.Panic("should not happen")
}
Expand Down Expand Up @@ -658,7 +662,7 @@ func (d *dispatcherStat) unregisterDispatcher(eventCollector *EventCollector) {
ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE,
})
// unregister from remote event service if have
if d.eventServiceInfo.serverID != eventCollector.serverId {
if d.eventServiceInfo.serverID != "" && d.eventServiceInfo.serverID != eventCollector.serverId {
eventCollector.mustSendDispatcherRequest(d.eventServiceInfo.serverID, eventServiceTopic, DispatcherRequest{
Dispatcher: d.target,
ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE,
Expand Down
Loading

0 comments on commit 11ba4e3

Please sign in to comment.