From 48ba53a4bbcd3c10f6cdc84581b2e4e90e20a436 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 3 Jun 2024 17:34:56 +0200 Subject: [PATCH 1/3] `vtgate`: support filtering tablets by tablet-tags (#15911) Signed-off-by: Tim Vaillancourt Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + go/vt/discovery/healthcheck.go | 16 ++++++--- go/vt/discovery/topology_watcher.go | 42 ++++++++++++++++++++++++ go/vt/discovery/topology_watcher_test.go | 31 +++++++++++++++-- 5 files changed, 85 insertions(+), 6 deletions(-) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 93db5d6e8f5..ec6ae2f20d6 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -337,6 +337,7 @@ Flags: --structured-logging enable structured logging --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class --table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop") + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index f2519ddaedb..7f3dbca694c 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -194,6 +194,7 @@ Flags: --stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768) --structured-logging enable structured logging --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class + --tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch. --tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch. --tablet_grpc_ca string the server ca to use to validate servers when connecting --tablet_grpc_cert string the cert to use to connect diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index f37c9ad1d8b..95e08290d53 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -48,6 +48,7 @@ import ( "github.com/spf13/pflag" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -82,6 +83,9 @@ var ( // tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets. tabletFilters []string + // tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets. + tabletFilterTags flagutil.StringMapValue + // refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo. refreshInterval = 1 * time.Minute @@ -164,6 +168,7 @@ func init() { func registerDiscoveryFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.") + fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.") fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.") fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.") } @@ -337,13 +342,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filter TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) } for _, c := range cells { + var filters TabletFilters log.Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if err != nil { log.Exitf("Cannot parse tablet_filters parameter: %v", err) } - filter = fbs + filters = append(filters, fbs) } else if len(KeyspacesToWatch) > 0 { - filter = NewFilterByKeyspace(KeyspacesToWatch) + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0b69ecb6a63..64346d524ad 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -274,6 +274,19 @@ type TabletFilter interface { IsIncluded(tablet *topodata.Tablet) bool } +// TabletFilters contains filters for tablets. +type TabletFilters []TabletFilter + +// IsIncluded returns true if a tablet passes all filters. +func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { + for _, filter := range tf { + if !filter.IsIncluded(tablet) { + return false + } + } + return true +} + // FilterByShard is a filter that filters tablets by // keyspace/shard. type FilterByShard struct { @@ -375,3 +388,32 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist } + +// FilterByTabletTags is a filter that filters tablets by tablet tag key/values. +type FilterByTabletTags struct { + tags map[string]string +} + +// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match +// all tablet tags will be forwarded to the TopologyWatcher's consumer. +func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags { + return &FilterByTabletTags{ + tags: tabletTags, + } +} + +// IsIncluded returns true if the tablet's tags match what we expect. +func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool { + if fbtg.tags == nil { + return true + } + if tablet.Tags == nil { + return false + } + for key, val := range fbtg.tags { + if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val { + return false + } + } + return true +} diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 1dfdf946e5a..9de996564a0 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -393,7 +393,7 @@ func TestFilterByKeyspace(t *testing.T) { ctx := utils.LeakCheckContext(t) hc := NewFakeHealthCheck(nil) - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) @@ -476,7 +476,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { defer fhc.Close() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - f := NewFilterByKeyspace(testKeyspacesToWatch) + f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)} tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) @@ -578,6 +578,33 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } +func TestNewFilterByTabletTags(t *testing.T) { + // no required tags == true + filter := NewFilterByTabletTags(nil) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{})) + + tags := map[string]string{ + "instance_type": "i3.xlarge", + "some_key": "some_value", + } + filter = NewFilterByTabletTags(tags) + + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: nil, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{}, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{ + "instance_type": "i3.xlarge", + }, + })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: tags, + })) +} + func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { ctx := utils.LeakCheckContext(t) From 19844b201d2fa45f3df60164b5b34eeba5100e86 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 2 Jun 2024 11:33:28 +0200 Subject: [PATCH 2/3] Add support for sampling rate in `streamlog` (#15919) Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + go/flags/endtoend/vttablet.txt | 1 + go/streamlog/streamlog.go | 21 ++++++ go/streamlog/streamlog_test.go | 131 +++++++++++++++++++++++++++++++++ 5 files changed, 155 insertions(+) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index ec6ae2f20d6..9e968a1936b 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -271,6 +271,7 @@ Flags: --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. + --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables). --queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type --queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 7f3dbca694c..78c427766a1 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -172,6 +172,7 @@ Flags: --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. + --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --redact-debug-ui-queries redact full queries and bind variables from debug UI --remote_operation_timeout duration time to wait for a remote operation (default 15s) --retry-count int retry count (default 2) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 08d4266aeab..85f8c3d741c 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -262,6 +262,7 @@ Flags: --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. + --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables). --queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type --queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results diff --git a/go/streamlog/streamlog.go b/go/streamlog/streamlog.go index 6d9f81f98d9..40d33b840b4 100644 --- a/go/streamlog/streamlog.go +++ b/go/streamlog/streamlog.go @@ -20,6 +20,7 @@ package streamlog import ( "fmt" "io" + rand "math/rand/v2" "net/http" "net/url" "os" @@ -51,6 +52,7 @@ var ( queryLogFilterTag string queryLogRowThreshold uint64 queryLogFormat = "text" + queryLogSampleRate float64 ) func GetRedactDebugUIQueries() bool { @@ -69,6 +71,10 @@ func SetQueryLogRowThreshold(newQueryLogRowThreshold uint64) { queryLogRowThreshold = newQueryLogRowThreshold } +func SetQueryLogSampleRate(sampleRate float64) { + queryLogSampleRate = sampleRate +} + func GetQueryLogFormat() string { return queryLogFormat } @@ -96,6 +102,8 @@ func registerStreamLogFlags(fs *pflag.FlagSet) { // QueryLogRowThreshold only log queries returning or affecting this many rows fs.Uint64Var(&queryLogRowThreshold, "querylog-row-threshold", queryLogRowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.") + // QueryLogSampleRate causes a sample of queries to be logged + fs.Float64Var(&queryLogSampleRate, "querylog-sample-rate", queryLogSampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)") } const ( @@ -249,9 +257,22 @@ func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter { } } +// shouldSampleQuery returns true if a query should be sampled based on queryLogSampleRate +func shouldSampleQuery() bool { + if queryLogSampleRate <= 0 { + return false + } else if queryLogSampleRate >= 1 { + return true + } + return rand.Float64() <= queryLogSampleRate +} + // ShouldEmitLog returns whether the log with the given SQL query // should be emitted or filtered func ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64) bool { + if shouldSampleQuery() { + return true + } if queryLogRowThreshold > max(rowsAffected, rowsReturned) && queryLogFilterTag == "" { return false } diff --git a/go/streamlog/streamlog_test.go b/go/streamlog/streamlog_test.go index 17d3c4148e2..402b7ed816b 100644 --- a/go/streamlog/streamlog_test.go +++ b/go/streamlog/streamlog_test.go @@ -29,6 +29,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/servenv" ) @@ -260,3 +263,131 @@ func TestFile(t *testing.T) { t.Errorf("streamlog file: want %q got %q", want, got) } } + +func TestShouldSampleQuery(t *testing.T) { + queryLogSampleRate = -1 + assert.False(t, shouldSampleQuery()) + + queryLogSampleRate = 0 + assert.False(t, shouldSampleQuery()) + + // for test coverage, can't test a random result + queryLogSampleRate = 0.5 + shouldSampleQuery() + + queryLogSampleRate = 1.0 + assert.True(t, shouldSampleQuery()) + + queryLogSampleRate = 100.0 + assert.True(t, shouldSampleQuery()) +} + +func TestShouldEmitLog(t *testing.T) { + origQueryLogFilterTag := queryLogFilterTag + origQueryLogRowThreshold := queryLogRowThreshold + origQueryLogSampleRate := queryLogSampleRate + defer func() { + SetQueryLogFilterTag(origQueryLogFilterTag) + SetQueryLogRowThreshold(origQueryLogRowThreshold) + SetQueryLogSampleRate(origQueryLogSampleRate) + }() + + tests := []struct { + sql string + qLogFilterTag string + qLogRowThreshold uint64 + qLogSampleRate float64 + rowsAffected uint64 + rowsReturned uint64 + ok bool + }{ + { + sql: "queryLogThreshold smaller than affected and returned", + qLogFilterTag: "", + qLogRowThreshold: 2, + qLogSampleRate: 0.0, + rowsAffected: 7, + rowsReturned: 7, + ok: true, + }, + { + sql: "queryLogThreshold greater than affected and returned", + qLogFilterTag: "", + qLogRowThreshold: 27, + qLogSampleRate: 0.0, + rowsAffected: 7, + rowsReturned: 17, + ok: false, + }, + { + sql: "this doesn't contains queryFilterTag: TAG", + qLogFilterTag: "special tag", + qLogRowThreshold: 10, + qLogSampleRate: 0.0, + rowsAffected: 7, + rowsReturned: 17, + ok: false, + }, + { + sql: "this contains queryFilterTag: TAG", + qLogFilterTag: "TAG", + qLogRowThreshold: 0, + qLogSampleRate: 0.0, + rowsAffected: 7, + rowsReturned: 17, + ok: true, + }, + { + sql: "this contains querySampleRate: 1.0", + qLogFilterTag: "", + qLogRowThreshold: 0, + qLogSampleRate: 1.0, + rowsAffected: 7, + rowsReturned: 17, + ok: true, + }, + { + sql: "this contains querySampleRate: 1.0 without expected queryFilterTag", + qLogFilterTag: "TAG", + qLogRowThreshold: 0, + qLogSampleRate: 1.0, + rowsAffected: 7, + rowsReturned: 17, + ok: true, + }, + } + + for _, tt := range tests { + t.Run(tt.sql, func(t *testing.T) { + SetQueryLogFilterTag(tt.qLogFilterTag) + SetQueryLogRowThreshold(tt.qLogRowThreshold) + SetQueryLogSampleRate(tt.qLogSampleRate) + + require.Equal(t, tt.ok, ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned)) + }) + } +} + +func BenchmarkShouldEmitLog(b *testing.B) { + b.Run("default", func(b *testing.B) { + SetQueryLogSampleRate(0.0) + for i := 0; i < b.N; i++ { + ShouldEmitLog("select * from test where user='someone'", 0, 123) + } + }) + b.Run("filter_tag", func(b *testing.B) { + SetQueryLogSampleRate(0.0) + SetQueryLogFilterTag("LOG_QUERY") + defer SetQueryLogFilterTag("") + for i := 0; i < b.N; i++ { + ShouldEmitLog("select /* LOG_QUERY=1 */ * from test where user='someone'", 0, 123) + } + }) + b.Run("50%_sample_rate", func(b *testing.B) { + SetQueryLogSampleRate(0.5) + defer SetQueryLogSampleRate(0.0) + for i := 0; i < b.N; i++ { + ShouldEmitLog("select * from test where user='someone'", 0, 123) + } + }) +} From e7560e09c67444f4773ce9264128508d42059685 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 4 Jun 2024 06:43:26 +0200 Subject: [PATCH 3/3] Add sql text counts stats to `vtcombo`,`vtgate`+`vttablet` (#15897) Signed-off-by: Tim Vaillancourt Signed-off-by: Harshit Gangal Co-authored-by: Harshit Gangal Co-authored-by: Deepthi Sigireddi --- go/vt/vtgate/plugin_mysql_server_test.go | 4 +- go/vt/vtgate/vtgate.go | 30 ++++++++----- go/vt/vttablet/tabletserver/query_engine.go | 16 ++++--- .../tabletserver/query_engine_test.go | 45 ++++++++++++++----- go/vt/vttablet/tabletserver/query_executor.go | 4 +- 5 files changed, 67 insertions(+), 32 deletions(-) diff --git a/go/vt/vtgate/plugin_mysql_server_test.go b/go/vt/vtgate/plugin_mysql_server_test.go index 5da79b9fe17..ceb4cea0d42 100644 --- a/go/vt/vtgate/plugin_mysql_server_test.go +++ b/go/vt/vtgate/plugin_mysql_server_test.go @@ -349,7 +349,7 @@ func TestKillMethods(t *testing.T) { func TestGracefulShutdown(t *testing.T) { executor, _, _, _, _ := createExecutorEnv(t) - vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected}) + vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected, queryTextCharsProcessed: queryTextCharsProcessed}) th := &testHandler{} listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0, 0) require.NoError(t, err) @@ -379,7 +379,7 @@ func TestGracefulShutdown(t *testing.T) { func TestGracefulShutdownWithTransaction(t *testing.T) { executor, _, _, _, _ := createExecutorEnv(t) - vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected}) + vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected, queryTextCharsProcessed: queryTextCharsProcessed}) th := &testHandler{} listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0, 0) require.NoError(t, err) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index c8c0df217af..fa895f10913 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -208,6 +208,11 @@ var ( "VtgateApiRowsAffected", "Rows affected by a write (DML) operation through the VTgate API", []string{"Operation", "Keyspace", "DbType"}) + + queryTextCharsProcessed = stats.NewCountersWithMultiLabels( + "VtgateQueryTextCharactersProcessed", + "Query text characters processed through the VTGate API", + []string{"Operation", "Keyspace", "DbType"}) ) // VTGate is the rpc interface to vtgate. Only one instance @@ -224,9 +229,10 @@ type VTGate struct { // stats objects. // TODO(sougou): This needs to be cleaned up. There // are global vars that depend on this member var. - timings *stats.MultiTimings - rowsReturned *stats.CountersWithMultiLabels - rowsAffected *stats.CountersWithMultiLabels + timings *stats.MultiTimings + rowsReturned *stats.CountersWithMultiLabels + rowsAffected *stats.CountersWithMultiLabels + queryTextCharsProcessed *stats.CountersWithMultiLabels // the throttled loggers for all errors, one per API entry logExecute *logutil.ThrottledLogger @@ -458,6 +464,7 @@ func (vtg *VTGate) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn if err == nil { vtg.rowsReturned.Add(statsKey, int64(len(qr.Rows))) vtg.rowsAffected.Add(statsKey, int64(qr.RowsAffected)) + vtg.queryTextCharsProcessed.Add(statsKey, int64(len(sql))) return session, qr, nil } @@ -668,14 +675,15 @@ func (vtg *VTGate) HandlePanic(err *error) { func newVTGate(executor *Executor, resolver *Resolver, vsm *vstreamManager, tc *TxConn, gw *TabletGateway) *VTGate { return &VTGate{ - executor: executor, - resolver: resolver, - vsm: vsm, - txConn: tc, - gw: gw, - timings: timings, - rowsReturned: rowsReturned, - rowsAffected: rowsAffected, + executor: executor, + resolver: resolver, + vsm: vsm, + txConn: tc, + gw: gw, + timings: timings, + rowsReturned: rowsReturned, + rowsAffected: rowsAffected, + queryTextCharsProcessed: queryTextCharsProcessed, logExecute: logutil.NewThrottledLogger("Execute", 5*time.Second), logPrepare: logutil.NewThrottledLogger("Prepare", 5*time.Second), diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index b87c5cf97ba..c1303e68f9e 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -187,7 +187,7 @@ type QueryEngine struct { // stats // Note: queryErrorCountsWithCode is similar to queryErrorCounts except it contains error code as an additional dimension - queryCounts, queryCountsWithTabletType, queryTimes, queryErrorCounts, queryErrorCountsWithCode, queryRowsAffected, queryRowsReturned *stats.CountersWithMultiLabels + queryCounts, queryCountsWithTabletType, queryTimes, queryErrorCounts, queryErrorCountsWithCode, queryRowsAffected, queryRowsReturned, queryTextCharsProcessed *stats.CountersWithMultiLabels // stats flags enablePerWorkloadTableMetrics bool @@ -291,6 +291,7 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", labels) qe.queryRowsAffected = env.Exporter().NewCountersWithMultiLabels("QueryRowsAffected", "query rows affected", labels) qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", labels) + qe.queryTextCharsProcessed = env.Exporter().NewCountersWithMultiLabels("QueryTextCharactersProcessed", "query text characters processed", labels) qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", labels) qe.queryErrorCountsWithCode = env.Exporter().NewCountersWithMultiLabels("QueryErrorCountsWithCode", "query error counts with error code", []string{"Table", "Plan", "Code"}) @@ -552,9 +553,9 @@ func (qe *QueryEngine) QueryPlanCacheLen() (count int) { } // AddStats adds the given stats for the planName.tableName -func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName, workload string, tabletType topodata.TabletType, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64, errorCode string) { +func (qe *QueryEngine) AddStats(plan *TabletPlan, tableName, workload string, tabletType topodata.TabletType, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64, errorCode string) { // table names can contain "." characters, replace them! - keys := []string{tableName, planType.String()} + keys := []string{tableName, plan.PlanID.String()} // Only use the workload as a label if that's enabled in the configuration. if qe.enablePerWorkloadTableMetrics { keys = append(keys, workload) @@ -562,20 +563,23 @@ func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName, worklo qe.queryCounts.Add(keys, queryCount) qe.queryTimes.Add(keys, int64(duration)) qe.queryErrorCounts.Add(keys, errorCount) + if plan.FullQuery != nil { + qe.queryTextCharsProcessed.Add(keys, int64(len(plan.FullQuery.Query))) + } - qe.queryCountsWithTabletType.Add([]string{tableName, planType.String(), tabletType.String()}, queryCount) + qe.queryCountsWithTabletType.Add([]string{tableName, plan.PlanID.String(), tabletType.String()}, queryCount) // queryErrorCountsWithCode is similar to queryErrorCounts except we have an additional dimension // of error code. if errorCount > 0 { - errorKeys := []string{tableName, planType.String(), errorCode} + errorKeys := []string{tableName, plan.PlanID.String(), errorCode} qe.queryErrorCountsWithCode.Add(errorKeys, errorCount) } // For certain plan types like select, we only want to add their metrics to rows returned // But there are special cases like `SELECT ... INTO OUTFILE ''` which return positive rows affected // So we check if it is positive and add that too. - switch planType { + switch plan.PlanID { case planbuilder.PlanSelect, planbuilder.PlanSelectStream, planbuilder.PlanSelectImpossible, planbuilder.PlanShow, planbuilder.PlanOtherRead: qe.queryRowsReturned.Add(keys, rowsReturned) if rowsAffected > 0 { diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 8dbe18ef13c..44b58e3a336 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -619,9 +619,21 @@ func TestPlanCachePollution(t *testing.T) { } func TestAddQueryStats(t *testing.T) { + fakeSelectPlan := &TabletPlan{ + Plan: &planbuilder.Plan{ + PlanID: planbuilder.PlanSelect, + FullQuery: &sqlparser.ParsedQuery{Query: `select * from something where something=123`}, // 43 length + }, + } + fakeInsertPlan := &TabletPlan{ + Plan: &planbuilder.Plan{ + PlanID: planbuilder.PlanInsert, + FullQuery: &sqlparser.ParsedQuery{Query: `insert into something (id, msg) values(123, 'hello world!')`}, // 59 length + }, + } testcases := []struct { name string - planType planbuilder.PlanType + plan *TabletPlan tableName string tabletType topodata.TabletType queryCount int64 @@ -638,12 +650,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes string expectedQueryRowsAffected string expectedQueryRowsReturned string + expectedQueryTextCharsProcessed string expectedQueryErrorCounts string expectedQueryErrorCountsWithCode string }{ { name: "select query", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -658,12 +671,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select": 10}`, expectedQueryRowsAffected: `{}`, expectedQueryRowsReturned: `{"A.Select": 15}`, + expectedQueryTextCharsProcessed: `{"A.Select": 43}`, expectedQueryErrorCounts: `{"A.Select": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "select query against a replica", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_REPLICA, queryCount: 1, @@ -678,12 +692,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select": 10}`, expectedQueryRowsAffected: `{}`, expectedQueryRowsReturned: `{"A.Select": 15}`, + expectedQueryTextCharsProcessed: `{"A.Select": 43}`, expectedQueryErrorCounts: `{"A.Select": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Select.REPLICA": 1}`, }, { name: "select into query", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -698,12 +713,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select": 10}`, expectedQueryRowsAffected: `{"A.Select": 15}`, expectedQueryRowsReturned: `{"A.Select": 0}`, + expectedQueryTextCharsProcessed: `{"A.Select": 43}`, expectedQueryErrorCounts: `{"A.Select": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "error", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -718,12 +734,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select": 10}`, expectedQueryRowsAffected: `{}`, expectedQueryRowsReturned: `{"A.Select": 0}`, + expectedQueryTextCharsProcessed: `{"A.Select": 43}`, expectedQueryErrorCounts: `{"A.Select": 1}`, expectedQueryErrorCountsWithCode: `{"A.Select.RESOURCE_EXHAUSTED": 1}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "insert query", - planType: planbuilder.PlanInsert, + plan: fakeInsertPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -738,12 +755,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Insert": 10}`, expectedQueryRowsAffected: `{"A.Insert": 15}`, expectedQueryRowsReturned: `{}`, + expectedQueryTextCharsProcessed: `{"A.Insert": 59}`, expectedQueryErrorCounts: `{"A.Insert": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Insert.PRIMARY": 1}`, }, { name: "select query with per workload metrics", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -758,12 +776,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select.some-workload": 10}`, expectedQueryRowsAffected: `{}`, expectedQueryRowsReturned: `{"A.Select.some-workload": 15}`, + expectedQueryTextCharsProcessed: `{"A.Select.some-workload": 43}`, expectedQueryErrorCounts: `{"A.Select.some-workload": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "select into query with per workload metrics", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -778,12 +797,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select.some-workload": 10}`, expectedQueryRowsAffected: `{"A.Select.some-workload": 15}`, expectedQueryRowsReturned: `{"A.Select.some-workload": 0}`, + expectedQueryTextCharsProcessed: `{"A.Select.some-workload": 43}`, expectedQueryErrorCounts: `{"A.Select.some-workload": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "error with per workload metrics", - planType: planbuilder.PlanSelect, + plan: fakeSelectPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -798,12 +818,13 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Select.some-workload": 10}`, expectedQueryRowsAffected: `{}`, expectedQueryRowsReturned: `{"A.Select.some-workload": 0}`, + expectedQueryTextCharsProcessed: `{"A.Select.some-workload": 43}`, expectedQueryErrorCounts: `{"A.Select.some-workload": 1}`, expectedQueryErrorCountsWithCode: `{"A.Select.RESOURCE_EXHAUSTED": 1}`, expectedQueryCountsWithTableType: `{"A.Select.PRIMARY": 1}`, }, { name: "insert query with per workload metrics", - planType: planbuilder.PlanInsert, + plan: fakeInsertPlan, tableName: "A", tabletType: topodata.TabletType_PRIMARY, queryCount: 1, @@ -818,6 +839,7 @@ func TestAddQueryStats(t *testing.T) { expectedQueryTimes: `{"A.Insert.some-workload": 10}`, expectedQueryRowsAffected: `{"A.Insert.some-workload": 15}`, expectedQueryRowsReturned: `{}`, + expectedQueryTextCharsProcessed: `{"A.Insert.some-workload": 59}`, expectedQueryErrorCounts: `{"A.Insert.some-workload": 0}`, expectedQueryErrorCountsWithCode: `{}`, expectedQueryCountsWithTableType: `{"A.Insert.PRIMARY": 1}`, @@ -833,12 +855,13 @@ func TestAddQueryStats(t *testing.T) { env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestAddQueryStats_"+testcase.name) se := schema.NewEngine(env) qe := NewQueryEngine(env, se) - qe.AddStats(testcase.planType, testcase.tableName, testcase.workload, testcase.tabletType, testcase.queryCount, testcase.duration, testcase.mysqlTime, testcase.rowsAffected, testcase.rowsReturned, testcase.errorCount, testcase.errorCode) + qe.AddStats(testcase.plan, testcase.tableName, testcase.workload, testcase.tabletType, testcase.queryCount, testcase.duration, testcase.mysqlTime, testcase.rowsAffected, testcase.rowsReturned, testcase.errorCount, testcase.errorCode) assert.Equal(t, testcase.expectedQueryCounts, qe.queryCounts.String()) assert.Equal(t, testcase.expectedQueryCountsWithTableType, qe.queryCountsWithTabletType.String()) assert.Equal(t, testcase.expectedQueryTimes, qe.queryTimes.String()) assert.Equal(t, testcase.expectedQueryRowsAffected, qe.queryRowsAffected.String()) assert.Equal(t, testcase.expectedQueryRowsReturned, qe.queryRowsReturned.String()) + assert.Equal(t, testcase.expectedQueryTextCharsProcessed, qe.queryTextCharsProcessed.String()) assert.Equal(t, testcase.expectedQueryErrorCounts, qe.queryErrorCounts.String()) assert.Equal(t, testcase.expectedQueryErrorCountsWithCode, qe.queryErrorCountsWithCode.String()) }) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 844ce753152..15a9cb0e25d 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -137,12 +137,12 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { errCode = vtErrorCode.String() if reply == nil { - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, 0, 0, 1, errCode) + qre.tsv.qe.AddStats(qre.plan, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, 0, 0, 1, errCode) qre.plan.AddStats(1, duration, mysqlTime, 0, 0, 1) return } - qre.tsv.qe.AddStats(qre.plan.PlanID, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0, errCode) + qre.tsv.qe.AddStats(qre.plan, tableName, qre.options.GetWorkloadName(), qre.targetTabletType, 1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0, errCode) qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows