Skip to content

Commit

Permalink
Add schema tracking support for UDFs (#15705)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
Signed-off-by: Andres Taylor <andres@planetscale.com>
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent 9e40015 commit 6b25965
Show file tree
Hide file tree
Showing 33 changed files with 813 additions and 265 deletions.
10 changes: 10 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [Update with Multi Target Support](#update-multi-target)
- [Delete with Subquery Support](#delete-subquery)
- [Delete with Multi Target Support](#delete-multi-target)
- [User Defined Functions Support](#udf-support)
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
Expand Down Expand Up @@ -183,6 +184,15 @@ Example: `delete t1, t3 from t1 join t2 on t1.id = t2.id join t3 on t1.col = t3.

More details about how it works is available in [MySQL Docs](https://dev.mysql.com/doc/refman/8.0/en/delete.html)

#### <a id="udf-support"/> User Defined Functions Support

VTGate can track any user defined functions for better planning.
User Defined Functions (UDFs) should be directly loaded in the underlying MySQL.

It should be enabled in VTGate with the `--enable-udfs` flag.

More details about how to load UDFs is available in [MySQL Docs](https://dev.mysql.com/doc/extending-mysql/8.0/en/adding-loadable-function.html)

### <a id="flag-changes"/>Flag Changes

#### <a id="pprof-http-default"/> `pprof-http` Default Change
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Flags:
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload
--enable-tx-throttler Synonym to -enable_tx_throttler
--enable-udfs Enable UDFs support in vtgate.
--enable-views Enable views support in vtgate.
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Flags:
--discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s)
--emit_stats If set, emit stats to push-based monitoring and stats backends
--enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)
--enable-udfs Enable UDFs support in vtgate.
--enable-views Enable views support in vtgate.
--enable_buffer Enable buffering (stalling) of primary traffic during failovers.
--enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests.
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ var numSidecarDBTables int
var ddls1, ddls2 []string

func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action", "redo_state",
"redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "tables",
"vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
"drop table _vt.vreplication_log",
Expand Down
306 changes: 161 additions & 145 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions go/vt/sidecardb/schema/schemaengine/udfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS udfs
(
FUNCTION_NAME varchar(128) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
FUNCTION_RETURN_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
FUNCTION_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL,
PRIMARY KEY (FUNCTION_NAME)
) engine = InnoDB
72 changes: 65 additions & 7 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package schema
import (
"context"
"maps"
"slices"
"strings"
"sync"
"time"
Expand All @@ -45,6 +46,7 @@ type (
mu sync.Mutex
tables *tableMap
views *viewMap
udfs map[keyspaceStr][]string
ctx context.Context
signal func() // a function that we'll call whenever we have new schema data

Expand All @@ -60,7 +62,7 @@ type (
const defaultConsumeDelay = 1 * time.Second

// NewTracker creates the tracker object.
func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlparser.Parser) *Tracker {
func NewTracker(ch chan *discovery.TabletHealth, enableViews, enableUDFs bool, parser *sqlparser.Parser) *Tracker {
t := &Tracker{
ctx: context.Background(),
ch: ch,
Expand All @@ -73,6 +75,9 @@ func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlpa
if enableViews {
t.views = &viewMap{m: map[keyspaceStr]map[viewNameStr]sqlparser.SelectStatement{}, parser: parser}
}
if enableUDFs {
t.udfs = map[keyspaceStr][]string{}
}
return t
}

Expand All @@ -86,6 +91,10 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
if err != nil {
return err
}
err = t.loadUDFs(conn, target)
if err != nil {
return err
}

t.tracked[target.Keyspace].setLoaded(true)
return nil
Expand Down Expand Up @@ -146,6 +155,32 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ
return nil
}

func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Target) error {
if t.udfs == nil {
// This happens only when UDFs are not enabled.
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDF_AGGREGATE, nil, func(schemaRes *querypb.GetSchemaResponse) error {
var udfs []string
for name := range schemaRes.TableDefinition {
udfs = append(udfs, name)
}

t.udfs[target.Keyspace] = udfs
return nil
})
if err != nil {
log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err)
return err
}
log.Infof("finished loading UDFs for keyspace %s", target.Keyspace)
return nil
}

// Start starts the schema tracking.
func (t *Tracker) Start() {
log.Info("Starting schema tracking")
Expand Down Expand Up @@ -208,6 +243,9 @@ func (t *Tracker) GetColumns(ks string, tbl string) []vindexes.Column {
defer t.mu.Unlock()

tblInfo := t.tables.get(ks, tbl)
if tblInfo == nil {
return nil
}
return tblInfo.Columns
}

Expand Down Expand Up @@ -244,27 +282,47 @@ func (t *Tracker) Tables(ks string) map[string]*vindexes.TableInfo {

// Views returns all known views in the keyspace with their definition.
func (t *Tracker) Views(ks string) map[string]sqlparser.SelectStatement {
t.mu.Lock()
defer t.mu.Unlock()

if t.views == nil {
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

m := t.views.m[ks]
return maps.Clone(m)
}

func (t *Tracker) UDFs(ks string) []string {
if t.udfs == nil {
return nil
}

t.mu.Lock()
defer t.mu.Unlock()

return slices.Clone(t.udfs[ks])
}

func (t *Tracker) updateSchema(th *discovery.TabletHealth) bool {
success := true
if th.Stats.TableSchemaChanged != nil {
success = t.updatedTableSchema(th)
}
if !success || th.Stats.ViewSchemaChanged == nil {
return success
if !success {
return false
}

// there is view definition change in the tablet
return t.updatedViewSchema(th)
if th.Stats.ViewSchemaChanged != nil {
success = t.updatedViewSchema(th)
}

if !success || !th.Stats.UdfsChanged {
return success
}

return t.loadUDFs(th.Conn, th.Target) == nil
}

func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
Expand Down
Loading

0 comments on commit 6b25965

Please sign in to comment.