Skip to content

Commit 78c798b

Browse files
authored
api(cdc): fix create changefeed after scale-in pd (#12003) (#12031)
close #12004
1 parent 0e4c06d commit 78c798b

File tree

2 files changed

+123
-87
lines changed

2 files changed

+123
-87
lines changed

cdc/api/v2/changefeed.go

Lines changed: 90 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/gin-gonic/gin"
2525
"github.com/pingcap/errors"
2626
"github.com/pingcap/log"
27-
tidbkv "github.com/pingcap/tidb/kv"
27+
"github.com/pingcap/tidb/kv"
2828
"github.com/pingcap/tiflow/cdc/api"
2929
"github.com/pingcap/tiflow/cdc/capture"
3030
"github.com/pingcap/tiflow/cdc/model"
@@ -68,30 +68,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
6868
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
6969
return
7070
}
71+
var pdClient pd.Client
72+
var kvStorage kv.Storage
73+
// if PDAddrs is empty, use the default pdClient
7174
if len(cfg.PDAddrs) == 0 {
7275
up, err := getCaptureDefaultUpstream(h.capture)
7376
if err != nil {
7477
_ = c.Error(err)
7578
return
7679
}
77-
cfg.PDConfig = getUpstreamPDConfig(up)
78-
}
79-
credential := cfg.PDConfig.toCredential()
80-
81-
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
82-
defer cancel()
83-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
84-
if err != nil {
85-
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
86-
return
87-
}
88-
defer pdClient.Close()
89-
90-
// verify tables todo: del kvstore
91-
kvStorage, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
92-
if err != nil {
93-
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
94-
return
80+
pdClient = up.PDClient
81+
kvStorage = up.KVStorage
82+
} else {
83+
credential := cfg.PDConfig.toCredential()
84+
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
85+
defer cancel()
86+
var err error
87+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
88+
if err != nil {
89+
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
90+
return
91+
}
92+
defer pdClient.Close()
93+
// verify tables todo: del kvstore
94+
kvStorage, err = h.helpers.createTiStore(cfg.PDAddrs, credential)
95+
if err != nil {
96+
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
97+
return
98+
}
9599
}
96100
// We should not close kvStorage since all kvStorage in cdc is the same one.
97101
// defer kvStorage.Close()
@@ -132,19 +136,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
132136
CertAllowedCN: cfg.CertAllowedCN,
133137
}
134138

135-
// cannot create changefeed if there are running lightning/restore tasks
136-
tlsCfg, err := credential.ToTLSConfig()
137-
if err != nil {
138-
_ = c.Error(err)
139-
return
140-
}
141-
142-
cli, err := h.helpers.getEtcdClient(cfg.PDAddrs, tlsCfg)
143-
if err != nil {
144-
_ = c.Error(err)
145-
return
139+
var etcdCli *clientv3.Client
140+
if len(cfg.PDAddrs) == 0 {
141+
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
142+
} else {
143+
credential := cfg.PDConfig.toCredential()
144+
// cannot create changefeed if there are running lightning/restore tasks
145+
tlsCfg, err := credential.ToTLSConfig()
146+
if err != nil {
147+
_ = c.Error(err)
148+
return
149+
}
150+
etcdCli, err = h.helpers.getEtcdClient(cfg.PDAddrs, tlsCfg)
151+
if err != nil {
152+
_ = c.Error(err)
153+
return
154+
}
146155
}
147-
err = hasRunningImport(ctx, cli)
156+
err = hasRunningImport(ctx, etcdCli)
148157
if err != nil {
149158
log.Error("failed to create changefeed", zap.Error(err))
150159
_ = c.Error(
@@ -299,20 +308,23 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
299308
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
300309
return
301310
}
311+
var kvStore kv.Storage
312+
// if PDAddrs is empty, use the default upstream
302313
if len(cfg.PDAddrs) == 0 {
303314
up, err := getCaptureDefaultUpstream(h.capture)
304315
if err != nil {
305316
_ = c.Error(err)
306317
return
307318
}
308-
cfg.PDConfig = getUpstreamPDConfig(up)
309-
}
310-
credential := cfg.PDConfig.toCredential()
311-
312-
kvStore, err := h.helpers.createTiStore(cfg.PDAddrs, credential)
313-
if err != nil {
314-
_ = c.Error(err)
315-
return
319+
kvStore = up.KVStorage
320+
} else {
321+
credential := cfg.PDConfig.toCredential()
322+
var err error
323+
kvStore, err = h.helpers.createTiStore(cfg.PDAddrs, credential)
324+
if err != nil {
325+
_ = c.Error(errors.Trace(err))
326+
return
327+
}
316328
}
317329
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()
318330
ineligibleTables, eligibleTables, err := h.helpers.
@@ -419,7 +431,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
419431
return
420432
}
421433

422-
var storage tidbkv.Storage
434+
var storage kv.Storage
423435
// if PDAddrs is not empty, use it to create a new kvstore
424436
// Note: upManager is nil in some unit test cases
425437
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
@@ -866,48 +878,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
866878
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
867879
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
868880
}
881+
if c.Request.Body != nil && c.Request.ContentLength > 0 {
882+
if err := c.BindJSON(cfg); err != nil {
883+
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
884+
return
885+
}
886+
}
869887

870888
// try to get pd client to get pd time, and determine synced status based on the pd time
889+
var pdClient pd.Client
871890
if len(cfg.PDAddrs) == 0 {
872891
up, err := getCaptureDefaultUpstream(h.capture)
873892
if err != nil {
874893
_ = c.Error(err)
875894
return
876895
}
877-
cfg.PDConfig = getUpstreamPDConfig(up)
878-
}
879-
credential := cfg.PDConfig.toCredential()
880-
881-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
882-
defer cancel()
896+
pdClient = up.PDClient
897+
} else {
898+
credential := cfg.PDConfig.toCredential()
899+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
900+
defer cancel()
883901

884-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
885-
if err != nil {
886-
// case 1. we can't get pd client, pd may be unavailable.
887-
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
888-
// otherwise, if pd is unavailable, we decide data whether is synced based on
889-
// the time difference between current time and lastSyncedTs.
890-
var message string
891-
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
892-
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
893-
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
894-
} else {
895-
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
896-
"If pd is offline, please check whether we satisfy the condition that "+
897-
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
898-
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
902+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
903+
if err != nil {
904+
// case 1. we can't get pd client, pd may be unavailable.
905+
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
906+
// otherwise, if pd is unavailable, we decide data whether is synced based on
907+
// the time difference between current time and lastSyncedTs.
908+
var message string
909+
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
910+
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
911+
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
912+
} else {
913+
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
914+
"If pd is offline, please check whether we satisfy the condition that "+
915+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
916+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
917+
}
918+
c.JSON(http.StatusOK, SyncedStatus{
919+
Synced: false,
920+
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
921+
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
922+
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
923+
NowTs: model.JSONTime(time.Unix(0, 0)),
924+
Info: message,
925+
})
926+
return
899927
}
900-
c.JSON(http.StatusOK, SyncedStatus{
901-
Synced: false,
902-
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
903-
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
904-
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
905-
NowTs: model.JSONTime(time.Unix(0, 0)),
906-
Info: message,
907-
})
908-
return
928+
defer pdClient.Close()
909929
}
910-
defer pdClient.Close()
911930
// get time from pd
912931
physicalNow, _, _ := pdClient.GetTS(ctx)
913932

@@ -1027,12 +1046,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
10271046
}
10281047
return up, nil
10291048
}
1030-
1031-
func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
1032-
return PDConfig{
1033-
PDAddrs: up.PdEndpoints,
1034-
KeyPath: up.SecurityConfig.KeyPath,
1035-
CAPath: up.SecurityConfig.CAPath,
1036-
CertPath: up.SecurityConfig.CertPath,
1037-
}
1038-
}

cdc/api/v2/changefeed_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestCreateChangefeed(t *testing.T) {
104104
}{
105105
ID: changeFeedID.ID,
106106
SinkURI: blackholeSink,
107-
PDAddrs: []string{},
107+
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
108108
}
109109
body, err := json.Marshal(&cfConfig)
110110
require.Nil(t, err)
@@ -633,6 +633,8 @@ func TestVerifyTable(t *testing.T) {
633633

634634
// case 2: kv create failed
635635
updateCfg := getDefaultVerifyTableConfig()
636+
// arbitrary pd address to trigger create new pd client
637+
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
636638
body, err := json.Marshal(&updateCfg)
637639
require.Nil(t, err)
638640
helpers.EXPECT().
@@ -1007,6 +1009,10 @@ func TestChangefeedSynced(t *testing.T) {
10071009
statusProvider.err = nil
10081010
statusProvider.changefeedInfo = cfInfo
10091011
{
1012+
cfg := getDefaultVerifyTableConfig()
1013+
// arbitrary pd address to trigger create new pd client
1014+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1015+
body, _ := json.Marshal(&cfg)
10101016
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10111017
// case3: pd is offline,resolvedTs - checkpointTs > 15s
10121018
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1019,7 +1025,7 @@ func TestChangefeedSynced(t *testing.T) {
10191025
context.Background(),
10201026
syncedInfo.method,
10211027
fmt.Sprintf(syncedInfo.url, validID),
1022-
nil,
1028+
bytes.NewReader(body),
10231029
)
10241030
router.ServeHTTP(w, req)
10251031
require.Equal(t, http.StatusOK, w.Code)
@@ -1032,6 +1038,10 @@ func TestChangefeedSynced(t *testing.T) {
10321038
}
10331039

10341040
{
1041+
cfg := getDefaultVerifyTableConfig()
1042+
// arbitrary pd address to trigger create new pd client
1043+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1044+
body, _ := json.Marshal(&cfg)
10351045
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10361046
// case4: pd is offline,resolvedTs - checkpointTs < 15s
10371047
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1044,7 +1054,7 @@ func TestChangefeedSynced(t *testing.T) {
10441054
context.Background(),
10451055
syncedInfo.method,
10461056
fmt.Sprintf(syncedInfo.url, validID),
1047-
nil,
1057+
bytes.NewReader(body),
10481058
)
10491059
router.ServeHTTP(w, req)
10501060
require.Equal(t, http.StatusOK, w.Code)
@@ -1063,6 +1073,10 @@ func TestChangefeedSynced(t *testing.T) {
10631073
pdClient.logicTime = 1000
10641074
pdClient.timestamp = 1701153217279
10651075
{
1076+
cfg := getDefaultVerifyTableConfig()
1077+
// arbitrary pd address to trigger create new pd client
1078+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1079+
body, _ := json.Marshal(&cfg)
10661080
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
10671081
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10681082
CheckpointTs: 1701153217209 << 18,
@@ -1074,7 +1088,7 @@ func TestChangefeedSynced(t *testing.T) {
10741088
context.Background(),
10751089
syncedInfo.method,
10761090
fmt.Sprintf(syncedInfo.url, validID),
1077-
nil,
1091+
bytes.NewReader(body),
10781092
)
10791093
router.ServeHTTP(w, req)
10801094
require.Equal(t, http.StatusOK, w.Code)
@@ -1086,6 +1100,10 @@ func TestChangefeedSynced(t *testing.T) {
10861100
}
10871101

10881102
{
1103+
cfg := getDefaultVerifyTableConfig()
1104+
// arbitrary pd address to trigger create new pd client
1105+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1106+
body, _ := json.Marshal(&cfg)
10891107
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
10901108
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10911109
CheckpointTs: 1701153201279 << 18,
@@ -1097,7 +1115,7 @@ func TestChangefeedSynced(t *testing.T) {
10971115
context.Background(),
10981116
syncedInfo.method,
10991117
fmt.Sprintf(syncedInfo.url, validID),
1100-
nil,
1118+
bytes.NewReader(body),
11011119
)
11021120
router.ServeHTTP(w, req)
11031121
require.Equal(t, http.StatusOK, w.Code)
@@ -1114,6 +1132,10 @@ func TestChangefeedSynced(t *testing.T) {
11141132
}
11151133

11161134
{
1135+
cfg := getDefaultVerifyTableConfig()
1136+
// arbitrary pd address to trigger create new pd client
1137+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1138+
body, _ := json.Marshal(&cfg)
11171139
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
11181140
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11191141
CheckpointTs: 1701153201279 << 18,
@@ -1125,7 +1147,7 @@ func TestChangefeedSynced(t *testing.T) {
11251147
context.Background(),
11261148
syncedInfo.method,
11271149
fmt.Sprintf(syncedInfo.url, validID),
1128-
nil,
1150+
bytes.NewReader(body),
11291151
)
11301152
router.ServeHTTP(w, req)
11311153
require.Equal(t, http.StatusOK, w.Code)
@@ -1137,6 +1159,10 @@ func TestChangefeedSynced(t *testing.T) {
11371159
}
11381160

11391161
{
1162+
cfg := getDefaultVerifyTableConfig()
1163+
// arbitrary pd address to trigger create new pd client
1164+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1165+
body, _ := json.Marshal(&cfg)
11401166
// case8: pdTs - lastSyncedTs < 5min
11411167
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11421168
CheckpointTs: 1701153217279 << 18,
@@ -1148,7 +1174,7 @@ func TestChangefeedSynced(t *testing.T) {
11481174
context.Background(),
11491175
syncedInfo.method,
11501176
fmt.Sprintf(syncedInfo.url, validID),
1151-
nil,
1177+
bytes.NewReader(body),
11521178
)
11531179
router.ServeHTTP(w, req)
11541180
require.Equal(t, http.StatusOK, w.Code)

0 commit comments

Comments
 (0)