Skip to content

Commit 54ad97f

Browse files
authored
api(cdc): fix create changefeed after scale-in pd (#12003) (#12030)
close #12004
1 parent d3aee73 commit 54ad97f

File tree

2 files changed

+106
-74
lines changed

2 files changed

+106
-74
lines changed

cdc/api/v2/changefeed.go

Lines changed: 73 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/gin-gonic/gin"
2525
"github.com/pingcap/errors"
2626
"github.com/pingcap/log"
27-
tidbkv "github.com/pingcap/tidb/kv"
27+
"github.com/pingcap/tidb/kv"
2828
"github.com/pingcap/tiflow/cdc/api"
2929
"github.com/pingcap/tiflow/cdc/capture"
3030
"github.com/pingcap/tiflow/cdc/model"
@@ -57,30 +57,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
5757
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
5858
return
5959
}
60+
var pdClient pd.Client
61+
var kvStorage kv.Storage
62+
// if PDAddrs is empty, use the default pdClient
6063
if len(cfg.PDAddrs) == 0 {
6164
up, err := getCaptureDefaultUpstream(h.capture)
6265
if err != nil {
6366
_ = c.Error(err)
6467
return
6568
}
66-
cfg.PDConfig = getUpstreamPDConfig(up)
67-
}
68-
credential := cfg.PDConfig.toCredential()
69-
70-
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
71-
defer cancel()
72-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
73-
if err != nil {
74-
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
75-
return
76-
}
77-
defer pdClient.Close()
78-
79-
// verify tables todo: del kvstore
80-
kvStorage, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
81-
if err != nil {
82-
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
83-
return
69+
pdClient = up.PDClient
70+
kvStorage = up.KVStorage
71+
} else {
72+
credential := cfg.PDConfig.toCredential()
73+
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
74+
defer cancel()
75+
var err error
76+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
77+
if err != nil {
78+
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
79+
return
80+
}
81+
defer pdClient.Close()
82+
// verify tables todo: del kvstore
83+
kvStorage, err = h.helpers.createTiStore(cfg.PDAddrs, credential)
84+
if err != nil {
85+
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
86+
return
87+
}
8488
}
8589
// We should not close kvStorage since all kvStorage in cdc is the same one.
8690
// defer kvStorage.Close()
@@ -245,21 +249,25 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
245249
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
246250
return
247251
}
252+
var kvStore kv.Storage
253+
// if PDAddrs is empty, use the default upstream
248254
if len(cfg.PDAddrs) == 0 {
249255
up, err := getCaptureDefaultUpstream(h.capture)
250256
if err != nil {
251257
_ = c.Error(err)
252258
return
253259
}
254-
cfg.PDConfig = getUpstreamPDConfig(up)
260+
kvStore = up.KVStorage
261+
} else {
262+
credential := cfg.PDConfig.toCredential()
263+
var err error
264+
kvStore, err = h.helpers.createTiStore(cfg.PDAddrs, credential)
265+
if err != nil {
266+
_ = c.Error(errors.Trace(err))
267+
return
268+
}
255269
}
256-
credential := cfg.PDConfig.toCredential()
257270

258-
kvStore, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
259-
if err != nil {
260-
_ = c.Error(err)
261-
return
262-
}
263271
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()
264272
ineligibleTables, eligibleTables, err := h.helpers.
265273
getVerfiedTables(replicaCfg, kvStore, cfg.StartTs)
@@ -353,7 +361,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
353361
return
354362
}
355363

356-
var storage tidbkv.Storage
364+
var storage kv.Storage
357365
// if PDAddrs is not empty, use it to create a new kvstore
358366
// Note: upManager is nil in some unit test cases
359367
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
@@ -789,48 +797,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
789797
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
790798
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
791799
}
800+
if c.Request.Body != nil && c.Request.ContentLength > 0 {
801+
if err := c.BindJSON(cfg); err != nil {
802+
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
803+
return
804+
}
805+
}
792806

793807
// try to get pd client to get pd time, and determine synced status based on the pd time
808+
var pdClient pd.Client
794809
if len(cfg.PDAddrs) == 0 {
795810
up, err := getCaptureDefaultUpstream(h.capture)
796811
if err != nil {
797812
_ = c.Error(err)
798813
return
799814
}
800-
cfg.PDConfig = getUpstreamPDConfig(up)
801-
}
802-
credential := cfg.PDConfig.toCredential()
803-
804-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
805-
defer cancel()
815+
pdClient = up.PDClient
816+
} else {
817+
credential := cfg.PDConfig.toCredential()
818+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
819+
defer cancel()
806820

807-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
808-
if err != nil {
809-
// case 1. we can't get pd client, pd may be unavailable.
810-
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
811-
// otherwise, if pd is unavailable, we decide data whether is synced based on
812-
// the time difference between current time and lastSyncedTs.
813-
var message string
814-
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
815-
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
816-
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
817-
} else {
818-
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
819-
"If pd is offline, please check whether we satisfy the condition that "+
820-
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
821-
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
821+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
822+
if err != nil {
823+
// case 1. we can't get pd client, pd may be unavailable.
824+
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
825+
// otherwise, if pd is unavailable, we decide data whether is synced based on
826+
// the time difference between current time and lastSyncedTs.
827+
var message string
828+
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
829+
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
830+
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
831+
} else {
832+
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
833+
"If pd is offline, please check whether we satisfy the condition that "+
834+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
835+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
836+
}
837+
c.JSON(http.StatusOK, SyncedStatus{
838+
Synced: false,
839+
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
840+
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
841+
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
842+
NowTs: model.JSONTime(time.Unix(0, 0)),
843+
Info: message,
844+
})
845+
return
822846
}
823-
c.JSON(http.StatusOK, SyncedStatus{
824-
Synced: false,
825-
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
826-
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
827-
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
828-
NowTs: model.JSONTime(time.Unix(0, 0)),
829-
Info: message,
830-
})
831-
return
847+
defer pdClient.Close()
832848
}
833-
defer pdClient.Close()
834849
// get time from pd
835850
physicalNow, _, _ := pdClient.GetTS(ctx)
836851

@@ -950,12 +965,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
950965
}
951966
return up, nil
952967
}
953-
954-
func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
955-
return PDConfig{
956-
PDAddrs: up.PdEndpoints,
957-
KeyPath: up.SecurityConfig.KeyPath,
958-
CAPath: up.SecurityConfig.CAPath,
959-
CertPath: up.SecurityConfig.CertPath,
960-
}
961-
}

cdc/api/v2/changefeed_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestCreateChangefeed(t *testing.T) {
9999
}{
100100
ID: changeFeedID.ID,
101101
SinkURI: blackholeSink,
102-
PDAddrs: []string{},
102+
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
103103
}
104104
body, err := json.Marshal(&cfConfig)
105105
require.Nil(t, err)
@@ -624,6 +624,8 @@ func TestVerifyTable(t *testing.T) {
624624

625625
// case 2: kv create failed
626626
updateCfg := getDefaultVerifyTableConfig()
627+
// arbitrary pd address to trigger create new pd client
628+
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
627629
body, err := json.Marshal(&updateCfg)
628630
require.Nil(t, err)
629631
helpers.EXPECT().
@@ -998,6 +1000,10 @@ func TestChangefeedSynced(t *testing.T) {
9981000
statusProvider.err = nil
9991001
statusProvider.changefeedInfo = cfInfo
10001002
{
1003+
cfg := getDefaultVerifyTableConfig()
1004+
// arbitrary pd address to trigger create new pd client
1005+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1006+
body, _ := json.Marshal(&cfg)
10011007
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10021008
// case3: pd is offline,resolvedTs - checkpointTs > 15s
10031009
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1010,7 +1016,7 @@ func TestChangefeedSynced(t *testing.T) {
10101016
context.Background(),
10111017
syncedInfo.method,
10121018
fmt.Sprintf(syncedInfo.url, validID),
1013-
nil,
1019+
bytes.NewReader(body),
10141020
)
10151021
router.ServeHTTP(w, req)
10161022
require.Equal(t, http.StatusOK, w.Code)
@@ -1023,6 +1029,10 @@ func TestChangefeedSynced(t *testing.T) {
10231029
}
10241030

10251031
{
1032+
cfg := getDefaultVerifyTableConfig()
1033+
// arbitrary pd address to trigger create new pd client
1034+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1035+
body, _ := json.Marshal(&cfg)
10261036
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10271037
// case4: pd is offline,resolvedTs - checkpointTs < 15s
10281038
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1035,7 +1045,7 @@ func TestChangefeedSynced(t *testing.T) {
10351045
context.Background(),
10361046
syncedInfo.method,
10371047
fmt.Sprintf(syncedInfo.url, validID),
1038-
nil,
1048+
bytes.NewReader(body),
10391049
)
10401050
router.ServeHTTP(w, req)
10411051
require.Equal(t, http.StatusOK, w.Code)
@@ -1054,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) {
10541064
pdClient.logicTime = 1000
10551065
pdClient.timestamp = 1701153217279
10561066
{
1067+
cfg := getDefaultVerifyTableConfig()
1068+
// arbitrary pd address to trigger create new pd client
1069+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1070+
body, _ := json.Marshal(&cfg)
10571071
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
10581072
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10591073
CheckpointTs: 1701153217209 << 18,
@@ -1065,7 +1079,7 @@ func TestChangefeedSynced(t *testing.T) {
10651079
context.Background(),
10661080
syncedInfo.method,
10671081
fmt.Sprintf(syncedInfo.url, validID),
1068-
nil,
1082+
bytes.NewReader(body),
10691083
)
10701084
router.ServeHTTP(w, req)
10711085
require.Equal(t, http.StatusOK, w.Code)
@@ -1077,6 +1091,10 @@ func TestChangefeedSynced(t *testing.T) {
10771091
}
10781092

10791093
{
1094+
cfg := getDefaultVerifyTableConfig()
1095+
// arbitrary pd address to trigger create new pd client
1096+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1097+
body, _ := json.Marshal(&cfg)
10801098
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
10811099
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10821100
CheckpointTs: 1701153201279 << 18,
@@ -1088,7 +1106,7 @@ func TestChangefeedSynced(t *testing.T) {
10881106
context.Background(),
10891107
syncedInfo.method,
10901108
fmt.Sprintf(syncedInfo.url, validID),
1091-
nil,
1109+
bytes.NewReader(body),
10921110
)
10931111
router.ServeHTTP(w, req)
10941112
require.Equal(t, http.StatusOK, w.Code)
@@ -1105,6 +1123,10 @@ func TestChangefeedSynced(t *testing.T) {
11051123
}
11061124

11071125
{
1126+
cfg := getDefaultVerifyTableConfig()
1127+
// arbitrary pd address to trigger create new pd client
1128+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1129+
body, _ := json.Marshal(&cfg)
11081130
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
11091131
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11101132
CheckpointTs: 1701153201279 << 18,
@@ -1116,7 +1138,7 @@ func TestChangefeedSynced(t *testing.T) {
11161138
context.Background(),
11171139
syncedInfo.method,
11181140
fmt.Sprintf(syncedInfo.url, validID),
1119-
nil,
1141+
bytes.NewReader(body),
11201142
)
11211143
router.ServeHTTP(w, req)
11221144
require.Equal(t, http.StatusOK, w.Code)
@@ -1128,6 +1150,10 @@ func TestChangefeedSynced(t *testing.T) {
11281150
}
11291151

11301152
{
1153+
cfg := getDefaultVerifyTableConfig()
1154+
// arbitrary pd address to trigger create new pd client
1155+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1156+
body, _ := json.Marshal(&cfg)
11311157
// case8: pdTs - lastSyncedTs < 5min
11321158
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11331159
CheckpointTs: 1701153217279 << 18,
@@ -1139,7 +1165,7 @@ func TestChangefeedSynced(t *testing.T) {
11391165
context.Background(),
11401166
syncedInfo.method,
11411167
fmt.Sprintf(syncedInfo.url, validID),
1142-
nil,
1168+
bytes.NewReader(body),
11431169
)
11441170
router.ServeHTTP(w, req)
11451171
require.Equal(t, http.StatusOK, w.Code)

0 commit comments

Comments
 (0)