Skip to content

Commit

Permalink
add message struct (#803)
Browse files Browse the repository at this point in the history
* add message struct

* use message provided method

* codec use local message

* adjust open protocol

* add some new

* fix build

* add more copyright headers

* fix make check

* remove useless field from the message

* run make check

* add copy right
  • Loading branch information
3AceShowHand authored Jan 8, 2025
1 parent 42c3e06 commit 564ded1
Show file tree
Hide file tree
Showing 240 changed files with 3,225 additions and 1,635 deletions.
1 change: 0 additions & 1 deletion api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func RegisterRoutes(
server server.Server,
registry prometheus.Gatherer,
) {

// Open API V2
v2.RegisterOpenAPIV2Routes(router, v2.NewOpenAPIV2(server))

Expand Down
13 changes: 13 additions & 0 deletions api/middleware/middleware_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package middleware

import (
Expand Down
13 changes: 13 additions & 0 deletions cmd/cli/cli_changefeed_move_table.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cli

import (
Expand Down
1 change: 0 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func addNewArchCommandTo(cmd *cobra.Command) {
}

func isNewArchEnabledByConfig(serverConfigFilePath string) bool {

cfg := config.GetDefaultServerConfig()
if len(serverConfigFilePath) > 0 {
// strict decode config file, but ignore debug item
Expand Down
5 changes: 2 additions & 3 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ import (
"os"
"strings"

"github.com/fatih/color"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/logger"

"github.com/fatih/color"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/server"
"github.com/pingcap/ticdc/version"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
2 changes: 1 addition & 1 deletion coordinator/changefeed/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea
}
}

var lastError = errs[len(errs)-1]
lastError := errs[len(errs)-1]

if !m.retrying.Load() {
// errBackoff may be stopped, reset it before the first retry.
Expand Down
20 changes: 13 additions & 7 deletions coordinator/changefeed/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestRetry(t *testing.T) {
changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"}},
{Message: "test"},
},
})
require.True(t, changefeed)
require.Equal(t, model.StateWarning, state)
Expand All @@ -56,12 +57,13 @@ func TestRetry(t *testing.T) {
require.True(t, backoff.ShouldRun())
require.False(t, backoff.retrying.Load())

//fail
// fail
changefeed, state, err = backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"},
{Code: "CDC:ErrSnapshotLostByGC", Message: "snapshot lost by gc"}},
{Code: "CDC:ErrSnapshotLostByGC", Message: "snapshot lost by gc"},
},
})

require.True(t, changefeed)
Expand Down Expand Up @@ -101,7 +103,8 @@ func TestErrorReportedWhenRetrying(t *testing.T) {
changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"}},
{Message: "test"},
},
})
require.NotNil(t, err)
require.True(t, changefeed)
Expand All @@ -114,7 +117,8 @@ func TestErrorReportedWhenRetrying(t *testing.T) {
changefeed, state, err = backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"}},
{Message: "test"},
},
})
require.NotNil(t, err)
require.True(t, changefeed)
Expand All @@ -136,7 +140,8 @@ func TestFailedWhenRetry(t *testing.T) {
changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"}},
{Message: "test"},
},
})
require.NotNil(t, err)
require.True(t, changefeed)
Expand All @@ -149,7 +154,8 @@ func TestFailedWhenRetry(t *testing.T) {
changefeed, state, err = backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
{Message: "test"}},
{Message: "test"},
},
})
require.NotNil(t, err)
require.True(t, changefeed)
Expand Down
3 changes: 2 additions & 1 deletion coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Changefeed struct {
// NewChangefeed creates a new changefeed instance
func NewChangefeed(cfID common.ChangeFeedID,
info *config.ChangeFeedInfo,
checkpointTs uint64) *Changefeed {
checkpointTs uint64,
) *Changefeed {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Panic("unable to marshal changefeed config",
Expand Down
67 changes: 40 additions & 27 deletions coordinator/changefeed/changefeed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,26 +210,32 @@ func TestReplaceStoppedChangefeed(t *testing.T) {
cfID := common.NewChangeFeedIDWithName("test")
cf := &Changefeed{
ID: cfID,
info: atomic.NewPointer(&config.ChangeFeedInfo{ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306"}),
info: atomic.NewPointer(&config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
}),
status: atomic.NewPointer(&heartbeatpb.MaintainerStatus{}),
}
db.AddStoppedChangefeed(cf)
require.Contains(t, db.stopped, cf.ID)

cf2 := &config.ChangeFeedInfo{ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "kafka://127.0.0.1:9092"}
cf2 := &config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "kafka://127.0.0.1:9092",
}
db.ReplaceStoppedChangefeed(cf2)
require.Contains(t, db.stopped, cf.ID)

cf3 := db.GetByID(cf.ID)
require.Equal(t, true, cf3.isMQSink)

cf4ID := common.NewChangeFeedIDWithName("test4")
cf4 := &config.ChangeFeedInfo{ChangefeedID: cf4ID,
SinkURI: "kafka://127.0.0.1:9092"}
cf4 := &config.ChangeFeedInfo{
ChangefeedID: cf4ID,
SinkURI: "kafka://127.0.0.1:9092",
}
db.ReplaceStoppedChangefeed(cf4)

require.NotContains(t, db.changefeeds, cf4ID)
Expand All @@ -238,9 +244,11 @@ func TestReplaceStoppedChangefeed(t *testing.T) {
func TestScheduleChangefeed(t *testing.T) {
db := NewChangefeedDB(1216)
cfID := common.NewChangeFeedIDWithName("test")
cf := NewChangefeed(cfID, &config.ChangeFeedInfo{ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306"},
cf := NewChangefeed(cfID, &config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
},
10)
db.AddAbsentChangefeed(cf)
db.MarkMaintainerScheduling(cf)
Expand All @@ -267,36 +275,40 @@ func TestCalculateGCSafepoint(t *testing.T) {

cfID := common.NewChangeFeedIDWithName("test")
cf1 := NewChangefeed(cfID,
&config.ChangeFeedInfo{ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateStopped,
&config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateStopped,
}, 11)
db.AddStoppedChangefeed(cf1)
require.Equal(t, uint64(11), db.CalculateGCSafepoint())

cf2ID := common.NewChangeFeedIDWithName("test")
cf2 := NewChangefeed(cf2ID,
&config.ChangeFeedInfo{ChangefeedID: cf2ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFinished,
&config.ChangeFeedInfo{
ChangefeedID: cf2ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFinished,
}, 9)
db.AddStoppedChangefeed(cf2)
require.Equal(t, uint64(11), db.CalculateGCSafepoint())

cf3ID := common.NewChangeFeedIDWithName("test")
cf3 := NewChangefeed(cf3ID,
&config.ChangeFeedInfo{ChangefeedID: cf3ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
&config.ChangeFeedInfo{
ChangefeedID: cf3ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}, 10)
db.AddStoppedChangefeed(cf3)
require.Equal(t, uint64(10), db.CalculateGCSafepoint())

cf4ID := common.NewChangeFeedIDWithName("test")
cf4 := NewChangefeed(cf4ID,
&config.ChangeFeedInfo{ChangefeedID: cf4ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFailed,
&config.ChangeFeedInfo{
ChangefeedID: cf4ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFailed,
Error: &model.RunningError{
Code: string(errors.ErrGCTTLExceeded.ID()),
},
Expand All @@ -306,10 +318,11 @@ func TestCalculateGCSafepoint(t *testing.T) {

cf5ID := common.NewChangeFeedIDWithName("test")
cf5 := NewChangefeed(cf5ID,
&config.ChangeFeedInfo{ChangefeedID: cf5ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFailed,
Error: &model.RunningError{},
&config.ChangeFeedInfo{
ChangefeedID: cf5ID,
Config: config.GetDefaultReplicaConfig(),
State: model.StateFailed,
Error: &model.RunningError{},
}, 7)
db.AddStoppedChangefeed(cf5)
require.Equal(t, uint64(7), db.CalculateGCSafepoint())
Expand Down
9 changes: 6 additions & 3 deletions coordinator/changefeed/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[common.ChangeF
}

func (b *EtcdBackend) CreateChangefeed(ctx context.Context,
info *config.ChangeFeedInfo) error {
info *config.ChangeFeedInfo,
) error {
infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), info.ChangefeedID.DisplayName)
infoValue, err := info.Marshal()
if err != nil {
Expand Down Expand Up @@ -229,7 +230,8 @@ func (b *EtcdBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedI
}

func (b *EtcdBackend) DeleteChangefeed(ctx context.Context,
changefeedID common.ChangeFeedID) error {
changefeedID common.ChangeFeedID,
) error {
infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), changefeedID.DisplayName)
jobKey := etcd.GetEtcdKeyJob(b.etcdClient.GetClusterID(), changefeedID.DisplayName)
opsThen := []clientv3.Op{}
Expand All @@ -247,7 +249,8 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context,
}

func (b *EtcdBackend) ResumeChangefeed(ctx context.Context,
id common.ChangeFeedID, newCheckpointTs uint64) error {
id common.ChangeFeedID, newCheckpointTs uint64,
) error {
info, err := b.etcdClient.GetChangeFeedInfo(ctx, id.DisplayName)
if err != nil {
return errors.Trace(err)
Expand Down
9 changes: 5 additions & 4 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func NewController(
backend changefeed.Backend,
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler],
taskScheduler threadpool.ThreadPool,
batchSize int, balanceInterval time.Duration) *Controller {
batchSize int, balanceInterval time.Duration,
) *Controller {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
changefeedDB := changefeed.NewChangefeedDB(version)

Expand Down Expand Up @@ -123,7 +124,7 @@ func NewController(
})
log.Info("changefeed bootstrap initial nodes",
zap.Int("nodes", len(nodes)))
var newNodes = make([]*node.Info, 0, len(nodes))
newNodes := make([]*node.Info, 0, len(nodes))
for _, n := range nodes {
newNodes = append(newNodes, n)
}
Expand Down Expand Up @@ -191,14 +192,14 @@ func (c *Controller) onNodeChanged() {
currentNodes := c.bootstrapper.GetAllNodes()

activeNodes := c.nodeManager.GetAliveNodes()
var newNodes = make([]*node.Info, 0, len(activeNodes))
newNodes := make([]*node.Info, 0, len(activeNodes))
for id, n := range activeNodes {
if _, ok := currentNodes[id]; !ok {
newNodes = append(newNodes, n)
}
}
var removedNodes []node.ID
for id, _ := range currentNodes {
for id := range currentNodes {
if _, ok := activeNodes[id]; !ok {
removedNodes = append(removedNodes, id)
c.RemoveNode(id)
Expand Down
Loading

0 comments on commit 564ded1

Please sign in to comment.