Skip to content

Commit

Permalink
Add the following metrics to vttablet
Browse files Browse the repository at this point in the history
* IndexBytes
* IndexCardinality
* TableClusteredIndexSize
* TableRows

Signed-off-by: Rafer Hazen <rafer@ralua.com>
  • Loading branch information
rafer committed Jan 18, 2025
1 parent eaaa206 commit 64b606d
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 5 deletions.
24 changes: 24 additions & 0 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,30 @@ func TestEngineReload(t *testing.T) {
})
}

func TestUpdateTableIndexMetrics(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
defer conn.Close()

if query := conn.BaseShowInnodbTableSizes(); query == "" {
t.Skip("additional table/index metrics not updated in this version of MySQL")
}
vars := framework.DebugVars()

require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_a"))
require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_part"))

require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_a"))
require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_part"))

require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_a.PRIMARY"))
require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_part.PRIMARY"))

require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_a.PRIMARY"))
require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_part.PRIMARY"))
}

// TestTuple tests that bind variables having tuple values work with vttablet.
func TestTuple(t *testing.T) {
client := framework.NewClient()
Expand Down
174 changes: 169 additions & 5 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
)

const maxTableCount = 10000
const maxPartitionsPerTable = 8192
const maxIndexesPerTable = 64

type notifier func(full map[string]*Table, created, altered, dropped []*Table, udfsChanged bool)

Expand Down Expand Up @@ -95,10 +97,16 @@ type Engine struct {
// dbCreationFailed is for preventing log spam.
dbCreationFailed bool

tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
innoDbReadRowsCounter *stats.Counter
SchemaReloadTimings *servenv.TimingsWrapper
tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
tableRowsGauge *stats.GaugesWithSingleLabel
tableClusteredIndexSizeGauge *stats.GaugesWithSingleLabel

indexCardinalityGauge *stats.GaugesWithMultiLabels
indexBytesGauge *stats.GaugesWithMultiLabels

innoDbReadRowsCounter *stats.Counter
SchemaReloadTimings *servenv.TimingsWrapper
}

// NewEngine creates a new Engine.
Expand All @@ -119,6 +127,10 @@ func NewEngine(env tabletenv.Env) *Engine {
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table")
se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table")
se.tableRowsGauge = env.Exporter().NewGaugesWithSingleLabel("TableRows", "the estimated number of rows in the table", "Table")
se.tableClusteredIndexSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableClusteredIndexSize", "the byte size of the clustered index (i.e. row data)", "Table")
se.indexCardinalityGauge = env.Exporter().NewGaugesWithMultiLabels("IndexCardinality", "estimated number of unique values in the index", []string{"Table", "Index"})
se.indexBytesGauge = env.Exporter().NewGaugesWithMultiLabels("IndexBytes", "byte size of the the index", []string{"Table", "Index"})
se.innoDbReadRowsCounter = env.Exporter().NewCounter("InnodbRowsRead", "number of rows read by mysql")
se.SchemaReloadTimings = env.Exporter().NewTimings("SchemaReload", "time taken to reload the schema", "type")
se.reloadTimeout = env.Config().SchemaChangeReloadTimeout
Expand Down Expand Up @@ -432,7 +444,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
// We therefore don't want to query for table sizes in getTableData()
includeStats = false

innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount, false)
innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount*maxPartitionsPerTable, false)
if err != nil {
return vterrors.Wrapf(err, "in Engine.reload(), reading innodb tables")
}
Expand Down Expand Up @@ -466,8 +478,12 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {
}
}
}
if err := se.updateTableIndexMetrics(ctx, conn.Conn); err != nil {
log.Errorf("Updating index/table statistics failed, error: %v", err)
}
// See testing in TestEngineReload
}

}
tableData, err := getTableData(ctx, conn.Conn, includeStats)
if err != nil {
Expand Down Expand Up @@ -689,6 +705,154 @@ func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.Conn)
return nil
}

func (se *Engine) updateTableIndexMetrics(ctx context.Context, conn *connpool.Conn) error {
// Load all partitions so that we can extract the base table name from tables given as "TABLE#p#PARTITION"
type partition struct {
table string
partition string
}

partitionsQuery := `
select table_name, partition_name
from information_schema.partitions
where table_schema = database() and partition_name is not null
`
partitionsResults, err := conn.Exec(ctx, partitionsQuery, 8192*maxTableCount, false)
if err != nil {
return err
}
partitions := make(map[string]partition)
for _, row := range partitionsResults.Rows {
p := partition{
table: row[0].ToString(),
partition: row[1].ToString(),
}
key := p.table + "#p#" + p.partition
partitions[key] = p
}

// Load table row counts and clustered index sizes. Results contain one row for every partition
type table struct {
table string
rows int64
rowBytes int64
}
tables := make(map[string]table)
tableStatsQuery := `
select table_name, n_rows, clustered_index_size * @@innodb_page_size
from mysql.innodb_table_stats where database_name = database()
`
tableStatsResults, err := conn.Exec(ctx, tableStatsQuery, maxTableCount*maxPartitionsPerTable, false)
if err != nil {
return err
}
for _, row := range tableStatsResults.Rows {
tableName := row[0].ToString()
rowCount, _ := row[1].ToInt64()
rowsBytes, _ := row[2].ToInt64()

if strings.Contains(tableName, "#p#") {
if partition, ok := partitions[tableName]; ok {
tableName = partition.table
}
}

t, ok := tables[tableName]
if !ok {
t = table{table: tableName}
}
t.rows += rowCount
t.rowBytes += rowsBytes
tables[tableName] = t
}

type index struct {
table string
index string
bytes int64
cardinality int64
}
indexes := make(map[[2]string]index)

// Load the byte sizes of all indexes. Results contain one row for every index/partition combination.
bytesQuery := `
select table_name, index_name, stat_value * @@innodb_page_size
from mysql.innodb_index_stats
where database_name = database() and stat_name = 'size';
`
bytesResults, err := conn.Exec(ctx, bytesQuery, maxTableCount*maxIndexesPerTable, false)
if err != nil {
return err
}
for _, row := range bytesResults.Rows {
tableName := row[0].ToString()
indexName := row[1].ToString()
indexBytes, _ := row[2].ToInt64()

if strings.Contains(tableName, "#p#") {
if partition, ok := partitions[tableName]; ok {
tableName = partition.table
}
}

key := [2]string{tableName, indexName}
idx, ok := indexes[key]
if !ok {
idx = index{
table: tableName,
index: indexName,
}
}
idx.bytes += indexBytes
indexes[key] = idx
}

// Load index cardinalities. Results contain one row for every index (pre-aggregated across partitions).
cardinalityQuery := `
select table_name, index_name, max(cardinality)
from information_schema.statistics s
where table_schema = database()
group by s.table_name, s.index_name
`
cardinalityResults, err := conn.Exec(ctx, cardinalityQuery, maxTableCount*maxPartitionsPerTable, false)
if err != nil {
return err
}
for _, row := range cardinalityResults.Rows {
tableName := row[0].ToString()
indexName := row[1].ToString()
cardinality, _ := row[2].ToInt64()

key := [2]string{tableName, indexName}
idx, ok := indexes[key]
if !ok {
idx = index{
table: tableName,
index: indexName,
}
}
idx.cardinality = cardinality
indexes[key] = idx
}

se.indexBytesGauge.ResetAll()
se.indexCardinalityGauge.ResetAll()
for _, idx := range indexes {
key := []string{idx.table, idx.index}
se.indexBytesGauge.Set(key, idx.bytes)
se.indexCardinalityGauge.Set(key, idx.cardinality)
}

se.tableRowsGauge.ResetAll()
se.tableClusteredIndexSizeGauge.ResetAll()
for _, tbl := range tables {
se.tableRowsGauge.Set(tbl.table, tbl.rows)
se.tableClusteredIndexSizeGauge.Set(tbl.table, tbl.rowBytes)
}

return nil
}

func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.Conn) (int64, error) {
// Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so.
tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false)
Expand Down

0 comments on commit 64b606d

Please sign in to comment.