Skip to content

Commit ef96454

Browse files
authored
api(cdc): fix create changefeed after scale-in pd (#12003)
close #12004
1 parent 3caa329 commit ef96454

File tree

3 files changed

+125
-86
lines changed

3 files changed

+125
-86
lines changed

cdc/api/v2/changefeed.go

Lines changed: 91 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/gin-gonic/gin"
2626
"github.com/pingcap/errors"
2727
"github.com/pingcap/log"
28+
"github.com/pingcap/tidb/pkg/kv"
2829
tidbkv "github.com/pingcap/tidb/pkg/kv"
2930
"github.com/pingcap/tiflow/cdc/api"
3031
"github.com/pingcap/tiflow/cdc/capture"
@@ -66,30 +67,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
6667
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
6768
return
6869
}
70+
var pdClient pd.Client
71+
var kvStorage kv.Storage
72+
// if PDAddrs is empty, use the default pdClient
6973
if len(cfg.PDAddrs) == 0 {
7074
up, err := getCaptureDefaultUpstream(h.capture)
7175
if err != nil {
7276
_ = c.Error(err)
7377
return
7478
}
75-
cfg.PDConfig = getUpstreamPDConfig(up)
76-
}
77-
credential := cfg.PDConfig.toCredential()
78-
79-
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
80-
defer cancel()
81-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
82-
if err != nil {
83-
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
84-
return
85-
}
86-
defer pdClient.Close()
87-
88-
// verify tables todo: del kvstore
89-
kvStorage, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
90-
if err != nil {
91-
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
92-
return
79+
pdClient = up.PDClient
80+
kvStorage = up.KVStorage
81+
} else {
82+
credential := cfg.PDConfig.toCredential()
83+
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
84+
defer cancel()
85+
var err error
86+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
87+
if err != nil {
88+
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
89+
return
90+
}
91+
defer pdClient.Close()
92+
// verify tables todo: del kvstore
93+
kvStorage, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
94+
if err != nil {
95+
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
96+
return
97+
}
9398
}
9499
provider := h.capture.StatusProvider()
95100
owner, err := h.capture.GetOwner()
@@ -136,19 +141,24 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
136141
CertAllowedCN: cfg.CertAllowedCN,
137142
}
138143

139-
// cannot create changefeed if there are running lightning/restore tasks
140-
tlsCfg, err := credential.ToTLSConfig()
141-
if err != nil {
142-
_ = c.Error(err)
143-
return
144-
}
145-
146-
cli, err := h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
147-
if err != nil {
148-
_ = c.Error(err)
149-
return
144+
var etcdCli *clientv3.Client
145+
if len(cfg.PDAddrs) == 0 {
146+
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
147+
} else {
148+
credential := cfg.PDConfig.toCredential()
149+
// cannot create changefeed if there are running lightning/restore tasks
150+
tlsCfg, err := credential.ToTLSConfig()
151+
if err != nil {
152+
_ = c.Error(err)
153+
return
154+
}
155+
etcdCli, err = h.helpers.getEtcdClient(ctx, cfg.PDAddrs, tlsCfg)
156+
if err != nil {
157+
_ = c.Error(err)
158+
return
159+
}
150160
}
151-
err = hasRunningImport(ctx, cli)
161+
err = hasRunningImport(ctx, etcdCli)
152162
if err != nil {
153163
log.Error("failed to create changefeed", zap.Error(err))
154164
_ = c.Error(
@@ -319,21 +329,26 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
319329
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
320330
return
321331
}
332+
ctx := c.Request.Context()
333+
var kvStore tidbkv.Storage
334+
// if PDAddrs is empty, use the default upstream
322335
if len(cfg.PDAddrs) == 0 {
323336
up, err := getCaptureDefaultUpstream(h.capture)
324337
if err != nil {
325338
_ = c.Error(err)
326339
return
327340
}
328-
cfg.PDConfig = getUpstreamPDConfig(up)
329-
}
330-
credential := cfg.PDConfig.toCredential()
331-
ctx := c.Request.Context()
332-
kvStore, err := h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
333-
if err != nil {
334-
_ = c.Error(err)
335-
return
341+
kvStore = up.KVStorage
342+
} else {
343+
credential := cfg.PDConfig.toCredential()
344+
var err error
345+
kvStore, err = h.helpers.createTiStore(ctx, cfg.PDAddrs, credential)
346+
if err != nil {
347+
_ = c.Error(errors.Trace(err))
348+
return
349+
}
336350
}
351+
337352
uri, err := url.Parse(cfg.SinkURI)
338353
if err != nil {
339354
_ = c.Error(err)
@@ -924,48 +939,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) {
924939
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
925940
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
926941
}
942+
if c.Request.Body != nil && c.Request.ContentLength > 0 {
943+
if err := c.BindJSON(cfg); err != nil {
944+
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
945+
return
946+
}
947+
}
927948

928949
// try to get pd client to get pd time, and determine synced status based on the pd time
950+
var pdClient pd.Client
929951
if len(cfg.PDAddrs) == 0 {
930952
up, err := getCaptureDefaultUpstream(h.capture)
931953
if err != nil {
932954
_ = c.Error(err)
933955
return
934956
}
935-
cfg.PDConfig = getUpstreamPDConfig(up)
936-
}
937-
credential := cfg.PDConfig.toCredential()
938-
939-
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
940-
defer cancel()
957+
pdClient = up.PDClient
958+
} else {
959+
credential := cfg.PDConfig.toCredential()
960+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
961+
defer cancel()
941962

942-
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
943-
if err != nil {
944-
// case 1. we can't get pd client, pd may be unavailable.
945-
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
946-
// otherwise, if pd is unavailable, we decide data whether is synced based on
947-
// the time difference between current time and lastSyncedTs.
948-
var message string
949-
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
950-
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
951-
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
952-
} else {
953-
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
954-
"If pd is offline, please check whether we satisfy the condition that "+
955-
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
956-
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
963+
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
964+
if err != nil {
965+
// case 1. we can't get pd client, pd may be unavailable.
966+
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
967+
// otherwise, if pd is unavailable, we decide data whether is synced based on
968+
// the time difference between current time and lastSyncedTs.
969+
var message string
970+
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
971+
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
972+
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
973+
} else {
974+
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
975+
"If pd is offline, please check whether we satisfy the condition that "+
976+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
977+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
978+
}
979+
c.JSON(http.StatusOK, SyncedStatus{
980+
Synced: false,
981+
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
982+
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
983+
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
984+
NowTs: model.JSONTime(time.Unix(0, 0)),
985+
Info: message,
986+
})
987+
return
957988
}
958-
c.JSON(http.StatusOK, SyncedStatus{
959-
Synced: false,
960-
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
961-
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
962-
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
963-
NowTs: model.JSONTime(time.Unix(0, 0)),
964-
Info: message,
965-
})
966-
return
989+
defer pdClient.Close()
967990
}
968-
defer pdClient.Close()
969991
// get time from pd
970992
physicalNow, _, _ := pdClient.GetTS(ctx)
971993

@@ -1094,12 +1116,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) {
10941116
}
10951117
return up, nil
10961118
}
1097-
1098-
func getUpstreamPDConfig(up *upstream.Upstream) PDConfig {
1099-
return PDConfig{
1100-
PDAddrs: up.PdEndpoints,
1101-
KeyPath: up.SecurityConfig.KeyPath,
1102-
CAPath: up.SecurityConfig.CAPath,
1103-
CertPath: up.SecurityConfig.CertPath,
1104-
}
1105-
}

cdc/api/v2/changefeed_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestCreateChangefeed(t *testing.T) {
110110
ID: changeFeedID.ID,
111111
Namespace: changeFeedID.Namespace,
112112
SinkURI: blackholeSink,
113-
PDAddrs: []string{},
113+
PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client
114114
}
115115
body, err := json.Marshal(&cfConfig)
116116
require.Nil(t, err)
@@ -647,6 +647,8 @@ func TestVerifyTable(t *testing.T) {
647647

648648
// case 2: kv create failed
649649
updateCfg := getDefaultVerifyTableConfig()
650+
// arbitrary pd address to trigger create new pd client
651+
updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"}
650652
body, err := json.Marshal(&updateCfg)
651653
require.Nil(t, err)
652654
helpers.EXPECT().
@@ -1035,6 +1037,10 @@ func TestChangefeedSynced(t *testing.T) {
10351037
statusProvider.err = nil
10361038
statusProvider.changefeedInfo = cfInfo
10371039
{
1040+
cfg := getDefaultVerifyTableConfig()
1041+
// arbitrary pd address to trigger create new pd client
1042+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1043+
body, _ := json.Marshal(&cfg)
10381044
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10391045
// case3: pd is offline,resolvedTs - checkpointTs > 15s
10401046
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1047,7 +1053,7 @@ func TestChangefeedSynced(t *testing.T) {
10471053
context.Background(),
10481054
syncedInfo.method,
10491055
fmt.Sprintf(syncedInfo.url, validID),
1050-
nil,
1056+
bytes.NewReader(body),
10511057
)
10521058
router.ServeHTTP(w, req)
10531059
require.Equal(t, http.StatusOK, w.Code)
@@ -1060,6 +1066,10 @@ func TestChangefeedSynced(t *testing.T) {
10601066
}
10611067

10621068
{
1069+
cfg := getDefaultVerifyTableConfig()
1070+
// arbitrary pd address to trigger create new pd client
1071+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1072+
body, _ := json.Marshal(&cfg)
10631073
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1)
10641074
// case4: pd is offline,resolvedTs - checkpointTs < 15s
10651075
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
@@ -1072,7 +1082,7 @@ func TestChangefeedSynced(t *testing.T) {
10721082
context.Background(),
10731083
syncedInfo.method,
10741084
fmt.Sprintf(syncedInfo.url, validID),
1075-
nil,
1085+
bytes.NewReader(body),
10761086
)
10771087
router.ServeHTTP(w, req)
10781088
require.Equal(t, http.StatusOK, w.Code)
@@ -1091,6 +1101,10 @@ func TestChangefeedSynced(t *testing.T) {
10911101
pdClient.logicTime = 1000
10921102
pdClient.timestamp = 1701153217279
10931103
{
1104+
cfg := getDefaultVerifyTableConfig()
1105+
// arbitrary pd address to trigger create new pd client
1106+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1107+
body, _ := json.Marshal(&cfg)
10941108
// case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s
10951109
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
10961110
CheckpointTs: 1701153217209 << 18,
@@ -1102,7 +1116,7 @@ func TestChangefeedSynced(t *testing.T) {
11021116
context.Background(),
11031117
syncedInfo.method,
11041118
fmt.Sprintf(syncedInfo.url, validID),
1105-
nil,
1119+
bytes.NewReader(body),
11061120
)
11071121
router.ServeHTTP(w, req)
11081122
require.Equal(t, http.StatusOK, w.Code)
@@ -1114,6 +1128,10 @@ func TestChangefeedSynced(t *testing.T) {
11141128
}
11151129

11161130
{
1131+
cfg := getDefaultVerifyTableConfig()
1132+
// arbitrary pd address to trigger create new pd client
1133+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1134+
body, _ := json.Marshal(&cfg)
11171135
// case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s
11181136
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11191137
CheckpointTs: 1701153201279 << 18,
@@ -1125,7 +1143,7 @@ func TestChangefeedSynced(t *testing.T) {
11251143
context.Background(),
11261144
syncedInfo.method,
11271145
fmt.Sprintf(syncedInfo.url, validID),
1128-
nil,
1146+
bytes.NewReader(body),
11291147
)
11301148
router.ServeHTTP(w, req)
11311149
require.Equal(t, http.StatusOK, w.Code)
@@ -1142,6 +1160,10 @@ func TestChangefeedSynced(t *testing.T) {
11421160
}
11431161

11441162
{
1163+
cfg := getDefaultVerifyTableConfig()
1164+
// arbitrary pd address to trigger create new pd client
1165+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1166+
body, _ := json.Marshal(&cfg)
11451167
// case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s
11461168
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11471169
CheckpointTs: 1701153201279 << 18,
@@ -1153,7 +1175,7 @@ func TestChangefeedSynced(t *testing.T) {
11531175
context.Background(),
11541176
syncedInfo.method,
11551177
fmt.Sprintf(syncedInfo.url, validID),
1156-
nil,
1178+
bytes.NewReader(body),
11571179
)
11581180
router.ServeHTTP(w, req)
11591181
require.Equal(t, http.StatusOK, w.Code)
@@ -1165,6 +1187,10 @@ func TestChangefeedSynced(t *testing.T) {
11651187
}
11661188

11671189
{
1190+
cfg := getDefaultVerifyTableConfig()
1191+
// arbitrary pd address to trigger create new pd client
1192+
cfg.PDAddrs = []string{"http://127.0.0.1:2379"}
1193+
body, _ := json.Marshal(&cfg)
11681194
// case8: pdTs - lastSyncedTs < 5min
11691195
statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{
11701196
CheckpointTs: 1701153217279 << 18,
@@ -1176,7 +1202,7 @@ func TestChangefeedSynced(t *testing.T) {
11761202
context.Background(),
11771203
syncedInfo.method,
11781204
fmt.Sprintf(syncedInfo.url, validID),
1179-
nil,
1205+
bytes.NewReader(body),
11801206
)
11811207
router.ServeHTTP(w, req)
11821208
require.Equal(t, http.StatusOK, w.Code)

tests/integration_tests/synced_status_with_redo/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
## test the same logic as `sync_status``, but with redo mode
3+
## test the same logic as `sync_status`, but with redo mode
44

55
#!/bin/bash
66

0 commit comments

Comments
 (0)