diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_cache.go b/go/vt/vttablet/tabletserver/throttle/base/metric_cache.go similarity index 100% rename from go/vt/vttablet/tabletserver/throttle/base/throttle_metric_cache.go rename to go/vt/vttablet/tabletserver/throttle/base/metric_cache.go diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_name.go b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go new file mode 100644 index 00000000000..98e1288fb23 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_name.go @@ -0,0 +1,128 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "fmt" + "slices" + "strings" +) + +// MetricName is a formalized name for a metric, such as "lag" or "threads_running". A metric name +// may include a scope, such as "self/lag" or "shard/threads_running". It is possible to add a +// scope to a name, or to parse the scope out of a name, and there is also always a default scope +// associated with a metric name. +type MetricName string + +// MetricNames is a formalized list of metric names +type MetricNames []MetricName + +func (names MetricNames) Contains(name MetricName) bool { + return slices.Contains(names, name) +} + +func (names MetricNames) String() string { + s := make([]string, len(names)) + for i, name := range names { + s[i] = name.String() + } + return strings.Join(s, ",") +} + +// Unique returns a subset of unique metric names, in same order as the original names +func (names MetricNames) Unique() MetricNames { + if names == nil { + return nil + } + uniqueMetricNamesMap := map[MetricName]bool{} + uniqueMetricNames := MetricNames{} + for _, metricName := range names { + if _, ok := uniqueMetricNamesMap[metricName]; !ok { + uniqueMetricNames = append(uniqueMetricNames, metricName) + uniqueMetricNamesMap[metricName] = true + } + } + return uniqueMetricNames +} + +const ( + DefaultMetricName MetricName = "default" + LagMetricName MetricName = "lag" + ThreadsRunningMetricName MetricName = "threads_running" + CustomMetricName MetricName = "custom" + LoadAvgMetricName MetricName = "loadavg" +) + +func (metric MetricName) DefaultScope() Scope { + if selfMetric := RegisteredSelfMetrics[metric]; selfMetric != nil { + return selfMetric.DefaultScope() + } + return SelfScope +} + +func (metric MetricName) String() string { + return string(metric) +} + +// AggregatedName returns the string representation of this metric in the given scope, e.g.: +// - "self/loadavg" +// - "shard/lag" +func (metric MetricName) AggregatedName(scope Scope) string { + if metric == DefaultMetricName { + // backwards (v20) compatibility + return scope.String() + } + if scope == UndefinedScope { + scope = metric.DefaultScope() + } + return fmt.Sprintf("%s/%s", scope.String(), metric.String()) +} + +// Disaggregated returns a breakdown of this metric into scope + name. +func (metric MetricName) Disaggregated() (scope Scope, metricName MetricName, err error) { + return DisaggregateMetricName(metric.String()) +} + +type AggregatedMetricName struct { + Scope Scope + Metric MetricName +} + +var ( + KnownMetricNames = make(MetricNames, 0) + // aggregatedMetricNames precomputes the aggregated metric names for all known metric names, + // mapped to their breakdowns. e.g. "self/loadavg" -> {SelfScope, LoadAvgMetricName} + // This means: + // - no textual parsing is needed in the critical path + // - we can easily check if a metric name is valid + aggregatedMetricNames = make(map[string]AggregatedMetricName) +) + +// DisaggregateMetricName splits a metric name into its scope name and metric name +// aggregated metric name could be in the form: +// - loadavg +// - self +// - self/threads_running +// - shard +// - shard/lag +func DisaggregateMetricName(aggregatedMetricName string) (scope Scope, metricName MetricName, err error) { + breakdown, ok := aggregatedMetricNames[aggregatedMetricName] + if !ok { + return UndefinedScope, DefaultMetricName, ErrNoSuchMetric + } + return breakdown.Scope, breakdown.Metric, nil +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_test.go b/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go similarity index 93% rename from go/vt/vttablet/tabletserver/throttle/base/throttle_metric_test.go rename to go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go index 8a0f9b85a16..9867ca18db3 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_test.go +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_name_test.go @@ -230,3 +230,12 @@ func TestContains(t *testing.T) { }) } } + +func TestKnownMetricNames(t *testing.T) { + assert.NotEmpty(t, KnownMetricNames) + assert.Contains(t, KnownMetricNames, LagMetricName) + assert.Contains(t, KnownMetricNames, ThreadsRunningMetricName) + assert.Contains(t, KnownMetricNames, LoadAvgMetricName) + assert.Contains(t, KnownMetricNames, CustomMetricName) + assert.Contains(t, KnownMetricNames, DefaultMetricName) +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_result.go b/go/vt/vttablet/tabletserver/throttle/base/metric_result.go new file mode 100644 index 00000000000..0fa48fe3240 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_result.go @@ -0,0 +1,121 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "errors" + "net" +) + +// MetricResult is what we expect our probes to return. This can be a numeric result, or +// a special type of result indicating more meta-information +type MetricResult interface { + Get() (float64, error) +} + +// MetricResultFunc is a function that returns a metric result +type MetricResultFunc func() (metricResult MetricResult, threshold float64) + +type MetricResultMap map[MetricName]MetricResult + +func NewMetricResultMap() MetricResultMap { + result := make(MetricResultMap, len(KnownMetricNames)) + for _, metricName := range KnownMetricNames { + result[metricName] = nil + } + return result +} + +// ErrThresholdExceeded is the common error one may get checking on metric result +var ErrThresholdExceeded = errors.New("threshold exceeded") +var ErrNoResultYet = errors.New("metric not collected yet") + +// ErrNoSuchMetric is for when a user requests a metric by an unknown metric name +var ErrNoSuchMetric = errors.New("no such metric") + +// ErrAppDenied is seen when an app is denied access +var ErrAppDenied = errors.New("app denied") + +// ErrInvalidCheckType is an internal error indicating an unknown check type +var ErrInvalidCheckType = errors.New("unknown throttler check type") + +// IsDialTCPError sees if the given error indicates a TCP issue +func IsDialTCPError(err error) bool { + if err == nil { + return false + } + switch err := err.(type) { + case *net.OpError: + return err.Op == "dial" && err.Net == "tcp" + } + return false +} + +type noHostsMetricResult struct{} + +// Get implements MetricResult +func (metricResult *noHostsMetricResult) Get() (float64, error) { + return 0, nil +} + +// NoHostsMetricResult is a result indicating "no hosts" +var NoHostsMetricResult = &noHostsMetricResult{} + +type noMetricResultYet struct{} + +// Get implements MetricResult +func (metricResult *noMetricResultYet) Get() (float64, error) { + return 0, ErrNoResultYet +} + +// NoMetricResultYet is a result indicating "no data" +var NoMetricResultYet = &noMetricResultYet{} + +type noSuchMetric struct{} + +// Get implements MetricResult +func (metricResult *noSuchMetric) Get() (float64, error) { + return 0, ErrNoSuchMetric +} + +// NoSuchMetric is a metric results for an unknown metric name +var NoSuchMetric = &noSuchMetric{} + +// simpleMetricResult is a result with float value +type simpleMetricResult struct { + Value float64 +} + +// NewSimpleMetricResult creates a simpleMetricResult +func NewSimpleMetricResult(value float64) MetricResult { + return &simpleMetricResult{Value: value} +} + +// Get implements MetricResult +func (metricResult *simpleMetricResult) Get() (float64, error) { + return metricResult.Value, nil +} + +type appDeniedMetric struct{} + +// Get implements MetricResult +func (metricResult *appDeniedMetric) Get() (float64, error) { + return 0, ErrAppDenied +} + +// AppDeniedMetric is a special metric indicating a "denied" situation +var AppDeniedMetric = &appDeniedMetric{} diff --git a/go/vt/vttablet/tabletserver/throttle/base/metric_scope.go b/go/vt/vttablet/tabletserver/throttle/base/metric_scope.go new file mode 100644 index 00000000000..60d116861c3 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/metric_scope.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "fmt" +) + +// Scope defines the tablet range from which a metric is collected. This can be the local tablet +// ("self") or the entire shard ("shard") +type Scope string + +const ( + UndefinedScope Scope = "" + ShardScope Scope = "shard" + SelfScope Scope = "self" +) + +func (s Scope) String() string { + return string(s) +} + +func ScopeFromString(s string) (Scope, error) { + switch scope := Scope(s); scope { + case UndefinedScope, ShardScope, SelfScope: + return scope, nil + default: + return "", fmt.Errorf("unknown scope: %s", s) + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric.go new file mode 100644 index 00000000000..220dfa6bf60 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric.go @@ -0,0 +1,91 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "fmt" + "strconv" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +type SelfMetric interface { + Name() MetricName + DefaultScope() Scope + DefaultThreshold() float64 + RequiresConn() bool + Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric +} + +var ( + RegisteredSelfMetrics = make(map[MetricName]SelfMetric) +) + +func registerSelfMetric(selfMetric SelfMetric) SelfMetric { + RegisteredSelfMetrics[selfMetric.Name()] = selfMetric + KnownMetricNames = append(KnownMetricNames, selfMetric.Name()) + aggregatedMetricNames[selfMetric.Name().String()] = AggregatedMetricName{ + Scope: selfMetric.DefaultScope(), + Metric: selfMetric.Name(), + } + for _, scope := range []Scope{ShardScope, SelfScope} { + aggregatedName := selfMetric.Name().AggregatedName(scope) + aggregatedMetricNames[aggregatedName] = AggregatedMetricName{ + Scope: scope, + Metric: selfMetric.Name(), + } + } + return selfMetric +} + +// ReadSelfMySQLThrottleMetric reads a metric using a given MySQL connection and a query. +func ReadSelfMySQLThrottleMetric(ctx context.Context, conn *connpool.Conn, query string) *ThrottleMetric { + metric := &ThrottleMetric{ + Scope: SelfScope, + } + if query == "" { + return metric + } + if conn == nil { + return metric.WithError(fmt.Errorf("conn is nil")) + } + + tm, err := conn.Exec(ctx, query, 1, true) + if err != nil { + return metric.WithError(err) + } + if len(tm.Rows) == 0 { + return metric.WithError(fmt.Errorf("no results in ReadSelfMySQLThrottleMetric for query %s", query)) + } + if len(tm.Rows) > 1 { + return metric.WithError(fmt.Errorf("expecting single row in ReadSelfMySQLThrottleMetric for query %s", query)) + } + + metricsQueryType := GetMetricsQueryType(query) + switch metricsQueryType { + case MetricsQueryTypeSelect: + metric.Value, metric.Err = tm.Rows[0][0].ToFloat64() + case MetricsQueryTypeShowGlobal: + // Columns are [Variable_name, Value] + metric.Value, metric.Err = strconv.ParseFloat(tm.Rows[0][1].ToString(), 64) + default: + metric.Err = fmt.Errorf("unsupported metrics query type for query: %s", query) + } + + return metric +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go new file mode 100644 index 00000000000..585e63ea285 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_custom_query.go @@ -0,0 +1,48 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{}) + +type CustomQuerySelfMetric struct { +} + +func (m *CustomQuerySelfMetric) Name() MetricName { + return CustomMetricName +} + +func (m *CustomQuerySelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *CustomQuerySelfMetric) DefaultThreshold() float64 { + return 0 +} + +func (m *CustomQuerySelfMetric) RequiresConn() bool { + return true +} + +func (m *CustomQuerySelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, conn, throttler.GetCustomMetricsQuery()) +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go new file mode 100644 index 00000000000..8bce295da7c --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "fmt" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +var _ SelfMetric = registerSelfMetric(&DefaultSelfMetric{}) + +type DefaultSelfMetric struct { +} + +func (m *DefaultSelfMetric) Name() MetricName { + return DefaultMetricName +} + +func (m *DefaultSelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *DefaultSelfMetric) DefaultThreshold() float64 { + return 0 +} + +func (m *DefaultSelfMetric) RequiresConn() bool { + return false +} + +func (m *DefaultSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { + return &ThrottleMetric{ + Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"), + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go new file mode 100644 index 00000000000..dc25ee5622a --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go @@ -0,0 +1,70 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "sync/atomic" + "time" + + "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +var ( + lagSelfMetricQueryBase = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat" + lagSelfDefaultThreshold = 5 * time.Second +) + +var _ SelfMetric = registerSelfMetric(&LagSelfMetric{}) + +type LagSelfMetric struct { + lagSelfMetricQuery atomic.Value +} + +// SetQuery is only used by unit tests to override the query. +func (m *LagSelfMetric) SetQuery(query string) { + m.lagSelfMetricQuery.Store(query) +} + +func (m *LagSelfMetric) GetQuery() string { + if query := m.lagSelfMetricQuery.Load(); query == nil { + m.lagSelfMetricQuery.Store(sqlparser.BuildParsedQuery(lagSelfMetricQueryBase, sidecar.GetIdentifier()).Query) + } + return m.lagSelfMetricQuery.Load().(string) +} + +func (m *LagSelfMetric) Name() MetricName { + return LagMetricName +} + +func (m *LagSelfMetric) DefaultScope() Scope { + return ShardScope +} + +func (m *LagSelfMetric) DefaultThreshold() float64 { + return lagSelfDefaultThreshold.Seconds() +} + +func (m *LagSelfMetric) RequiresConn() bool { + return true +} + +func (m *LagSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery()) +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go new file mode 100644 index 00000000000..40a2878421a --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_loadavg.go @@ -0,0 +1,81 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + "fmt" + "os" + "runtime" + "strconv" + "strings" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +var ( + loadavgOnlyAvailableOnLinuxMetric = &ThrottleMetric{ + Scope: SelfScope, + Err: fmt.Errorf("loadavg metric is only available on Linux"), + } +) + +var _ SelfMetric = registerSelfMetric(&LoadAvgSelfMetric{}) + +type LoadAvgSelfMetric struct { +} + +func (m *LoadAvgSelfMetric) Name() MetricName { + return LoadAvgMetricName +} + +func (m *LoadAvgSelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *LoadAvgSelfMetric) DefaultThreshold() float64 { + return 1.0 +} + +func (m *LoadAvgSelfMetric) RequiresConn() bool { + return false +} + +func (m *LoadAvgSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { + if runtime.GOOS != "linux" { + return loadavgOnlyAvailableOnLinuxMetric + } + metric := &ThrottleMetric{ + Scope: SelfScope, + } + { + content, err := os.ReadFile("/proc/loadavg") + if err != nil { + return metric.WithError(err) + } + fields := strings.Fields(string(content)) + if len(fields) == 0 { + return metric.WithError(fmt.Errorf("unexpected /proc/loadavg content")) + } + loadAvg, err := strconv.ParseFloat(fields[0], 64) + if err != nil { + return metric.WithError(err) + } + metric.Value = loadAvg / float64(runtime.NumCPU()) + } + return metric +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go new file mode 100644 index 00000000000..08f7d408d1c --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/self_metric_threads_running.go @@ -0,0 +1,52 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +import ( + "context" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" +) + +var ( + threadsRunningMetricQuery = "show global status like 'threads_running'" +) + +var _ SelfMetric = registerSelfMetric(&ThreadsRunningSelfMetric{}) + +type ThreadsRunningSelfMetric struct { +} + +func (m *ThreadsRunningSelfMetric) Name() MetricName { + return ThreadsRunningMetricName +} + +func (m *ThreadsRunningSelfMetric) DefaultScope() Scope { + return SelfScope +} + +func (m *ThreadsRunningSelfMetric) DefaultThreshold() float64 { + return 100 +} + +func (m *ThreadsRunningSelfMetric) RequiresConn() bool { + return true +} + +func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric { + return ReadSelfMySQLThrottleMetric(ctx, conn, threadsRunningMetricQuery) +} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go deleted file mode 100644 index 054687cdd3f..00000000000 --- a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric.go +++ /dev/null @@ -1,260 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package base - -import ( - "errors" - "fmt" - "slices" - "strings" -) - -// Scope defines the tablet range from which a metric is collected. This can be the local tablet -// ("self") or the entire shard ("shard") -type Scope string - -const ( - UndefinedScope Scope = "" - ShardScope Scope = "shard" - SelfScope Scope = "self" -) - -func (s Scope) String() string { - return string(s) -} - -func ScopeFromString(s string) (Scope, error) { - switch scope := Scope(s); scope { - case UndefinedScope, ShardScope, SelfScope: - return scope, nil - default: - return "", fmt.Errorf("unknown scope: %s", s) - } -} - -// MetricName is a formalized name for a metric, such as "lag" or "threads_running". A metric name -// may include a scope, such as "self/lag" or "shard/threads_running". It is possible to add a -// scope to a name, or to parse the scope out of a name, and there is also always a default scope -// associated with a metric name. -type MetricName string - -// MetricNames is a formalized list of metric names -type MetricNames []MetricName - -func (names MetricNames) Contains(name MetricName) bool { - return slices.Contains(names, name) -} - -func (names MetricNames) String() string { - s := make([]string, len(names)) - for i, name := range names { - s[i] = name.String() - } - return strings.Join(s, ",") -} - -// Unique returns a subset of unique metric names, in same order as the original names -func (names MetricNames) Unique() MetricNames { - if names == nil { - return nil - } - uniqueMetricNamesMap := map[MetricName]bool{} - uniqueMetricNames := MetricNames{} - for _, metricName := range names { - if _, ok := uniqueMetricNamesMap[metricName]; !ok { - uniqueMetricNames = append(uniqueMetricNames, metricName) - uniqueMetricNamesMap[metricName] = true - } - } - return uniqueMetricNames -} - -const ( - DefaultMetricName MetricName = "default" - LagMetricName MetricName = "lag" - ThreadsRunningMetricName MetricName = "threads_running" - CustomMetricName MetricName = "custom" - LoadAvgMetricName MetricName = "loadavg" -) - -func (metric MetricName) DefaultScope() Scope { - switch metric { - case LagMetricName: - return ShardScope - default: - return SelfScope - } -} - -func (metric MetricName) String() string { - return string(metric) -} - -// AggregatedName returns the string representation of this metric in the given scope, e.g.: -// - "self/loadavg" -// - "shard/lag" -func (metric MetricName) AggregatedName(scope Scope) string { - if metric == DefaultMetricName { - // backwards (v20) compatibility - return scope.String() - } - if scope == UndefinedScope { - scope = metric.DefaultScope() - } - return fmt.Sprintf("%s/%s", scope.String(), metric.String()) -} - -// Disaggregated returns a breakdown of this metric into scope + name. -func (metric MetricName) Disaggregated() (scope Scope, metricName MetricName, err error) { - return DisaggregateMetricName(metric.String()) -} - -var KnownMetricNames = MetricNames{ - DefaultMetricName, - LagMetricName, - ThreadsRunningMetricName, - CustomMetricName, - LoadAvgMetricName, -} - -type AggregatedMetricName struct { - Scope Scope - Metric MetricName -} - -var ( - // aggregatedMetricNames precomputes the aggregated metric names for all known metric names, - // mapped to their breakdowns. e.g. "self/loadavg" -> {SelfScope, LoadAvgMetricName} - // This means: - // - no textual parsing is needed in the critical path - // - we can easily check if a metric name is valid - aggregatedMetricNames map[string]AggregatedMetricName -) - -func init() { - aggregatedMetricNames = make(map[string]AggregatedMetricName, 3*len(KnownMetricNames)) - for _, metricName := range KnownMetricNames { - aggregatedMetricNames[metricName.String()] = AggregatedMetricName{ - Scope: metricName.DefaultScope(), - Metric: metricName, - } - for _, scope := range []Scope{ShardScope, SelfScope} { - aggregatedName := metricName.AggregatedName(scope) - aggregatedMetricNames[aggregatedName] = AggregatedMetricName{ - Scope: scope, - Metric: metricName, - } - } - } -} - -// DisaggregateMetricName splits a metric name into its scope name and metric name -// aggregated metric name could be in the form: -// - loadavg -// - self -// - self/threads_running -// - shard -// - shard/lag -func DisaggregateMetricName(aggregatedMetricName string) (scope Scope, metricName MetricName, err error) { - breakdown, ok := aggregatedMetricNames[aggregatedMetricName] - if !ok { - return UndefinedScope, DefaultMetricName, ErrNoSuchMetric - } - return breakdown.Scope, breakdown.Metric, nil -} - -// MetricResult is what we expect our probes to return. This can be a numeric result, or -// a special type of result indicating more meta-information -type MetricResult interface { - Get() (float64, error) -} - -// MetricResultFunc is a function that returns a metric result -type MetricResultFunc func() (metricResult MetricResult, threshold float64) - -type MetricResultMap map[MetricName]MetricResult - -func NewMetricResultMap() MetricResultMap { - result := make(MetricResultMap, len(KnownMetricNames)) - for _, metricName := range KnownMetricNames { - result[metricName] = nil - } - return result -} - -// ErrThresholdExceeded is the common error one may get checking on metric result -var ErrThresholdExceeded = errors.New("threshold exceeded") -var ErrNoResultYet = errors.New("metric not collected yet") - -// ErrNoSuchMetric is for when a user requests a metric by an unknown metric name -var ErrNoSuchMetric = errors.New("no such metric") - -// ErrInvalidCheckType is an internal error indicating an unknown check type -var ErrInvalidCheckType = errors.New("unknown throttler check type") - -// IsDialTCPError sees if the given error indicates a TCP issue -func IsDialTCPError(e error) bool { - if e == nil { - return false - } - return strings.HasPrefix(e.Error(), "dial tcp") -} - -type noHostsMetricResult struct{} - -// Get implements MetricResult -func (metricResult *noHostsMetricResult) Get() (float64, error) { - return 0, nil -} - -// NoHostsMetricResult is a result indicating "no hosts" -var NoHostsMetricResult = &noHostsMetricResult{} - -type noMetricResultYet struct{} - -// Get implements MetricResult -func (metricResult *noMetricResultYet) Get() (float64, error) { - return 0, ErrNoResultYet -} - -// NoMetricResultYet is a result indicating "no data" -var NoMetricResultYet = &noMetricResultYet{} - -type noSuchMetric struct{} - -// Get implements MetricResult -func (metricResult *noSuchMetric) Get() (float64, error) { - return 0, ErrNoSuchMetric -} - -// NoSuchMetric is a metric results for an unknown metric name -var NoSuchMetric = &noSuchMetric{} - -// simpleMetricResult is a result with float value -type simpleMetricResult struct { - Value float64 -} - -// NewSimpleMetricResult creates a simpleMetricResult -func NewSimpleMetricResult(value float64) MetricResult { - return &simpleMetricResult{Value: value} -} - -// Get implements MetricResult -func (metricResult *simpleMetricResult) Get() (float64, error) { - return metricResult.Value, nil -} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go b/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go deleted file mode 100644 index 482f319365f..00000000000 --- a/go/vt/vttablet/tabletserver/throttle/base/throttle_metric_app.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This codebase originates from https://github.com/github/freno, See https://github.com/github/freno/blob/master/LICENSE -/* - MIT License - - Copyright (c) 2017 GitHub - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ - -package base - -import ( - "errors" -) - -// ErrAppDenied is seen when an app is denied access -var ErrAppDenied = errors.New("App denied") - -type appDeniedMetric struct{} - -// Get implements MetricResult -func (metricResult *appDeniedMetric) Get() (float64, error) { - return 0, ErrAppDenied -} - -// AppDeniedMetric is a special metric indicating a "denied" situation -var AppDeniedMetric = &appDeniedMetric{} diff --git a/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go b/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go new file mode 100644 index 00000000000..1d2d4d0652c --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/base/throttler_metrics_publisher.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package base + +// ThrottlerMetricsPublisher is implemented by throttler.Throttler and is used by SelfMetric +// implementations to query the throttler. +type ThrottlerMetricsPublisher interface { + GetCustomMetricsQuery() string +} diff --git a/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go index d4beb40deb4..76d5f2dd298 100644 --- a/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go +++ b/go/vt/vttablet/tabletserver/throttle/config/mysql_config.go @@ -41,32 +41,15 @@ limitations under the License. package config -import ( - "sync/atomic" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" -) - // // MySQL-specific configuration // -type MySQLMetricConfigurationSettings struct { - Name base.MetricName - CustomQuery string - Threshold atomic.Uint64 -} - // MySQLConfigurationSettings has the general configuration for all MySQL clusters type MySQLConfigurationSettings struct { - CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value - Port int // Specify if different than 3306; applies to all clusters - IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors - IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds - IgnoreHostsThreshold float64 // Threshold beyond which IgnoreHostsCount applies (default: 0) - HTTPCheckPort int // port for HTTP check. -1 to disable. - HTTPCheckPath string // If non-empty, requires HTTPCheckPort - IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped - - Metrics map[base.MetricName]*MySQLMetricConfigurationSettings + CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value + Port int // Specify if different than 3306; applies to all clusters + IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors + IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds + IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 01c4fb3c622..4aa1e248dc1 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -42,15 +42,12 @@ limitations under the License. package throttle import ( - "bufio" "context" "errors" "fmt" "math" "math/rand/v2" "net/http" - "os" - "strconv" "strings" "sync" "sync/atomic" @@ -59,7 +56,6 @@ import ( "github.com/patrickmn/go-cache" "github.com/spf13/pflag" - "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/stats" @@ -69,7 +65,6 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -107,14 +102,6 @@ const ( var ( throttleTabletTypes = "replica" - - defaultThresholds = map[base.MetricName]float64{ - base.DefaultMetricName: 5 * time.Second.Seconds(), - base.LagMetricName: 5 * time.Second.Seconds(), - base.ThreadsRunningMetricName: 100, - base.CustomMetricName: 0, - base.LoadAvgMetricName: 1.0, - } ) var ( @@ -186,7 +173,6 @@ type Throttler struct { inventory *base.Inventory - metricsQuery atomic.Value customMetricsQuery atomic.Value MetricsThreshold atomic.Uint64 checkAsCheckSelf atomic.Bool @@ -205,8 +191,6 @@ type Throttler struct { throttledAppsMutex sync.Mutex readSelfThrottleMetrics func(context.Context) base.ThrottleMetrics // overwritten by unit test - - hostCpuCoreCount atomic.Int32 } // ThrottlerStatus published some status values from the throttler @@ -277,11 +261,10 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv throttler.recentCheckDiff = 1 } - throttler.StoreMetricsThreshold(defaultThresholds[base.LagMetricName]) + throttler.StoreMetricsThreshold(base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold()) throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics { return throttler.readSelfThrottleMetricsInternal(ctx) } - return throttler } @@ -312,7 +295,14 @@ func (throttler *Throttler) InitDBConfig(keyspace, shard string) { } func (throttler *Throttler) GetMetricsQuery() string { - return throttler.metricsQuery.Load().(string) + if customQuery := throttler.GetCustomMetricsQuery(); customQuery != "" { + return customQuery + } + lagSelfMetric, ok := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + if !ok { + return "" + } + return lagSelfMetric.GetQuery() } func (throttler *Throttler) GetCustomMetricsQuery() string { @@ -336,27 +326,6 @@ func (throttler *Throttler) initConfig() { IgnoreDialTCPErrors: true, }, } - metrics := make(map[base.MetricName]*config.MySQLMetricConfigurationSettings) - for _, metricsName := range base.KnownMetricNames { - metrics[metricsName] = &config.MySQLMetricConfigurationSettings{ - Name: metricsName, - } - } - metrics[base.DefaultMetricName].CustomQuery = "" - metrics[base.DefaultMetricName].Threshold.Store(throttler.MetricsThreshold.Load()) - - metrics[base.LagMetricName].CustomQuery = sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query - metrics[base.LagMetricName].Threshold.Store(throttler.MetricsThreshold.Load()) - - metrics[base.ThreadsRunningMetricName].CustomQuery = threadsRunningQuery - metrics[base.ThreadsRunningMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.ThreadsRunningMetricName])) - - metrics[base.CustomMetricName].CustomQuery = "" - metrics[base.CustomMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.CustomMetricName])) - - metrics[base.LoadAvgMetricName].Threshold.Store(math.Float64bits(defaultThresholds[base.LoadAvgMetricName])) - - throttler.configSettings.MySQLStore.Metrics = metrics } // readThrottlerConfig proactively reads the throttler's config from SrvKeyspace in local topo @@ -385,7 +354,7 @@ func (throttler *Throttler) normalizeThrottlerConfig(throttlerConfig *topodatapb if throttlerConfig.CustomQuery == "" { // no custom query; we check replication lag if throttlerConfig.Threshold == 0 { - throttlerConfig.Threshold = defaultThresholds[base.LagMetricName] + throttlerConfig.Threshold = base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold() } } return throttlerConfig @@ -439,11 +408,6 @@ func (throttler *Throttler) convergeMetricThresholds() { // Note: you should be holding the initMutex when calling this function. func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerConfig *topodatapb.ThrottlerConfig) { log.Infof("Throttler: applying topo config: %+v", throttlerConfig) - if throttlerConfig.CustomQuery == "" { - throttler.metricsQuery.Store(sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query) - } else { - throttler.metricsQuery.Store(throttlerConfig.CustomQuery) - } throttler.customMetricsQuery.Store(throttlerConfig.CustomQuery) if throttlerConfig.Threshold > 0 || throttlerConfig.CustomQuery != "" { // We do not allow Threshold=0, unless there is a custom query. @@ -643,10 +607,6 @@ func (throttler *Throttler) Open() error { log.Infof("Throttler: opening") var ctx context.Context ctx, throttler.cancelOpenContext = context.WithCancel(context.Background()) - // The query needs to be dynamically built because the sidecar database name - // is not known when the TabletServer is created, which in turn creates the - // Throttler. - throttler.metricsQuery.Store(sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query) // default throttler.customMetricsQuery.Store("") throttler.initConfig() throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB()) @@ -723,99 +683,6 @@ func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmCli return nil } -func (throttler *Throttler) readSelfLoadAvgPerCore(ctx context.Context) *base.ThrottleMetric { - metric := &base.ThrottleMetric{ - Scope: base.SelfScope, - Alias: throttler.tabletAlias, - } - - coreCount := throttler.hostCpuCoreCount.Load() - if coreCount == 0 { - // Count cores. This number is not going to change in the lifetime of this tablet, - // hence it makes sense to read it once then cache it. - - // We choose to read /proc/cpuinfo over executing "nproc" or similar commands. - var coreCount int32 - f, err := os.Open("/proc/cpuinfo") - if err != nil { - return metric.WithError(err) - } - defer f.Close() - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - if strings.HasPrefix(scanner.Text(), "processor") { - coreCount++ - } - } - - if err := scanner.Err(); err != nil { - return metric.WithError(err) - } - throttler.hostCpuCoreCount.Store(coreCount) - } - if coreCount == 0 { - return metric.WithError(fmt.Errorf("could not determine number of cores")) - } - { - content, err := os.ReadFile("/proc/loadavg") - if err != nil { - return metric.WithError(err) - } - fields := strings.Fields(string(content)) - if len(fields) == 0 { - return metric.WithError(fmt.Errorf("unexpected /proc/loadavg content")) - } - loadAvg, err := strconv.ParseFloat(fields[0], 64) - if err != nil { - return metric.WithError(err) - } - metric.Value = loadAvg / float64(throttler.hostCpuCoreCount.Load()) - } - return metric -} - -// readSelfMySQLThrottleMetric reads the metric from this very tablet or from its backend mysql. -func (throttler *Throttler) readSelfMySQLThrottleMetric(ctx context.Context, query string) *base.ThrottleMetric { - metric := &base.ThrottleMetric{ - Scope: base.SelfScope, - Alias: throttler.tabletAlias, - } - if query == "" { - return metric - } - conn, err := throttler.pool.Get(ctx, nil) - if err != nil { - return metric.WithError(err) - } - defer conn.Recycle() - - tm, err := conn.Conn.Exec(ctx, query, 1, true) - if err != nil { - return metric.WithError(err) - } - row := tm.Named().Row() - if row == nil { - return metric.WithError(fmt.Errorf("no results for readSelfThrottleMetric")) - } - - metricsQueryType := base.GetMetricsQueryType(query) - switch metricsQueryType { - case base.MetricsQueryTypeSelect: - // We expect a single row, single column result. - // The "for" iteration below is just a way to get first result without knowing column name - for k := range row { - metric.Value, metric.Err = row.ToFloat64(k) - } - case base.MetricsQueryTypeShowGlobal: - metric.Value, metric.Err = strconv.ParseFloat(row["Value"].ToString(), 64) - default: - metric.Err = fmt.Errorf("Unsupported metrics query type for query: %s", throttler.GetMetricsQuery()) - } - - return metric -} - // throttledAppsSnapshot returns a snapshot (a copy) of current throttled apps func (throttler *Throttler) throttledAppsSnapshot() map[string]cache.Item { return throttler.throttledApps.Items() @@ -1059,22 +926,40 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie } } +// readSelfThrottleMetricsInternal rreads all registsred self metrics on this tablet (or backend MySQL server). +// This is the actual place where metrics are read, to be later aggregated and/or propagated to other tablets. func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) base.ThrottleMetrics { - - writeMetric := func(metricName base.MetricName, metric *base.ThrottleMetric) { - metric.Name = metricName + result := make(base.ThrottleMetrics, len(base.RegisteredSelfMetrics)) + writeMetric := func(metric *base.ThrottleMetric) { select { case <-ctx.Done(): return case throttler.throttleMetricChan <- metric: } } + readMetric := func(selfMetric base.SelfMetric) *base.ThrottleMetric { + if !selfMetric.RequiresConn() { + return selfMetric.Read(ctx, throttler, nil) + } + conn, err := throttler.pool.Get(ctx, nil) + if err != nil { + return &base.ThrottleMetric{Err: err} + } + defer conn.Recycle() + return selfMetric.Read(ctx, throttler, conn.Conn) + } + for metricName, selfMetric := range base.RegisteredSelfMetrics { + if metricName == base.DefaultMetricName { + continue + } + metric := readMetric(selfMetric) + metric.Name = metricName + metric.Alias = throttler.tabletAlias - go writeMetric(base.LagMetricName, throttler.readSelfMySQLThrottleMetric(ctx, sqlparser.BuildParsedQuery(defaultReplicationLagQuery, sidecar.GetIdentifier()).Query)) - go writeMetric(base.ThreadsRunningMetricName, throttler.readSelfMySQLThrottleMetric(ctx, threadsRunningQuery)) - go writeMetric(base.CustomMetricName, throttler.readSelfMySQLThrottleMetric(ctx, throttler.GetCustomMetricsQuery())) - go writeMetric(base.LoadAvgMetricName, throttler.readSelfLoadAvgPerCore(ctx)) - return nil + go writeMetric(metric) + result[metricName] = metric + } + return result } func (throttler *Throttler) collectSelfMetrics(ctx context.Context) { @@ -1166,21 +1051,19 @@ func (throttler *Throttler) refreshInventory(ctx context.Context) error { } } - metricsThreshold := throttler.MetricsThreshold.Load() metricNameUsedAsDefault := throttler.metricNameUsedAsDefault() - mysqlSettings := &throttler.configSettings.MySQLStore - mysqlSettings.Metrics[base.DefaultMetricName].Threshold.Store(metricsThreshold) - for metricName, metricConfig := range mysqlSettings.Metrics { - threshold := metricConfig.Threshold.Load() - if metricName == metricNameUsedAsDefault && metricsThreshold != 0 { + metricsThreshold := throttler.GetMetricsThreshold() + for metricName, selfMetric := range base.RegisteredSelfMetrics { + threshold := selfMetric.DefaultThreshold() + if (metricName == metricNameUsedAsDefault || metricName == base.DefaultMetricName) && metricsThreshold != 0 { // backwards compatibility to v20: threshold = metricsThreshold } - - throttler.metricThresholds.Set(inventoryPrefix+metricName.String(), math.Float64frombits(threshold), cache.DefaultExpiration) + throttler.metricThresholds.Set(inventoryPrefix+metricName.String(), threshold, cache.DefaultExpiration) } throttler.convergeMetricThresholds() - clusterSettingsCopy := *mysqlSettings + + var clusterSettingsCopy config.MySQLConfigurationSettings = throttler.configSettings.MySQLStore // config may dynamically change, but internal structure (config.Settings().MySQLStore.Clusters in our case) // is immutable and can only be _replaced_. Hence, it's safe to read in a goroutine: collect := func() error { diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 508f542478e..54ff04a2ce5 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -241,7 +241,8 @@ func newTestThrottler() *Throttler { tabletTypeFunc: func() topodatapb.TabletType { return topodatapb.TabletType_PRIMARY }, overrideTmClient: &fakeTMClient{}, } - throttler.metricsQuery.Store(metricsQuery) + lagSelfMetric := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + lagSelfMetric.SetQuery(metricsQuery) throttler.MetricsThreshold.Store(math.Float64bits(0.75)) throttler.configSettings = config.NewConfigurationSettings() throttler.initConfig() @@ -1116,7 +1117,8 @@ func TestRefreshInventory(t *testing.T) { ts: &FakeTopoServer{}, inventory: base.NewInventory(), } - throttler.metricsQuery.Store(metricsQuery) + lagSelfMetric := base.RegisteredSelfMetrics[base.LagMetricName].(*base.LagSelfMetric) + lagSelfMetric.SetQuery(metricsQuery) throttler.configSettings = configSettings throttler.initConfig() throttler.initThrottleTabletTypes()