diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_name.go b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go index 5f0cb202413..98e1288fb23 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/metric_name.go +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go @@ -103,7 +103,7 @@ type AggregatedMetricName struct { } var ( - KnownMetricNames = make([]MetricName, 0) + KnownMetricNames = make(MetricNames, 0) // aggregatedMetricNames precomputes the aggregated metric names for all known metric names, // mapped to their breakdowns. e.g. "self/loadavg" -> {SelfScope, LoadAvgMetricName} // This means: diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go index 3f5e1a29d7f..1221d4052b9 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go @@ -26,11 +26,24 @@ import ( var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{}) type CustomQuerySelfMetric struct { - customQuery atomic.Value + customQueryFuncPtr atomic.Pointer[func() string] } -func (m *CustomQuerySelfMetric) SetQuery(query string) { - m.customQuery.Store(query) +func (m *CustomQuerySelfMetric) GetQuery() string { + customQueryFunc := m.customQueryFuncPtr.Load() + if customQueryFunc == nil { + return "" + } + query := (*customQueryFunc)() + return query +} + +func (m *CustomQuerySelfMetric) SetQueryFunc(f func() string) { + if f == nil { + m.customQueryFuncPtr.Store(nil) + return + } + m.customQueryFuncPtr.Store(&f) } func (m *CustomQuerySelfMetric) Name() MetricName { @@ -50,9 +63,5 @@ func (m *CustomQuerySelfMetric) RequiresConn() bool { } func (m *CustomQuerySelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric { - query := "" - if val := m.customQuery.Load(); val != nil { - query = val.(string) - } - return ReadSelfMySQLThrottleMetric(ctx, conn, query) + return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery()) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go index 5d1ac502f95..a0a6776b2d1 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go @@ -16,10 +16,16 @@ limitations under the License. package base +import ( + "context" + "fmt" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + var _ SelfMetric = registerSelfMetric(&DefaultSelfMetric{}) type DefaultSelfMetric struct { - LagSelfMetric } func (m *DefaultSelfMetric) Name() MetricName { @@ -29,3 +35,17 @@ func (m *DefaultSelfMetric) Name() MetricName { func (m *DefaultSelfMetric) DefaultScope() Scope { return SelfScope } + +func (m *DefaultSelfMetric) DefaultThreshold() float64 { + return 0 +} + +func (m *DefaultSelfMetric) RequiresConn() bool { + return false +} + +func (m *DefaultSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric { + return &ThrottleMetric{ + Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"), + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go index a4523a5ad53..496505180ed 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go @@ -18,18 +18,34 @@ package base import ( "context" + "sync/atomic" "time" + "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var ( - LagSelfMetricQuery = "show global status like 'threads_running'" + lagSelfMetricQueryBase = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat" ) var _ SelfMetric = registerSelfMetric(&LagSelfMetric{}) type LagSelfMetric struct { + lagSelfMetricQuery atomic.Value +} + +// SetQuery is only used by unit tests to override the query. +func (m *LagSelfMetric) SetQuery(query string) { + m.lagSelfMetricQuery.Store(query) +} + +func (m *LagSelfMetric) GetQuery() string { + if query := m.lagSelfMetricQuery.Load(); query == nil { + m.lagSelfMetricQuery.Store(sqlparser.BuildParsedQuery(lagSelfMetricQueryBase, sidecar.GetIdentifier()).Query) + } + return m.lagSelfMetricQuery.Load().(string) } func (m *LagSelfMetric) Name() MetricName { @@ -49,5 +65,5 @@ func (m *LagSelfMetric) RequiresConn() bool { } func (m *LagSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric { - return ReadSelfMySQLThrottleMetric(ctx, conn, LagSelfMetricQuery) + return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery()) } diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go index 8fa31c55b61..5add1286a95 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go @@ -19,14 +19,11 @@ package base import ( "context" - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" ) var ( - lagSelfMetricQueryBase = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat" - lagSelfMetricQuery = sqlparser.BuildParsedQuery(lagSelfMetricQueryBase, sidecar.GetIdentifier()).Query + threadsRunningMetricQuery = "show global status like 'threads_running'" ) var _ SelfMetric = registerSelfMetric(&ThreadsRunningSelfMetric{}) @@ -51,5 +48,5 @@ func (m *ThreadsRunningSelfMetric) RequiresConn() bool { } func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric { - return ReadSelfMySQLThrottleMetric(ctx, conn, lagSelfMetricQuery) + return ReadSelfMySQLThrottleMetric(ctx, conn, threadsRunningMetricQuery) } diff --git a/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go index d4beb40deb4..76d5f2dd298 100644 --- a/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go +++ b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go @@ -41,32 +41,15 @@ limitations under the License. package config -import ( - "sync/atomic" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" -) - // // MySQL-specific configuration // -type MySQLMetricConfigurationSettings struct { - Name base.MetricName - CustomQuery string - Threshold atomic.Uint64 -} - // MySQLConfigurationSettings has the general configuration for all MySQL clusters type MySQLConfigurationSettings struct { - CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value - Port int // Specify if different than 3306; applies to all clusters - IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors - IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds - IgnoreHostsThreshold float64 // Threshold beyond which IgnoreHostsCount applies (default: 0) - HTTPCheckPort int // port for HTTP check. -1 to disable. - HTTPCheckPath string // If non-empty, requires HTTPCheckPort - IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped - - Metrics map[base.MetricName]*MySQLMetricConfigurationSettings + CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value + Port int // Specify if different than 3306; applies to all clusters + IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors + IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds + IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 884013f618a..792f56c2a00 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -42,15 +42,12 @@ limitations under the License. package throttle import ( - "bufio" "context" "errors" "fmt" "math" "math/rand/v2" "net/http" - "os" - "strconv" "strings" "sync" "sync/atomic" @@ -59,7 +56,6 @@ import ( "github.com/patrickmn/go-cache" "github.com/spf13/pflag" - "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/stats" @@ -69,7 +65,6 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -107,14 +102,6 @@ const ( var ( throttleTabletTypes = "replica" - - defaultThresholds = map[base.MetricName]float64{ - base.DefaultMetricName: 5 * time.Second.Seconds(), - base.LagMetricName: 5 * time.Second.Seconds(), - base.ThreadsRunningMetricName: 100, - base.CustomMetricName: 0, - base.LoadAvgMetricName: 1.0, - } ) var ( @@ -186,7 +173,6 @@ type Throttler struct { inventory *base.Inventory - metricsQuery atomic.Value customMetricsQuery atomic.Value MetricsThreshold atomic.Uint64 checkAsCheckSelf atomic.Bool @@ -207,8 +193,6 @@ type Throttler struct { readSelfThrottleMetrics func(context.Context) base.ThrottleMetrics // overwritten by unit test httpClient *http.Client - - hostCpuCoreCount atomic.Int32 } // ThrottlerStatus published some status values from the throttler @@ -280,11 +264,13 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv throttler.recentCheckDiff = 1 } - throttler.StoreMetricsThreshold(defaultThresholds[base.LagMetricName]) + throttler.StoreMetricsThreshold(base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold()) throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics { return throttler.readSelfThrottleMetricsInternal(ctx) } - + if customQuerySelfMetric, ok := base.RegisteredSelfMetrics[base.CustomMetricName].(*base.CustomQuerySelfMetric); ok { + customQuerySelfMetric.SetQueryFunc(throttler.GetCustomMetricsQuery) + } return throttler } @@ -315,7 +301,14 @@ func (throttler *Throttler) InitDBConfig(keyspace, shard string) { } func (throttler *Throttler) GetMetricsQuery() string { - return throttler.metricsQuery.Load().(string) + if customQuery := throttler.GetCustomMetricsQuery(); customQuery != "" { + return customQuery + } + lagSelfMetric, ok := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + if !ok { + return "" + } + return lagSelfMetric.GetQuery() } func (throttler *Throttler) GetCustomMetricsQuery() string { @@ -339,27 +332,6 @@ func (throttler *Throttler) initConfig() { IgnoreDialTCPErrors: true, }, } - metrics := make(map[base.MetricName]*config.MySQLMetricConfigurationSettings) - for _, metricsName := range base.KnownMetricNames { - metrics[metricsName] = &config.MySQLMetricConfigurationSettings{ - Name: metricsName, - } - } - metrics[base.DefaultMetricName].CustomQuery = "" - metrics[base.DefaultMetricName].Threshold.Store(throttler.MetricsThreshold.Load()) - - metrics[base.LagMetricName].CustomQuery = sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query - metrics[base.LagMetricName].Threshold.Store(throttler.MetricsThreshold.Load()) - - metrics[base.ThreadsRunningMetricName].CustomQuery = threadsRunningQuery - metrics[base.ThreadsRunningMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.ThreadsRunningMetricName])) - - metrics[base.CustomMetricName].CustomQuery = "" - metrics[base.CustomMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.CustomMetricName])) - - metrics[base.LoadAvgMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.LoadAvgMetricName])) - - throttler.configSettings.MySQLStore.Metrics = metrics } // readThrottlerConfig proactively reads the throttler's config from SrvKeyspace in local topo @@ -388,7 +360,7 @@ func (throttler *Throttler) normalizeThrottlerConfig(throttlerConfig *topodatapb if throttlerConfig.CustomQuery == "" { // no custom query; we check replication lag if throttlerConfig.Threshold == 0 { - throttlerConfig.Threshold = defaultThresholds[base.LagMetricName] + throttlerConfig.Threshold = base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold() } } return throttlerConfig @@ -442,11 +414,6 @@ func (throttler *Throttler) convergeMetricThresholds() { // Note: you should be holding the initMutex when calling this function. func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerConfig *topodatapb.ThrottlerConfig) { log.Infof("Throttler: applying topo config: %+v", throttlerConfig) - if throttlerConfig.CustomQuery == "" { - throttler.metricsQuery.Store(sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query) - } else { - throttler.metricsQuery.Store(throttlerConfig.CustomQuery) - } throttler.customMetricsQuery.Store(throttlerConfig.CustomQuery) if throttlerConfig.Threshold > 0 || throttlerConfig.CustomQuery != "" { // We do not allow Threshold=0, unless there is a custom query. @@ -646,10 +613,6 @@ func (throttler *Throttler) Open() error { log.Infof("Throttler: opening") var ctx context.Context ctx, throttler.cancelOpenContext = context.WithCancel(context.Background()) - // The query needs to be dynamically built because the sidecar database name - // is not known when the TabletServer is created, which in turn creates the - // Throttler. - throttler.metricsQuery.Store(sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query) // default throttler.customMetricsQuery.Store("") throttler.initConfig() throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB()) @@ -726,58 +689,6 @@ func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmCli return nil } -func (throttler *Throttler) readSelfLoadAvgPerCore(ctx context.Context) *base.ThrottleMetric { - metric := &base.ThrottleMetric{ - Scope: base.SelfScope, - Alias: throttler.tabletAlias, - } - - coreCount := throttler.hostCpuCoreCount.Load() - if coreCount == 0 { - // Count cores. This number is not going to change in the lifetime of this tablet, - // hence it makes sense to read it once then cache it. - - // We choose to read /proc/cpuinfo over executing "nproc" or similar commands. - var coreCount int32 - f, err := os.Open("/proc/cpuinfo") - if err != nil { - return metric.WithError(err) - } - defer f.Close() - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - if strings.HasPrefix(scanner.Text(), "processor") { - coreCount++ - } - } - - if err := scanner.Err(); err != nil { - return metric.WithError(err) - } - throttler.hostCpuCoreCount.Store(coreCount) - } - if coreCount == 0 { - return metric.WithError(fmt.Errorf("could not determine number of cores")) - } - { - content, err := os.ReadFile("/proc/loadavg") - if err != nil { - return metric.WithError(err) - } - fields := strings.Fields(string(content)) - if len(fields) == 0 { - return metric.WithError(fmt.Errorf("unexpected /proc/loadavg content")) - } - loadAvg, err := strconv.ParseFloat(fields[0], 64) - if err != nil { - return metric.WithError(err) - } - metric.Value = loadAvg / float64(throttler.hostCpuCoreCount.Load()) - } - return metric -} - // readSelfMySQLThrottleMetric reads the metric from this very tablet or from its backend mysql. func (throttler *Throttler) readSelfMySQLThrottleMetric(ctx context.Context, query string) *base.ThrottleMetric { conn, err := throttler.pool.Get(ctx, nil) @@ -1045,11 +956,26 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) case throttler.throttleMetricChan <- metric: } } + readMetric := func(selfMetric base.SelfMetric) *base.ThrottleMetric { + if !selfMetric.RequiresConn() { + return selfMetric.Read(ctx, nil) + } + conn, err := throttler.pool.Get(ctx, nil) + if err != nil { + return &base.ThrottleMetric{Err: err} + } + defer conn.Recycle() + return selfMetric.Read(ctx, conn.Conn) + } + for metricsName, selfMetric := range base.RegisteredSelfMetrics { + if metricsName == base.DefaultMetricName { + continue + } + metric := readMetric(selfMetric) + metric.Alias = throttler.tabletAlias - go writeMetric(base.LagMetricName, throttler.readSelfMySQLThrottleMetric(ctx, sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query)) - go writeMetric(base.ThreadsRunningMetricName, throttler.readSelfMySQLThrottleMetric(ctx, threadsRunningQuery)) - go writeMetric(base.CustomMetricName, throttler.readSelfMySQLThrottleMetric(ctx, throttler.GetCustomMetricsQuery())) - go writeMetric(base.LoadAvgMetricName, throttler.readSelfLoadAvgPerCore(ctx)) + go writeMetric(metricsName, metric) + } return nil } @@ -1142,21 +1068,19 @@ func (throttler *Throttler) refreshInventory(ctx context.Context) error { } } - metricsThreshold := throttler.MetricsThreshold.Load() metricNameUsedAsDefault := throttler.metricNameUsedAsDefault() - mysqlSettings := &throttler.configSettings.MySQLStore - mysqlSettings.Metrics[base.DefaultMetricName].Threshold.Store(metricsThreshold) - for metricName, metricConfig := range mysqlSettings.Metrics { - threshold := metricConfig.Threshold.Load() - if metricName == metricNameUsedAsDefault && metricsThreshold != 0 { + metricsThreshold := throttler.GetMetricsThreshold() + for metricName, selfMetric := range base.RegisteredSelfMetrics { + threshold := selfMetric.DefaultThreshold() + if (metricName == metricNameUsedAsDefault || metricName == base.DefaultMetricName) && metricsThreshold != 0 { // backwards compatibility to v20: threshold = metricsThreshold } - - throttler.metricThresholds.Set(inventoryPrefix+metricName.String(), math.Float64frombits(threshold), cache.DefaultExpiration) + throttler.metricThresholds.Set(inventoryPrefix+metricName.String(), threshold, cache.DefaultExpiration) } throttler.convergeMetricThresholds() - clusterSettingsCopy := *mysqlSettings + + var clusterSettingsCopy config.MySQLConfigurationSettings = throttler.configSettings.MySQLStore // config may dynamically change, but internal structure (config.Settings().MySQLStore.Clusters in our case) // is immutable and can only be _replaced_. Hence, it's safe to read in a goroutine: collect := func() error { diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 6363143fd43..4dfda94591f 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -241,7 +241,8 @@ func newTestThrottler() *Throttler { tabletTypeFunc: func() topodatapb.TabletType { return topodatapb.TabletType_PRIMARY }, overrideTmClient: &fakeTMClient{}, } - throttler.metricsQuery.Store(metricsQuery) + lagSelfMetric := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + lagSelfMetric.SetQuery(metricsQuery) throttler.MetricsThreshold.Store(math.Float64bits(0.75)) throttler.configSettings = config.NewConfigurationSettings() throttler.initConfig() @@ -283,6 +284,9 @@ func newTestThrottler() *Throttler { return selfMetrics } throttler.ThrottleApp(throttlerapp.TestingAlwaysThrottlerName.String(), time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio, false) + if customQuerySelfMetric, ok := base.RegisteredSelfMetrics[base.CustomMetricName].(*base.CustomQuerySelfMetric); ok { + customQuerySelfMetric.SetQueryFunc(throttler.GetCustomMetricsQuery) + } return throttler } @@ -916,7 +920,8 @@ func TestRefreshInventory(t *testing.T) { ts: &FakeTopoServer{}, inventory: base.NewInventory(), } - throttler.metricsQuery.Store(metricsQuery) + lagSelfMetric := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + lagSelfMetric.SetQuery(metricsQuery) throttler.configSettings = configSettings throttler.initConfig() throttler.initThrottleTabletTypes()