diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 6770b56c8..7ff57d33a 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -248,11 +248,34 @@ func (e *EventDispatcherManager) TryClose(removeChangefeed bool) bool { if e.closing.Load() { return e.closed.Load() } + e.cleanMetrics() e.closing.Store(true) go e.close(removeChangefeed) return false } +func (e *EventDispatcherManager) cleanMetrics() { + metrics.DynamicStreamMemoryUsage.DeleteLabelValues( + "event-collector", + "max", + e.changefeedID.String(), + ) + + metrics.DynamicStreamMemoryUsage.DeleteLabelValues( + "event-collector", + "used", + e.changefeedID.String(), + ) + + metrics.TableTriggerEventDispatcherGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.EventDispatcherGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.CreateDispatcherDuration.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.EventDispatcherManagerCheckpointTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.EventDispatcherManagerResolvedTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.EventDispatcherManagerCheckpointTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.EventDispatcherManagerResolvedTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) +} + func (e *EventDispatcherManager) close(removeChangefeed bool) { log.Info("closing event dispatcher manager", zap.Stringer("changefeedID", e.changefeedID)) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index a4a90165c..c7545cdb6 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -45,8 +45,6 @@ var ( metricsHandleEventDuration = metrics.EventCollectorHandleEventDuration metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("event-collector") metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-collector") - metricsDSMaxMemoryUsage = metrics.DynamicStreamMemoryUsage.WithLabelValues("event-collector", "max") - metricsDSUsedMemoryUsage = metrics.DynamicStreamMemoryUsage.WithLabelValues("event-collector", "used") ) type DispatcherRequest struct { @@ -76,9 +74,11 @@ EventCollector is the relay between EventService and DispatcherManager, responsi EventCollector is an instance-level component. */ type EventCollector struct { - serverId node.ID - dispatcherMap sync.Map - mc messaging.MessageCenter + serverId node.ID + dispatcherMap sync.Map // key: dispatcherID, value: dispatcherStat + changefeedIDMap sync.Map // key: changefeedID.GID, value: changefeedID + + mc messaging.MessageCenter // dispatcherRequestChan cached dispatcher request when some error occurs. dispatcherRequestChan *chann.DrainableChann[DispatcherRequestWithTarget] @@ -164,6 +164,23 @@ func (c *EventCollector) Run(ctx context.Context) { func (c *EventCollector) Close() { c.cancel() c.ds.Close() + + c.changefeedIDMap.Range(func(key, value any) bool { + cfID := value.(common.ChangeFeedID) + // Remove metrics for the changefeed. + metrics.DynamicStreamMemoryUsage.DeleteLabelValues( + "event-collector", + "max", + cfID.String(), + ) + metrics.DynamicStreamMemoryUsage.DeleteLabelValues( + "event-collector", + "used", + cfID.String(), + ) + return true + }) + c.wg.Wait() log.Info("event collector is closed") } @@ -180,6 +197,7 @@ func (c *EventCollector) AddDispatcher(target dispatcher.EventDispatcher, memory stat.reset() stat.sentCommitTs.Store(target.GetStartTs()) c.dispatcherMap.Store(target.GetId(), stat) + c.changefeedIDMap.Store(target.GetChangefeedID().ID(), target.GetChangefeedID()) metrics.EventCollectorRegisteredDispatcherCount.Inc() areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota) @@ -438,8 +456,23 @@ func (c *EventCollector) updateMetrics(ctx context.Context) { dsMetrics := c.ds.GetMetrics() metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize)) metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) - metricsDSUsedMemoryUsage.Set(float64(dsMetrics.MemoryControl.UsedMemory)) - metricsDSMaxMemoryUsage.Set(float64(dsMetrics.MemoryControl.MaxMemory)) + for _, areaMetric := range dsMetrics.MemoryControl.AreaMemoryMetrics { + cfID, ok := c.changefeedIDMap.Load(areaMetric.Area()) + if !ok { + continue + } + changefeedID := cfID.(common.ChangeFeedID) + metrics.DynamicStreamMemoryUsage.WithLabelValues( + "event-collector", + "max", + changefeedID.String(), + ).Set(float64(areaMetric.MaxMemory())) + metrics.DynamicStreamMemoryUsage.WithLabelValues( + "event-collector", + "used", + changefeedID.String(), + ).Set(float64(areaMetric.MemoryUsage())) + } } } } diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 77e7a25ee..d4d3027e5 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -1025,14 +1025,6 @@ "stack": false, "steppedLine": false, "targets": [ - { - "exemplar": true, - "expr": "ticdc_log_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namesapce=~\"$namespace\", changefeed=~\"$changefeed\"}", - "hide": false, - "interval": "", - "legendFormat": "Log-Puller-{{type}}-resolvedTs-Lag-{{instance}}", - "refId": "A" - }, { "exemplar": true, "expr": "ticdc_event_store_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}", @@ -11009,7 +11001,7 @@ "exemplar": true, "expr": "ticdc_dynamic_stream_memory_usage{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\",changefeed=~\"$changefeed\", component!=\"\", instance=~\"$ticdc_instance\"}", "interval": "", - "legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-memory-{{component}}-{{type}}", + "legendFormat": "{{area}}-{{instance}}-memory-{{component}}-{{type}}", "refId": "A" } ], diff --git a/pkg/common/types.go b/pkg/common/types.go index 323f17fb8..0f93f0c45 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -134,6 +134,10 @@ func NewGID() GID { } } +func (g *GID) PString() string { + return g.String() +} + func (g GID) String() string { var buf bytes.Buffer buf.WriteString(strconv.FormatUint(g.Low, 10)) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index ac87fd41a..74fbdb159 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -888,14 +888,29 @@ func (c *eventBroker) removeDispatcher(dispatcherInfo DispatcherInfo) { id := dispatcherInfo.GetID() stat, ok := c.dispatchers.Load(id) if !ok { + stat, ok = c.tableTriggerDispatchers.Load(id) + if !ok { + return + } c.tableTriggerDispatchers.Delete(id) - return } + stat.(*dispatcherStat).changefeedStat.removeDispatcher() + // FIXME: this is a workaround to remove the changefeed status when all dispatchers are removed. + // But some remove dispatcher events missing during the changefeed pausing process, the changefeed status will not be removed expectedly. + // So, we need to find a permanent solution to fix this problem. + if stat.(*dispatcherStat).changefeedStat.dispatcherCount.Load() == 0 { + log.Info("All dispatchers for the changefeed are removed, remove the changefeed status", + zap.Stringer("changefeedID", dispatcherInfo.GetChangefeedID()), + ) + c.changefeedMap.Delete(dispatcherInfo.GetChangefeedID()) + } + stat.(*dispatcherStat).isRemoved.Store(true) c.eventStore.UnregisterDispatcher(id) c.schemaStore.UnregisterTable(dispatcherInfo.GetTableSpan().TableID) c.dispatchers.Delete(id) + log.Info("remove dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Stringer("changefeedID", dispatcherInfo.GetChangefeedID()), diff --git a/pkg/metrics/ds.go b/pkg/metrics/ds.go index 4f4638742..1abca0e73 100644 --- a/pkg/metrics/ds.go +++ b/pkg/metrics/ds.go @@ -21,7 +21,7 @@ var ( Namespace: "ticdc", Subsystem: "dynamic_stream", Name: "memory_usage", - }, []string{"component", "type"}) + }, []string{"component", "type", "area"}) DynamicStreamEventChanSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 96588d86a..689be0d41 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -114,8 +114,6 @@ type MysqlConfig struct { BatchDMLEnable bool MultiStmtEnable bool CachePrepStmts bool - // DryRun is used to enable dry-run mode. In dry-run mode, the writer will not write data to the downstream. - DryRun bool // sync point SyncPointRetention time.Duration @@ -125,6 +123,13 @@ type MysqlConfig struct { MaxAllowedPacket int64 HasVectorType bool // HasVectorType is true if the column is vector type + + // DryRun is used to enable dry-run mode. In dry-run mode, the writer will not write data to the downstream. + DryRun bool + // DryRunDelay is the delay time for dry-run mode, it is used to simulate the delay time of real write. + DryRunDelay time.Duration + // DryRunBlockInterval is the interval time for blocking in dry-run mode. + DryRunBlockInterval time.Duration } // NewConfig returns the default mysql backend config. diff --git a/pkg/sink/mysql/helper.go b/pkg/sink/mysql/helper.go index 754b48695..3f78ef099 100644 --- a/pkg/sink/mysql/helper.go +++ b/pkg/sink/mysql/helper.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strconv" + "time" "github.com/coreos/go-semver/semver" dmysql "github.com/go-sql-driver/mysql" @@ -101,12 +102,6 @@ func GenBasicDSN(cfg *MysqlConfig) (*dmysql.Config, error) { port = "4000" } - dryRun := cfg.sinkURI.Query().Get("dry-run") - if dryRun == "true" { - log.Info("dry-run mode is enabled, will not write data to downstream") - cfg.DryRun = true - } - // This will handle the IPv6 address format. var dsn *dmysql.Config var err error @@ -127,9 +122,45 @@ func GenBasicDSN(cfg *MysqlConfig) (*dmysql.Config, error) { dsn.Params["readTimeout"] = cfg.ReadTimeout dsn.Params["writeTimeout"] = cfg.WriteTimeout dsn.Params["timeout"] = cfg.DialTimeout + + setDryRunConfig(cfg) + return dsn, nil } +func setDryRunConfig(cfg *MysqlConfig) { + dryRun := cfg.sinkURI.Query().Get("dry-run") + if dryRun == "true" { + log.Info("dry-run mode is enabled, will not write data to downstream") + cfg.DryRun = true + } + if !cfg.DryRun { + return + } + + dryRunDelay := cfg.sinkURI.Query().Get("dry-run-delay") + if dryRunDelay != "" { + dryRunDelayInt, err := strconv.Atoi(dryRunDelay) + if err != nil { + log.Error("invalid dry-run-delay", zap.Error(err)) + return + } + log.Info("set dry-run-delay", zap.Duration("duration", time.Duration(dryRunDelayInt)*time.Millisecond)) + cfg.DryRunDelay = time.Duration(dryRunDelayInt) * time.Millisecond + } + + dryRunBlockInterval := cfg.sinkURI.Query().Get("dry-run-block-interval") + if dryRunBlockInterval != "" { + dryRunBlockIntervalInt, err := strconv.Atoi(dryRunBlockInterval) + if err != nil { + log.Error("invalid dry-run-block-interval", zap.Error(err)) + return + } + log.Info("set dry-run-block-interval", zap.Duration("duration", time.Duration(dryRunBlockIntervalInt)*time.Second)) + cfg.DryRunBlockInterval = time.Duration(dryRunBlockIntervalInt) * time.Second + } +} + // GetTestDB checks and adjusts the password of the given DSN, // it will return a DB instance opened with the adjusted password. func GetTestDB(dbConfig *dmysql.Config) (*sql.DB, error) { diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index f752bb29f..96d2eb3e8 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -21,10 +21,12 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/sink/util" + "go.uber.org/zap" ) const ( @@ -61,6 +63,9 @@ type MysqlWriter struct { statistics *metrics.Statistics needFormat bool + + // for dry-run mode + blockerTicker *time.Ticker } func NewMysqlWriter( @@ -71,7 +76,7 @@ func NewMysqlWriter( statistics *metrics.Statistics, needFormatVectorType bool, ) *MysqlWriter { - return &MysqlWriter{ + res := &MysqlWriter{ ctx: ctx, db: db, cfg: cfg, @@ -86,6 +91,12 @@ func NewMysqlWriter( statistics: statistics, needFormat: needFormatVectorType, } + + if cfg.DryRun && cfg.DryRunBlockInterval > 0 { + res.blockerTicker = time.NewTicker(cfg.DryRunBlockInterval) + } + + return res } func (w *MysqlWriter) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { @@ -93,6 +104,13 @@ func (w *MysqlWriter) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStor } func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error { + if w.cfg.DryRun { + for _, callback := range event.PostTxnFlushed { + callback() + } + return nil + } + if w.cfg.IsTiDB { // first we check whether there is some async ddl executed now. w.waitAsyncDDLDone(event) @@ -137,6 +155,13 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error { } func (w *MysqlWriter) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error { + if w.cfg.DryRun { + for _, callback := range event.PostTxnFlushed { + callback() + } + return nil + } + if !w.syncPointTableInit { // create sync point table if not exist err := w.createSyncTable() @@ -190,13 +215,13 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent) error { return errors.Trace(err) } } else { + w.tryDryRunBlock() if err = w.statistics.RecordBatchExecution(func() (int, int64, error) { return dmls.rowCount, dmls.approximateSize, nil }); err != nil { return errors.Trace(err) } } - for _, event := range events { for _, callback := range event.PostTxnFlushed { callback() @@ -205,6 +230,19 @@ func (w *MysqlWriter) Flush(events []*commonEvent.DMLEvent) error { return nil } +func (w *MysqlWriter) tryDryRunBlock() { + time.Sleep(w.cfg.DryRunDelay) + if w.blockerTicker != nil { + select { + case <-w.blockerTicker.C: + log.Info("dry-run mode, blocker ticker triggered, block for a while", + zap.Duration("duration", w.cfg.DryRunBlockInterval)) + time.Sleep(w.cfg.DryRunBlockInterval) + default: + } + } +} + func (w *MysqlWriter) Close() { if w.stmtCache != nil { w.stmtCache.Purge() diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 887a51357..e1d3f4f43 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -117,6 +117,7 @@ func needTimeoutCheck(ddlType timodel.ActionType) bool { func (w *MysqlWriter) execDDL(event *commonEvent.DDLEvent) error { if w.cfg.DryRun { log.Info("Dry run DDL", zap.String("sql", event.GetDDLQuery())) + time.Sleep(w.cfg.DryRunDelay) return nil } diff --git a/tests/integration_tests/ds_memory_control/conf/diff_config.toml b/tests/integration_tests/ds_memory_control/conf/diff_config.toml new file mode 100644 index 000000000..2d31480fb --- /dev/null +++ b/tests/integration_tests/ds_memory_control/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/sink_retry/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["sink_retry.usertable"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/ds_memory_control/conf/workload b/tests/integration_tests/ds_memory_control/conf/workload new file mode 100644 index 000000000..8ac845f3f --- /dev/null +++ b/tests/integration_tests/ds_memory_control/conf/workload @@ -0,0 +1,14 @@ +threadcount=2 +recordcount=100000 +operationcount=1000 +maxexecutiontime=120 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/ds_memory_control/conf/workload-2 b/tests/integration_tests/ds_memory_control/conf/workload-2 new file mode 100644 index 000000000..e39b08aa2 --- /dev/null +++ b/tests/integration_tests/ds_memory_control/conf/workload-2 @@ -0,0 +1,13 @@ +threadcount=2 +recordcount=100000 +operationcount=0 +maxexecutiontime=120 + +readallfields=true + +readproportion=0.2 +updateproportion=0.8 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform \ No newline at end of file diff --git a/tests/integration_tests/ds_memory_control/run.sh b/tests/integration_tests/ds_memory_control/run.sh new file mode 100755 index 000000000..8c5873f0f --- /dev/null +++ b/tests/integration_tests/ds_memory_control/run.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + # Validate sink type is mysql since this test is mysql specific + if [ "$SINK_TYPE" != "mysql" ]; then + echo "skip sink_hang test for $SINK_TYPE" + return 0 + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_sql "CREATE DATABASE ds_memory_control;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=ds_memory_control + export GO_FAILPOINTS='github.com/pingcap/ticdc/utils/dynstream/PausePath=10%return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://normal:123456@127.0.0.1:3306" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE TABLE ds_memory_control.finish_mark_1 (a int primary key);" + sleep 30 + check_table_exists "ds_memory_control.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 + + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY + export GO_FAILPOINTS='github.com/pingcap/ticdc/utils/dynstream/PauseArea=10%return(true)' + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + go-ycsb run mysql -P $CUR/conf/workload-2 -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=ds_memory_control + + run_sql "CREATE TABLE ds_memory_control.finish_mark_2 (a int primary key);" + sleep 30 + check_table_exists "ds_memory_control.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 180 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index c2b6f7488..3540bce5d 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -179,7 +179,7 @@ type DynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] inter // This method can be called at any time. But to avoid the memory leak, setting on a area without existing paths is a no-op. SetAreaSettings(area A, settings AreaSettings) - GetMetrics() Metrics + GetMetrics() Metrics[A] } // PathHasher is used to select target stream for the path. @@ -315,14 +315,11 @@ func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T return newParallelDynamicStream(hasher, handler, opt) } -type Metrics struct { +type Metrics[A Area] struct { EventChanSize int PendingQueueLen int AddPath int RemovePath int - MemoryControl struct { - UsedMemory int64 - MaxMemory int64 - } + MemoryControl MemoryMetric[A] } diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 59e7152ca..3abaa2429 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "go.uber.org/zap" ) @@ -82,7 +83,7 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent( // Add the event to the pending queue. path.pendingQueue.PushBack(event) // Update the pending size. - path.pendingSize.Add(uint32(event.eventSize)) + path.updatePendingSize(int64(event.eventSize)) as.totalPendingSize.Add(int64(event.eventSize)) return true } @@ -91,13 +92,21 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent( // It needs to be called after a event is appended. // Note: Our gaol is to fast pause, and lazy resume. func (as *areaMemStat[A, P, T, D, H]) updatePathPauseState(path *pathInfo[A, P, T, D, H]) { - pause, resume := as.shouldPausePath(path) + pause, resume, memoryUsageRatio := as.shouldPausePath(path) sendFeedback := func(pause bool) { - if !(time.Since(path.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval) { + now := time.Now() + lastTime := path.lastSendFeedbackTime.Load().(time.Time) + + // Fast pause, lazy resume. + if !pause && time.Since(lastTime) < as.settings.Load().FeedbackInterval { return } + if !path.lastSendFeedbackTime.CompareAndSwap(lastTime, now) { + return // Another goroutine already updated the time + } + feedbackType := PausePath if !pause { feedbackType = ResumePath @@ -109,10 +118,18 @@ func (as *areaMemStat[A, P, T, D, H]) updatePathPauseState(path *pathInfo[A, P, Dest: path.dest, FeedbackType: feedbackType, } - path.lastSendFeedbackTime.Store(time.Now()) path.paused.Store(pause) + + log.Info("send path feedback", zap.Any("area", as.area), + zap.Any("path", path.path), zap.Stringer("feedbackType", feedbackType), + zap.Float64("memoryUsageRatio", memoryUsageRatio)) } + failpoint.Inject("PausePath", func() { + log.Warn("inject PausePath") + sendFeedback(true) + }) + switch { case pause: sendFeedback(true) @@ -122,28 +139,49 @@ func (as *areaMemStat[A, P, T, D, H]) updatePathPauseState(path *pathInfo[A, P, } func (as *areaMemStat[A, P, T, D, H]) updateAreaPauseState(path *pathInfo[A, P, T, D, H]) { - pause, resume := as.shouldPauseArea() + pause, resume, memoryUsageRatio := as.shouldPauseArea() sendFeedback := func(pause bool) { + now := time.Now() + lastTime := as.lastSendFeedbackTime.Load().(time.Time) + + // Fast pause, lazy resume. + if !pause && time.Since(lastTime) < as.settings.Load().FeedbackInterval { + return + } + + if !as.lastSendFeedbackTime.CompareAndSwap(lastTime, now) { + return // Another goroutine already updated the time + } + feedbackType := PauseArea if !pause { feedbackType = ResumeArea } - if !(time.Since(as.lastSendFeedbackTime.Load().(time.Time)) >= as.settings.Load().FeedbackInterval) { - return - } as.feedbackChan <- Feedback[A, P, D]{ Area: as.area, Path: path.path, Dest: path.dest, FeedbackType: feedbackType, } - - as.lastSendFeedbackTime.Store(time.Now()) as.paused.Store(pause) + + log.Info("send area feedback", + zap.Any("area", as.area), + zap.Stringer("feedbackType", feedbackType), + zap.Float64("memoryUsageRatio", memoryUsageRatio), + zap.Time("lastTime", lastTime), + zap.Time("now", now), + zap.Duration("sinceLastTime", time.Since(lastTime)), + ) } + failpoint.Inject("PauseArea", func() { + log.Warn("inject PauseArea") + sendFeedback(true) + }) + switch { case pause: sendFeedback(true) @@ -154,43 +192,40 @@ func (as *areaMemStat[A, P, T, D, H]) updateAreaPauseState(path *pathInfo[A, P, // shouldPausePath determines if a path should be paused based on memory usage. // If the memory usage is greater than the 20% of max pending size, the path should be paused. -func (as *areaMemStat[A, P, T, D, H]) shouldPausePath(path *pathInfo[A, P, T, D, H]) (pause bool, resume bool) { - memoryUsageRatio := float64(path.pendingSize.Load()) / float64(as.settings.Load().MaxPendingSize) +func (as *areaMemStat[A, P, T, D, H]) shouldPausePath(path *pathInfo[A, P, T, D, H]) (pause bool, resume bool, memoryUsageRatio float64) { + memoryUsageRatio = float64(path.pendingSize.Load()) / float64(as.settings.Load().MaxPendingSize) switch { case path.paused.Load(): // If the path is paused, we only need to resume it when the memory usage is less than 10%. if memoryUsageRatio < 0.1 { - log.Info("resume path", zap.Any("area", as.area), zap.Any("path", path.path), zap.Float64("memoryUsageRatio", memoryUsageRatio)) resume = true } default: - // If the path is not paused, we need to pause it when the memory usage is greater than 20% of max pending size. + // If the path is not paused, we need to pause it when the memory usage is greater than 30% of max pending size. if memoryUsageRatio >= 0.2 { - log.Info("pause path", zap.Any("area", as.area), zap.Any("path", path.path), zap.Float64("memoryUsageRatio", memoryUsageRatio)) pause = true } } - return + return pause, resume, memoryUsageRatio } // shouldPauseArea determines if the area should be paused based on memory usage. // If the memory usage is greater than the 80% of max pending size, the area should be paused. -func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool) { - memoryUsageRatio := float64(as.totalPendingSize.Load()) / float64(as.settings.Load().MaxPendingSize) +func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool, memoryUsageRatio float64) { + memoryUsageRatio = float64(as.totalPendingSize.Load()) / float64(as.settings.Load().MaxPendingSize) + switch { case as.paused.Load(): // If the area is already paused, we need to resume it when the memory usage is less than 50%. if memoryUsageRatio < 0.5 { resume = true - log.Info("resume area", zap.Any("area", as.area), zap.Float64("memoryUsageRatio", memoryUsageRatio)) } default: // If the area is not paused, we need to pause it when the memory usage is greater than 80% of max pending size. if memoryUsageRatio >= 0.8 { pause = true - log.Info("pause area", zap.Any("area", as.area), zap.Float64("memoryUsageRatio", memoryUsageRatio)) } } @@ -263,14 +298,43 @@ func (m *memControl[A, P, T, D, H]) removePathFromArea(path *pathInfo[A, P, T, D } // FIXME/TODO: We use global metric here, which is not good for multiple streams. -func (m *memControl[A, P, T, D, H]) getMetrics() (usedMemory int64, maxMemory int64) { +func (m *memControl[A, P, T, D, H]) getMetrics() MemoryMetric[A] { m.mutex.Lock() defer m.mutex.Unlock() - usedMemory = int64(0) - maxMemory = int64(0) + metrics := MemoryMetric[A]{} for _, area := range m.areaStatMap { - usedMemory += area.totalPendingSize.Load() - maxMemory += int64(area.settings.Load().MaxPendingSize) + areaMetric := AreaMemoryMetric[A]{ + area: area.area, + usedMemory: area.totalPendingSize.Load(), + maxMemory: int64(area.settings.Load().MaxPendingSize), + } + metrics.AreaMemoryMetrics = append(metrics.AreaMemoryMetrics, areaMetric) } - return usedMemory, maxMemory + return metrics +} + +type MemoryMetric[A Area] struct { + AreaMemoryMetrics []AreaMemoryMetric[A] +} + +type AreaMemoryMetric[A Area] struct { + area A + usedMemory int64 + maxMemory int64 +} + +func (a *AreaMemoryMetric[A]) MemoryUsageRatio() float64 { + return float64(a.usedMemory) / float64(a.maxMemory) +} + +func (a *AreaMemoryMetric[A]) MemoryUsage() int64 { + return a.usedMemory +} + +func (a *AreaMemoryMetric[A]) MaxMemory() int64 { + return a.maxMemory +} + +func (a *AreaMemoryMetric[A]) Area() A { + return a.area } diff --git a/utils/dynstream/memory_control_test.go b/utils/dynstream/memory_control_test.go index 59341cb89..f83fe67c9 100644 --- a/utils/dynstream/memory_control_test.go +++ b/utils/dynstream/memory_control_test.go @@ -178,36 +178,36 @@ func TestShouldPausePath(t *testing.T) { areaMemStat := newAreaMemStat(path.area, mc, settings, nil) path.areaMemStat = areaMemStat - path.pendingSize.Store(uint32(10)) - pause, resume := path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(10)) + pause, resume, _ := path.areaMemStat.shouldPausePath(path) require.False(t, pause) require.False(t, resume) - path.pendingSize.Store(uint32(15)) - pause, resume = path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(15)) + pause, resume, _ = path.areaMemStat.shouldPausePath(path) require.False(t, pause) require.False(t, resume) - path.pendingSize.Store(uint32(20)) - pause, resume = path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(20)) + pause, resume, _ = path.areaMemStat.shouldPausePath(path) require.True(t, pause) path.paused.Store(true) require.False(t, resume) - path.pendingSize.Store(uint32(15)) - pause, resume = path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(15)) + pause, resume, _ = path.areaMemStat.shouldPausePath(path) require.False(t, pause) require.False(t, resume) - path.pendingSize.Store(uint32(9)) - pause, resume = path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(9)) + pause, resume, _ = path.areaMemStat.shouldPausePath(path) require.False(t, pause) require.True(t, resume) path.paused.Store(false) - path.pendingSize.Store(uint32(15)) - pause, resume = path.areaMemStat.shouldPausePath(path) + path.pendingSize.Store(int64(15)) + pause, resume, _ = path.areaMemStat.shouldPausePath(path) require.False(t, pause) require.False(t, resume) } @@ -221,34 +221,34 @@ func TestShouldPauseArea(t *testing.T) { areaMemStat := newAreaMemStat(path.area, mc, settings, nil) areaMemStat.totalPendingSize.Store(int64(10)) - pause, resume := areaMemStat.shouldPauseArea() + pause, resume, _ := areaMemStat.shouldPauseArea() require.False(t, pause) require.False(t, resume) areaMemStat.totalPendingSize.Store(int64(60)) - pause, resume = areaMemStat.shouldPauseArea() + pause, resume, _ = areaMemStat.shouldPauseArea() require.False(t, pause) require.False(t, resume) areaMemStat.totalPendingSize.Store(int64(80)) - pause, resume = areaMemStat.shouldPauseArea() + pause, resume, _ = areaMemStat.shouldPauseArea() require.True(t, pause) areaMemStat.paused.Store(true) require.False(t, resume) areaMemStat.totalPendingSize.Store(int64(60)) - pause, resume = areaMemStat.shouldPauseArea() + pause, resume, _ = areaMemStat.shouldPauseArea() require.False(t, pause) require.False(t, resume) areaMemStat.totalPendingSize.Store(int64(49)) - pause, resume = areaMemStat.shouldPauseArea() + pause, resume, _ = areaMemStat.shouldPauseArea() require.False(t, pause) require.True(t, resume) areaMemStat.paused.Store(false) areaMemStat.totalPendingSize.Store(int64(60)) - pause, resume = areaMemStat.shouldPauseArea() + pause, resume, _ = areaMemStat.shouldPauseArea() require.False(t, pause) require.False(t, resume) } @@ -285,22 +285,22 @@ func TestSetAreaSettings(t *testing.T) { func TestGetMetrics(t *testing.T) { mc, path := setupTestComponents() - usedMemory, maxMemory := mc.getMetrics() - require.Equal(t, int64(0), usedMemory) - require.Equal(t, int64(0), maxMemory) + metrics := mc.getMetrics() + require.Equal(t, 0, len(metrics.AreaMemoryMetrics)) mc.addPathToArea(path, AreaSettings{ MaxPendingSize: 100, FeedbackInterval: time.Second, }, nil) - usedMemory, maxMemory = mc.getMetrics() - require.Equal(t, int64(0), usedMemory) - require.Equal(t, int64(100), maxMemory) + metrics = mc.getMetrics() + require.Equal(t, 1, len(metrics.AreaMemoryMetrics)) + require.Equal(t, int64(0), metrics.AreaMemoryMetrics[0].usedMemory) + require.Equal(t, int64(100), metrics.AreaMemoryMetrics[0].maxMemory) path.areaMemStat.totalPendingSize.Store(100) - usedMemory, maxMemory = mc.getMetrics() - require.Equal(t, int64(100), usedMemory) - require.Equal(t, int64(100), maxMemory) + metrics = mc.getMetrics() + require.Equal(t, int64(100), metrics.AreaMemoryMetrics[0].usedMemory) + require.Equal(t, int64(100), metrics.AreaMemoryMetrics[0].maxMemory) } func TestUpdateAreaPauseState(t *testing.T) { @@ -362,18 +362,18 @@ func TestUpdatePathPauseState(t *testing.T) { mc.addPathToArea(path, settings, feedbackChan) areaMemStat := path.areaMemStat - path.pendingSize.Store(uint32(10)) + path.pendingSize.Store(int64(10)) areaMemStat.updatePathPauseState(path) require.False(t, path.paused.Load()) - path.pendingSize.Store(uint32(60)) + path.pendingSize.Store(int64(60)) areaMemStat.updatePathPauseState(path) require.True(t, path.paused.Load()) fb := <-feedbackChan require.Equal(t, PausePath, fb.FeedbackType) require.Equal(t, path.area, fb.Area) - path.pendingSize.Store(uint32(9)) + path.pendingSize.Store(int64(9)) areaMemStat.updatePathPauseState(path) require.True(t, path.paused.Load()) diff --git a/utils/dynstream/parallel_dynamic_stream.go b/utils/dynstream/parallel_dynamic_stream.go index cdeaa3f45..6f9dfef39 100644 --- a/utils/dynstream/parallel_dynamic_stream.go +++ b/utils/dynstream/parallel_dynamic_stream.go @@ -175,8 +175,8 @@ func (s *parallelDynamicStream[A, P, T, D, H]) SetAreaSettings(area A, settings } } -func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics { - metrics := Metrics{} +func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics[A] { + metrics := Metrics[A]{} for _, ds := range s.streams { size := ds.getPendingSize() metrics.PendingQueueLen += size @@ -185,9 +185,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics { metrics.RemovePath = int(s._statRemovePathCount.Load()) if s.memControl != nil { - usedMemory, maxMemory := s.memControl.getMetrics() - metrics.MemoryControl.UsedMemory = usedMemory - metrics.MemoryControl.MaxMemory = maxMemory + metrics.MemoryControl = s.memControl.getMetrics() } return metrics diff --git a/utils/dynstream/stream.go b/utils/dynstream/stream.go index ae0fd7ea7..55347bacc 100644 --- a/utils/dynstream/stream.go +++ b/utils/dynstream/stream.go @@ -288,8 +288,8 @@ type pathInfo[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct { // Fields used by the memory control. areaMemStat *areaMemStat[A, P, T, D, H] - pendingSize atomic.Uint32 // The total size(bytes) of pending events in the pendingQueue of the path. - paused atomic.Bool // The path is paused to send events. + pendingSize atomic.Int64 // The total size(bytes) of pending events in the pendingQueue of the path. + paused atomic.Bool // The path is paused to send events. lastSendFeedbackTime atomic.Value } @@ -318,7 +318,7 @@ func (pi *pathInfo[A, P, T, D, H]) appendEvent(event eventWrap[A, P, T, D, H], h if event.eventType.Property != PeriodicSignal { pi.pendingQueue.PushBack(event) - pi.pendingSize.Add(uint32(event.eventSize)) + pi.updatePendingSize(int64(event.eventSize)) return true } @@ -339,7 +339,7 @@ func (pi *pathInfo[A, P, T, D, H]) popEvent() (eventWrap[A, P, T, D, H], bool) { if !ok { return eventWrap[A, P, T, D, H]{}, false } - pi.pendingSize.Add(uint32(-e.eventSize)) + pi.updatePendingSize(int64(-e.eventSize)) if pi.areaMemStat != nil { pi.areaMemStat.decPendingSize(pi, int64(e.eventSize)) @@ -347,6 +347,25 @@ func (pi *pathInfo[A, P, T, D, H]) popEvent() (eventWrap[A, P, T, D, H], bool) { return e, true } +func (pi *pathInfo[A, P, T, D, H]) updatePendingSize(delta int64) { + oldSize := pi.pendingSize.Load() + // Check for integer overflow/underflow + newSize := oldSize + delta + if delta > 0 && newSize < oldSize { + log.Error("Integer overflow detected in updatePendingSize", + zap.Int64("oldSize", oldSize), + zap.Int64("delta", delta)) + return + } + if delta < 0 && newSize > oldSize { + log.Error("Integer underflow detected in updatePendingSize", + zap.Int64("oldSize", oldSize), + zap.Int64("delta", delta)) + return + } + pi.pendingSize.Store(newSize) +} + // eventWrap contains the event and the path info. // It can be a event or a wake signal. type eventWrap[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct { diff --git a/utils/dynstream/stream_test.go b/utils/dynstream/stream_test.go index 8ed4817dd..c7d888228 100644 --- a/utils/dynstream/stream_test.go +++ b/utils/dynstream/stream_test.go @@ -198,7 +198,7 @@ func TestPathInfo(t *testing.T) { pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](1, "test/path", nil) require.Equal(t, 1, pi.area) require.Equal(t, "test/path", pi.path) - require.Equal(t, uint32(0), pi.pendingSize.Load()) + require.Equal(t, int64(0), pi.pendingSize.Load()) require.Equal(t, false, pi.paused.Load()) require.Equal(t, time.Unix(0, 0), pi.lastSendFeedbackTime.Load()) }