Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-19.0: v20 backports, pt. 2 #459

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
--queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type
--queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results
Expand Down Expand Up @@ -337,6 +338,7 @@ Flags:
--structured-logging enable structured logging
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop")
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--retry-count int retry count (default 2)
Expand All @@ -194,6 +195,7 @@ Flags:
--stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768)
--structured-logging enable structured logging
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_grpc_ca string the server ca to use to validate servers when connecting
--tablet_grpc_cert string the cert to use to connect
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
--queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type
--queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results
Expand Down
21 changes: 21 additions & 0 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package streamlog
import (
"fmt"
"io"
rand "math/rand/v2"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -51,6 +52,7 @@ var (
queryLogFilterTag string
queryLogRowThreshold uint64
queryLogFormat = "text"
queryLogSampleRate float64
)

func GetRedactDebugUIQueries() bool {
Expand All @@ -69,6 +71,10 @@ func SetQueryLogRowThreshold(newQueryLogRowThreshold uint64) {
queryLogRowThreshold = newQueryLogRowThreshold
}

func SetQueryLogSampleRate(sampleRate float64) {
queryLogSampleRate = sampleRate
}

func GetQueryLogFormat() string {
return queryLogFormat
}
Expand Down Expand Up @@ -96,6 +102,8 @@ func registerStreamLogFlags(fs *pflag.FlagSet) {
// QueryLogRowThreshold only log queries returning or affecting this many rows
fs.Uint64Var(&queryLogRowThreshold, "querylog-row-threshold", queryLogRowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.")

// QueryLogSampleRate causes a sample of queries to be logged
fs.Float64Var(&queryLogSampleRate, "querylog-sample-rate", queryLogSampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)")
}

const (
Expand Down Expand Up @@ -249,9 +257,22 @@ func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter {
}
}

// shouldSampleQuery returns true if a query should be sampled based on queryLogSampleRate
func shouldSampleQuery() bool {
if queryLogSampleRate <= 0 {
return false
} else if queryLogSampleRate >= 1 {
return true
}
return rand.Float64() <= queryLogSampleRate
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64) bool {
if shouldSampleQuery() {
return true
}
if queryLogRowThreshold > max(rowsAffected, rowsReturned) && queryLogFilterTag == "" {
return false
}
Expand Down
131 changes: 131 additions & 0 deletions go/streamlog/streamlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -260,3 +263,131 @@ func TestFile(t *testing.T) {
t.Errorf("streamlog file: want %q got %q", want, got)
}
}

func TestShouldSampleQuery(t *testing.T) {
queryLogSampleRate = -1
assert.False(t, shouldSampleQuery())

queryLogSampleRate = 0
assert.False(t, shouldSampleQuery())

// for test coverage, can't test a random result
queryLogSampleRate = 0.5
shouldSampleQuery()

queryLogSampleRate = 1.0
assert.True(t, shouldSampleQuery())

queryLogSampleRate = 100.0
assert.True(t, shouldSampleQuery())
}

func TestShouldEmitLog(t *testing.T) {
origQueryLogFilterTag := queryLogFilterTag
origQueryLogRowThreshold := queryLogRowThreshold
origQueryLogSampleRate := queryLogSampleRate
defer func() {
SetQueryLogFilterTag(origQueryLogFilterTag)
SetQueryLogRowThreshold(origQueryLogRowThreshold)
SetQueryLogSampleRate(origQueryLogSampleRate)
}()

tests := []struct {
sql string
qLogFilterTag string
qLogRowThreshold uint64
qLogSampleRate float64
rowsAffected uint64
rowsReturned uint64
ok bool
}{
{
sql: "queryLogThreshold smaller than affected and returned",
qLogFilterTag: "",
qLogRowThreshold: 2,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 7,
ok: true,
},
{
sql: "queryLogThreshold greater than affected and returned",
qLogFilterTag: "",
qLogRowThreshold: 27,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: false,
},
{
sql: "this doesn't contains queryFilterTag: TAG",
qLogFilterTag: "special tag",
qLogRowThreshold: 10,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: false,
},
{
sql: "this contains queryFilterTag: TAG",
qLogFilterTag: "TAG",
qLogRowThreshold: 0,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
{
sql: "this contains querySampleRate: 1.0",
qLogFilterTag: "",
qLogRowThreshold: 0,
qLogSampleRate: 1.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
{
sql: "this contains querySampleRate: 1.0 without expected queryFilterTag",
qLogFilterTag: "TAG",
qLogRowThreshold: 0,
qLogSampleRate: 1.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
}

for _, tt := range tests {
t.Run(tt.sql, func(t *testing.T) {
SetQueryLogFilterTag(tt.qLogFilterTag)
SetQueryLogRowThreshold(tt.qLogRowThreshold)
SetQueryLogSampleRate(tt.qLogSampleRate)

require.Equal(t, tt.ok, ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned))
})
}
}

func BenchmarkShouldEmitLog(b *testing.B) {
b.Run("default", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
b.Run("filter_tag", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
SetQueryLogFilterTag("LOG_QUERY")
defer SetQueryLogFilterTag("")
for i := 0; i < b.N; i++ {
ShouldEmitLog("select /* LOG_QUERY=1 */ * from test where user='someone'", 0, 123)
}
})
b.Run("50%_sample_rate", func(b *testing.B) {
SetQueryLogSampleRate(0.5)
defer SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
}
16 changes: 12 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -82,6 +83,9 @@ var (
// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets.
tabletFilters []string

// tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets.
tabletFilterTags flagutil.StringMapValue

// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo.
refreshInterval = 1 * time.Minute

Expand Down Expand Up @@ -164,6 +168,7 @@ func init() {

func registerDiscoveryFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.")
fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.")
fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.")
fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.")
}
Expand Down Expand Up @@ -337,13 +342,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
loadTabletsTrigger: make(chan struct{}),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
Expand All @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
42 changes: 42 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ type TabletFilter interface {
IsIncluded(tablet *topodata.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// FilterByShard is a filter that filters tablets by
// keyspace/shard.
type FilterByShard struct {
Expand Down Expand Up @@ -375,3 +388,32 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}

// FilterByTabletTags is a filter that filters tablets by tablet tag key/values.
type FilterByTabletTags struct {
tags map[string]string
}

// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match
// all tablet tags will be forwarded to the TopologyWatcher's consumer.
func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
return &FilterByTabletTags{
tags: tabletTags,
}
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
if fbtg.tags == nil {
return true
}
if tablet.Tags == nil {
return false
}
for key, val := range fbtg.tags {
if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val {
return false
}
}
return true
}
Loading
Loading