Skip to content

Commit

Permalink
DS, mysql: enhance dry run mode and refactor DS memory control (#996)
Browse files Browse the repository at this point in the history
ref #994
  • Loading branch information
asddongmen authored Feb 17, 2025
1 parent 6210d2c commit 21a6f5f
Show file tree
Hide file tree
Showing 20 changed files with 436 additions and 100 deletions.
23 changes: 23 additions & 0 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,34 @@ func (e *EventDispatcherManager) TryClose(removeChangefeed bool) bool {
if e.closing.Load() {
return e.closed.Load()
}
e.cleanMetrics()
e.closing.Store(true)
go e.close(removeChangefeed)
return false
}

func (e *EventDispatcherManager) cleanMetrics() {
metrics.DynamicStreamMemoryUsage.DeleteLabelValues(
"event-collector",
"max",
e.changefeedID.String(),
)

metrics.DynamicStreamMemoryUsage.DeleteLabelValues(
"event-collector",
"used",
e.changefeedID.String(),
)

metrics.TableTriggerEventDispatcherGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.CreateDispatcherDuration.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerCheckpointTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerResolvedTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerCheckpointTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerResolvedTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
}

func (e *EventDispatcherManager) close(removeChangefeed bool) {
log.Info("closing event dispatcher manager",
zap.Stringer("changefeedID", e.changefeedID))
Expand Down
47 changes: 40 additions & 7 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ var (
metricsHandleEventDuration = metrics.EventCollectorHandleEventDuration
metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("event-collector")
metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-collector")
metricsDSMaxMemoryUsage = metrics.DynamicStreamMemoryUsage.WithLabelValues("event-collector", "max")
metricsDSUsedMemoryUsage = metrics.DynamicStreamMemoryUsage.WithLabelValues("event-collector", "used")
)

type DispatcherRequest struct {
Expand Down Expand Up @@ -76,9 +74,11 @@ EventCollector is the relay between EventService and DispatcherManager, responsi
EventCollector is an instance-level component.
*/
type EventCollector struct {
serverId node.ID
dispatcherMap sync.Map
mc messaging.MessageCenter
serverId node.ID
dispatcherMap sync.Map // key: dispatcherID, value: dispatcherStat
changefeedIDMap sync.Map // key: changefeedID.GID, value: changefeedID

mc messaging.MessageCenter

// dispatcherRequestChan cached dispatcher request when some error occurs.
dispatcherRequestChan *chann.DrainableChann[DispatcherRequestWithTarget]
Expand Down Expand Up @@ -164,6 +164,23 @@ func (c *EventCollector) Run(ctx context.Context) {
func (c *EventCollector) Close() {
c.cancel()
c.ds.Close()

c.changefeedIDMap.Range(func(key, value any) bool {
cfID := value.(common.ChangeFeedID)
// Remove metrics for the changefeed.
metrics.DynamicStreamMemoryUsage.DeleteLabelValues(
"event-collector",
"max",
cfID.String(),
)
metrics.DynamicStreamMemoryUsage.DeleteLabelValues(
"event-collector",
"used",
cfID.String(),
)
return true
})

c.wg.Wait()
log.Info("event collector is closed")
}
Expand All @@ -180,6 +197,7 @@ func (c *EventCollector) AddDispatcher(target dispatcher.EventDispatcher, memory
stat.reset()
stat.sentCommitTs.Store(target.GetStartTs())
c.dispatcherMap.Store(target.GetId(), stat)
c.changefeedIDMap.Store(target.GetChangefeedID().ID(), target.GetChangefeedID())
metrics.EventCollectorRegisteredDispatcherCount.Inc()

areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota)
Expand Down Expand Up @@ -438,8 +456,23 @@ func (c *EventCollector) updateMetrics(ctx context.Context) {
dsMetrics := c.ds.GetMetrics()
metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize))
metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
metricsDSUsedMemoryUsage.Set(float64(dsMetrics.MemoryControl.UsedMemory))
metricsDSMaxMemoryUsage.Set(float64(dsMetrics.MemoryControl.MaxMemory))
for _, areaMetric := range dsMetrics.MemoryControl.AreaMemoryMetrics {
cfID, ok := c.changefeedIDMap.Load(areaMetric.Area())
if !ok {
continue
}
changefeedID := cfID.(common.ChangeFeedID)
metrics.DynamicStreamMemoryUsage.WithLabelValues(
"event-collector",
"max",
changefeedID.String(),
).Set(float64(areaMetric.MaxMemory()))
metrics.DynamicStreamMemoryUsage.WithLabelValues(
"event-collector",
"used",
changefeedID.String(),
).Set(float64(areaMetric.MemoryUsage()))
}
}
}
}
Expand Down
10 changes: 1 addition & 9 deletions metrics/grafana/ticdc_new_arch.json
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,6 @@
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "ticdc_log_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namesapce=~\"$namespace\", changefeed=~\"$changefeed\"}",
"hide": false,
"interval": "",
"legendFormat": "Log-Puller-{{type}}-resolvedTs-Lag-{{instance}}",
"refId": "A"
},
{
"exemplar": true,
"expr": "ticdc_event_store_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}",
Expand Down Expand Up @@ -11009,7 +11001,7 @@
"exemplar": true,
"expr": "ticdc_dynamic_stream_memory_usage{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\",changefeed=~\"$changefeed\", component!=\"\", instance=~\"$ticdc_instance\"}",
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-memory-{{component}}-{{type}}",
"legendFormat": "{{area}}-{{instance}}-memory-{{component}}-{{type}}",
"refId": "A"
}
],
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func NewGID() GID {
}
}

func (g *GID) PString() string {
return g.String()
}

func (g GID) String() string {
var buf bytes.Buffer
buf.WriteString(strconv.FormatUint(g.Low, 10))
Expand Down
17 changes: 16 additions & 1 deletion pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,14 +888,29 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) {
id := dispatcherInfo.GetID()
stat, ok := c.dispatchers.Load(id)
if !ok {
stat, ok = c.tableTriggerDispatchers.Load(id)
if !ok {
return
}
c.tableTriggerDispatchers.Delete(id)
return
}

stat.(*dispatcherStat).changefeedStat.removeDispatcher()
// FIXME: this is a workaround to remove the changefeed status when all dispatchers are removed.
// But some remove dispatcher events missing during the changefeed pausing process, the changefeed status will not be removed expectedly.
// So, we need to find a permanent solution to fix this problem.
if stat.(*dispatcherStat).changefeedStat.dispatcherCount.Load() == 0 {
log.Info("All dispatchers for the changefeed are removed, remove the changefeed status",
zap.Stringer("changefeedID", dispatcherInfo.GetChangefeedID()),
)
c.changefeedMap.Delete(dispatcherInfo.GetChangefeedID())
}

stat.(*dispatcherStat).isRemoved.Store(true)
c.eventStore.UnregisterDispatcher(id)
c.schemaStore.UnregisterTable(dispatcherInfo.GetTableSpan().TableID)
c.dispatchers.Delete(id)

log.Info("remove dispatcher",
zap.Uint64("clusterID", c.tidbClusterID),
zap.Stringer("changefeedID", dispatcherInfo.GetChangefeedID()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
Namespace: "ticdc",
Subsystem: "dynamic_stream",
Name: "memory_usage",
}, []string{"component", "type"})
}, []string{"component", "type", "area"})
DynamicStreamEventChanSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down
9 changes: 7 additions & 2 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ type MysqlConfig struct {
BatchDMLEnable bool
MultiStmtEnable bool
CachePrepStmts bool
// DryRun is used to enable dry-run mode. In dry-run mode, the writer will not write data to the downstream.
DryRun bool

// sync point
SyncPointRetention time.Duration
Expand All @@ -125,6 +123,13 @@ type MysqlConfig struct {
MaxAllowedPacket int64

HasVectorType bool // HasVectorType is true if the column is vector type

// DryRun is used to enable dry-run mode. In dry-run mode, the writer will not write data to the downstream.
DryRun bool
// DryRunDelay is the delay time for dry-run mode, it is used to simulate the delay time of real write.
DryRunDelay time.Duration
// DryRunBlockInterval is the interval time for blocking in dry-run mode.
DryRunBlockInterval time.Duration
}

// NewConfig returns the default mysql backend config.
Expand Down
43 changes: 37 additions & 6 deletions pkg/sink/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"strconv"
"time"

"github.com/coreos/go-semver/semver"
dmysql "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -101,12 +102,6 @@ func GenBasicDSN(cfg *MysqlConfig) (*dmysql.Config, error) {
port = "4000"
}

dryRun := cfg.sinkURI.Query().Get("dry-run")
if dryRun == "true" {
log.Info("dry-run mode is enabled, will not write data to downstream")
cfg.DryRun = true
}

// This will handle the IPv6 address format.
var dsn *dmysql.Config
var err error
Expand All @@ -127,9 +122,45 @@ func GenBasicDSN(cfg *MysqlConfig) (*dmysql.Config, error) {
dsn.Params["readTimeout"] = cfg.ReadTimeout
dsn.Params["writeTimeout"] = cfg.WriteTimeout
dsn.Params["timeout"] = cfg.DialTimeout

setDryRunConfig(cfg)

return dsn, nil
}

func setDryRunConfig(cfg *MysqlConfig) {
dryRun := cfg.sinkURI.Query().Get("dry-run")
if dryRun == "true" {
log.Info("dry-run mode is enabled, will not write data to downstream")
cfg.DryRun = true
}
if !cfg.DryRun {
return
}

dryRunDelay := cfg.sinkURI.Query().Get("dry-run-delay")
if dryRunDelay != "" {
dryRunDelayInt, err := strconv.Atoi(dryRunDelay)
if err != nil {
log.Error("invalid dry-run-delay", zap.Error(err))
return
}
log.Info("set dry-run-delay", zap.Duration("duration", time.Duration(dryRunDelayInt)*time.Millisecond))
cfg.DryRunDelay = time.Duration(dryRunDelayInt) * time.Millisecond
}

dryRunBlockInterval := cfg.sinkURI.Query().Get("dry-run-block-interval")
if dryRunBlockInterval != "" {
dryRunBlockIntervalInt, err := strconv.Atoi(dryRunBlockInterval)
if err != nil {
log.Error("invalid dry-run-block-interval", zap.Error(err))
return
}
log.Info("set dry-run-block-interval", zap.Duration("duration", time.Duration(dryRunBlockIntervalInt)*time.Second))
cfg.DryRunBlockInterval = time.Duration(dryRunBlockIntervalInt) * time.Second
}
}

// GetTestDB checks and adjusts the password of the given DSN,
// it will return a DB instance opened with the adjusted password.
func GetTestDB(dbConfig *dmysql.Config) (*sql.DB, error) {
Expand Down
42 changes: 40 additions & 2 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

lru "github.com/hashicorp/golang-lru"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/util"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -61,6 +63,9 @@ type MysqlWriter struct {

statistics *metrics.Statistics
needFormat bool

// for dry-run mode
blockerTicker *time.Ticker
}

func NewMysqlWriter(
Expand All @@ -71,7 +76,7 @@ func NewMysqlWriter(
statistics *metrics.Statistics,
needFormatVectorType bool,
) *MysqlWriter {
return &MysqlWriter{
res := &MysqlWriter{
ctx: ctx,
db: db,
cfg: cfg,
Expand All @@ -86,13 +91,26 @@ func NewMysqlWriter(
statistics: statistics,
needFormat: needFormatVectorType,
}

if cfg.DryRun && cfg.DryRunBlockInterval > 0 {
res.blockerTicker = time.NewTicker(cfg.DryRunBlockInterval)
}

return res
}

func (w *MysqlWriter) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
w.tableSchemaStore = tableSchemaStore
}

func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error {
if w.cfg.DryRun {
for _, callback := range event.PostTxnFlushed {
callback()
}
return nil
}

if w.cfg.IsTiDB {
// first we check whether there is some async ddl executed now.
w.waitAsyncDDLDone(event)
Expand Down Expand Up @@ -137,6 +155,13 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error {
}

func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error {
if w.cfg.DryRun {
for _, callback := range event.PostTxnFlushed {
callback()
}
return nil
}

if !w.syncPointTableInit {
// create sync point table if not exist
err := w.createSyncTable()
Expand Down Expand Up @@ -190,13 +215,13 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent) error {
return errors.Trace(err)
}
} else {
w.tryDryRunBlock()
if err = w.statistics.RecordBatchExecution(func() (int, int64, error) {
return dmls.rowCount, dmls.approximateSize, nil
}); err != nil {
return errors.Trace(err)
}
}

for _, event := range events {
for _, callback := range event.PostTxnFlushed {
callback()
Expand All @@ -205,6 +230,19 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent) error {
return nil
}

func (w *MysqlWriter) tryDryRunBlock() {
time.Sleep(w.cfg.DryRunDelay)
if w.blockerTicker != nil {
select {
case <-w.blockerTicker.C:
log.Info("dry-run mode, blocker ticker triggered, block for a while",
zap.Duration("duration", w.cfg.DryRunBlockInterval))
time.Sleep(w.cfg.DryRunBlockInterval)
default:
}
}
}

func (w *MysqlWriter) Close() {
if w.stmtCache != nil {
w.stmtCache.Purge()
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/mysql/mysql_writer_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func needTimeoutCheck(ddlType timodel.ActionType) bool {
func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error {
if w.cfg.DryRun {
log.Info("Dry run DDL", zap.String("sql", event.GetDDLQuery()))
time.Sleep(w.cfg.DryRunDelay)
return nil
}

Expand Down
Loading

0 comments on commit 21a6f5f

Please sign in to comment.