Skip to content

Commit

Permalink
throttler uses new SelfMetric implementations. Discarding MySQLMetric…
Browse files Browse the repository at this point in the history
…ConfigurationSettings. Discarding Throttler.metricsQuery

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Jul 24, 2024
1 parent 6af44a2 commit 292adbb
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 156 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/throttle/base/metric_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type AggregatedMetricName struct {
}

var (
KnownMetricNames = make([]MetricName, 0)
KnownMetricNames = make(MetricNames, 0)
// aggregatedMetricNames precomputes the aggregated metric names for all known metric names,
// mapped to their breakdowns. e.g. "self/loadavg" -> {SelfScope, LoadAvgMetricName}
// This means:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@ import (
var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{})

type CustomQuerySelfMetric struct {
customQuery atomic.Value
customQueryFuncPtr atomic.Pointer[func() string]
}

func (m *CustomQuerySelfMetric) SetQuery(query string) {
m.customQuery.Store(query)
func (m *CustomQuerySelfMetric) GetQuery() string {
customQueryFunc := m.customQueryFuncPtr.Load()
if customQueryFunc == nil {
return ""
}
query := (*customQueryFunc)()
return query
}

func (m *CustomQuerySelfMetric) SetQueryFunc(f func() string) {
if f == nil {
m.customQueryFuncPtr.Store(nil)
return
}
m.customQueryFuncPtr.Store(&f)
}

func (m *CustomQuerySelfMetric) Name() MetricName {
Expand All @@ -50,9 +63,5 @@ func (m *CustomQuerySelfMetric) RequiresConn() bool {
}

func (m *CustomQuerySelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
query := ""
if val := m.customQuery.Load(); val != nil {
query = val.(string)
}
return ReadSelfMySQLThrottleMetric(ctx, conn, query)
return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery())
}
22 changes: 21 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/base/self_metric_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ limitations under the License.

package base

import (
"context"
"fmt"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var _ SelfMetric = registerSelfMetric(&DefaultSelfMetric{})

type DefaultSelfMetric struct {
LagSelfMetric
}

func (m *DefaultSelfMetric) Name() MetricName {
Expand All @@ -29,3 +35,17 @@ func (m *DefaultSelfMetric) Name() MetricName {
func (m *DefaultSelfMetric) DefaultScope() Scope {
return SelfScope
}

func (m *DefaultSelfMetric) DefaultThreshold() float64 {
return 0
}

func (m *DefaultSelfMetric) RequiresConn() bool {
return false
}

func (m *DefaultSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
return &ThrottleMetric{
Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"),
}
}
20 changes: 18 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/base/self_metric_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,34 @@ package base

import (
"context"
"sync/atomic"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var (
LagSelfMetricQuery = "show global status like 'threads_running'"
lagSelfMetricQueryBase = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat"
)

var _ SelfMetric = registerSelfMetric(&LagSelfMetric{})

type LagSelfMetric struct {
lagSelfMetricQuery atomic.Value
}

// SetQuery is only used by unit tests to override the query.
func (m *LagSelfMetric) SetQuery(query string) {
m.lagSelfMetricQuery.Store(query)
}

func (m *LagSelfMetric) GetQuery() string {
if query := m.lagSelfMetricQuery.Load(); query == nil {
m.lagSelfMetricQuery.Store(sqlparser.BuildParsedQuery(lagSelfMetricQueryBase, sidecar.GetIdentifier()).Query)
}
return m.lagSelfMetricQuery.Load().(string)
}

func (m *LagSelfMetric) Name() MetricName {
Expand All @@ -49,5 +65,5 @@ func (m *LagSelfMetric) RequiresConn() bool {
}

func (m *LagSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, LagSelfMetricQuery)
return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery())
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ package base
import (
"context"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var (
lagSelfMetricQueryBase = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat"
lagSelfMetricQuery = sqlparser.BuildParsedQuery(lagSelfMetricQueryBase, sidecar.GetIdentifier()).Query
threadsRunningMetricQuery = "show global status like 'threads_running'"
)

var _ SelfMetric = registerSelfMetric(&ThreadsRunningSelfMetric{})
Expand All @@ -51,5 +48,5 @@ func (m *ThreadsRunningSelfMetric) RequiresConn() bool {
}

func (m *ThreadsRunningSelfMetric) Read(ctx context.Context, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, lagSelfMetricQuery)
return ReadSelfMySQLThrottleMetric(ctx, conn, threadsRunningMetricQuery)
}
27 changes: 5 additions & 22 deletions go/vt/vttablet/tabletserver/throttle/config/mysql_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,15 @@ limitations under the License.

package config

import (
"sync/atomic"

"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
)

//
// MySQL-specific configuration
//

type MySQLMetricConfigurationSettings struct {
Name base.MetricName
CustomQuery string
Threshold atomic.Uint64
}

// MySQLConfigurationSettings has the general configuration for all MySQL clusters
type MySQLConfigurationSettings struct {
CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value
Port int // Specify if different than 3306; applies to all clusters
IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors
IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds
IgnoreHostsThreshold float64 // Threshold beyond which IgnoreHostsCount applies (default: 0)
HTTPCheckPort int // port for HTTP check. -1 to disable.
HTTPCheckPath string // If non-empty, requires HTTPCheckPort
IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped

Metrics map[base.MetricName]*MySQLMetricConfigurationSettings
CacheMillis int // optional, if defined then probe result will be cached, and future probes may use cached value
Port int // Specify if different than 3306; applies to all clusters
IgnoreDialTCPErrors bool // Skip hosts where a metric cannot be retrieved due to TCP dial errors
IgnoreHostsCount int // Number of hosts that can be skipped/ignored even on error or on exceeding thresholds
IgnoreHosts []string // If non empty, substrings to indicate hosts to be ignored/skipped
}
Loading

0 comments on commit 292adbb

Please sign in to comment.