diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 0aa72a897..7bc558529 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -28,12 +28,12 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/server" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 7777435fd..f4f594bca 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -37,10 +37,10 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/messaging/proto" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/orchestrator" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "go.uber.org/zap" diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 305260198..f81f34528 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -56,6 +57,8 @@ type EventDispatcherManager struct { changefeedID common.ChangeFeedID maintainerID node.ID + pdClock pdutil.Clock + config *config.ChangefeedConfig filterConfig *eventpb.FilterConfig // only not nil when enable sync point @@ -122,10 +125,12 @@ func NewEventDispatcherManager( ) (*EventDispatcherManager, uint64, error) { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup + pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) manager := &EventDispatcherManager{ dispatcherMap: newDispatcherMap(), changefeedID: changefeedID, maintainerID: maintainerID, + pdClock: pdClock, statusesChan: make(chan TableSpanStatusWithSeq, 8192), blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), errCh: make(chan error, 1), @@ -654,8 +659,9 @@ func (e *EventDispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatu phyCheckpointTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs) phyResolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs) - e.metricCheckpointTsLag.Set(float64(oracle.GetPhysical(time.Now())-phyCheckpointTs) / 1e3) - e.metricResolvedTsLag.Set(float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3) + pdTime := e.pdClock.CurrentTime() + e.metricCheckpointTsLag.Set(float64(oracle.GetPhysical(pdTime)-phyCheckpointTs) / 1e3) + e.metricResolvedTsLag.Set(float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3) return &message } diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 110b35cc7..68e2d4631 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tiflow/pkg/chann" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -428,30 +427,10 @@ func (c *EventCollector) updateMetrics(ctx context.Context) { metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) metricsDSUsedMemoryUsage.Set(float64(dsMetrics.MemoryControl.UsedMemory)) metricsDSMaxMemoryUsage.Set(float64(dsMetrics.MemoryControl.MaxMemory)) - c.updateResolvedTsMetric() } } } -func (c *EventCollector) updateResolvedTsMetric() { - var minResolvedTs uint64 - c.dispatcherMap.Range(func(key, value interface{}) bool { - if stat, ok := value.(*dispatcherStat); ok { - d := stat.target - if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs { - minResolvedTs = d.GetResolvedTs() - } - } - return true - }) - - if minResolvedTs > 0 { - phyResolvedTs := oracle.ExtractPhysical(minResolvedTs) - lagMs := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 - metrics.EventCollectorResolvedTsLagGauge.Set(lagMs) - } -} - // dispatcherStat is a helper struct to manage the state of a dispatcher. type dispatcherStat struct { dispatcherID common.DispatcherID diff --git a/downstreamadapter/sink/helper/topicmanager/kafka_topic_manager.go b/downstreamadapter/sink/helper/topicmanager/kafka_topic_manager.go index e624ab9e9..993938ead 100644 --- a/downstreamadapter/sink/helper/topicmanager/kafka_topic_manager.go +++ b/downstreamadapter/sink/helper/topicmanager/kafka_topic_manager.go @@ -23,8 +23,8 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/sink/kafka" - "github.com/pingcap/tiflow/pkg/retry" "go.uber.org/zap" ) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 4e97f692c..2ecb75660 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -35,8 +35,8 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/utils/chann" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -696,15 +696,15 @@ func (e *eventStore) updateMetrics(ctx context.Context) error { } func (e *eventStore) updateMetricsOnce() { - currentTime := e.pdClock.CurrentTime() - currentPhyTs := oracle.GetPhysical(currentTime) + pdTime := e.pdClock.CurrentTime() + pdPhyTs := oracle.GetPhysical(pdTime) minResolvedTs := uint64(0) e.dispatcherMeta.RLock() for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats { // resolved ts lag resolvedTs := subscriptionStat.resolvedTs.Load() resolvedPhyTs := oracle.ExtractPhysical(resolvedTs) - resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3 + resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag)) if minResolvedTs == 0 || resolvedTs < minResolvedTs { minResolvedTs = resolvedTs @@ -712,7 +712,7 @@ func (e *eventStore) updateMetricsOnce() { // checkpoint ts lag checkpointTs := subscriptionStat.checkpointTs.Load() watermarkPhyTs := oracle.ExtractPhysical(checkpointTs) - watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3 + watermarkLag := float64(pdPhyTs-watermarkPhyTs) / 1e3 metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag)) } e.dispatcherMeta.RUnlock() @@ -721,7 +721,7 @@ func (e *eventStore) updateMetricsOnce() { return } minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs) - eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3 + eventStoreResolvedTsLag := float64(pdPhyTs-minResolvedPhyTs) / 1e3 metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag) } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 678b631e3..1898b81b6 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -29,8 +29,8 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/util" @@ -867,8 +867,9 @@ func (s *SubscriptionClient) GetResolvedTsLag() float64 { if pullerMinResolvedTs == 0 { return 0 } + pdTime := s.pdClock.CurrentTime() phyResolvedTs := oracle.ExtractPhysical(pullerMinResolvedTs) - lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 + lag := float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3 return lag } diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 74dc68564..75a524749 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -25,8 +25,8 @@ import ( "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/logservice/txnutil" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/tidb/pkg/store/mockstore/mockcopr" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/security" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 388277be3..a9ad12459 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -25,8 +25,8 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -160,9 +160,9 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { tryUpdateResolvedTs := func() { pendingTs := s.pendingResolvedTs.Load() defer func() { - currentPhyTs := oracle.GetPhysical(s.pdClock.CurrentTime()) + pdPhyTs := oracle.GetPhysical(s.pdClock.CurrentTime()) resolvedPhyTs := oracle.ExtractPhysical(pendingTs) - resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3 + resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3 metrics.SchemaStoreResolvedTsLagGauge.Set(float64(resolvedLag)) }() diff --git a/logservice/upstream/etcd.go b/logservice/upstream/etcd.go deleted file mode 100644 index 373a1d0ad..000000000 --- a/logservice/upstream/etcd.go +++ /dev/null @@ -1,289 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package upstream - -import ( - "context" - "crypto/tls" - "fmt" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/util" - "github.com/tikv/pd/pkg/errs" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientV3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/keepalive" -) - -// The following code is mainly copied from: -// https://github.com/tikv/pd/blob/master/pkg/utils/etcdutil/etcdutil.go -const ( - // defaultEtcdClientTimeout is the default timeout for etcd client. - defaultEtcdClientTimeout = 5 * time.Second - // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. - defaultDialKeepAliveTime = 10 * time.Second - // defaultDialKeepAliveTimeout is the time that the client waits for a response for the - // keep-alive probe. If the response is not received in this time, the connection is closed. - defaultDialKeepAliveTimeout = 3 * time.Second - // etcdServerOfflineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. - etcdServerOfflineTimeout = 30 * time.Minute - // etcdServerDisconnectedTimeout is the timeout for an unhealthy etcd endpoint to be disconnected from healthy checker. - etcdServerDisconnectedTimeout = 1 * time.Minute - - etcdClientTimeoutDuration = 30 * time.Second - // healthyPath is the path to check etcd health. - healthyPath = "health" -) - -func newClient(tlsConfig *tls.Config, grpcDialOption grpc.DialOption, endpoints ...string) (*clientV3.Client, error) { - if len(endpoints) == 0 { - return nil, errors.New("empty endpoints") - } - logConfig := logutil.DefaultZapLoggerConfig - logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) - - lgc := zap.NewProductionConfig() - lgc.Encoding = log.ZapEncodingName - client, err := clientV3.New(clientV3.Config{ - Endpoints: endpoints, - TLS: tlsConfig, - LogConfig: &logConfig, - DialTimeout: defaultEtcdClientTimeout, - DialKeepAliveTime: defaultDialKeepAliveTime, - DialKeepAliveTimeout: defaultDialKeepAliveTimeout, - DialOptions: []grpc.DialOption{ - grpcDialOption, - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, - Multiplier: 1.1, - Jitter: 0.1, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: 3 * time.Second, - }), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 10 * time.Second, - Timeout: 20 * time.Second, - }), - }, - }) - if err != nil { - return nil, errors.Trace(err) - } - return client, nil -} - -// CreateRawEtcdClient creates etcd v3 client with detecting endpoints. -// It will check the health of endpoints periodically, and update endpoints if needed. -func CreateRawEtcdClient(securityConf *security.Credential, grpcDialOption grpc.DialOption, endpoints ...string) (*clientV3.Client, error) { - log.Info("create etcdCli", zap.Strings("endpoints", endpoints)) - - tlsConfig, err := securityConf.ToTLSConfig() - if err != nil { - return nil, err - } - - client, err := newClient(tlsConfig, grpcDialOption, endpoints...) - if err != nil { - return nil, err - } - - tickerInterval := defaultDialKeepAliveTime - - checker := &healthyChecker{ - tlsConfig: tlsConfig, - grpcDialOption: grpcDialOption, - } - eps := syncUrls(client) - checker.update(eps) - - // Create a goroutine to check the health of etcd endpoints periodically. - go func(client *clientV3.Client) { - ticker := time.NewTicker(tickerInterval) - defer ticker.Stop() - lastAvailable := time.Now() - for { - select { - case <-client.Ctx().Done(): - log.Info("etcd client is closed, exit health check goroutine") - checker.Range(func(key, value interface{}) bool { - client := value.(*healthyClient) - client.Close() - return true - }) - return - case <-ticker.C: - usedEps := client.Endpoints() - healthyEps := checker.patrol(client.Ctx()) - if len(healthyEps) == 0 { - // when all endpoints are unhealthy, try to reset endpoints to update connect - // rather than delete them to avoid there is no any endpoint in client. - // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. - // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, - // and it cannot recover as soon as possible. - if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { - log.Info("no available endpoint, try to reset endpoints", zap.Strings("lastEndpoints", usedEps)) - client.SetEndpoints([]string{}...) - client.SetEndpoints(usedEps...) - } - } else { - if !util.AreStringSlicesEquivalent(healthyEps, usedEps) { - client.SetEndpoints(healthyEps...) - change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) - log.Info("update endpoints", zap.String("numChange", change), - zap.Strings("lastEndpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) - } - lastAvailable = time.Now() - } - } - } - }(client) - - // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. - go func(client *clientV3.Client) { - ticker := time.NewTicker(tickerInterval) - defer ticker.Stop() - for { - select { - case <-client.Ctx().Done(): - log.Info("etcd client is closed, exit update endpoint goroutine") - return - case <-ticker.C: - eps := syncUrls(client) - checker.update(eps) - } - } - }(client) - - return client, nil -} - -type healthyClient struct { - *clientV3.Client - lastHealth time.Time -} - -type healthyChecker struct { - sync.Map // map[string]*healthyClient - tlsConfig *tls.Config - grpcDialOption grpc.DialOption -} - -func (checker *healthyChecker) patrol(ctx context.Context) []string { - // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 - var wg sync.WaitGroup - count := 0 - checker.Range(func(key, value interface{}) bool { - count++ - return true - }) - hch := make(chan string, count) - healthyList := make([]string, 0, count) - checker.Range(func(key, value interface{}) bool { - wg.Add(1) - go func(key, value interface{}) { - defer wg.Done() - ep := key.(string) - client := value.(*healthyClient) - if IsHealthy(ctx, client.Client) { - hch <- ep - checker.Store(ep, &healthyClient{ - Client: client.Client, - lastHealth: time.Now(), - }) - return - } - }(key, value) - return true - }) - wg.Wait() - close(hch) - for h := range hch { - healthyList = append(healthyList, h) - } - return healthyList -} - -func (checker *healthyChecker) update(eps []string) { - for _, ep := range eps { - // check if client exists, if not, create one, if exists, check if it's offline or disconnected. - if client, ok := checker.Load(ep); ok { - lastHealthy := client.(*healthyClient).lastHealth - if time.Since(lastHealthy) > etcdServerOfflineTimeout { - log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) - checker.Delete(ep) - } - if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { - // try to reset client endpoint to trigger reconnect - client.(*healthyClient).Client.SetEndpoints([]string{}...) - client.(*healthyClient).Client.SetEndpoints(ep) - } - continue - } - checker.addClient(ep, time.Now()) - } -} - -func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { - client, err := newClient(checker.tlsConfig, checker.grpcDialOption, ep) - if err != nil { - log.Error("failed to create etcd healthy client", zap.Error(err)) - return - } - checker.Store(ep, &healthyClient{ - Client: client, - lastHealth: lastHealth, - }) -} - -func syncUrls(client *clientV3.Client) []string { - // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 - ctx, cancel := context.WithTimeout(clientV3.WithRequireLeader(client.Ctx()), - etcdClientTimeoutDuration) - defer cancel() - mresp, err := client.MemberList(ctx) - if err != nil { - log.Error("failed to list members", errs.ZapError(err)) - return []string{} - } - var eps []string - for _, m := range mresp.Members { - if len(m.Name) != 0 && !m.IsLearner { - eps = append(eps, m.ClientURLs...) - } - } - return eps -} - -// IsHealthy checks if the etcd is healthy. -func IsHealthy(ctx context.Context, client *clientV3.Client) bool { - timeout := etcdClientTimeoutDuration - ctx, cancel := context.WithTimeout(clientV3.WithRequireLeader(ctx), timeout) - defer cancel() - _, err := client.Get(ctx, healthyPath) - // permission denied is OK since proposal goes through consensus to get it - // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 - return err == nil || err == rpctypes.ErrPermissionDenied -} diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 06e86934d..9772f294c 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" "github.com/stretchr/testify/require" ) @@ -228,6 +229,8 @@ func TestUpdateSchemaID(t *testing.T) { func setNodeManagerAndMessageCenter() *watcher.NodeManager { n := node.NewInfo("", "") + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) appcontext.SetService(appcontext.MessageCenter, messaging.NewMessageCenter(context.Background(), n.ID, 100, config.NewDefaultMessageCenterConfig(), nil)) nodeManager := watcher.NewNodeManager(nil, nil) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index d283ea064..220263837 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -32,12 +32,12 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/sink/util" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" @@ -67,6 +67,8 @@ type Maintainer struct { controller *Controller barrier *Barrier + pdClock pdutil.Clock + eventCh *chann.DrainableChann[*Event] taskScheduler threadpool.ThreadPool @@ -158,8 +160,13 @@ func NewMaintainer(cfID common.ChangeFeedID, ComponentStatus: heartbeatpb.ComponentState_Working, CheckpointTs: checkpointTs, }, selfNode.ID) + + // TODO: Retrieve the correct pdClock from the context once multiple upstreams are supported. + // For now, since there is only one upstream, using the default pdClock is sufficient. + pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) m := &Maintainer{ id: cfID, + pdClock: pdClock, selfNode: selfNode, eventCh: chann.NewAutoDrainChann[*Event](), taskScheduler: taskScheduler, @@ -491,14 +498,16 @@ func (m *Maintainer) calCheckpointTs() { func (m *Maintainer) updateMetrics() { watermark := m.getWatermark() + + pdTime := m.pdClock.CurrentTime() phyCkpTs := oracle.ExtractPhysical(watermark.CheckpointTs) m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs)) - lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3 + lag := float64(oracle.GetPhysical(pdTime)-phyCkpTs) / 1e3 m.changefeedCheckpointTsLagGauge.Set(lag) phyResolvedTs := oracle.ExtractPhysical(watermark.ResolvedTs) m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs)) - lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 + lag = float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) m.changefeedStatusGauge.Set(float64(m.state.Load())) diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index c9830841f..b18092538 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -32,11 +32,11 @@ import ( "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/scheduler" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils" "github.com/pingcap/ticdc/utils/threadpool" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index 834b24ff9..7d95f60eb 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -29,12 +29,12 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/scheduler" pkgOpearator "github.com/pingcap/ticdc/pkg/scheduler/operator" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 122e2bdde..1dba3cbe6 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/utils/threadpool" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index 7fbb7d51b..2475d4f9d 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/messaging/proto" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tiflow/cdc/model" config2 "github.com/pingcap/tiflow/pkg/config" @@ -61,6 +62,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { {SchemaID: 1, TableID: 4, SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t4"}}, }, } + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig(), nil) appcontext.SetService(appcontext.MessageCenter, mc) @@ -260,6 +264,8 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { {SchemaID: 1, TableID: 4, SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t4"}}, }, } + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig(), nil) appcontext.SetService(appcontext.MessageCenter, mc) @@ -375,6 +381,8 @@ func TestStopNotExistsMaintainer(t *testing.T) { {SchemaID: 1, TableID: 4, SchemaTableName: &commonEvent.SchemaTableName{SchemaName: "test", TableName: "t4"}}, }, } + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig(), nil) appcontext.SetService(appcontext.MessageCenter, mc) diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 09d580ca6..028c683dc 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/threadpool" "github.com/prometheus/client_golang/prometheus" @@ -290,6 +291,10 @@ func TestMaintainerSchedule(t *testing.T) { SchemaTableName: &commonEvent.SchemaTableName{}, }) } + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + schemaStore := &mockSchemaStore{tables: tables} appcontext.SetService(appcontext.SchemaStore, schemaStore) diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index 2fab9286f..73301910b 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -25,9 +25,9 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/scheduler/replica" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/spanz" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" diff --git a/maintainer/split/splitter.go b/maintainer/split/splitter.go index b6ffc7a5a..94922cd77 100644 --- a/maintainer/split/splitter.go +++ b/maintainer/split/splitter.go @@ -22,8 +22,8 @@ import ( "github.com/pingcap/ticdc/maintainer/replica" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/utils" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) diff --git a/maintainer/split/write_bytes_splitter.go b/maintainer/split/write_bytes_splitter.go index c55cd05db..11671fe0f 100644 --- a/maintainer/split/write_bytes_splitter.go +++ b/maintainer/split/write_bytes_splitter.go @@ -20,8 +20,8 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/pdutil" "go.uber.org/zap" ) diff --git a/maintainer/split/write_bytes_splitter_test.go b/maintainer/split/write_bytes_splitter_test.go index cfd645a0c..b089edb1a 100644 --- a/maintainer/split/write_bytes_splitter_test.go +++ b/maintainer/split/write_bytes_splitter_test.go @@ -23,8 +23,8 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/stretchr/testify/require" ) diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index e436f89a8..63d410ba8 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -2640,7 +2640,7 @@ "y": 4 }, "hiddenSeries": false, - "id": 27, + "id": 275, "legend": { "alignAsTable": true, "avg": false, @@ -8148,7 +8148,7 @@ "heatmap": {}, "hideZeroBuckets": true, "highlightCards": true, - "id": 274, + "id": 22225, "legend": { "alignAsTable": true, "avg": false, @@ -10703,7 +10703,7 @@ "heatmap": {}, "hideZeroBuckets": true, "highlightCards": true, - "id": 20200, + "id": 22223, "legend": { "alignAsTable": true, "avg": false, @@ -10787,7 +10787,7 @@ "heatmap": {}, "hideZeroBuckets": true, "highlightCards": true, - "id": 20201, + "id": 22224, "legend": { "alignAsTable": true, "avg": false, @@ -22227,7 +22227,7 @@ "y": 25 }, "hiddenSeries": false, - "id": 20199, + "id": 20201, "legend": { "alignAsTable": false, "avg": false, @@ -22540,7 +22540,7 @@ "y": 32 }, "hiddenSeries": false, - "id": 20201, + "id": 20209, "legend": { "alignAsTable": false, "avg": false, @@ -23455,7 +23455,7 @@ "y": 25 }, "hiddenSeries": false, - "id": 20209, + "id": 22222, "legend": { "alignAsTable": false, "avg": false, diff --git a/pkg/api/internal/rest/request.go b/pkg/api/internal/rest/request.go index 3e35dec63..be0e9b6ed 100644 --- a/pkg/api/internal/rest/request.go +++ b/pkg/api/internal/rest/request.go @@ -26,10 +26,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/tiflow/cdc/api/middleware" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/httputil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" ) diff --git a/pkg/common/context/app_context.go b/pkg/common/context/app_context.go index 98adcb77d..872ee5bba 100644 --- a/pkg/common/context/app_context.go +++ b/pkg/common/context/app_context.go @@ -22,6 +22,7 @@ var ( once sync.Once ) +// ServiceName is the name of the service. const ( MessageCenter = "MessageCenter" EventCollector = "EventCollector" @@ -33,6 +34,7 @@ const ( DispatcherDynamicStream = "DispatcherDynamicStream" MaintainerManager = "MaintainerManager" DispatcherOrchestrator = "DispatcherOrchestrator" + DefaultPDClock = "PDClock-0" ) // Put all the global instances here. diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index d2e237416..28552b999 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/tiflow/pkg/errorutil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 1041c0511..8d3dcd6d7 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -26,11 +26,13 @@ import ( "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" pevent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -70,6 +72,7 @@ type eventBroker struct { mounter pevent.Mounter // msgSender is used to send the events to the dispatchers. msgSender messaging.MessageSender + pdClock pdutil.Clock // changefeedMap is used to track the changefeed status. changefeedMap sync.Map @@ -119,9 +122,15 @@ func newEventBroker( g, ctx := errgroup.WithContext(ctx) ctx, cancel := context.WithCancel(ctx) + + // TODO: Retrieve the correct pdClock from the context once multiple upstreams are supported. + // For now, since there is only one upstream, using the default pdClock is sufficient. + pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) + c := &eventBroker{ tidbClusterID: id, eventStore: eventStore, + pdClock: pdClock, mounter: pevent.NewMounter(tz), schemaStore: schemaStore, changefeedMap: sync.Map{}, @@ -727,11 +736,12 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { if receivedMinResolvedTs == 0 { continue } + pdTime := c.pdClock.CurrentTime() phyResolvedTs := oracle.ExtractPhysical(receivedMinResolvedTs) - lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 + lag := float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3 c.metricEventServiceReceivedResolvedTs.Set(float64(phyResolvedTs)) c.metricEventServiceResolvedTsLag.Set(lag) - lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3 + lag = float64(oracle.GetPhysical(pdTime)-oracle.ExtractPhysical(sentMinWaterMark)) / 1e3 c.metricEventServiceSentResolvedTs.Set(lag) metricEventBrokerPendingScanTaskCount.Set(float64(len(c.taskChan))) } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 0ec665789..911b09508 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -22,9 +22,11 @@ import ( "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/stretchr/testify/require" ) @@ -37,6 +39,8 @@ func newTableSpan(tableID int64, start, end string) *heartbeatpb.TableSpan { } func newEventBrokerForTest() (*eventBroker, *mockEventStore, *mockSchemaStore) { + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) es := newMockEventStore(100) ss := newMockSchemaStore() mc := newMockMessageCenter() diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index 879ca33b5..78df2159e 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" tconfig "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -44,6 +45,8 @@ func initEventService( mc messaging.MessageCenter, mockStore eventstore.EventStore, ) *eventService { mockSchemaStore := newMockSchemaStore() + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) appcontext.SetService(appcontext.MessageCenter, mc) appcontext.SetService(appcontext.EventStore, mockStore) appcontext.SetService(appcontext.SchemaStore, mockSchemaStore) diff --git a/pkg/metrics/dispatcher.go b/pkg/metrics/dispatcher.go index bc370dd69..76c0e7eaa 100644 --- a/pkg/metrics/dispatcher.go +++ b/pkg/metrics/dispatcher.go @@ -112,13 +112,6 @@ var ( Buckets: LagBucket(), }, []string{"type"}) - EventCollectorResolvedTsLagGauge = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "event_collector", - Name: "resolved_ts_lag", - Help: "Resolved ts lag of event collector in seconds", - }) EventCollectorHandleEventDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -141,6 +134,5 @@ func InitDispatcherMetrics(registry *prometheus.Registry) { registry.MustRegister(DispatcherReceivedEventCount) registry.MustRegister(EventCollectorRegisteredDispatcherCount) registry.MustRegister(EventCollectorReceivedEventLagDuration) - registry.MustRegister(EventCollectorResolvedTsLagGauge) registry.MustRegister(EventCollectorHandleEventDuration) } diff --git a/pkg/pdutil/api_client.go b/pkg/pdutil/api_client.go new file mode 100644 index 000000000..f68ed8ba6 --- /dev/null +++ b/pkg/pdutil/api_client.go @@ -0,0 +1,369 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/spanz" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const ( + regionLabelPrefix = "/pd/api/v1/config/region-label/rules" + gcServiceSafePointURL = "/pd/api/v1/gc/safepoint" + healthyAPI = "/pd/api/v1/health" + scanRegionAPI = "/pd/api/v1/regions/key" + + // Split the default rule by following keys to keep metadata region isolated + // from the normal data area. + // + // * `6e000000000000000000f8`, keys starts with "m". + // * `748000fffffffffffffe00000000000000f8`, the table prefix of + // `tidb_ddl_job` table, which has the table ID 281474976710654, + // see "github.com/pingcap/tidb/pkg/ddl.JobTableID" + addMetaJSON = `{ + "sets": [ + { + "id": "ticdc/meta", + "labels": [ + { + "key": "data-type", + "value": "meta" + } + ], + "rule_type": "key-range", + "data": [ + { + "start_key": "6d00000000000000f8", + "end_key": "6e00000000000000f8" + } + ] + }, + { + "id": "ticdc/meta_tidb_ddl_job", + "labels": [ + { + "key": "data-type", + "value": "meta" + } + ], + "rule_type": "key-range", + "data": [ + { + "start_key": "748000fffffffffffffe00000000000000f8", + "end_key": "748000ffffffffffffff00000000000000f8" + } + ] + } + ] + }` +) + +const ( + defaultMaxRetry = 3 + defaultRequestTimeout = 5 * time.Second +) + +// PDAPIClient is client for PD http API. +type PDAPIClient interface { + UpdateMetaLabel(ctx context.Context) error + ListGcServiceSafePoint(ctx context.Context) (*ListServiceGCSafepoint, error) + CollectMemberEndpoints(ctx context.Context) ([]string, error) + Healthy(ctx context.Context, endpoint string) error + ScanRegions(ctx context.Context, span tablepb.Span) ([]RegionInfo, error) + Close() +} + +// pdAPIClient is the api client of Placement Driver, include grpc client and http client. +type pdAPIClient struct { + grpcClient pd.Client + httpClient *httputil.Client +} + +// NewPDAPIClient create a new pdAPIClient. +func NewPDAPIClient(pdClient pd.Client, conf *security.Credential) (PDAPIClient, error) { + dialClient, err := httputil.NewClient(conf) + if err != nil { + return nil, errors.Trace(err) + } + return &pdAPIClient{ + grpcClient: pdClient, + httpClient: dialClient, + }, nil +} + +// Close the pd api client, at the moment only close idle http connections if there is any. +func (pc *pdAPIClient) Close() { + pc.httpClient.CloseIdleConnections() +} + +// UpdateMetaLabel is a reentrant function that updates the meta-region label of upstream cluster. +func (pc *pdAPIClient) UpdateMetaLabel(ctx context.Context) error { + err := retry.Do(ctx, func() error { + ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + defer cancel() + + err := pc.patchMetaLabel(ctx) + if err != nil { + log.Error("Fail to add meta region label to PD", zap.Error(err)) + return err + } + + log.Info("Succeed to add meta region label to PD") + return nil + }, retry.WithMaxTries(defaultMaxRetry), retry.WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + return err +} + +// NewTestRegionInfo creates a new RegionInfo for test purpose. +func NewTestRegionInfo(regionID uint64, start, end []byte, writtenKeys uint64) RegionInfo { + return RegionInfo{ + ID: regionID, + StartKey: hex.EncodeToString(start), + EndKey: hex.EncodeToString(end), + WrittenKeys: writtenKeys, + } +} + +// RegionInfo records detail region info for api usage. +// NOTE: This type is a copy of github.com/tikv/pd/server/api.RegionInfo. +// To reduce dependency tree, we do not import the api package directly. +type RegionInfo struct { + ID uint64 `json:"id"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + WrittenKeys uint64 `json:"written_keys"` +} + +// RegionsInfo contains some regions with the detailed region info. +// NOTE: This type is a copy of github.com/tikv/pd/server/api.RegionInfo. +// To reduce dependency tree, we do not import the api package directly. +type RegionsInfo struct { + Count int `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// ScanRegions is a reentrant function that updates the meta-region label of upstream cluster. +func (pc *pdAPIClient) ScanRegions(ctx context.Context, span tablepb.Span) ([]RegionInfo, error) { + scanLimit := 1024 + endpoints, err := pc.CollectMemberEndpoints(ctx) + if err != nil { + log.Warn("fail to collec pd member endpoints") + return nil, errors.Trace(err) + } + return pc.scanRegions(ctx, span, endpoints, scanLimit) +} + +func (pc *pdAPIClient) scanRegions( + ctx context.Context, span tablepb.Span, endpoints []string, scanLimit int, +) ([]RegionInfo, error) { + scan := func(endpoint string, startKey, endKey []byte) ([]RegionInfo, error) { + query := url.Values{} + query.Add("key", string(startKey)) + query.Add("end_key", string(endKey)) + query.Add("limit", strconv.Itoa(scanLimit)) + u, _ := url.Parse(endpoint + scanRegionAPI) + u.RawQuery = query.Encode() + resp, err := pc.httpClient.Get(ctx, u.String()) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + regions := &RegionsInfo{} + err = json.Unmarshal(data, regions) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + return regions.Regions, nil + } + + regions := []RegionInfo{} + startKey := span.StartKey + startKeyHex := strings.ToUpper(hex.EncodeToString(startKey)) + isFirstStartKey := true + for spanz.EndCompare(startKey, span.EndKey) < 0 || (len(startKey) == 0 && isFirstStartKey) { + for i, endpoint := range endpoints { + r, err := scan(endpoint, startKey, span.EndKey) + if err != nil && i+1 == len(endpoints) { + return nil, errors.Trace(err) + } + + if len(r) == 0 { + // Because start key is less than end key, there must be some regions. + log.Error("fail to scan region, missing region", + zap.String("endpoint", endpoint)) + return nil, cerror.WrapError(cerror.ErrInternalServerError, + fmt.Errorf("fail to scan region, missing region")) + } + if r[0].StartKey != startKeyHex { + r[0].StartKey = strings.ToUpper(hex.EncodeToString(startKey)) + log.Info("start key mismatch, adjust start key", + zap.String("startKey", startKeyHex), + zap.String("regionStartKey", r[0].StartKey), + zap.Uint64("regionID", r[0].ID)) + } + regions = append(regions, r...) + key, err := hex.DecodeString(regions[len(regions)-1].EndKey) + if err != nil { + log.Info("fail to decode region end key", + zap.String("endKey", regions[len(regions)-1].EndKey), + zap.Uint64("regionID", r[len(regions)-1].ID)) + return nil, errors.Trace(err) + } + startKey = tablepb.Key(key) + startKeyHex = strings.ToUpper(hex.EncodeToString(startKey)) + isFirstStartKey = false + break + } + } + if regions[len(regions)-1].EndKey != string(span.EndKey) { + regions[len(regions)-1].EndKey = strings.ToUpper(hex.EncodeToString(span.EndKey)) + log.Info("end key mismatch, adjust end key", + zap.String("endKey", strings.ToUpper(hex.EncodeToString(span.EndKey))), + zap.String("regionEndKey", regions[len(regions)-1].EndKey), + zap.Uint64("regionID", regions[len(regions)-1].ID)) + } + + return regions, nil +} + +// ServiceSafePoint contains gc service safe point +type ServiceSafePoint struct { + ServiceID string `json:"service_id"` + ExpiredAt int64 `json:"expired_at"` + SafePoint uint64 `json:"safe_point"` +} + +// ListServiceGCSafepoint is the response of pd list gc service safe point API +type ListServiceGCSafepoint struct { + ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"` + GCSafePoint uint64 `json:"gc_safe_point"` +} + +// ListGcServiceSafePoint list gc service safepoint from PD +func (pc *pdAPIClient) ListGcServiceSafePoint( + ctx context.Context, +) (*ListServiceGCSafepoint, error) { + var ( + resp *ListServiceGCSafepoint + err error + ) + err = retry.Do(ctx, func() error { + ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout) + defer cancel() + + resp, err = pc.listGcServiceSafePoint(ctx) + if err != nil { + return err + } + return nil + }, retry.WithMaxTries(defaultMaxRetry), retry.WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + return resp, err +} + +func (pc *pdAPIClient) patchMetaLabel(ctx context.Context) error { + url := pc.grpcClient.GetLeaderURL() + regionLabelPrefix + header := http.Header{"Content-Type": {"application/json"}} + content := []byte(addMetaJSON) + + _, err := pc.httpClient.DoRequest(ctx, url, http.MethodPatch, + header, bytes.NewReader(content)) + return errors.Trace(err) +} + +func (pc *pdAPIClient) listGcServiceSafePoint( + ctx context.Context, +) (*ListServiceGCSafepoint, error) { + url := pc.grpcClient.GetLeaderURL() + gcServiceSafePointURL + + respData, err := pc.httpClient.DoRequest(ctx, url, http.MethodGet, + nil, nil) + if err != nil { + return nil, errors.Trace(err) + } + resp := ListServiceGCSafepoint{} + err = json.Unmarshal(respData, &resp) + if err != nil { + return nil, errors.Trace(err) + } + return &resp, nil +} + +// CollectMemberEndpoints return all members' endpoint +func (pc *pdAPIClient) CollectMemberEndpoints(ctx context.Context) ([]string, error) { + members, err := pc.grpcClient.GetAllMembers(ctx) + if err != nil { + return nil, errors.Trace(err) + } + result := make([]string, 0, len(members)) + for _, m := range members { + clientUrls := m.GetClientUrls() + if len(clientUrls) > 0 { + result = append(result, clientUrls[0]) + } + } + return result, nil +} + +// Healthy return error if the member corresponding to the endpoint is unhealthy +func (pc *pdAPIClient) Healthy(ctx context.Context, endpoint string) error { + url := endpoint + healthyAPI + resp, err := pc.httpClient.Get(ctx, fmt.Sprintf("%s/", url)) + if err != nil { + return errors.Trace(err) + } + _ = resp.Body.Close() + return nil +} diff --git a/pkg/pdutil/api_client_test.go b/pkg/pdutil/api_client_test.go new file mode 100644 index 000000000..2331d9b4e --- /dev/null +++ b/pkg/pdutil/api_client_test.go @@ -0,0 +1,259 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "encoding/hex" + "encoding/json" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" +) + +type mockPDClient struct { + pd.Client + testServer *httptest.Server + url string +} + +func (m *mockPDClient) GetLeaderURL() string { + return m.url +} + +func newMockPDClient(normal bool) *mockPDClient { + mock := &mockPDClient{} + status := http.StatusOK + if !normal { + status = http.StatusNotFound + } + mock.testServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(status) + _, _ = w.Write([]byte("{}")) + }, + )) + mock.url = mock.testServer.URL + + return mock +} + +func TestMetaLabelNormal(t *testing.T) { + t.Parallel() + + mockClient := newMockPDClient(true) + + pc, err := NewPDAPIClient(mockClient, nil) + require.NoError(t, err) + defer pc.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = pc.UpdateMetaLabel(ctx) + require.NoError(t, err) + mockClient.testServer.Close() +} + +func TestMetaLabelFail(t *testing.T) { + t.Parallel() + + mockClient := newMockPDClient(false) + pc, err := NewPDAPIClient(mockClient, nil) + require.NoError(t, err) + defer pc.Close() + mockClient.url = "http://127.0.1.1:2345" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // test url error + err = pc.(*pdAPIClient).patchMetaLabel(ctx) + require.Error(t, err) + + // test 404 + mockClient.url = mockClient.testServer.URL + err = pc.(*pdAPIClient).patchMetaLabel(ctx) + require.Regexp(t, ".*404.*", err) + + err = pc.UpdateMetaLabel(ctx) + require.ErrorIs(t, err, cerror.ErrReachMaxTry) + mockClient.testServer.Close() +} + +func TestListGcServiceSafePoint(t *testing.T) { + t.Parallel() + + mockClient := newMockPDClient(true) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pc, err := NewPDAPIClient(mockClient, nil) + require.NoError(t, err) + defer pc.Close() + _, err = pc.ListGcServiceSafePoint(ctx) + require.NoError(t, err) + mockClient.testServer.Close() +} + +// LabelRulePatch is the patch to update the label rules. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// Copied from github.com/tikv/pd/server/schedule/labeler +type LabelRulePatch struct { + SetRules []*LabelRule `json:"sets"` + DeleteRules []string `json:"deletes"` +} + +// LabelRule is the rule to assign labels to a region. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// Copied from github.com/tikv/pd/server/schedule/labeler +type LabelRule struct { + ID string `json:"id"` + Index int `json:"index"` + Labels []RegionLabel `json:"labels"` + RuleType string `json:"rule_type"` + Data interface{} `json:"data"` +} + +// RegionLabel is the label of a region. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +// Copied from github.com/tikv/pd/server/schedule/labeler +type RegionLabel struct { + Key string `json:"key"` + Value string `json:"value"` +} + +func TestMetaLabelDecodeJSON(t *testing.T) { + t.Parallel() + + meta := LabelRulePatch{} + require.Nil(t, json.Unmarshal([]byte(addMetaJSON), &meta)) + require.Len(t, meta.SetRules, 2) + keys := meta.SetRules[1].Data.([]interface{})[0].(map[string]interface{}) + startKey, err := hex.DecodeString(keys["start_key"].(string)) + require.NoError(t, err) + endKey, err := hex.DecodeString(keys["end_key"].(string)) + require.NoError(t, err) + + _, startKey, err = codec.DecodeBytes(startKey, nil) + require.NoError(t, err) + require.EqualValues( + t, spanz.JobTableID, tablecodec.DecodeTableID(startKey), keys["start_key"].(string)) + + _, endKey, err = codec.DecodeBytes(endKey, nil) + require.NoError(t, err) + require.EqualValues( + t, spanz.JobTableID+1, tablecodec.DecodeTableID(endKey), keys["end_key"].(string)) +} + +func TestScanRegions(t *testing.T) { + t.Parallel() + + regions := []RegionInfo{ + NewTestRegionInfo(2, []byte(""), []byte{0, 1}, 0), + NewTestRegionInfo(3, []byte{0, 1}, []byte{0, 2}, 1), + NewTestRegionInfo(4, []byte{0, 2}, []byte{0, 3}, 2), + NewTestRegionInfo(5, []byte{0, 2}, []byte{0, 4}, 3), // a merged region. + NewTestRegionInfo(6, []byte{0, 4}, []byte{1, 0}, 4), + NewTestRegionInfo(7, []byte{1, 0}, []byte{1, 1}, 5), + NewTestRegionInfo(8, []byte{1, 1}, []byte(""), 6), + } + var handler func() RegionsInfo + mockPDServer := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + startKey, _ := hex.DecodeString(r.URL.Query()["key"][0]) + endKey, _ := hex.DecodeString(r.URL.Query()["end_key"][0]) + limit, _ := strconv.Atoi(r.URL.Query()["limit"][0]) + t.Log(startKey, endKey, limit) + info := handler() + info.Count = len(info.Regions) + data, _ := json.Marshal(info) + t.Logf("%s", string(data)) + _, _ = w.Write(data) + }, + )) + defer mockPDServer.Close() + + httpcli, _ := httputil.NewClient(nil) + pc := pdAPIClient{httpClient: httpcli} + + i := 0 + handler = func() RegionsInfo { + start := i + end := i + 1 + i++ + if end > len(regions) { + return RegionsInfo{Regions: regions[start:]} + } + return RegionsInfo{Regions: regions[start:end]} + } + rs, err := pc.scanRegions(context.Background(), tablepb.Span{}, []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 7, len(rs)) + + handler = func() RegionsInfo { + return RegionsInfo{Regions: regions} + } + rs, err = pc.scanRegions(context.Background(), tablepb.Span{}, []string{mockPDServer.URL}, 1024) + require.NoError(t, err) + require.Equal(t, 7, len(rs)) + + i = 0 + handler = func() RegionsInfo { + if i != 0 { + require.FailNow(t, "must only request once") + } + i++ + return RegionsInfo{Regions: regions[2:3]} + } + rs, err = pc.scanRegions( + context.Background(), + tablepb.Span{StartKey: []byte{0, 2, 0}, EndKey: []byte{0, 3}}, + []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 1, len(rs)) + + i = 0 + handler = func() RegionsInfo { + if i == 0 { + i++ + return RegionsInfo{Regions: regions[2:3]} + } else if i == 1 { + i++ + return RegionsInfo{Regions: regions[3:4]} + } else if i == 2 { + i++ + return RegionsInfo{Regions: regions[4:5]} + } + + require.FailNow(t, "must only request once") + return RegionsInfo{} + } + rs, err = pc.scanRegions( + context.Background(), + tablepb.Span{StartKey: []byte{0, 2, 0}, EndKey: []byte{0, 4, 0}}, + []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 3, len(rs)) +} diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go new file mode 100644 index 000000000..e2eef67a7 --- /dev/null +++ b/pkg/pdutil/clock.go @@ -0,0 +1,160 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/retry" + pclock "github.com/pingcap/tiflow/engine/pkg/clock" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const pdTimeUpdateInterval = 10 * time.Millisecond + +// Clock is a time source of PD cluster. +type Clock interface { + // CurrentTime returns approximate current time from pd. + CurrentTime() time.Time + Run(ctx context.Context) + Stop() +} + +// clock cache time get from PD periodically and cache it +type clock struct { + pdClient pd.Client + mu struct { + sync.RWMutex + // The time encoded in PD ts. + tsEventTime time.Time + // The time we receive PD ts. + tsProcessingTime time.Time + } + updateInterval time.Duration + cancel context.CancelFunc + stopCh chan struct{} +} + +// NewClock return a new clock +func NewClock(ctx context.Context, pdClient pd.Client) (*clock, error) { + ret := &clock{ + pdClient: pdClient, + stopCh: make(chan struct{}, 1), + updateInterval: pdTimeUpdateInterval, + } + physical, _, err := pdClient.GetTS(ctx) + if err != nil { + return nil, errors.Trace(err) + } + ret.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + ret.mu.tsProcessingTime = time.Now() + return ret, nil +} + +// Run gets time from pd periodically. +func (c *clock) Run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + c.mu.Lock() + c.cancel = cancel + c.mu.Unlock() + ticker := time.NewTicker(c.updateInterval) + defer func() { c.stopCh <- struct{}{} }() + for { + select { + // c.Stop() was called or parent ctx was canceled + case <-ctx.Done(): + return + case <-ticker.C: + err := retry.Do(ctx, func() error { + physical, _, err := c.pdClient.GetTS(ctx) + if err != nil { + log.Info("get time from pd failed, retry later", zap.Error(err)) + return err + } + c.mu.Lock() + c.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + c.mu.tsProcessingTime = time.Now() + c.mu.Unlock() + return nil + }, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10)) + if err != nil { + log.Warn("get time from pd failed, do not update time cache", + zap.Time("cachedTime", c.mu.tsEventTime), + zap.Time("processingTime", c.mu.tsProcessingTime), + zap.Error(err)) + } + } + } +} + +// CurrentTime returns approximate current time from pd. +func (c *clock) CurrentTime() time.Time { + c.mu.RLock() + defer c.mu.RUnlock() + tsEventTime := c.mu.tsEventTime + current := tsEventTime.Add(time.Since(c.mu.tsProcessingTime)) + return current +} + +// Stop clock. +func (c *clock) Stop() { + c.mu.Lock() + c.cancel() + c.mu.Unlock() + <-c.stopCh +} + +type clock4Test struct{} + +// NewClock4Test return a new clock for test. +func NewClock4Test() Clock { + return &clock4Test{} +} + +func (c *clock4Test) CurrentTime() time.Time { + return time.Now() +} + +func (c *clock4Test) Run(ctx context.Context) { +} + +func (c *clock4Test) Stop() { +} + +type monotonicClock struct { + clock pclock.Clock +} + +// NewMonotonicClock return a new monotonic clock. +func NewMonotonicClock(pClock pclock.Clock) Clock { + return &monotonicClock{ + clock: pClock, + } +} + +func (c *monotonicClock) CurrentTime() time.Time { + return c.clock.Now() +} + +func (c *monotonicClock) Run(ctx context.Context) { +} + +func (c *monotonicClock) Stop() { +} diff --git a/pkg/pdutil/clock_test.go b/pkg/pdutil/clock_test.go new file mode 100644 index 000000000..57f7c5636 --- /dev/null +++ b/pkg/pdutil/clock_test.go @@ -0,0 +1,75 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" +) + +// MockPDClient mocks pd.Client to facilitate unit testing. +type MockPDClient struct { + pd.Client +} + +// GetTS implements pd.Client.GetTS. +func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.GetPhysical(time.Now()), 0, nil +} + +func TestTimeFromPD(t *testing.T) { + t.Parallel() + mockPDClient := &MockPDClient{} + clock, err := NewClock(context.Background(), mockPDClient) + require.NoError(t, err) + + go clock.Run(context.Background()) + defer clock.Stop() + time.Sleep(1 * time.Second) + + t1 := clock.CurrentTime() + + time.Sleep(400 * time.Millisecond) + // assume that the gc safe point updated one hour ago + t2 := clock.CurrentTime() + // should return new time + require.NotEqual(t, t1, t2) +} + +func TestEventTimeAndProcessingTime(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockPDClient := &MockPDClient{} + clock, err := NewClock(ctx, mockPDClient) + require.NoError(t, err) + + // Disable update in test by setting a very long update interval. + clock.updateInterval = time.Hour + go clock.Run(ctx) + defer clock.Stop() + + sleep := time.Second + time.Sleep(sleep) + t1 := clock.CurrentTime() + now := time.Now() + require.Nil(t, err) + require.Less(t, now.Sub(t1), sleep/2) +} diff --git a/pkg/pdutil/main_test.go b/pkg/pdutil/main_test.go new file mode 100644 index 000000000..141c4e4c5 --- /dev/null +++ b/pkg/pdutil/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/pdutil/utils.go b/pkg/pdutil/utils.go new file mode 100644 index 000000000..4edfe4dbc --- /dev/null +++ b/pkg/pdutil/utils.go @@ -0,0 +1,52 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "strconv" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const sourceIDName = "source_id" + +// GetSourceID returns the source ID of the TiDB cluster that PD is belonged to. +func GetSourceID(ctx context.Context, pdClient pd.Client) (uint64, error) { + // only nil in test case + if pdClient == nil { + return config.DefaultTiDBSourceID, nil + } + // The default value of sourceID is 1, + // which means the sourceID is not changed by user. + sourceID := uint64(1) + sourceIDConfig, _, err := pdClient.LoadGlobalConfig(ctx, []string{sourceIDName}, "") + if err != nil { + return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if len(sourceIDConfig) != 0 && sourceIDConfig[0].Value != "" { + sourceID, err = strconv.ParseUint(sourceIDConfig[0].Value, 10, 64) + if err != nil { + log.Error("fail to parse sourceID from PD", + zap.String("sourceID", sourceIDConfig[0].Value), + zap.Error(err)) + return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + } + return sourceID, nil +} diff --git a/pkg/pdutil/utils_test.go b/pkg/pdutil/utils_test.go new file mode 100644 index 000000000..235466bd0 --- /dev/null +++ b/pkg/pdutil/utils_test.go @@ -0,0 +1,47 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/stretchr/testify/require" +) + +func TestGetSourceID(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + defer func() { + err := store.Close() + require.NoError(t, err) + }() + domain, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domain.Close() + se, err := session.CreateSession4Test(store) + require.NoError(t, err) + _, err = se.Execute(context.Background(), "set @@global.tidb_source_id=2;") + require.NoError(t, err) + require.Eventually(t, func() bool { + client := store.(kv.StorageWithPD).GetPDClient() + sourceID, err := GetSourceID(context.Background(), client) + require.NoError(t, err) + return sourceID == 2 + }, 5*time.Second, 100*time.Millisecond) +} diff --git a/pkg/retry/error_retry.go b/pkg/retry/error_retry.go new file mode 100644 index 000000000..3e0128e58 --- /dev/null +++ b/pkg/retry/error_retry.go @@ -0,0 +1,105 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" + "math/rand" + "time" + + "github.com/pingcap/log" + cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +const ( + defaultErrorMaxRetryDuration = 30 * time.Minute + defaultErrGCInterval = 10 * time.Minute + defaultBackoffBaseInS = 5 + defaultBackoffMaxInS = 30 +) + +// ErrorRetry is used to control the error retry logic. +type ErrorRetry struct { + // To control the error retry. + lastInternalError error + firstRetryTime time.Time + lastErrorRetryTime time.Time + maxRetryDuration time.Duration + errGCInterval time.Duration + backoffBase int64 + backoffMax int64 +} + +// NewDefaultErrorRetry creates a new ErrorRetry with default values. +func NewDefaultErrorRetry() *ErrorRetry { + return NewErrorRetry(defaultErrorMaxRetryDuration, + defaultErrGCInterval, + defaultBackoffBaseInS, + defaultBackoffMaxInS) +} + +// NewInfiniteErrorRetry creates a new ErrorRetry with infinite duration. +func NewInfiniteErrorRetry() *ErrorRetry { + return NewErrorRetry(time.Duration(math.MaxInt64), + defaultErrGCInterval, + defaultBackoffBaseInS, + defaultBackoffMaxInS) +} + +// NewErrorRetry creates a new ErrorRetry. +func NewErrorRetry( + maxRetryDuration time.Duration, + errGCInterval time.Duration, + backoffBase int64, + backoffMax int64, +) *ErrorRetry { + return &ErrorRetry{ + maxRetryDuration: maxRetryDuration, + errGCInterval: errGCInterval, + backoffBase: backoffBase, + backoffMax: backoffMax, + } +} + +// GetRetryBackoff returns the backoff duration for retrying the last error. +// If the retry time is exhausted, it returns the an ChangefeedUnRetryableError. +func (r *ErrorRetry) GetRetryBackoff(err error) (time.Duration, error) { + // reset firstRetryTime when the last error is too long ago + // it means the last error is retry success, and the sink is running well for some time + if r.lastInternalError == nil || + time.Since(r.lastErrorRetryTime) >= r.errGCInterval { + log.Debug("reset firstRetryTime", + zap.Time("lastErrorRetryTime", r.lastErrorRetryTime), + zap.Time("now", time.Now())) + r.firstRetryTime = time.Now() + } + + // return an unretryable error if retry time is exhausted + if time.Since(r.firstRetryTime) >= r.maxRetryDuration { + log.Debug("error retry exhausted", + zap.Time("firstRetryTime", r.firstRetryTime), + zap.Time("lastErrorRetryTime", r.lastErrorRetryTime), + zap.Time("now", time.Now())) + return 0, cerror.WrapChangefeedUnretryableErr(err) + } + + r.lastInternalError = err + r.lastErrorRetryTime = time.Now() + + // interval is in range [defaultBackoffBaseInS, defaultBackoffMaxInS) + interval := time.Second * time.Duration( + rand.Int63n(defaultBackoffMaxInS-defaultBackoffBaseInS)+defaultBackoffBaseInS) + return interval, nil +} diff --git a/pkg/retry/error_retry_test.go b/pkg/retry/error_retry_test.go new file mode 100644 index 000000000..a3f8a83c7 --- /dev/null +++ b/pkg/retry/error_retry_test.go @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package retry + +import ( + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestGetRetryBackoff(t *testing.T) { + t.Parallel() + + r := NewDefaultErrorRetry() + // test retry backoff + backoff, err := r.GetRetryBackoff(errors.New("test")) + require.NoError(t, err) + require.Less(t, backoff, 30*time.Second) + time.Sleep(500 * time.Millisecond) + elapsedTime := time.Since(r.firstRetryTime) + + // mock time to test reset error backoff + r.lastErrorRetryTime = time.Unix(0, 0) + _, err = r.GetRetryBackoff(errors.New("test")) + require.NoError(t, err) + require.Less(t, time.Since(r.firstRetryTime), elapsedTime) +} diff --git a/pkg/retry/main_test.go b/pkg/retry/main_test.go new file mode 100644 index 000000000..e9f7b820f --- /dev/null +++ b/pkg/retry/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/retry/options.go b/pkg/retry/options.go new file mode 100644 index 000000000..942f04025 --- /dev/null +++ b/pkg/retry/options.go @@ -0,0 +1,96 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" + "time" +) + +const ( + // defaultBackoffBaseInMs is the initial duration, in Millisecond + defaultBackoffBaseInMs = 10.0 + // defaultBackoffCapInMs is the max amount of duration, in Millisecond + defaultBackoffCapInMs = 100.0 + defaultMaxTries = math.MaxUint64 + defaultMaxRetryDuration = time.Duration(0) +) + +// Option ... +type Option func(*retryOptions) + +// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry +type IsRetryable func(error) bool + +type retryOptions struct { + totalRetryDuration time.Duration + maxTries uint64 + backoffBaseInMs float64 + backoffCapInMs float64 + isRetryable IsRetryable +} + +func newRetryOptions() *retryOptions { + return &retryOptions{ + totalRetryDuration: defaultMaxRetryDuration, + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, + } +} + +// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used +func WithBackoffBaseDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffBaseInMs = float64(delayInMs) + } + } +} + +// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used +func WithBackoffMaxDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffCapInMs = float64(delayInMs) + } + } +} + +// WithMaxTries configures maximum tries, if tries is 0, 1 will be used +func WithMaxTries(tries uint64) Option { + return func(o *retryOptions) { + if tries == 0 { + tries = 1 + } + o.maxTries = tries + } +} + +// WithTotalRetryDuratoin configures the total retry duration. +func WithTotalRetryDuratoin(retryDuration time.Duration) Option { + return func(o *retryOptions) { + o.totalRetryDuration = retryDuration + } +} + +// WithIsRetryableErr configures the error should retry or not, if not set, retry by default +func WithIsRetryableErr(f IsRetryable) Option { + return func(o *retryOptions) { + if f != nil { + o.isRetryable = f + } + } +} diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go new file mode 100644 index 000000000..2080e7cb3 --- /dev/null +++ b/pkg/retry/retry_test.go @@ -0,0 +1,204 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestDoShouldRetryAtMostSpecifiedTimes(t *testing.T) { + t.Parallel() + + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + require.Regexp(t, "test", errors.Cause(err)) + require.Equal(t, callCount, 3) +} + +func TestDoShouldStopOnSuccess(t *testing.T) { + t.Parallel() + + var callCount int + f := func() error { + callCount++ + if callCount == 2 { + return nil + } + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + require.Nil(t, err) + require.Equal(t, callCount, 2) +} + +func TestIsRetryable(t *testing.T) { + t.Parallel() + + var callCount int + f := func() error { + callCount++ + return errors.Annotate(context.Canceled, "test") + } + + err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + + require.Equal(t, errors.Cause(err), context.Canceled) + require.Equal(t, callCount, 1) + + callCount = 0 + err = Do(context.Background(), f, WithMaxTries(3)) + + require.Equal(t, errors.Cause(err), context.Canceled) + require.Equal(t, callCount, 3) +} + +func TestDoCancelInfiniteRetry(t *testing.T) { + t.Parallel() + + callCount := 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + f := func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + require.Equal(t, errors.Cause(err), context.DeadlineExceeded) + require.GreaterOrEqual(t, callCount, 1, "tries: %d", callCount) + require.Less(t, callCount, math.MaxInt64) +} + +func TestDoCancelAtBeginning(t *testing.T) { + t.Parallel() + + callCount := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + require.Equal(t, errors.Cause(err), context.Canceled) + require.Equal(t, callCount, 0, "tries:%d", callCount) +} + +func TestDoCornerCases(t *testing.T) { + t.Parallel() + + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + require.Regexp(t, "test", errors.Cause(err)) + require.Equal(t, callCount, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + require.Regexp(t, "test", errors.Cause(err)) + require.Equal(t, callCount, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + require.Regexp(t, "test", errors.Cause(err)) + require.Equal(t, callCount, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + require.Regexp(t, "test", errors.Cause(err)) + require.Equal(t, callCount, 2) + + var i uint64 + for i = 0; i < 10; i++ { + callCount = 0 + err = Do(context.Background(), f, + WithBackoffBaseDelay(int64(i)), WithBackoffMaxDelay(int64(i)), WithMaxTries(i)) + require.Regexp(t, "test", errors.Cause(err)) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + if i == 0 { + require.Equal(t, 1, callCount) + } else { + require.Equal(t, int(i), callCount) + } + } +} + +func TestTotalRetryDuration(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("test") + } + + start := time.Now() + err := Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 1, int(math.Round(time.Since(start).Seconds()))) + + start = time.Now() + err = Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(2*time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 2, int(math.Round(time.Since(start).Seconds()))) +} + +func TestRetryError(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("some error info") + } + + err := Do( + context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithMaxTries(2), + ) + require.Regexp(t, "some error info", errors.Cause(err)) + require.Regexp(t, ".*some error info.*", err.Error()) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err.Error()) +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go new file mode 100644 index 000000000..8f6bc1250 --- /dev/null +++ b/pkg/retry/retry_with_opt.go @@ -0,0 +1,109 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "strconv" + "time" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// Operation is the action need to retry +type Operation func() error + +// Do execute the specified function. +// By default, it retries infinitely until it succeeds or got canceled. +func Do(ctx context.Context, operation Operation, opts ...Option) error { + retryOption := setOptions(opts...) + return run(ctx, operation, retryOption) +} + +func setOptions(opts ...Option) *retryOptions { + retryOption := newRetryOptions() + for _, opt := range opts { + opt(retryOption) + } + return retryOption +} + +func run(ctx context.Context, op Operation, retryOption *retryOptions) error { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + default: + } + + var t *time.Timer + var start time.Time + try := uint64(0) + backOff := time.Duration(0) + for { + err := op() + if err == nil { + return nil + } + + if !retryOption.isRetryable(err) { + return err + } + + try++ + if try >= retryOption.maxTries { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(strconv.Itoa(int(retryOption.maxTries)), err) + } + if retryOption.totalRetryDuration > 0 { + if start.IsZero() { + start = time.Now() + } else if time.Since(start) > retryOption.totalRetryDuration { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(retryOption.totalRetryDuration, err) + } + } + + backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) + if t == nil { + t = time.NewTimer(backOff) + defer t.Stop() + } else { + t.Reset(backOff) + } + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-t.C: + } + } +} + +// getBackoffInMs returns the duration to wait before next try +// See https://www.awsarchitectureblog.com/2015/03/backoff.html +func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration { + temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2) + if temp <= 0 { + temp = 1 + } + sleep := (temp + rand.Int63n(temp)) * 3 + if sleep <= 0 { + sleep = math.MaxInt64 + } + backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep))+backoffBaseInMs) + return time.Duration(backOff) * time.Millisecond +} diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 4131ba122..ecb604c7b 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" timodel "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tiflow/pkg/retry" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/zap" ) diff --git a/pkg/sink/mysql/mysql_writer_dml.go b/pkg/sink/mysql/mysql_writer_dml.go index cb1276be1..9212f17f3 100644 --- a/pkg/sink/mysql/mysql_writer_dml.go +++ b/pkg/sink/mysql/mysql_writer_dml.go @@ -24,7 +24,7 @@ import ( "github.com/pingcap/log" commonEvent "github.com/pingcap/ticdc/pkg/common/event" cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/ticdc/pkg/retry" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 4168c64ab..44ba6cd9c 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/ticdc/pkg/retry" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) diff --git a/logservice/upstream/manager.go b/pkg/upstream/manager.go similarity index 100% rename from logservice/upstream/manager.go rename to pkg/upstream/manager.go diff --git a/logservice/upstream/upstream.go b/pkg/upstream/upstream.go similarity index 98% rename from logservice/upstream/upstream.go rename to pkg/upstream/upstream.go index d70a4d085..6ab591593 100644 --- a/logservice/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -25,9 +25,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/pdutil" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/driver" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/security" tikvconfig "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" @@ -154,7 +155,7 @@ func initUpstream(ctx context.Context, up *Upstream) error { return errors.Trace(err) } - etcdCli, err := CreateRawEtcdClient(up.SecurityConfig, grpcTLSOption, up.PdEndpoints...) + etcdCli, err := etcd.CreateRawEtcdClient(up.SecurityConfig, grpcTLSOption, up.PdEndpoints...) if err != nil { return errors.Trace(err) } diff --git a/scripts/check-ticdc-dashboard.sh b/scripts/check-ticdc-dashboard.sh index e63c61431..efca7bf1f 100755 --- a/scripts/check-ticdc-dashboard.sh +++ b/scripts/check-ticdc-dashboard.sh @@ -12,11 +12,11 @@ # limitations under the License. if $(which jq &>/dev/null); then - dup=$(jq '[.panels[] | .panels[]]| group_by(.id) | .[] | select(length>1) | .[] | { id: .id, title: .title}' metrics/grafana/ticdc.json) + dup=$(jq '[.panels[] | .panels[]]| group_by(.id) | .[] | select(length>1) | .[] | { id: .id, title: .title}' metrics/grafana/ticdc_new_arch.json) [[ -n $dup ]] || exit 0 - echo "Find panels with duplicated ID in metrics/grafana/ticdc.json" + echo "Find panels with duplicated ID in metrics/grafana/ticdc_new_arch.json" echo "$dup" echo "Please choose a new ID that is larger than the max ID:" jq '[.panels[] | .panels[] | .id] | max' \ - metrics/grafana/ticdc.json + metrics/grafana/ticdc_new_arch.json fi diff --git a/server/server.go b/server/server.go index 943d1e3ca..538d61b58 100644 --- a/server/server.go +++ b/server/server.go @@ -37,11 +37,11 @@ import ( "github.com/pingcap/ticdc/pkg/eventservice" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" tiserver "github.com/pingcap/ticdc/pkg/server" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/tcpserver" "github.com/tikv/client-go/v2/tikv" @@ -116,6 +116,8 @@ func (c *server) initialize(ctx context.Context) error { return errors.Trace(err) } + appcontext.SetService(appcontext.DefaultPDClock, c.PDClock) + appcontext.SetID(c.info.ID.String()) messageCenter := messaging.NewMessageCenter(ctx, c.info.ID, c.info.Epoch, config.NewDefaultMessageCenterConfig(), c.security) appcontext.SetService(appcontext.MessageCenter, messageCenter) diff --git a/server/server_prepare.go b/server/server_prepare.go index 90c98b4cc..1c1749083 100644 --- a/server/server_prepare.go +++ b/server/server_prepare.go @@ -27,11 +27,11 @@ import ( "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/tidb/pkg/util/gctuner" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/fsutil" - "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/client/v3/concurrency" diff --git a/tests/integration_tests/api_v2/request.go b/tests/integration_tests/api_v2/request.go index e3cc88da2..2283f59be 100644 --- a/tests/integration_tests/api_v2/request.go +++ b/tests/integration_tests/api_v2/request.go @@ -27,10 +27,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/tiflow/cdc/api/middleware" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/httputil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" ) diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index 27fb6b87d..8cd5ac33a 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/ticdc/pkg/retry" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) diff --git a/tests/integration_tests/move_table/main.go b/tests/integration_tests/move_table/main.go index ef169d979..3352d663b 100644 --- a/tests/integration_tests/move_table/main.go +++ b/tests/integration_tests/move_table/main.go @@ -28,9 +28,9 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/httputil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3"