Skip to content

Commit

Permalink
*: support for lag calculation based on PD TSO (#892)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen authored Jan 16, 2025
1 parent e1fcd8c commit ceb4feb
Show file tree
Hide file tree
Showing 53 changed files with 1,668 additions and 369 deletions.
2 changes: 1 addition & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/server"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down
2 changes: 1 addition & 1 deletion coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/messaging/proto"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down
10 changes: 8 additions & 2 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -56,6 +57,8 @@ type EventDispatcherManager struct {
changefeedID common.ChangeFeedID
maintainerID node.ID

pdClock pdutil.Clock

config *config.ChangefeedConfig
filterConfig *eventpb.FilterConfig
// only not nil when enable sync point
Expand Down Expand Up @@ -122,10 +125,12 @@ func NewEventDispatcherManager(
) (*EventDispatcherManager, uint64, error) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock)
manager := &EventDispatcherManager{
dispatcherMap: newDispatcherMap(),
changefeedID: changefeedID,
maintainerID: maintainerID,
pdClock: pdClock,
statusesChan: make(chan TableSpanStatusWithSeq, 8192),
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
errCh: make(chan error, 1),
Expand Down Expand Up @@ -654,8 +659,9 @@ func (e *EventDispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatu
phyCheckpointTs := oracle.ExtractPhysical(message.Watermark.CheckpointTs)
phyResolvedTs := oracle.ExtractPhysical(message.Watermark.ResolvedTs)

e.metricCheckpointTsLag.Set(float64(oracle.GetPhysical(time.Now())-phyCheckpointTs) / 1e3)
e.metricResolvedTsLag.Set(float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3)
pdTime := e.pdClock.CurrentTime()
e.metricCheckpointTsLag.Set(float64(oracle.GetPhysical(pdTime)-phyCheckpointTs) / 1e3)
e.metricResolvedTsLag.Set(float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3)
return &message
}

Expand Down
21 changes: 0 additions & 21 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -428,30 +427,10 @@ func (c *EventCollector) updateMetrics(ctx context.Context) {
metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
metricsDSUsedMemoryUsage.Set(float64(dsMetrics.MemoryControl.UsedMemory))
metricsDSMaxMemoryUsage.Set(float64(dsMetrics.MemoryControl.MaxMemory))
c.updateResolvedTsMetric()
}
}
}

func (c *EventCollector) updateResolvedTsMetric() {
var minResolvedTs uint64
c.dispatcherMap.Range(func(key, value interface{}) bool {
if stat, ok := value.(*dispatcherStat); ok {
d := stat.target
if minResolvedTs == 0 || d.GetResolvedTs() < minResolvedTs {
minResolvedTs = d.GetResolvedTs()
}
}
return true
})

if minResolvedTs > 0 {
phyResolvedTs := oracle.ExtractPhysical(minResolvedTs)
lagMs := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
metrics.EventCollectorResolvedTsLagGauge.Set(lagMs)
}
}

// dispatcherStat is a helper struct to manage the state of a dispatcher.
type dispatcherStat struct {
dispatcherID common.DispatcherID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/zap"
)

Expand Down
12 changes: 6 additions & 6 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -696,23 +696,23 @@ func (e *eventStore) updateMetrics(ctx context.Context) error {
}

func (e *eventStore) updateMetricsOnce() {
currentTime := e.pdClock.CurrentTime()
currentPhyTs := oracle.GetPhysical(currentTime)
pdTime := e.pdClock.CurrentTime()
pdPhyTs := oracle.GetPhysical(pdTime)
minResolvedTs := uint64(0)
e.dispatcherMeta.RLock()
for _, subscriptionStat := range e.dispatcherMeta.subscriptionStats {
// resolved ts lag
resolvedTs := subscriptionStat.resolvedTs.Load()
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
if minResolvedTs == 0 || resolvedTs < minResolvedTs {
minResolvedTs = resolvedTs
}
// checkpoint ts lag
checkpointTs := subscriptionStat.checkpointTs.Load()
watermarkPhyTs := oracle.ExtractPhysical(checkpointTs)
watermarkLag := float64(currentPhyTs-watermarkPhyTs) / 1e3
watermarkLag := float64(pdPhyTs-watermarkPhyTs) / 1e3
metrics.EventStoreDispatcherWatermarkLagHist.Observe(float64(watermarkLag))
}
e.dispatcherMeta.RUnlock()
Expand All @@ -721,7 +721,7 @@ func (e *eventStore) updateMetricsOnce() {
return
}
minResolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
eventStoreResolvedTsLag := float64(currentPhyTs-minResolvedPhyTs) / 1e3
eventStoreResolvedTsLag := float64(pdPhyTs-minResolvedPhyTs) / 1e3
metrics.EventStoreResolvedTsLagGauge.Set(eventStoreResolvedTsLag)
}

Expand Down
5 changes: 3 additions & 2 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
appcontext "github.com/pingcap/ticdc/pkg/common/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -867,8 +867,9 @@ func (s *SubscriptionClient) GetResolvedTsLag() float64 {
if pullerMinResolvedTs == 0 {
return 0
}
pdTime := s.pdClock.CurrentTime()
phyResolvedTs := oracle.ExtractPhysical(pullerMinResolvedTs)
lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
lag := float64(oracle.GetPhysical(pdTime)-phyResolvedTs) / 1e3
return lag
}

Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/subscription_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/pingcap/ticdc/logservice/txnutil"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/tidb/pkg/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/security"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down
6 changes: 3 additions & 3 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -160,9 +160,9 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
tryUpdateResolvedTs := func() {
pendingTs := s.pendingResolvedTs.Load()
defer func() {
currentPhyTs := oracle.GetPhysical(s.pdClock.CurrentTime())
pdPhyTs := oracle.GetPhysical(s.pdClock.CurrentTime())
resolvedPhyTs := oracle.ExtractPhysical(pendingTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
resolvedLag := float64(pdPhyTs-resolvedPhyTs) / 1e3
metrics.SchemaStoreResolvedTsLagGauge.Set(float64(resolvedLag))
}()

Expand Down
Loading

0 comments on commit ceb4feb

Please sign in to comment.