Skip to content

Commit 54a2ec5

Browse files
authored
api(cdc): fix create changefeed after scale-in pd (#12003) (#12029)
close #12004
1 parent 9ef86ba commit 54a2ec5

File tree

2 files changed

+124
-85
lines changed

2 files changed

+124
-85
lines changed

cdc/api/v2/changefeed.go

Lines changed: 91 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/gin-gonic/gin"
2626
"github.com/pingcap/errors"
2727
"github.com/pingcap/log"
28+
"github.com/pingcap/tidb/pkg/kv"
2829
tidbkv "github.com/pingcap/tidb/pkg/kv"
2930
"github.com/pingcap/tiflow/cdc/api"
3031
"github.com/pingcap/tiflow/cdc/capture"
@@ -72,30 +73,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
7273
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
7374
return
7475
}
76+
var pdClient pd.Client
77+
var kvStorage kv.Storage
78+
// if PDAddrs is empty, use the default pdClient
7579
if len(cfg.PDAddrs) == 0 {
7680
up, err := getCaptureDefaultUpstream(h.capture)
7781
if err != nil {
7882
_ = c.Error(err)
7983
return
8084
}
81-
cfg.PDConfig = getUpstreamPDConfig(up)
82-
}
83-
credential := cfg.PDConfig.toCredential()
84-
85-
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
86-
defer cancel()
87-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
88-
if err != nil {
89-
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
90-
return
91-
}
92-
defer pdClient.Close()
93-
94-
// verify tables todo: del kvstore
95-
kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
96-
if err != nil {
97-
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
98-
return
85+
pdClient = up.PDClient
86+
kvStorage = up.KVStorage
87+
} else {
88+
credential := cfg.PDConfig.toCredential()
89+
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
90+
defer cancel()
91+
var err error
92+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
93+
if err != nil {
94+
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
95+
return
96+
}
97+
defer pdClient.Close()
98+
// verify tables todo: del kvstore
99+
kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
100+
if err != nil {
101+
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
102+
return
103+
}
99104
}
100105
ctrl, err := h.capture.GetController()
101106
if err != nil {
@@ -142,19 +147,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
142147
CertAllowedCN: cfg.CertAllowedCN,
143148
}
144149

145-
// cannot create changefeed if there are running lightning/restore tasks
146-
tlsCfg, err := credential.ToTLSConfig()
147-
if err != nil {
148-
_ = c.Error(err)
149-
return
150-
}
151-
152-
cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
153-
if err != nil {
154-
_ = c.Error(err)
155-
return
150+
var etcdCli *clientv3.Client
151+
if len(cfg.PDAddrs) == 0 {
152+
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
153+
} else {
154+
credential := cfg.PDConfig.toCredential()
155+
// cannot create changefeed if there are running lightning/restore tasks
156+
tlsCfg, err := credential.ToTLSConfig()
157+
if err != nil {
158+
_ = c.Error(err)
159+
return
160+
}
161+
etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
162+
if err != nil {
163+
_ = c.Error(err)
164+
return
165+
}
156166
}
157-
err = hasRunningImport(ctx, cli)
167+
err = hasRunningImport(ctx, etcdCli)
158168
if err != nil {
159169
log.Error("failed to create changefeed", zap.Error(err))
160170
_ = c.Error(
@@ -329,21 +339,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
329339
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
330340
return
331341
}
342+
ctx := c.Request.Context()
343+
var kvStore tidbkv.Storage
344+
// if PDAddrs is empty, use the default upstream
332345
if len(cfg.PDAddrs) == 0 {
333346
up, err := getCaptureDefaultUpstream(h.capture)
334347
if err != nil {
335348
_ = c.Error(err)
336349
return
337350
}
338-
cfg.PDConfig = getUpstreamPDConfig(up)
339-
}
340-
credential := cfg.PDConfig.toCredential()
341-
ctx := c.Request.Context()
342-
kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
343-
if err != nil {
344-
_ = c.Error(err)
345-
return
351+
kvStore = up.KVStorage
352+
} else {
353+
credential := cfg.PDConfig.toCredential()
354+
var err error
355+
kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
356+
if err != nil {
357+
_ = c.Error(errors.Trace(err))
358+
return
359+
}
346360
}
361+
347362
uri, err := url.Parse(cfg.SinkURI)
348363
if err != nil {
349364
_ = c.Error(err)
@@ -926,48 +941,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
926941
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
927942
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
928943
}
944+
if c.Request.Body != nil && c.Request.ContentLength > 0 {
945+
if err := c.BindJSON(cfg); err != nil {
946+
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
947+
return
948+
}
949+
}
929950

930951
// try to get pd client to get pd time, and determine synced status based on the pd time
952+
var pdClient pd.Client
931953
if len(cfg.PDAddrs) == 0 {
932954
up, err := getCaptureDefaultUpstream(h.capture)
933955
if err != nil {
934956
_ = c.Error(err)
935957
return
936958
}
937-
cfg.PDConfig = getUpstreamPDConfig(up)
938-
}
939-
credential := cfg.PDConfig.toCredential()
940-
941-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
942-
defer cancel()
959+
pdClient = up.PDClient
960+
} else {
961+
credential := cfg.PDConfig.toCredential()
962+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
963+
defer cancel()
943964

944-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
945-
if err != nil {
946-
// case 1. we can't get pd client, pd may be unavailable.
947-
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
948-
// otherwise, if pd is unavailable, we decide data whether is synced based on
949-
// the time difference between current time and lastSyncedTs.
950-
var message string
951-
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
952-
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
953-
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
954-
} else {
955-
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
956-
"If pd is offline, please check whether we satisfy the condition that "+
957-
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
958-
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
965+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
966+
if err != nil {
967+
// case 1. we can't get pd client, pd may be unavailable.
968+
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
969+
// otherwise, if pd is unavailable, we decide data whether is synced based on
970+
// the time difference between current time and lastSyncedTs.
971+
var message string
972+
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
973+
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
974+
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
975+
} else {
976+
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
977+
"If pd is offline, please check whether we satisfy the condition that "+
978+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
979+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
980+
}
981+
c.JSON(http.StatusOK, SyncedStatus{
982+
Synced: false,
983+
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
984+
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
985+
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
986+
NowTs: model.JSONTime(time.Unix(0, 0)),
987+
Info: message,
988+
})
989+
return
959990
}
960-
c.JSON(http.StatusOK, SyncedStatus{
961-
Synced: false,
962-
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
963-
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
964-
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
965-
NowTs: model.JSONTime(time.Unix(0, 0)),
966-
Info: message,
967-
})
968-
return
991+
defer pdClient.Close()
969992
}
970-
defer pdClient.Close()
971993
// get time from pd
972994
physicalNow, _, _ := pdClient.GetTS(ctx)
973995

@@ -1087,12 +1109,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
10871109
}
10881110
return up, nil
10891111
}
1090-
1091-
func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
1092-
return PDConfig{
1093-
PDAddrs: up.PdEndpoints,
1094-
KeyPath: up.SecurityConfig.KeyPath,
1095-
CAPath: up.SecurityConfig.CAPath,
1096-
CertPath: up.SecurityConfig.CertPath,
1097-
}
1098-
}

cdc/api/v2/changefeed_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func TestCreateChangefeed(t *testing.T) {
109109
ID: changeFeedID.ID,
110110
Namespace: changeFeedID.Namespace,
111111
SinkURI: blackholeSink,
112-
PDAddrs: []string{},
112+
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
113113
}
114114
body, err := json.Marshal(&cfConfig)
115115
require.Nil(t, err)
@@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) {
647647

648648
// case 2: kv create failed
649649
updateCfg := getDefaultVerifyTableConfig()
650+
// arbitrary pd address to trigger create new pd client
651+
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
650652
body, err := json.Marshal(&updateCfg)
651653
require.Nil(t, err)
652654
helpers.EXPECT().
@@ -1033,6 +1035,10 @@ func TestChangefeedSynced(t *testing.T) {
10331035
statusProvider.err = nil
10341036
statusProvider.changefeedInfo = cfInfo
10351037
{
1038+
cfg := getDefaultVerifyTableConfig()
1039+
// arbitrary pd address to trigger create new pd client
1040+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1041+
body, _ := json.Marshal(&cfg)
10361042
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10371043
// case3: pd is offline,resolvedTs - checkpointTs > 15s
10381044
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1045,7 +1051,7 @@ func TestChangefeedSynced(t *testing.T) {
10451051
context.Background(),
10461052
syncedInfo.method,
10471053
fmt.Sprintf(syncedInfo.url, validID),
1048-
nil,
1054+
bytes.NewReader(body),
10491055
)
10501056
router.ServeHTTP(w, req)
10511057
require.Equal(t, http.StatusOK, w.Code)
@@ -1058,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) {
10581064
}
10591065

10601066
{
1067+
cfg := getDefaultVerifyTableConfig()
1068+
// arbitrary pd address to trigger create new pd client
1069+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1070+
body, _ := json.Marshal(&cfg)
10611071
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10621072
// case4: pd is offline,resolvedTs - checkpointTs < 15s
10631073
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1070,7 +1080,7 @@ func TestChangefeedSynced(t *testing.T) {
10701080
context.Background(),
10711081
syncedInfo.method,
10721082
fmt.Sprintf(syncedInfo.url, validID),
1073-
nil,
1083+
bytes.NewReader(body),
10741084
)
10751085
router.ServeHTTP(w, req)
10761086
require.Equal(t, http.StatusOK, w.Code)
@@ -1089,6 +1099,10 @@ func TestChangefeedSynced(t *testing.T) {
10891099
pdClient.logicTime = 1000
10901100
pdClient.timestamp = 1701153217279
10911101
{
1102+
cfg := getDefaultVerifyTableConfig()
1103+
// arbitrary pd address to trigger create new pd client
1104+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1105+
body, _ := json.Marshal(&cfg)
10921106
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
10931107
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10941108
CheckpointTs: 1701153217209 << 18,
@@ -1100,7 +1114,7 @@ func TestChangefeedSynced(t *testing.T) {
11001114
context.Background(),
11011115
syncedInfo.method,
11021116
fmt.Sprintf(syncedInfo.url, validID),
1103-
nil,
1117+
bytes.NewReader(body),
11041118
)
11051119
router.ServeHTTP(w, req)
11061120
require.Equal(t, http.StatusOK, w.Code)
@@ -1112,6 +1126,10 @@ func TestChangefeedSynced(t *testing.T) {
11121126
}
11131127

11141128
{
1129+
cfg := getDefaultVerifyTableConfig()
1130+
// arbitrary pd address to trigger create new pd client
1131+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1132+
body, _ := json.Marshal(&cfg)
11151133
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
11161134
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11171135
CheckpointTs: 1701153201279 << 18,
@@ -1123,7 +1141,7 @@ func TestChangefeedSynced(t *testing.T) {
11231141
context.Background(),
11241142
syncedInfo.method,
11251143
fmt.Sprintf(syncedInfo.url, validID),
1126-
nil,
1144+
bytes.NewReader(body),
11271145
)
11281146
router.ServeHTTP(w, req)
11291147
require.Equal(t, http.StatusOK, w.Code)
@@ -1140,6 +1158,10 @@ func TestChangefeedSynced(t *testing.T) {
11401158
}
11411159

11421160
{
1161+
cfg := getDefaultVerifyTableConfig()
1162+
// arbitrary pd address to trigger create new pd client
1163+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1164+
body, _ := json.Marshal(&cfg)
11431165
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
11441166
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11451167
CheckpointTs: 1701153201279 << 18,
@@ -1151,7 +1173,7 @@ func TestChangefeedSynced(t *testing.T) {
11511173
context.Background(),
11521174
syncedInfo.method,
11531175
fmt.Sprintf(syncedInfo.url, validID),
1154-
nil,
1176+
bytes.NewReader(body),
11551177
)
11561178
router.ServeHTTP(w, req)
11571179
require.Equal(t, http.StatusOK, w.Code)
@@ -1163,6 +1185,10 @@ func TestChangefeedSynced(t *testing.T) {
11631185
}
11641186

11651187
{
1188+
cfg := getDefaultVerifyTableConfig()
1189+
// arbitrary pd address to trigger create new pd client
1190+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1191+
body, _ := json.Marshal(&cfg)
11661192
// case8: pdTs - lastSyncedTs < 5min
11671193
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11681194
CheckpointTs: 1701153217279 << 18,
@@ -1174,7 +1200,7 @@ func TestChangefeedSynced(t *testing.T) {
11741200
context.Background(),
11751201
syncedInfo.method,
11761202
fmt.Sprintf(syncedInfo.url, validID),
1177-
nil,
1203+
bytes.NewReader(body),
11781204
)
11791205
router.ServeHTTP(w, req)
11801206
require.Equal(t, http.StatusOK, w.Code)

0 commit comments

Comments
 (0)