From 6b25965ce9b6873fb79181b19a5bb0206b1a6691 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Taylor?= Date: Mon, 15 Apr 2024 14:20:02 +0200 Subject: [PATCH] Add schema tracking support for UDFs (#15705) Signed-off-by: Manan Gupta Signed-off-by: Andres Taylor Signed-off-by: Harshit Gangal Co-authored-by: Manan Gupta Co-authored-by: Harshit Gangal --- changelog/20.0/20.0.0/summary.md | 10 + go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + .../endtoend/vreplication/sidecardb_test.go | 6 +- go/vt/proto/query/query.pb.go | 306 +++++++++--------- go/vt/proto/query/query_vtproto.pb.go | 34 ++ go/vt/sidecardb/schema/schemaengine/udfs.sql | 23 ++ go/vt/vtgate/schema/tracker.go | 72 ++++- go/vt/vtgate/schema/tracker_test.go | 93 ++++-- go/vt/vtgate/schema/update_controller.go | 4 +- go/vt/vtgate/vcursor_impl.go | 2 +- go/vt/vtgate/vindexes/vschema.go | 2 + go/vt/vtgate/vschema_manager.go | 99 +++--- go/vt/vtgate/vschema_manager_test.go | 4 +- go/vt/vtgate/vtgate.go | 11 +- go/vt/vttablet/endtoend/framework/server.go | 1 + go/vt/vttablet/endtoend/main_test.go | 12 +- go/vt/vttablet/endtoend/udf.so | Bin 0 -> 33608 bytes go/vt/vttablet/endtoend/udfs_test.go | 169 ++++++++++ .../vttablet/tabletserver/health_streamer.go | 11 +- .../tabletserver/health_streamer_test.go | 4 +- .../vttablet/tabletserver/messager/engine.go | 2 +- .../tabletserver/messager/engine_test.go | 12 +- go/vt/vttablet/tabletserver/query_engine.go | 2 +- go/vt/vttablet/tabletserver/query_executor.go | 24 ++ go/vt/vttablet/tabletserver/schema/db.go | 100 +++++- go/vt/vttablet/tabletserver/schema/db_test.go | 2 +- go/vt/vttablet/tabletserver/schema/engine.go | 17 +- .../tabletserver/schema/engine_test.go | 9 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- proto/query.proto | 4 + web/vtadmin/src/proto/vtadmin.d.ts | 9 +- web/vtadmin/src/proto/vtadmin.js | 30 ++ 33 files changed, 813 insertions(+), 265 deletions(-) create mode 100644 go/vt/sidecardb/schema/schemaengine/udfs.sql create mode 100755 go/vt/vttablet/endtoend/udf.so create mode 100644 go/vt/vttablet/endtoend/udfs_test.go diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index 2223cbe63ca..64588cd8be4 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -21,6 +21,7 @@ - [Update with Multi Target Support](#update-multi-target) - [Delete with Subquery Support](#delete-subquery) - [Delete with Multi Target Support](#delete-multi-target) + - [User Defined Functions Support](#udf-support) - **[Flag changes](#flag-changes)** - [`pprof-http` default change](#pprof-http-default) - [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag) @@ -183,6 +184,15 @@ Example: `delete t1, t3 from t1 join t2 on t1.id = t2.id join t3 on t1.col = t3. More details about how it works is available in [MySQL Docs](https://dev.mysql.com/doc/refman/8.0/en/delete.html) +#### User Defined Functions Support + +VTGate can track any user defined functions for better planning. +User Defined Functions (UDFs) should be directly loaded in the underlying MySQL. + +It should be enabled in VTGate with the `--enable-udfs` flag. + +More details about how to load UDFs is available in [MySQL Docs](https://dev.mysql.com/doc/extending-mysql/8.0/en/adding-loadable-function.html) + ### Flag Changes #### `pprof-http` Default Change diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 783b0d898d3..8871cf05f43 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -115,6 +115,7 @@ Flags: --enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false) --enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload --enable-tx-throttler Synonym to -enable_tx_throttler + --enable-udfs Enable UDFs support in vtgate. --enable-views Enable views support in vtgate. --enable_buffer Enable buffering (stalling) of primary traffic during failovers. --enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests. diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index c9431fb43db..98a7d709246 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -54,6 +54,7 @@ Flags: --discovery_low_replication_lag duration Threshold below which replication lag is considered low enough to be healthy. (default 30s) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-partial-keyspace-migration (Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false) + --enable-udfs Enable UDFs support in vtgate. --enable-views Enable views support in vtgate. --enable_buffer Enable buffering (stalling) of primary traffic during failovers. --enable_buffer_dry_run Detect and log failover events, but do not actually buffer requests. diff --git a/go/test/endtoend/vreplication/sidecardb_test.go b/go/test/endtoend/vreplication/sidecardb_test.go index d2a4ec6df07..391f7d60246 100644 --- a/go/test/endtoend/vreplication/sidecardb_test.go +++ b/go/test/endtoend/vreplication/sidecardb_test.go @@ -37,9 +37,9 @@ var numSidecarDBTables int var ddls1, ddls2 []string func init() { - sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action", "redo_state", - "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "tables", - "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"} + sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action", + "redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", + "tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"} numSidecarDBTables = len(sidecarDBTables) ddls1 = []string{ "drop table _vt.vreplication_log", diff --git a/go/vt/proto/query/query.pb.go b/go/vt/proto/query/query.pb.go index 098ccad1032..4875e2c56f3 100644 --- a/go/vt/proto/query/query.pb.go +++ b/go/vt/proto/query/query.pb.go @@ -479,9 +479,10 @@ func (TransactionState) EnumDescriptor() ([]byte, []int) { type SchemaTableType int32 const ( - SchemaTableType_VIEWS SchemaTableType = 0 - SchemaTableType_TABLES SchemaTableType = 1 - SchemaTableType_ALL SchemaTableType = 2 + SchemaTableType_VIEWS SchemaTableType = 0 + SchemaTableType_TABLES SchemaTableType = 1 + SchemaTableType_ALL SchemaTableType = 2 + SchemaTableType_UDF_AGGREGATE SchemaTableType = 3 ) // Enum value maps for SchemaTableType. @@ -490,11 +491,13 @@ var ( 0: "VIEWS", 1: "TABLES", 2: "ALL", + 3: "UDF_AGGREGATE", } SchemaTableType_value = map[string]int32{ - "VIEWS": 0, - "TABLES": 1, - "ALL": 2, + "VIEWS": 0, + "TABLES": 1, + "ALL": 2, + "UDF_AGGREGATE": 3, } ) @@ -5066,6 +5069,8 @@ type RealtimeStats struct { TableSchemaChanged []string `protobuf:"bytes,7,rep,name=table_schema_changed,json=tableSchemaChanged,proto3" json:"table_schema_changed,omitempty"` // view_schema_changed is to provide list of views that have schema changes detected by the tablet. ViewSchemaChanged []string `protobuf:"bytes,8,rep,name=view_schema_changed,json=viewSchemaChanged,proto3" json:"view_schema_changed,omitempty"` + // udfs_changed is used to signal that the UDFs have changed on the tablet. + UdfsChanged bool `protobuf:"varint,9,opt,name=udfs_changed,json=udfsChanged,proto3" json:"udfs_changed,omitempty"` } func (x *RealtimeStats) Reset() { @@ -5156,6 +5161,13 @@ func (x *RealtimeStats) GetViewSchemaChanged() []string { return nil } +func (x *RealtimeStats) GetUdfsChanged() bool { + if x != nil { + return x.UdfsChanged + } + return false +} + // AggregateStats contains information about the health of a group of // tablets for a Target. It is used to propagate stats from a vtgate // to another, or from the Gateway layer of a vtgate to the routing @@ -6389,8 +6401,8 @@ var file_query_proto_rawDesc = []byte{ 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x49, 0x64, 0x22, 0x11, 0x0a, 0x0f, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xf6, - 0x02, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x6d, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x99, + 0x03, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x36, 0x0a, 0x17, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, @@ -6413,143 +6425,147 @@ var file_query_proto_rawDesc = []byte{ 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x12, 0x2e, 0x0a, 0x13, 0x76, 0x69, 0x65, 0x77, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x11, 0x76, 0x69, 0x65, 0x77, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x41, 0x67, 0x67, 0x72, - 0x65, 0x67, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x30, 0x0a, 0x14, 0x68, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x63, 0x6f, 0x75, - 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x12, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x34, 0x0a, 0x16, - 0x75, 0x6e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, - 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x14, 0x75, 0x6e, - 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x43, 0x6f, 0x75, - 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x1b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x5f, 0x6d, 0x69, - 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x18, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x4d, 0x69, - 0x6e, 0x12, 0x3d, 0x0a, 0x1b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x5f, 0x6d, 0x61, 0x78, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x18, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x4d, 0x61, 0x78, - 0x22, 0x95, 0x02, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x06, 0x74, 0x61, 0x72, - 0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, 0x65, 0x72, - 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, - 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x12, 0x3f, 0x0a, 0x1c, 0x70, 0x72, - 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x74, 0x65, 0x72, 0x6d, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x19, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x54, 0x65, 0x72, 0x6d, 0x53, 0x74, 0x61, - 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3b, 0x0a, 0x0e, 0x72, - 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x6c, - 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x0d, 0x72, 0x65, 0x61, 0x6c, 0x74, - 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x74, 0x5f, 0x61, 0x6c, 0x69, 0x61, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, - 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, - 0x41, 0x6c, 0x69, 0x61, 0x73, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x41, 0x6c, 0x69, - 0x61, 0x73, 0x4a, 0x04, 0x08, 0x06, 0x10, 0x07, 0x22, 0xae, 0x01, 0x0a, 0x13, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x64, 0x74, 0x69, 0x64, 0x12, 0x2d, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x63, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x74, 0x69, 0x6d, 0x65, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, - 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x0c, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x22, 0x91, 0x01, 0x0a, 0x10, 0x47, 0x65, - 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, - 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, - 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, - 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x35, 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x71, 0x75, 0x65, 0x72, - 0x79, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x79, 0x70, - 0x65, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, - 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x22, 0xb1, 0x01, - 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x58, 0x0a, 0x10, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x64, 0x65, 0x66, - 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x66, - 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0f, 0x74, 0x61, - 0x62, 0x6c, 0x65, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x42, 0x0a, - 0x14, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x2a, 0x92, 0x03, 0x0a, 0x09, 0x4d, 0x79, 0x53, 0x71, 0x6c, 0x46, 0x6c, 0x61, 0x67, 0x12, - 0x09, 0x0a, 0x05, 0x45, 0x4d, 0x50, 0x54, 0x59, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x4e, 0x4f, - 0x54, 0x5f, 0x4e, 0x55, 0x4c, 0x4c, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, - 0x0c, 0x50, 0x52, 0x49, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x02, 0x12, - 0x13, 0x0a, 0x0f, 0x55, 0x4e, 0x49, 0x51, 0x55, 0x45, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, - 0x41, 0x47, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x50, 0x4c, 0x45, - 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x08, 0x12, 0x0d, 0x0a, 0x09, 0x42, - 0x4c, 0x4f, 0x42, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x10, 0x12, 0x11, 0x0a, 0x0d, 0x55, 0x4e, - 0x53, 0x49, 0x47, 0x4e, 0x45, 0x44, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x20, 0x12, 0x11, 0x0a, - 0x0d, 0x5a, 0x45, 0x52, 0x4f, 0x46, 0x49, 0x4c, 0x4c, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x40, - 0x12, 0x10, 0x0a, 0x0b, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, - 0x80, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x45, 0x4e, 0x55, 0x4d, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, - 0x80, 0x02, 0x12, 0x18, 0x0a, 0x13, 0x41, 0x55, 0x54, 0x4f, 0x5f, 0x49, 0x4e, 0x43, 0x52, 0x45, - 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x04, 0x12, 0x13, 0x0a, 0x0e, - 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, - 0x08, 0x12, 0x0d, 0x0a, 0x08, 0x53, 0x45, 0x54, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x10, - 0x12, 0x1a, 0x0a, 0x15, 0x4e, 0x4f, 0x5f, 0x44, 0x45, 0x46, 0x41, 0x55, 0x4c, 0x54, 0x5f, 0x56, - 0x41, 0x4c, 0x55, 0x45, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x20, 0x12, 0x17, 0x0a, 0x12, - 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x4e, 0x4f, 0x57, 0x5f, 0x46, 0x4c, - 0x41, 0x47, 0x10, 0x80, 0x40, 0x12, 0x0e, 0x0a, 0x08, 0x4e, 0x55, 0x4d, 0x5f, 0x46, 0x4c, 0x41, - 0x47, 0x10, 0x80, 0x80, 0x02, 0x12, 0x13, 0x0a, 0x0d, 0x50, 0x41, 0x52, 0x54, 0x5f, 0x4b, 0x45, - 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x01, 0x12, 0x10, 0x0a, 0x0a, 0x47, 0x52, - 0x4f, 0x55, 0x50, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x02, 0x12, 0x11, 0x0a, 0x0b, - 0x55, 0x4e, 0x49, 0x51, 0x55, 0x45, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x04, 0x12, - 0x11, 0x0a, 0x0b, 0x42, 0x49, 0x4e, 0x43, 0x4d, 0x50, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, - 0x80, 0x08, 0x1a, 0x02, 0x10, 0x01, 0x2a, 0x6b, 0x0a, 0x04, 0x46, 0x6c, 0x61, 0x67, 0x12, 0x08, - 0x0a, 0x04, 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0a, 0x49, 0x53, 0x49, 0x4e, - 0x54, 0x45, 0x47, 0x52, 0x41, 0x4c, 0x10, 0x80, 0x02, 0x12, 0x0f, 0x0a, 0x0a, 0x49, 0x53, 0x55, - 0x4e, 0x53, 0x49, 0x47, 0x4e, 0x45, 0x44, 0x10, 0x80, 0x04, 0x12, 0x0c, 0x0a, 0x07, 0x49, 0x53, - 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x10, 0x80, 0x08, 0x12, 0x0d, 0x0a, 0x08, 0x49, 0x53, 0x51, 0x55, - 0x4f, 0x54, 0x45, 0x44, 0x10, 0x80, 0x10, 0x12, 0x0b, 0x0a, 0x06, 0x49, 0x53, 0x54, 0x45, 0x58, - 0x54, 0x10, 0x80, 0x20, 0x12, 0x0d, 0x0a, 0x08, 0x49, 0x53, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, - 0x10, 0x80, 0x40, 0x2a, 0xc0, 0x03, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0d, 0x0a, 0x09, - 0x4e, 0x55, 0x4c, 0x4c, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x04, 0x49, - 0x4e, 0x54, 0x38, 0x10, 0x81, 0x02, 0x12, 0x0a, 0x0a, 0x05, 0x55, 0x49, 0x4e, 0x54, 0x38, 0x10, - 0x82, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x31, 0x36, 0x10, 0x83, 0x02, 0x12, 0x0b, - 0x0a, 0x06, 0x55, 0x49, 0x4e, 0x54, 0x31, 0x36, 0x10, 0x84, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, - 0x4e, 0x54, 0x32, 0x34, 0x10, 0x85, 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, 0x4e, 0x54, 0x32, - 0x34, 0x10, 0x86, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x87, 0x02, - 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x88, 0x06, 0x12, 0x0a, 0x0a, - 0x05, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x89, 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, 0x4e, - 0x54, 0x36, 0x34, 0x10, 0x8a, 0x06, 0x12, 0x0c, 0x0a, 0x07, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x33, - 0x32, 0x10, 0x8b, 0x08, 0x12, 0x0c, 0x0a, 0x07, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x36, 0x34, 0x10, - 0x8c, 0x08, 0x12, 0x0e, 0x0a, 0x09, 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x10, - 0x8d, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x44, 0x41, 0x54, 0x45, 0x10, 0x8e, 0x10, 0x12, 0x09, 0x0a, - 0x04, 0x54, 0x49, 0x4d, 0x45, 0x10, 0x8f, 0x10, 0x12, 0x0d, 0x0a, 0x08, 0x44, 0x41, 0x54, 0x45, - 0x54, 0x49, 0x4d, 0x45, 0x10, 0x90, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x59, 0x45, 0x41, 0x52, 0x10, - 0x91, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, 0x10, 0x12, 0x12, - 0x09, 0x0a, 0x04, 0x54, 0x45, 0x58, 0x54, 0x10, 0x93, 0x30, 0x12, 0x09, 0x0a, 0x04, 0x42, 0x4c, - 0x4f, 0x42, 0x10, 0x94, 0x50, 0x12, 0x0c, 0x0a, 0x07, 0x56, 0x41, 0x52, 0x43, 0x48, 0x41, 0x52, - 0x10, 0x95, 0x30, 0x12, 0x0e, 0x0a, 0x09, 0x56, 0x41, 0x52, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, - 0x10, 0x96, 0x50, 0x12, 0x09, 0x0a, 0x04, 0x43, 0x48, 0x41, 0x52, 0x10, 0x97, 0x30, 0x12, 0x0b, - 0x0a, 0x06, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x98, 0x50, 0x12, 0x08, 0x0a, 0x03, 0x42, - 0x49, 0x54, 0x10, 0x99, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x45, 0x4e, 0x55, 0x4d, 0x10, 0x9a, 0x10, - 0x12, 0x08, 0x0a, 0x03, 0x53, 0x45, 0x54, 0x10, 0x9b, 0x10, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x55, - 0x50, 0x4c, 0x45, 0x10, 0x1c, 0x12, 0x0d, 0x0a, 0x08, 0x47, 0x45, 0x4f, 0x4d, 0x45, 0x54, 0x52, - 0x59, 0x10, 0x9d, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x4a, 0x53, 0x4f, 0x4e, 0x10, 0x9e, 0x10, 0x12, - 0x0e, 0x0a, 0x0a, 0x45, 0x58, 0x50, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x1f, 0x12, - 0x0b, 0x0a, 0x06, 0x48, 0x45, 0x58, 0x4e, 0x55, 0x4d, 0x10, 0xa0, 0x20, 0x12, 0x0b, 0x0a, 0x06, - 0x48, 0x45, 0x58, 0x56, 0x41, 0x4c, 0x10, 0xa1, 0x20, 0x12, 0x0b, 0x0a, 0x06, 0x42, 0x49, 0x54, - 0x4e, 0x55, 0x4d, 0x10, 0xa2, 0x20, 0x2a, 0x46, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, - 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x52, 0x45, 0x50, 0x41, - 0x52, 0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x02, - 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x03, 0x2a, 0x31, - 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x49, 0x45, 0x57, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, - 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x41, 0x4c, 0x4c, 0x10, - 0x02, 0x42, 0x35, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x22, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, - 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x75, 0x64, 0x66, 0x73, 0x5f, + 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x75, + 0x64, 0x66, 0x73, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x41, + 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x30, 0x0a, + 0x14, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, + 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x12, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, + 0x34, 0x0a, 0x16, 0x75, 0x6e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x74, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x14, 0x75, 0x6e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, + 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x1b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, + 0x5f, 0x6d, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x18, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, + 0x73, 0x4d, 0x69, 0x6e, 0x12, 0x3d, 0x0a, 0x1b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x61, 0x67, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x5f, + 0x6d, 0x61, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x18, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x61, 0x67, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, + 0x4d, 0x61, 0x78, 0x22, 0x95, 0x02, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x48, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x06, + 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, + 0x67, 0x65, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x12, 0x3f, 0x0a, + 0x1c, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x74, 0x65, 0x72, 0x6d, 0x5f, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x19, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x54, 0x65, 0x72, 0x6d, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3b, + 0x0a, 0x0e, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, + 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x0d, 0x72, 0x65, + 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x0c, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x61, 0x6c, 0x69, 0x61, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x15, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x74, 0x41, 0x6c, 0x69, 0x61, 0x73, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, + 0x41, 0x6c, 0x69, 0x61, 0x73, 0x4a, 0x04, 0x08, 0x06, 0x10, 0x07, 0x22, 0xae, 0x01, 0x0a, 0x13, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x12, 0x2d, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x74, 0x69, + 0x6d, 0x65, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x0d, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x0c, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x22, 0x91, 0x01, 0x0a, + 0x10, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x25, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, + 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x35, 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, + 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, + 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x1f, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, + 0x22, 0xb1, 0x01, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x58, 0x0a, 0x10, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, + 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x2d, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x68, 0x65, + 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, + 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x0f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x1a, 0x42, 0x0a, 0x14, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x2a, 0x92, 0x03, 0x0a, 0x09, 0x4d, 0x79, 0x53, 0x71, 0x6c, 0x46, 0x6c, + 0x61, 0x67, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x4d, 0x50, 0x54, 0x59, 0x10, 0x00, 0x12, 0x11, 0x0a, + 0x0d, 0x4e, 0x4f, 0x54, 0x5f, 0x4e, 0x55, 0x4c, 0x4c, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x01, + 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x52, 0x49, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, + 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x55, 0x4e, 0x49, 0x51, 0x55, 0x45, 0x5f, 0x4b, 0x45, 0x59, + 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x4d, 0x55, 0x4c, 0x54, 0x49, + 0x50, 0x4c, 0x45, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x08, 0x12, 0x0d, + 0x0a, 0x09, 0x42, 0x4c, 0x4f, 0x42, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x10, 0x12, 0x11, 0x0a, + 0x0d, 0x55, 0x4e, 0x53, 0x49, 0x47, 0x4e, 0x45, 0x44, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x20, + 0x12, 0x11, 0x0a, 0x0d, 0x5a, 0x45, 0x52, 0x4f, 0x46, 0x49, 0x4c, 0x4c, 0x5f, 0x46, 0x4c, 0x41, + 0x47, 0x10, 0x40, 0x12, 0x10, 0x0a, 0x0b, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x4c, + 0x41, 0x47, 0x10, 0x80, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x45, 0x4e, 0x55, 0x4d, 0x5f, 0x46, 0x4c, + 0x41, 0x47, 0x10, 0x80, 0x02, 0x12, 0x18, 0x0a, 0x13, 0x41, 0x55, 0x54, 0x4f, 0x5f, 0x49, 0x4e, + 0x43, 0x52, 0x45, 0x4d, 0x45, 0x4e, 0x54, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x04, 0x12, + 0x13, 0x0a, 0x0e, 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x5f, 0x46, 0x4c, 0x41, + 0x47, 0x10, 0x80, 0x08, 0x12, 0x0d, 0x0a, 0x08, 0x53, 0x45, 0x54, 0x5f, 0x46, 0x4c, 0x41, 0x47, + 0x10, 0x80, 0x10, 0x12, 0x1a, 0x0a, 0x15, 0x4e, 0x4f, 0x5f, 0x44, 0x45, 0x46, 0x41, 0x55, 0x4c, + 0x54, 0x5f, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x20, 0x12, + 0x17, 0x0a, 0x12, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x4e, 0x4f, 0x57, + 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x40, 0x12, 0x0e, 0x0a, 0x08, 0x4e, 0x55, 0x4d, 0x5f, + 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x02, 0x12, 0x13, 0x0a, 0x0d, 0x50, 0x41, 0x52, 0x54, + 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x01, 0x12, 0x10, 0x0a, + 0x0a, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, 0x80, 0x02, 0x12, + 0x11, 0x0a, 0x0b, 0x55, 0x4e, 0x49, 0x51, 0x55, 0x45, 0x5f, 0x46, 0x4c, 0x41, 0x47, 0x10, 0x80, + 0x80, 0x04, 0x12, 0x11, 0x0a, 0x0b, 0x42, 0x49, 0x4e, 0x43, 0x4d, 0x50, 0x5f, 0x46, 0x4c, 0x41, + 0x47, 0x10, 0x80, 0x80, 0x08, 0x1a, 0x02, 0x10, 0x01, 0x2a, 0x6b, 0x0a, 0x04, 0x46, 0x6c, 0x61, + 0x67, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0a, 0x49, + 0x53, 0x49, 0x4e, 0x54, 0x45, 0x47, 0x52, 0x41, 0x4c, 0x10, 0x80, 0x02, 0x12, 0x0f, 0x0a, 0x0a, + 0x49, 0x53, 0x55, 0x4e, 0x53, 0x49, 0x47, 0x4e, 0x45, 0x44, 0x10, 0x80, 0x04, 0x12, 0x0c, 0x0a, + 0x07, 0x49, 0x53, 0x46, 0x4c, 0x4f, 0x41, 0x54, 0x10, 0x80, 0x08, 0x12, 0x0d, 0x0a, 0x08, 0x49, + 0x53, 0x51, 0x55, 0x4f, 0x54, 0x45, 0x44, 0x10, 0x80, 0x10, 0x12, 0x0b, 0x0a, 0x06, 0x49, 0x53, + 0x54, 0x45, 0x58, 0x54, 0x10, 0x80, 0x20, 0x12, 0x0d, 0x0a, 0x08, 0x49, 0x53, 0x42, 0x49, 0x4e, + 0x41, 0x52, 0x59, 0x10, 0x80, 0x40, 0x2a, 0xc0, 0x03, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x0d, 0x0a, 0x09, 0x4e, 0x55, 0x4c, 0x4c, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x10, 0x00, 0x12, 0x09, + 0x0a, 0x04, 0x49, 0x4e, 0x54, 0x38, 0x10, 0x81, 0x02, 0x12, 0x0a, 0x0a, 0x05, 0x55, 0x49, 0x4e, + 0x54, 0x38, 0x10, 0x82, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x31, 0x36, 0x10, 0x83, + 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, 0x4e, 0x54, 0x31, 0x36, 0x10, 0x84, 0x06, 0x12, 0x0a, + 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x32, 0x34, 0x10, 0x85, 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, + 0x4e, 0x54, 0x32, 0x34, 0x10, 0x86, 0x06, 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x33, 0x32, + 0x10, 0x87, 0x02, 0x12, 0x0b, 0x0a, 0x06, 0x55, 0x49, 0x4e, 0x54, 0x33, 0x32, 0x10, 0x88, 0x06, + 0x12, 0x0a, 0x0a, 0x05, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x89, 0x02, 0x12, 0x0b, 0x0a, 0x06, + 0x55, 0x49, 0x4e, 0x54, 0x36, 0x34, 0x10, 0x8a, 0x06, 0x12, 0x0c, 0x0a, 0x07, 0x46, 0x4c, 0x4f, + 0x41, 0x54, 0x33, 0x32, 0x10, 0x8b, 0x08, 0x12, 0x0c, 0x0a, 0x07, 0x46, 0x4c, 0x4f, 0x41, 0x54, + 0x36, 0x34, 0x10, 0x8c, 0x08, 0x12, 0x0e, 0x0a, 0x09, 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, + 0x4d, 0x50, 0x10, 0x8d, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x44, 0x41, 0x54, 0x45, 0x10, 0x8e, 0x10, + 0x12, 0x09, 0x0a, 0x04, 0x54, 0x49, 0x4d, 0x45, 0x10, 0x8f, 0x10, 0x12, 0x0d, 0x0a, 0x08, 0x44, + 0x41, 0x54, 0x45, 0x54, 0x49, 0x4d, 0x45, 0x10, 0x90, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x59, 0x45, + 0x41, 0x52, 0x10, 0x91, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x43, 0x49, 0x4d, 0x41, 0x4c, + 0x10, 0x12, 0x12, 0x09, 0x0a, 0x04, 0x54, 0x45, 0x58, 0x54, 0x10, 0x93, 0x30, 0x12, 0x09, 0x0a, + 0x04, 0x42, 0x4c, 0x4f, 0x42, 0x10, 0x94, 0x50, 0x12, 0x0c, 0x0a, 0x07, 0x56, 0x41, 0x52, 0x43, + 0x48, 0x41, 0x52, 0x10, 0x95, 0x30, 0x12, 0x0e, 0x0a, 0x09, 0x56, 0x41, 0x52, 0x42, 0x49, 0x4e, + 0x41, 0x52, 0x59, 0x10, 0x96, 0x50, 0x12, 0x09, 0x0a, 0x04, 0x43, 0x48, 0x41, 0x52, 0x10, 0x97, + 0x30, 0x12, 0x0b, 0x0a, 0x06, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x98, 0x50, 0x12, 0x08, + 0x0a, 0x03, 0x42, 0x49, 0x54, 0x10, 0x99, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x45, 0x4e, 0x55, 0x4d, + 0x10, 0x9a, 0x10, 0x12, 0x08, 0x0a, 0x03, 0x53, 0x45, 0x54, 0x10, 0x9b, 0x10, 0x12, 0x09, 0x0a, + 0x05, 0x54, 0x55, 0x50, 0x4c, 0x45, 0x10, 0x1c, 0x12, 0x0d, 0x0a, 0x08, 0x47, 0x45, 0x4f, 0x4d, + 0x45, 0x54, 0x52, 0x59, 0x10, 0x9d, 0x10, 0x12, 0x09, 0x0a, 0x04, 0x4a, 0x53, 0x4f, 0x4e, 0x10, + 0x9e, 0x10, 0x12, 0x0e, 0x0a, 0x0a, 0x45, 0x58, 0x50, 0x52, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, + 0x10, 0x1f, 0x12, 0x0b, 0x0a, 0x06, 0x48, 0x45, 0x58, 0x4e, 0x55, 0x4d, 0x10, 0xa0, 0x20, 0x12, + 0x0b, 0x0a, 0x06, 0x48, 0x45, 0x58, 0x56, 0x41, 0x4c, 0x10, 0xa1, 0x20, 0x12, 0x0b, 0x0a, 0x06, + 0x42, 0x49, 0x54, 0x4e, 0x55, 0x4d, 0x10, 0xa2, 0x20, 0x2a, 0x46, 0x0a, 0x10, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, + 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x52, + 0x45, 0x50, 0x41, 0x52, 0x45, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, 0x4d, 0x4d, 0x49, + 0x54, 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, + 0x03, 0x2a, 0x44, 0x0a, 0x0f, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x54, 0x61, 0x62, 0x6c, 0x65, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x49, 0x45, 0x57, 0x53, 0x10, 0x00, 0x12, + 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x41, + 0x4c, 0x4c, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x55, 0x44, 0x46, 0x5f, 0x41, 0x47, 0x47, 0x52, + 0x45, 0x47, 0x41, 0x54, 0x45, 0x10, 0x03, 0x42, 0x35, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, + 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x22, 0x76, 0x69, 0x74, 0x65, + 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, + 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/query/query_vtproto.pb.go b/go/vt/proto/query/query_vtproto.pb.go index 248772ba4f4..3132fa124dd 100644 --- a/go/vt/proto/query/query_vtproto.pb.go +++ b/go/vt/proto/query/query_vtproto.pb.go @@ -1359,6 +1359,7 @@ func (m *RealtimeStats) CloneVT() *RealtimeStats { FilteredReplicationLagSeconds: m.FilteredReplicationLagSeconds, CpuUsage: m.CpuUsage, Qps: m.Qps, + UdfsChanged: m.UdfsChanged, } if rhs := m.TableSchemaChanged; rhs != nil { tmpContainer := make([]string, len(rhs)) @@ -5344,6 +5345,16 @@ func (m *RealtimeStats) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.UdfsChanged { + i-- + if m.UdfsChanged { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x48 + } if len(m.ViewSchemaChanged) > 0 { for iNdEx := len(m.ViewSchemaChanged) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.ViewSchemaChanged[iNdEx]) @@ -7230,6 +7241,9 @@ func (m *RealtimeStats) SizeVT() (n int) { n += 1 + l + sov(uint64(l)) } } + if m.UdfsChanged { + n += 2 + } n += len(m.unknownFields) return n } @@ -17524,6 +17538,26 @@ func (m *RealtimeStats) UnmarshalVT(dAtA []byte) error { } m.ViewSchemaChanged = append(m.ViewSchemaChanged, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UdfsChanged", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.UdfsChanged = bool(v != 0) default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/sidecardb/schema/schemaengine/udfs.sql b/go/vt/sidecardb/schema/schemaengine/udfs.sql new file mode 100644 index 00000000000..90c6143fbd6 --- /dev/null +++ b/go/vt/sidecardb/schema/schemaengine/udfs.sql @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +CREATE TABLE IF NOT EXISTS udfs +( + FUNCTION_NAME varchar(128) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL, + FUNCTION_RETURN_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL, + FUNCTION_TYPE varchar(20) CHARACTER SET `utf8mb3` COLLATE `utf8mb3_bin` NOT NULL, + PRIMARY KEY (FUNCTION_NAME) +) engine = InnoDB diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index f7b46521b68..ce1686e9938 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -19,6 +19,7 @@ package schema import ( "context" "maps" + "slices" "strings" "sync" "time" @@ -45,6 +46,7 @@ type ( mu sync.Mutex tables *tableMap views *viewMap + udfs map[keyspaceStr][]string ctx context.Context signal func() // a function that we'll call whenever we have new schema data @@ -60,7 +62,7 @@ type ( const defaultConsumeDelay = 1 * time.Second // NewTracker creates the tracker object. -func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlparser.Parser) *Tracker { +func NewTracker(ch chan *discovery.TabletHealth, enableViews, enableUDFs bool, parser *sqlparser.Parser) *Tracker { t := &Tracker{ ctx: context.Background(), ch: ch, @@ -73,6 +75,9 @@ func NewTracker(ch chan *discovery.TabletHealth, enableViews bool, parser *sqlpa if enableViews { t.views = &viewMap{m: map[keyspaceStr]map[viewNameStr]sqlparser.SelectStatement{}, parser: parser} } + if enableUDFs { + t.udfs = map[keyspaceStr][]string{} + } return t } @@ -86,6 +91,10 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T if err != nil { return err } + err = t.loadUDFs(conn, target) + if err != nil { + return err + } t.tracked[target.Keyspace].setLoaded(true) return nil @@ -146,6 +155,32 @@ func (t *Tracker) loadViews(conn queryservice.QueryService, target *querypb.Targ return nil } +func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Target) error { + if t.udfs == nil { + // This happens only when UDFs are not enabled. + return nil + } + + t.mu.Lock() + defer t.mu.Unlock() + + err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDF_AGGREGATE, nil, func(schemaRes *querypb.GetSchemaResponse) error { + var udfs []string + for name := range schemaRes.TableDefinition { + udfs = append(udfs, name) + } + + t.udfs[target.Keyspace] = udfs + return nil + }) + if err != nil { + log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err) + return err + } + log.Infof("finished loading UDFs for keyspace %s", target.Keyspace) + return nil +} + // Start starts the schema tracking. func (t *Tracker) Start() { log.Info("Starting schema tracking") @@ -208,6 +243,9 @@ func (t *Tracker) GetColumns(ks string, tbl string) []vindexes.Column { defer t.mu.Unlock() tblInfo := t.tables.get(ks, tbl) + if tblInfo == nil { + return nil + } return tblInfo.Columns } @@ -244,27 +282,47 @@ func (t *Tracker) Tables(ks string) map[string]*vindexes.TableInfo { // Views returns all known views in the keyspace with their definition. func (t *Tracker) Views(ks string) map[string]sqlparser.SelectStatement { - t.mu.Lock() - defer t.mu.Unlock() - if t.views == nil { return nil } + t.mu.Lock() + defer t.mu.Unlock() + m := t.views.m[ks] return maps.Clone(m) } +func (t *Tracker) UDFs(ks string) []string { + if t.udfs == nil { + return nil + } + + t.mu.Lock() + defer t.mu.Unlock() + + return slices.Clone(t.udfs[ks]) +} + func (t *Tracker) updateSchema(th *discovery.TabletHealth) bool { success := true if th.Stats.TableSchemaChanged != nil { success = t.updatedTableSchema(th) } - if !success || th.Stats.ViewSchemaChanged == nil { - return success + if !success { + return false } + // there is view definition change in the tablet - return t.updatedViewSchema(th) + if th.Stats.ViewSchemaChanged != nil { + success = t.updatedViewSchema(th) + } + + if !success || !th.Stats.UdfsChanged { + return success + } + + return t.loadUDFs(th.Conn, th.Target) == nil } func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool { diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index 1ee1aee6a0f..3c9b08d71dd 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -81,7 +81,7 @@ func TestTrackingUnHealthyTablet(t *testing.T) { sbc := sandboxconn.NewSandboxConn(tablet) ch := make(chan *discovery.TabletHealth) - tracker := NewTracker(ch, false, sqlparser.NewTestParser()) + tracker := NewTracker(ch, false, false, sqlparser.NewTestParser()) tracker.consumeDelay = 1 * time.Millisecond tracker.Start() defer tracker.Stop() @@ -104,7 +104,7 @@ func TestTrackingUnHealthyTablet(t *testing.T) { serving: true, }, { - name: "initial load", + name: "first update", serving: true, updatedTbls: []string{"a"}, }, @@ -113,24 +113,26 @@ func TestTrackingUnHealthyTablet(t *testing.T) { serving: false, }, { - name: "now serving tablet", + name: "serving tablet", serving: true, }, } for _, tcase := range tcases { - ch <- &discovery.TabletHealth{ - Conn: sbc, - Tablet: tablet, - Target: target, - Serving: tcase.serving, - Stats: &querypb.RealtimeStats{TableSchemaChanged: tcase.updatedTbls}, - } - time.Sleep(5 * time.Millisecond) + t.Run(tcase.name, func(t *testing.T) { + ch <- &discovery.TabletHealth{ + Conn: sbc, + Tablet: tablet, + Target: target, + Serving: tcase.serving, + Stats: &querypb.RealtimeStats{TableSchemaChanged: tcase.updatedTbls}, + } + time.Sleep(5 * time.Millisecond) + }) } require.False(t, waitTimeout(&wg, 5*time.Second), "schema was updated but received no signal") - require.EqualValues(t, 3, sbc.GetSchemaCount.Load()) + assert.EqualValues(t, 3, sbc.GetSchemaCount.Load()) } // TestTrackerGetKeyspaceUpdateController tests table update controller initialization. @@ -214,7 +216,7 @@ func TestTableTracking(t *testing.T) { }, }} - testTracker(t, schemaDefResult, testcases) + testTracker(t, false, schemaDefResult, testcases) } // TestViewsTracking tests that the tracker is able to track views. @@ -261,7 +263,7 @@ func TestViewsTracking(t *testing.T) { "t4": "select 1 from tbl4"}, }} - testTracker(t, schemaDefResult, testcases) + testTracker(t, false, schemaDefResult, testcases) } // TestFKInfoRetrieval tests that the tracker is able to retrieve required foreign key information from ddl statement. @@ -319,7 +321,7 @@ func TestFKInfoRetrieval(t *testing.T) { }, }} - testTracker(t, schemaDefResult, testcases) + testTracker(t, false, schemaDefResult, testcases) } // TestIndexInfoRetrieval tests that the tracker is able to retrieve required index information from ddl statement. @@ -379,7 +381,39 @@ func TestIndexInfoRetrieval(t *testing.T) { }, }} - testTracker(t, schemaDefResult, testcases) + testTracker(t, false, schemaDefResult, testcases) +} + +// TestUDFRetrieval tests that the tracker is able to retrieve required UDF information. +func TestUDFRetrieval(t *testing.T) { + schemaDefResult := []map[string]string{{ + // initial load of table - kept empty + }, { + // initial load of view - kept empty + }, { + "my_udf": "int", + }, { + "my_udf2": "char", + "my_udf3": "int", + }, { + "my_udf2": "char", + "my_udf4": "int", + }} + + testcases := []testCases{{ + testName: "initial load", + expUDFs: []string{"my_udf"}, + }, { + testName: "next load 1", + updUdfs: true, + expUDFs: []string{"my_udf2", "my_udf3"}, + }, { + testName: "next load 2", + updUdfs: true, + expUDFs: []string{"my_udf2", "my_udf4"}, + }} + + testTracker(t, true, schemaDefResult, testcases) } type testCases struct { @@ -392,11 +426,14 @@ type testCases struct { updView []string expView map[string]string + + updUdfs bool + expUDFs []string } -func testTracker(t *testing.T, schemaDefResult []map[string]string, tcases []testCases) { +func testTracker(t *testing.T, enableUDFs bool, schemaDefResult []map[string]string, tcases []testCases) { ch := make(chan *discovery.TabletHealth) - tracker := NewTracker(ch, true, sqlparser.NewTestParser()) + tracker := NewTracker(ch, true, enableUDFs, sqlparser.NewTestParser()) tracker.consumeDelay = 1 * time.Millisecond tracker.Start() defer tracker.Stop() @@ -412,6 +449,10 @@ func testTracker(t *testing.T, schemaDefResult []map[string]string, tcases []tes sbc := sandboxconn.NewSandboxConn(tablet) sbc.SetSchemaResult(schemaDefResult) + initialLoadCount := 2 + if enableUDFs { + initialLoadCount = 3 + } for count, tcase := range tcases { t.Run(tcase.testName, func(t *testing.T) { wg.Add(1) @@ -420,11 +461,11 @@ func testTracker(t *testing.T, schemaDefResult []map[string]string, tcases []tes Tablet: tablet, Target: target, Serving: true, - Stats: &querypb.RealtimeStats{TableSchemaChanged: tcase.updTbl, ViewSchemaChanged: tcase.updView}, + Stats: &querypb.RealtimeStats{TableSchemaChanged: tcase.updTbl, ViewSchemaChanged: tcase.updView, UdfsChanged: tcase.updUdfs}, } require.False(t, waitTimeout(&wg, time.Second), "schema was updated but received no signal") - require.EqualValues(t, count+2, sbc.GetSchemaCount.Load()) + require.EqualValues(t, count+initialLoadCount, sbc.GetSchemaCount.Load()) _, keyspacePresent := tracker.tracked[target.Keyspace] require.Equal(t, true, keyspacePresent) @@ -434,24 +475,24 @@ func testTracker(t *testing.T, schemaDefResult []map[string]string, tcases []tes if len(tcase.expFk[k]) > 0 { fks := tracker.GetForeignKeys(keyspace, k) for _, fk := range fks { - utils.MustMatch(t, tcase.expFk[k], sqlparser.String(fk), "mismatch foreign keys for table: ", k) + assert.Equal(t, tcase.expFk[k], sqlparser.String(fk), "mismatch foreign keys for table: ", k) } } expIndexes := tcase.expIdx[k] if len(expIndexes) > 0 { idxs := tracker.GetIndexes(keyspace, k) - if len(expIndexes) != len(idxs) { - t.Fatalf("mismatch index for table: %s", k) - } + require.Equal(t, len(expIndexes), len(idxs)) for i, idx := range idxs { - utils.MustMatch(t, expIndexes[i], sqlparser.String(idx), "mismatch index for table: ", k) + assert.Equal(t, expIndexes[i], sqlparser.String(idx), "mismatch index for table: ", k) } } } for k, v := range tcase.expView { - utils.MustMatch(t, v, sqlparser.String(tracker.GetViews(keyspace, k)), "mismatch for view: ", k) + assert.Equal(t, v, sqlparser.String(tracker.GetViews(keyspace, k)), "mismatch for view: ", k) } + + assert.Equal(t, tcase.expUDFs, tracker.UDFs(keyspace), "mismatch for udfs") }) } } diff --git a/go/vt/vtgate/schema/update_controller.go b/go/vt/vtgate/schema/update_controller.go index f68a9448d55..f30b2a679e6 100644 --- a/go/vt/vtgate/schema/update_controller.go +++ b/go/vt/vtgate/schema/update_controller.go @@ -148,11 +148,11 @@ func (u *updateController) add(th *discovery.TabletHealth) { } // If the keyspace schema is loaded and there is no schema change detected. Then there is nothing to process. - if len(th.Stats.TableSchemaChanged) == 0 && len(th.Stats.ViewSchemaChanged) == 0 && u.loaded { + if len(th.Stats.TableSchemaChanged) == 0 && len(th.Stats.ViewSchemaChanged) == 0 && !th.Stats.UdfsChanged && u.loaded { return } - if (len(th.Stats.TableSchemaChanged) > 0 || len(th.Stats.ViewSchemaChanged) > 0) && u.ignore { + if (len(th.Stats.TableSchemaChanged) > 0 || len(th.Stats.ViewSchemaChanged) > 0 || th.Stats.UdfsChanged) && u.ignore { // we got an update for this keyspace - we need to stop ignoring it, and reload everything u.ignore = false u.loaded = false diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 18508a2e94c..15c6296f108 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -1324,7 +1324,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso callerId := callerid.EffectiveCallerIDFromContext(ctx) immediateCallerId := callerid.ImmediateCallerIDFromContext(ctx) - timedCtx, _ := context.WithTimeout(context.Background(), warmingReadsQueryTimeout) //nolint + timedCtx, _ := context.WithTimeout(context.Background(), warmingReadsQueryTimeout) // nolint clonedCtx := callerid.NewContext(timedCtx, callerId, immediateCallerId) v := &vcursorImpl{ diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 16b528322b6..9e21505690c 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -393,6 +393,8 @@ func replaceUnspecifiedForeignKeyMode(fkMode vschemapb.Keyspace_ForeignKeyMode) return fkMode } +// AddView adds a view to an existing keyspace in the VSchema. +// It's only used from tests. func (vschema *VSchema) AddView(ksname, viewName, query string, parser *sqlparser.Parser) error { ks, ok := vschema.Keyspaces[ksname] if !ok { diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index ad054807045..8b346a0274f 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -52,6 +52,7 @@ type VSchemaManager struct { type SchemaInfo interface { Tables(ks string) map[string]*vindexes.TableInfo Views(ks string) map[string]sqlparser.SelectStatement + UDFs(ks string) []string } // GetCurrentSrvVschema returns a copy of the latest SrvVschema from the @@ -201,60 +202,66 @@ func (vm *VSchemaManager) buildAndEnhanceVSchema(v *vschemapb.SrvVSchema) *vinde func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) { for ksName, ks := range vschema.Keyspaces { - m := vm.schema.Tables(ksName) - // Before we add the foreign key definitions in the tables, we need to make sure that all the tables - // are created in the Vschema, so that later when we try to find the routed tables, we don't end up - // getting dummy tables. - for tblName, tblInfo := range m { - setColumns(ks, tblName, tblInfo.Columns) + vm.updateTableInfo(vschema, ks, ksName) + vm.updateViewInfo(ks, ksName) + } +} + +func (vm *VSchemaManager) updateViewInfo(ks *vindexes.KeyspaceSchema, ksName string) { + views := vm.schema.Views(ksName) + if views != nil { + ks.Views = make(map[string]sqlparser.SelectStatement, len(views)) + for name, def := range views { + ks.Views[name] = sqlparser.CloneSelectStatement(def) } + } +} +func (vm *VSchemaManager) updateTableInfo(vschema *vindexes.VSchema, ks *vindexes.KeyspaceSchema, ksName string) { + m := vm.schema.Tables(ksName) + // Before we add the foreign key definitions in the tables, we need to make sure that all the tables + // are created in the Vschema, so that later when we try to find the routed tables, we don't end up + // getting dummy tables. + for tblName, tblInfo := range m { + setColumns(ks, tblName, tblInfo.Columns) + } - // Now that we have ensured that all the tables are created, we can start populating the foreign keys - // in the tables. - for tblName, tblInfo := range m { - rTbl, err := vschema.FindRoutedTable(ksName, tblName, topodatapb.TabletType_PRIMARY) + // Now that we have ensured that all the tables are created, we can start populating the foreign keys + // in the tables. + for tblName, tblInfo := range m { + rTbl, err := vschema.FindRoutedTable(ksName, tblName, topodatapb.TabletType_PRIMARY) + if err != nil { + log.Errorf("error finding routed table %s: %v", tblName, err) + continue + } + for _, fkDef := range tblInfo.ForeignKeys { + // Ignore internal tables as part of foreign key references. + if schema.IsInternalOperationTableName(fkDef.ReferenceDefinition.ReferencedTable.Name.String()) { + continue + } + parentTbl, err := vschema.FindRoutedTable(ksName, fkDef.ReferenceDefinition.ReferencedTable.Name.String(), topodatapb.TabletType_PRIMARY) if err != nil { - log.Errorf("error finding routed table %s: %v", tblName, err) + log.Errorf("error finding parent table %s: %v", fkDef.ReferenceDefinition.ReferencedTable.Name.String(), err) continue } - for _, fkDef := range tblInfo.ForeignKeys { - // Ignore internal tables as part of foreign key references. - if schema.IsInternalOperationTableName(fkDef.ReferenceDefinition.ReferencedTable.Name.String()) { - continue - } - parentTbl, err := vschema.FindRoutedTable(ksName, fkDef.ReferenceDefinition.ReferencedTable.Name.String(), topodatapb.TabletType_PRIMARY) - if err != nil { - log.Errorf("error finding parent table %s: %v", fkDef.ReferenceDefinition.ReferencedTable.Name.String(), err) - continue + rTbl.ParentForeignKeys = append(rTbl.ParentForeignKeys, vindexes.NewParentFkInfo(parentTbl, fkDef)) + parentTbl.ChildForeignKeys = append(parentTbl.ChildForeignKeys, vindexes.NewChildFkInfo(rTbl, fkDef)) + } + for _, idxDef := range tblInfo.Indexes { + switch idxDef.Info.Type { + case sqlparser.IndexTypePrimary: + for _, idxCol := range idxDef.Columns { + rTbl.PrimaryKey = append(rTbl.PrimaryKey, idxCol.Column) } - rTbl.ParentForeignKeys = append(rTbl.ParentForeignKeys, vindexes.NewParentFkInfo(parentTbl, fkDef)) - parentTbl.ChildForeignKeys = append(parentTbl.ChildForeignKeys, vindexes.NewChildFkInfo(rTbl, fkDef)) - } - for _, idxDef := range tblInfo.Indexes { - switch idxDef.Info.Type { - case sqlparser.IndexTypePrimary: - for _, idxCol := range idxDef.Columns { - rTbl.PrimaryKey = append(rTbl.PrimaryKey, idxCol.Column) + case sqlparser.IndexTypeUnique: + var uniqueKey sqlparser.Exprs + for _, idxCol := range idxDef.Columns { + if idxCol.Expression == nil { + uniqueKey = append(uniqueKey, sqlparser.NewColName(idxCol.Column.String())) + } else { + uniqueKey = append(uniqueKey, idxCol.Expression) } - case sqlparser.IndexTypeUnique: - var uniqueKey sqlparser.Exprs - for _, idxCol := range idxDef.Columns { - if idxCol.Expression == nil { - uniqueKey = append(uniqueKey, sqlparser.NewColName(idxCol.Column.String())) - } else { - uniqueKey = append(uniqueKey, idxCol.Expression) - } - } - rTbl.UniqueKeys = append(rTbl.UniqueKeys, uniqueKey) } - } - } - - views := vm.schema.Views(ksName) - if views != nil { - ks.Views = make(map[string]sqlparser.SelectStatement, len(views)) - for name, def := range views { - ks.Views[name] = sqlparser.CloneSelectStatement(def) + rTbl.UniqueKeys = append(rTbl.UniqueKeys, uniqueKey) } } } diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index c7ee5f34e8d..230ea961437 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -813,7 +813,8 @@ func makeTestSrvVSchema(ks string, sharded bool, tbls map[string]*vschemapb.Tabl } type fakeSchema struct { - t map[string]*vindexes.TableInfo + t map[string]*vindexes.TableInfo + udfs []string } func (f *fakeSchema) Tables(string) map[string]*vindexes.TableInfo { @@ -823,5 +824,6 @@ func (f *fakeSchema) Tables(string) map[string]*vindexes.TableInfo { func (f *fakeSchema) Views(string) map[string]sqlparser.SelectStatement { return nil } +func (f *fakeSchema) UDFs(string) []string { return f.udfs } var _ SchemaInfo = (*fakeSchema)(nil) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 477a1e40cae..e08292087bf 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -96,13 +96,13 @@ var ( enableOnlineDDL = true enableDirectDDL = true - // vtgate schema tracking flags + // schema tracking flags enableSchemaChangeSignal = true - - queryTimeout int + enableViews bool + enableUdfs bool // vtgate views flags - enableViews bool + queryTimeout int // queryLogToFile controls whether query logs are sent to a file queryLogToFile string @@ -149,6 +149,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&queryLogBufferSize, "querylog-buffer-size", queryLogBufferSize, "Maximum number of buffered query logs before throttling log output") fs.DurationVar(&messageStreamGracePeriod, "message_stream_grace_period", messageStreamGracePeriod, "the amount of time to give for a vttablet to resume if it ends a message stream, usually because of a reparent.") fs.BoolVar(&enableViews, "enable-views", enableViews, "Enable views support in vtgate.") + fs.BoolVar(&enableViews, "enable-udfs", enableUdfs, "Enable UDFs support in vtgate.") fs.BoolVar(&allowKillStmt, "allow-kill-statement", allowKillStmt, "Allows the execution of kill statement") fs.IntVar(&warmingReadsPercent, "warming-reads-percent", 0, "Percentage of reads on the primary to forward to replicas. Useful for keeping buffer pools warm") fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") @@ -302,7 +303,7 @@ func Init( var si SchemaInfo // default nil var st *vtschema.Tracker if enableSchemaChangeSignal { - st = vtschema.NewTracker(gw.hc.Subscribe(), enableViews, env.Parser()) + st = vtschema.NewTracker(gw.hc.Subscribe(), enableViews, enableUdfs, env.Parser()) addKeyspacesToTracker(ctx, srvResolver, st, gw) si = st } diff --git a/go/vt/vttablet/endtoend/framework/server.go b/go/vt/vttablet/endtoend/framework/server.go index 42f2994a22c..95c8114fd9f 100644 --- a/go/vt/vttablet/endtoend/framework/server.go +++ b/go/vt/vttablet/endtoend/framework/server.go @@ -128,6 +128,7 @@ func StartServer(ctx context.Context, connParams, connAppDebugParams mysql.ConnP config.Olap.TxTimeout = 5 * time.Second config.EnableViews = true config.QueryCacheDoorkeeper = false + config.SchemaReloadInterval = 5 * time.Second gotBytes, _ := yaml2.Marshal(config) log.Infof("Config:\n%s", gotBytes) return StartCustomServer(ctx, connParams, connAppDebugParams, dbName, config) diff --git a/go/vt/vttablet/endtoend/main_test.go b/go/vt/vttablet/endtoend/main_test.go index b5256be0994..939147cb139 100644 --- a/go/vt/vttablet/endtoend/main_test.go +++ b/go/vt/vttablet/endtoend/main_test.go @@ -37,6 +37,7 @@ import ( var ( connParams mysql.ConnParams connAppDebugParams mysql.ConnParams + cluster vttest.LocalCluster ) func TestMain(m *testing.M) { @@ -69,7 +70,7 @@ func TestMain(m *testing.M) { return 1 } defer os.RemoveAll(cfg.SchemaDir) - cluster := vttest.LocalCluster{ + cluster = vttest.LocalCluster{ Config: cfg, } if err := cluster.Setup(); err != nil { @@ -307,7 +308,7 @@ var tableACLConfig = `{ }, { "name": "sys_table", - "table_names_or_prefixes": ["tables", "user", "processlist", "mutex_instances", "columns", "a"], + "table_names_or_prefixes": ["tables", "user", "processlist", "mutex_instances", "columns", "a", "func"], "readers": ["dev"], "writers": ["dev"], "admins": ["dev"] @@ -332,6 +333,13 @@ var tableACLConfig = `{ "readers": ["dev"], "writers": ["dev"], "admins": ["dev"] + }, + { + "name": "vitess_internal", + "table_names_or_prefixes": ["udfs"], + "readers": ["dev"], + "writers": ["dev"], + "admins": ["dev"] } ] }` diff --git a/go/vt/vttablet/endtoend/udf.so b/go/vt/vttablet/endtoend/udf.so new file mode 100755 index 0000000000000000000000000000000000000000..b0af697aaf2a4d9e69921dd65dd1719704c7c52a GIT binary patch literal 33608 zcmeI5Yiv|S6vxlqZRu7@i=_zZsrjg#AO1@WR|O5qJ4C`m}ol70q#|*c4b|yWg7V~N2;Xt zY&MHz>29Us!5wj}a5Xw!>LI(Y$I66Abk8k!36ylJO2y)dO|fCq=y)5J+g*2Q57FK6 zd^#Sl-6~2o#^TXXWb?2-a=l3%Z#MI{R&y+GmWKUnPkMR(ZEdZEkkGhB4z!V_&P7_IRyX z--yI>isV(zqSRwGs~=riRpWdkN$p|kTI#!);#XB^5Nt?^v zS{F`IX0k|sH?Q}YoxGS)KmY_l00ck)1V8`;KmY_l00ck)1V8`;KmY_l00ck)1V8`; zKmY_l00ck)1V8`;KmY_l00ck)1V8`;KmY_l00ck)1V8`;KmY_l00ck)1V8`;#vw4| zHTpAc-mc6S)ZXbWzJ8&A(ibOEda%Gu`wa6#Up75GnA0(%#Gu8zjkCzIXS@2QcFfqK z`vxYOY4$lZo0he2Bsx+;$tQdU6^f*;6)!2^@nj^wS;u_?IXp*>nWoEB#@kIZ((dxA z>m|9A_Kg}R^QNw;CG1z2X{+qwoZI`n>fF$C#JL+?nM!JR;`wsT^q`kx81&m9=awQn z?*Xrw=DoB>Bvz)aqKn54#~9!>U*I)g50^kN0=Z! ze3!Xj8G|D;qB(r_LEFbh)rJMuAqEAUK-O}5-dwIkPg(Kh zAPZYxr2A)UJ)rx`9DhDbLCR7V9?n0zRF2Xgv2}e+0+HM$FZaafu(;cP((AM(iRG%CX4t3`946$m`?u4s+AVv50M)kFke5S#p`Yk4_> z@~=;$$LWqG!lkihdUIppJFlKOQ{UeD{+6Znhnr5GKHht5#i}`vZ$0|()k(3BPVIU7 zy}IVFt~B*!eb?W=)w8B9duV6R&S&?X|Kz3W7w&m$y#7q*w`KP~P5A~lO<$hs|D(!(aM!`YX#SN~e$VdeonA4e^rz2G@BO;J`(#ng)Z^3V{+WB_#ZL#u ui~nEy7hWu{|FwVU$Fpa*|6Tt<@z0$(u@g0izMn9!k*whW^$7 literal 0 HcmV?d00001 diff --git a/go/vt/vttablet/endtoend/udfs_test.go b/go/vt/vttablet/endtoend/udfs_test.go new file mode 100644 index 00000000000..aed7c3c5a97 --- /dev/null +++ b/go/vt/vttablet/endtoend/udfs_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endtoend + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/callerid" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" +) + +const ( + soFileName = "udf.so" + udfRows = "select * from _vt.udfs" +) + +// TestUDFs will validate that UDFs signal is sent through the stream health. +func TestUDFs(t *testing.T) { + client := framework.NewClient() + + client.UpdateContext(callerid.NewContext( + context.Background(), + &vtrpcpb.CallerID{}, + &querypb.VTGateCallerID{Username: "dev"})) + + copySOFile(t, client) + + ch := make(chan any) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + err := client.StreamHealthWithContext(ctx, func(shr *querypb.StreamHealthResponse) error { + if shr.RealtimeStats.UdfsChanged { + ch <- true + } + return nil + }) + require.NoError(t, err) + }() + + // create a user defined function directly on mysql as it is not supported by vitess parser. + err := cluster.Execute([]string{"CREATE AGGREGATE FUNCTION myudf RETURNS REAL SONAME 'udf.so';"}, "vttest") + require.NoError(t, err) + + validateHealthStreamSignal(t, client, ch, + `[[BINARY("myudf") INT8(1) BINARY("udf.so") ENUM("aggregate")]]`, + `[[VARBINARY("myudf") VARBINARY("double") VARBINARY("aggregate")]]`) + + // dropping the user defined function. + err = cluster.Execute([]string{"drop function myudf"}, "vttest") + require.NoError(t, err) + + validateHealthStreamSignal(t, client, ch, + `[]`, + `[]`) +} + +func validateHealthStreamSignal(t *testing.T, client *framework.QueryClient, ch chan any, expected ...string) { + t.Helper() + + // validate the row in mysql.func. + qr, err := client.Execute("select * from mysql.func", nil) + require.NoError(t, err) + require.Equal(t, expected[0], fmt.Sprintf("%v", qr.Rows)) + + // wait for udf update + select { + case <-ch: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for udf create signal") + } + + // validate the row in _vt.udfs. + qr, err = client.Execute(udfRows, nil) + require.NoError(t, err) + require.Equal(t, expected[1], fmt.Sprintf("%v", qr.Rows)) +} + +// TestUDFRFC will validate that UDFs are received through the rfc call. +func TestUDFRFC(t *testing.T) { + client := framework.NewClient() + + client.UpdateContext(callerid.NewContext( + context.Background(), + &vtrpcpb.CallerID{}, + &querypb.VTGateCallerID{Username: "dev"})) + + copySOFile(t, client) + + // create a user defined function directly on mysql as it is not supported by vitess parser. + err := cluster.Execute([]string{"CREATE AGGREGATE FUNCTION myudf RETURNS REAL SONAME 'udf.so';"}, "vttest") + require.NoError(t, err) + + validateRPC(t, client, func(udfs map[string]string) bool { + // keep checking till the udf is added. + return len(udfs) == 0 + }) + + // dropping the user defined function. + err = cluster.Execute([]string{"drop function myudf"}, "vttest") + require.NoError(t, err) + + validateRPC(t, client, func(udfs map[string]string) bool { + // keep checking till the udf is removed. + return len(udfs) != 0 + }) +} + +func validateRPC(t *testing.T, client *framework.QueryClient, cond func(udfs map[string]string) bool) (<-chan time.Time, bool) { + timeout := time.After(30 * time.Second) + conditionNotMet := true + for conditionNotMet { + time.Sleep(1 * time.Second) + select { + case <-timeout: + t.Fatal("timed out waiting for updated udf") + default: + udfs, err := client.GetSchema(querypb.SchemaTableType_UDF_AGGREGATE) + require.NoError(t, err) + conditionNotMet = cond(udfs) + } + } + return timeout, conditionNotMet +} + +func copySOFile(t *testing.T, client *framework.QueryClient) { + t.Helper() + qr, err := client.Execute("select @@plugin_dir", nil) + require.NoError(t, err) + pluginDir := qr.Rows[0][0].ToString() + + source, err := os.Open(soFileName) + require.NoError(t, err) + defer source.Close() + + destination, err := os.Create(pluginDir + soFileName) + if err != nil && strings.Contains(err.Error(), "permission denied") { + t.Skip("permission denied to copy so file") + } + require.NoError(t, err) + defer destination.Close() + + _, err = io.Copy(destination, source) + require.NoError(t, err) +} diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index 8ff6834aeaf..c13d11df69e 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -312,8 +312,8 @@ func (hs *healthStreamer) MakePrimary(serving bool) { // We register for notifications from the schema Engine only when schema tracking is enabled, // and we are going to a serving primary state. if serving && hs.signalWhenSchemaChange { - hs.se.RegisterNotifier("healthStreamer", func(full map[string]*schema.Table, created, altered, dropped []*schema.Table) { - if err := hs.reload(full, created, altered, dropped); err != nil { + hs.se.RegisterNotifier("healthStreamer", func(full map[string]*schema.Table, created, altered, dropped []*schema.Table, udfsChanged bool) { + if err := hs.reload(created, altered, dropped, udfsChanged); err != nil { log.Errorf("periodic schema reload failed in health stream: %v", err) } }, false) @@ -328,7 +328,7 @@ func (hs *healthStreamer) MakeNonPrimary() { } // reload reloads the schema from the underlying mysql for the tables that we get the alert on. -func (hs *healthStreamer) reload(full map[string]*schema.Table, created, altered, dropped []*schema.Table) error { +func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfsChanged bool) error { hs.mu.Lock() defer hs.mu.Unlock() // Schema Reload to happen only on primary when it is serving. @@ -366,16 +366,17 @@ func (hs *healthStreamer) reload(full map[string]*schema.Table, created, altered } // no change detected - if len(tables) == 0 && len(views) == 0 { + if len(tables) == 0 && len(views) == 0 && !udfsChanged { return nil } hs.state.RealtimeStats.TableSchemaChanged = tables hs.state.RealtimeStats.ViewSchemaChanged = views + hs.state.RealtimeStats.UdfsChanged = udfsChanged shr := hs.state.CloneVT() hs.broadCastToClients(shr) hs.state.RealtimeStats.TableSchemaChanged = nil hs.state.RealtimeStats.ViewSchemaChanged = nil - + hs.state.RealtimeStats.UdfsChanged = false return nil } diff --git a/go/vt/vttablet/tabletserver/health_streamer_test.go b/go/vt/vttablet/tabletserver/health_streamer_test.go index 14a1899d07b..ff61787dd1d 100644 --- a/go/vt/vttablet/tabletserver/health_streamer_test.go +++ b/go/vt/vttablet/tabletserver/health_streamer_test.go @@ -91,7 +91,7 @@ func TestNotServingPrimaryNoWrite(t *testing.T) { // A reload now should not write anything to the database. If any write happens it will error out since we have not // added any query to the database to expect. t1 := schema.NewTable("t1", schema.NoType) - err := hs.reload(map[string]*schema.Table{"t1": t1}, []*schema.Table{t1}, nil, nil) + err := hs.reload([]*schema.Table{t1}, nil, nil, false) require.NoError(t, err) require.NoError(t, db.LastError()) } @@ -380,6 +380,8 @@ func TestReloadView(t *testing.T) { )) db.AddQueryPattern(".*SELECT table_name, view_definition.*views.*", &sqltypes.Result{}) db.AddQuery("SELECT TABLE_NAME, CREATE_TIME FROM _vt.`tables`", &sqltypes.Result{}) + // adding query pattern for udfs + db.AddQueryPattern("SELECT name.*", &sqltypes.Result{}) hs.InitDBConfig(target, configs.DbaWithDB()) se.InitDBConfig(configs.DbaWithDB()) diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 4204c5c0b7e..612619f7ccc 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -138,7 +138,7 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype return mm.Subscribe(ctx, send), nil } -func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table) { +func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) { me.mu.Lock() defer me.mu.Unlock() for _, table := range append(dropped, altered...) { diff --git a/go/vt/vttablet/tabletserver/messager/engine_test.go b/go/vt/vttablet/tabletserver/messager/engine_test.go index eda585694f1..30e849ac73b 100644 --- a/go/vt/vttablet/tabletserver/messager/engine_test.go +++ b/go/vt/vttablet/tabletserver/messager/engine_test.go @@ -70,28 +70,28 @@ func TestEngineSchemaChanged(t *testing.T) { engine := newTestEngine() defer engine.Close() - engine.schemaChanged(nil, []*schema.Table{meTableT1, tableT2}, nil, nil) + engine.schemaChanged(nil, []*schema.Table{meTableT1, tableT2}, nil, nil, true) got := extractManagerNames(engine.managers) want := map[string]bool{"t1": true} if !reflect.DeepEqual(got, want) { t.Errorf("got: %+v, want %+v", got, want) } - engine.schemaChanged(nil, []*schema.Table{meTableT3}, nil, nil) + engine.schemaChanged(nil, []*schema.Table{meTableT3}, nil, nil, true) got = extractManagerNames(engine.managers) want = map[string]bool{"t1": true, "t3": true} if !reflect.DeepEqual(got, want) { t.Errorf("got: %+v, want %+v", got, want) } - engine.schemaChanged(nil, []*schema.Table{meTableT4}, nil, []*schema.Table{meTableT3, tableT5}) + engine.schemaChanged(nil, []*schema.Table{meTableT4}, nil, []*schema.Table{meTableT3, tableT5}, true) got = extractManagerNames(engine.managers) want = map[string]bool{"t1": true, "t4": true} if !reflect.DeepEqual(got, want) { t.Errorf("got: %+v, want %+v", got, want) } // Test update - engine.schemaChanged(nil, nil, []*schema.Table{meTableT2, tableT4}, nil) + engine.schemaChanged(nil, nil, []*schema.Table{meTableT2, tableT4}, nil, true) got = extractManagerNames(engine.managers) want = map[string]bool{"t1": true, "t2": true} if !reflect.DeepEqual(got, want) { @@ -109,7 +109,7 @@ func extractManagerNames(in map[string]*messageManager) map[string]bool { func TestSubscribe(t *testing.T) { engine := newTestEngine() - engine.schemaChanged(nil, []*schema.Table{meTableT1, meTableT2}, nil, nil) + engine.schemaChanged(nil, []*schema.Table{meTableT1, meTableT2}, nil, nil, true) f1, ch1 := newEngineReceiver() f2, ch2 := newEngineReceiver() // Each receiver is subscribed to different managers. @@ -140,7 +140,7 @@ func TestSubscribe(t *testing.T) { func TestEngineGenerate(t *testing.T) { engine := newTestEngine() defer engine.Close() - engine.schemaChanged(nil, []*schema.Table{meTableT1}, nil, nil) + engine.schemaChanged(nil, []*schema.Table{meTableT1}, nil, nil, true) if _, err := engine.GetGenerator("t1"); err != nil { t.Error(err) diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index f009a72ceee..dc4128a7c69 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -530,7 +530,7 @@ func (qe *QueryEngine) IsMySQLReachable() error { return nil } -func (qe *QueryEngine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table) { +func (qe *QueryEngine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) { qe.schemaMu.Lock() defer qe.schemaMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index d1fbc96123f..78351ab23be 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1161,6 +1161,8 @@ func (qre *QueryExecutor) GetSchemaDefinitions(tableType querypb.SchemaTableType return qre.getTableDefinitions(tableNames, callback) case querypb.SchemaTableType_ALL: return qre.getAllDefinitions(tableNames, callback) + case querypb.SchemaTableType_UDF_AGGREGATE: + return qre.getUDFs(callback) } return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid table type %v", tableType) } @@ -1209,3 +1211,25 @@ func (qre *QueryExecutor) executeGetSchemaQuery(query string, callback func(sche return callback(&querypb.GetSchemaResponse{TableDefinition: schemaDef}) }) } + +func (qre *QueryExecutor) getUDFs(callback func(schemaRes *querypb.GetSchemaResponse) error) error { + query, err := eschema.GetFetchUDFsQuery(qre.tsv.env.Parser()) + if err != nil { + return err + } + + conn, err := qre.getStreamConn() + if err != nil { + return err + } + defer conn.Recycle() + + return qre.execStreamSQL(conn, false /* isTransaction */, query, func(result *sqltypes.Result) error { + schemaDef := make(map[string]string) + for _, row := range result.Rows { + udf := row[0].ToString() + schemaDef[udf] = row[1].ToString() + } + return callback(&querypb.GetSchemaResponse{TableDefinition: schemaDef}) + }) +} diff --git a/go/vt/vttablet/tabletserver/schema/db.go b/go/vt/vttablet/tabletserver/schema/db.go index 4bea80c4010..4fa7062ebca 100644 --- a/go/vt/vttablet/tabletserver/schema/db.go +++ b/go/vt/vttablet/tabletserver/schema/db.go @@ -19,6 +19,8 @@ package schema import ( "context" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -86,6 +88,32 @@ where table_schema = database() and table_name in ::viewNames` // fetchTablesAndViews queries fetches all information about tables and views fetchTablesAndViews = `select table_name, create_statement from %s.tables where table_schema = database() union select table_name, create_statement from %s.views where table_schema = database()` + + // detectUdfChange query detects if there is any udf change from previous copy. + detectUdfChange = `SELECT name +FROM ( + SELECT name FROM + mysql.func + + UNION ALL + + SELECT function_name + FROM %s.udfs +) _inner +GROUP BY name +HAVING COUNT(*) = 1 +LIMIT 1 +` + + // deleteAllUdfs clears out the udfs table. + deleteAllUdfs = `delete from %s.udfs` + + // copyUdfs copies user defined function to the udfs table. + copyUdfs = `INSERT INTO %s.udfs(FUNCTION_NAME, FUNCTION_RETURN_TYPE, FUNCTION_TYPE) +SELECT f.name, i.UDF_RETURN_TYPE, f.type FROM mysql.func f left join performance_schema.user_defined_functions i on f.name = i.udf_name +` + // fetchAggregateUdfs queries fetches all the aggregate user defined functions. + fetchAggregateUdfs = `select function_name, function_return_type from %s.udfs where function_type = 'aggregate'` ) // reloadTablesDataInDB reloads teh tables information we have stored in our database we use for schema-tracking. @@ -313,6 +341,31 @@ func getChangedViewNames(ctx context.Context, conn *connpool.Conn, isServingPrim return views, nil } +func getChangedUserDefinedFunctions(ctx context.Context, conn *connpool.Conn, isServingPrimary bool) (bool, error) { + if !isServingPrimary { + return false, nil + } + + udfsChanged := false + callback := func(qr *sqltypes.Result) error { + // If we receive any row as output which means udf was modified. + udfsChanged = len(qr.Rows) > 0 + return nil + } + alloc := func() *sqltypes.Result { return &sqltypes.Result{} } + bufferSize := 1000 + + udfChangeQuery := sqlparser.BuildParsedQuery(detectUdfChange, sidecar.GetIdentifier()).Query + err := conn.Stream(ctx, udfChangeQuery, callback, alloc, bufferSize, 0) + if err != nil { + return false, err + } + if udfsChanged { + log.Info("Underlying User Defined Functions have changed") + } + return udfsChanged, nil +} + // getMismatchedTableNames gets the tables that do not align with the tables information we have in the cache. func (se *Engine) getMismatchedTableNames(ctx context.Context, conn *connpool.Conn, isServingPrimary bool) (map[string]any, error) { tablesMismatched := make(map[string]any) @@ -358,7 +411,7 @@ func (se *Engine) getMismatchedTableNames(ctx context.Context, conn *connpool.Co } // reloadDataInDB reloads the schema tracking data in the database -func reloadDataInDB(ctx context.Context, conn *connpool.Conn, altered []*Table, created []*Table, dropped []*Table, parser *sqlparser.Parser) error { +func reloadDataInDB(ctx context.Context, conn *connpool.Conn, altered, created, dropped []*Table, udfsChanged bool, parser *sqlparser.Parser) error { // tablesToReload and viewsToReload stores the tables and views that need reloading and storing in our MySQL database. var tablesToReload, viewsToReload []*Table // droppedTables, droppedViews stores the list of tables and views we need to delete, respectively. @@ -388,6 +441,42 @@ func reloadDataInDB(ctx context.Context, conn *connpool.Conn, altered []*Table, if err := reloadViewsDataInDB(ctx, conn, viewsToReload, droppedViews, parser); err != nil { return err } + if err := reloadUdfsInDB(ctx, conn, udfsChanged, parser); err != nil { + return err + } + return nil +} + +func reloadUdfsInDB(ctx context.Context, conn *connpool.Conn, udfsChanged bool, parser *sqlparser.Parser) error { + if !udfsChanged { + return nil + } + + clearUdfQuery := sqlparser.BuildParsedQuery(deleteAllUdfs, sidecar.GetIdentifier()).Query + copyUdfQuery := sqlparser.BuildParsedQuery(copyUdfs, sidecar.GetIdentifier()).Query + + // Reload the udfs in a transaction. + _, err := conn.Exec(ctx, "begin", 1, false) + if err != nil { + return err + } + defer conn.Exec(ctx, "rollback", 1, false) + + _, err = conn.Exec(ctx, clearUdfQuery, 1, false) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, copyUdfQuery, 1, false) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, "commit", 1, false) + if err != nil { + return err + } + return nil } @@ -459,3 +548,12 @@ func GetFetchTableAndViewsQuery(tableNames []string, parser *sqlparser.Parser) ( } return parsedQuery.GenerateQuery(bv, nil) } + +// GetFetchUDFsQuery gets the fetch query to retrieve all the UDFs. +func GetFetchUDFsQuery(parser *sqlparser.Parser) (string, error) { + parsedQuery, err := generateFullQuery(fetchAggregateUdfs, parser) + if err != nil { + return "", err + } + return parsedQuery.Query, nil +} diff --git a/go/vt/vttablet/tabletserver/schema/db_test.go b/go/vt/vttablet/tabletserver/schema/db_test.go index fec6469d4cf..d0ff91b63d5 100644 --- a/go/vt/vttablet/tabletserver/schema/db_test.go +++ b/go/vt/vttablet/tabletserver/schema/db_test.go @@ -898,7 +898,7 @@ func TestReloadDataInDB(t *testing.T) { db.AddRejectedQuery(query, errorToThrow) } - err = reloadDataInDB(context.Background(), conn, tc.altered, tc.created, tc.dropped, sqlparser.NewTestParser()) + err = reloadDataInDB(context.Background(), conn, tc.altered, tc.created, tc.dropped, false, sqlparser.NewTestParser()) if tc.expectedError != "" { require.ErrorContains(t, err, tc.expectedError) return diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 1995bd5472d..0bf2ebefd54 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -58,7 +58,7 @@ import ( const maxTableCount = 10000 -type notifier func(full map[string]*Table, created, altered, dropped []*Table) +type notifier func(full map[string]*Table, created, altered, dropped []*Table, udfsChanged bool) // Engine stores the schema info and performs operations that // keep itself up-to-date. @@ -447,6 +447,11 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { return err } + udfsChanged, err := getChangedUserDefinedFunctions(ctx, conn.Conn, shouldUseDatabase) + if err != nil { + return err + } + rec := concurrency.AllErrorRecorder{} // curTables keeps track of tables in the new snapshot so we can detect what was dropped. curTables := map[string]bool{"dual": true} @@ -536,7 +541,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { if shouldUseDatabase { // If reloadDataInDB succeeds, then we don't want to prevent sending the broadcast notification. // So, we do this step in the end when we can receive no more errors that fail the reload operation. - err = reloadDataInDB(ctx, conn.Conn, altered, created, dropped, se.env.Environment().Parser()) + err = reloadDataInDB(ctx, conn.Conn, altered, created, dropped, udfsChanged, se.env.Environment().Parser()) if err != nil { log.Errorf("error in updating schema information in Engine.reload() - %v", err) } @@ -550,7 +555,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { if len(created) > 0 || len(altered) > 0 || len(dropped) > 0 { log.Infof("schema engine created %v, altered %v, dropped %v", extractNamesFromTablesList(created), extractNamesFromTablesList(altered), extractNamesFromTablesList(dropped)) } - se.broadcast(created, altered, dropped) + se.broadcast(created, altered, dropped, udfsChanged) return nil } @@ -709,7 +714,7 @@ func (se *Engine) RegisterNotifier(name string, f notifier, runNotifier bool) { } if runNotifier { s := maps.Clone(se.tables) - f(s, created, nil, nil) + f(s, created, nil, nil, true) } } @@ -730,7 +735,7 @@ func (se *Engine) UnregisterNotifier(name string) { } // broadcast must be called while holding a lock on se.mu. -func (se *Engine) broadcast(created, altered, dropped []*Table) { +func (se *Engine) broadcast(created, altered, dropped []*Table, udfsChanged bool) { if !se.isOpen { return } @@ -739,7 +744,7 @@ func (se *Engine) broadcast(created, altered, dropped []*Table) { defer se.notifierMu.Unlock() s := maps.Clone(se.tables) for _, f := range se.notifiers { - f(s, created, altered, dropped) + f(s, created, altered, dropped, udfsChanged) } } diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index b9492cbd185..b6cdf244297 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -154,7 +154,7 @@ func TestOpenAndReload(t *testing.T) { AddFakeInnoDBReadRowsResult(db, secondReadRowsValue) firstTime := true - notifier := func(full map[string]*Table, created, altered, dropped []*Table) { + notifier := func(full map[string]*Table, created, altered, dropped []*Table, _ bool) { if firstTime { firstTime = false createTables := extractNamesFromTablesList(created) @@ -718,7 +718,7 @@ func TestRegisterNotifier(t *testing.T) { var tablesReceived map[string]*Table // Register a notifier and make it run immediately. - se.RegisterNotifier("TestRegisterNotifier", func(full map[string]*Table, created, altered, dropped []*Table) { + se.RegisterNotifier("TestRegisterNotifier", func(full map[string]*Table, created, altered, dropped []*Table, _ bool) { tablesReceived = full }, true) @@ -1284,8 +1284,11 @@ func TestEngineReload(t *testing.T) { db.AddQuery("insert into _vt.views(TABLE_SCHEMA, TABLE_NAME, CREATE_STATEMENT, VIEW_DEFINITION) values (database(), 'V2', 'create_table_V2', 'select_V2')", &sqltypes.Result{}) } + // adding query pattern for udfs + db.AddQueryPattern("SELECT name.*", &sqltypes.Result{}) + // Verify the list of created, altered and dropped tables seen. - se.RegisterNotifier("test", func(full map[string]*Table, created, altered, dropped []*Table) { + se.RegisterNotifier("test", func(full map[string]*Table, created, altered, dropped []*Table, _ bool) { require.ElementsMatch(t, extractNamesFromTablesList(created), []string{"T2", "V2"}) require.ElementsMatch(t, extractNamesFromTablesList(altered), []string{"t2", "v2"}) require.ElementsMatch(t, extractNamesFromTablesList(dropped), []string{"t4", "v4", "t5", "v5"}) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 8a1a45ca4a2..87a6904d99a 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -431,7 +431,7 @@ func (tsv *TabletServer) ReloadSchema(ctx context.Context) error { // changes to finish being applied. func (tsv *TabletServer) WaitForSchemaReset(timeout time.Duration) { onSchemaChange := make(chan struct{}, 1) - tsv.se.RegisterNotifier("_tsv_wait", func(_ map[string]*schema.Table, _, _, _ []*schema.Table) { + tsv.se.RegisterNotifier("_tsv_wait", func(_ map[string]*schema.Table, _, _, _ []*schema.Table, _ bool) { onSchemaChange <- struct{}{} }, true) defer tsv.se.UnregisterNotifier("_tsv_wait") diff --git a/proto/query.proto b/proto/query.proto index 4d94fcb2c83..696dadd6ffe 100644 --- a/proto/query.proto +++ b/proto/query.proto @@ -883,6 +883,9 @@ message RealtimeStats { // view_schema_changed is to provide list of views that have schema changes detected by the tablet. repeated string view_schema_changed = 8; + + // udfs_changed is used to signal that the UDFs have changed on the tablet. + bool udfs_changed = 9; } // AggregateStats contains information about the health of a group of @@ -989,6 +992,7 @@ enum SchemaTableType { VIEWS = 0; TABLES = 1; ALL = 2; + UDF_AGGREGATE = 3; } // GetSchemaRequest is the payload to GetSchema diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 1dac3e4cac5..7a97f8ee5b5 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -41299,6 +41299,9 @@ export namespace query { /** RealtimeStats view_schema_changed */ view_schema_changed?: (string[]|null); + + /** RealtimeStats udfs_changed */ + udfs_changed?: (boolean|null); } /** Represents a RealtimeStats. */ @@ -41334,6 +41337,9 @@ export namespace query { /** RealtimeStats view_schema_changed. */ public view_schema_changed: string[]; + /** RealtimeStats udfs_changed. */ + public udfs_changed: boolean; + /** * Creates a new RealtimeStats instance using the specified properties. * @param [properties] Properties to set @@ -41775,7 +41781,8 @@ export namespace query { enum SchemaTableType { VIEWS = 0, TABLES = 1, - ALL = 2 + ALL = 2, + UDF_AGGREGATE = 3 } /** Properties of a GetSchemaRequest. */ diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 35e7ffc5eae..ebd4706cd9c 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -99735,6 +99735,7 @@ export const query = $root.query = (() => { * @property {number|null} [qps] RealtimeStats qps * @property {Array.|null} [table_schema_changed] RealtimeStats table_schema_changed * @property {Array.|null} [view_schema_changed] RealtimeStats view_schema_changed + * @property {boolean|null} [udfs_changed] RealtimeStats udfs_changed */ /** @@ -99818,6 +99819,14 @@ export const query = $root.query = (() => { */ RealtimeStats.prototype.view_schema_changed = $util.emptyArray; + /** + * RealtimeStats udfs_changed. + * @member {boolean} udfs_changed + * @memberof query.RealtimeStats + * @instance + */ + RealtimeStats.prototype.udfs_changed = false; + /** * Creates a new RealtimeStats instance using the specified properties. * @function create @@ -99860,6 +99869,8 @@ export const query = $root.query = (() => { if (message.view_schema_changed != null && message.view_schema_changed.length) for (let i = 0; i < message.view_schema_changed.length; ++i) writer.uint32(/* id 8, wireType 2 =*/66).string(message.view_schema_changed[i]); + if (message.udfs_changed != null && Object.hasOwnProperty.call(message, "udfs_changed")) + writer.uint32(/* id 9, wireType 0 =*/72).bool(message.udfs_changed); return writer; }; @@ -99930,6 +99941,10 @@ export const query = $root.query = (() => { message.view_schema_changed.push(reader.string()); break; } + case 9: { + message.udfs_changed = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -99997,6 +100012,9 @@ export const query = $root.query = (() => { if (!$util.isString(message.view_schema_changed[i])) return "view_schema_changed: string[] expected"; } + if (message.udfs_changed != null && message.hasOwnProperty("udfs_changed")) + if (typeof message.udfs_changed !== "boolean") + return "udfs_changed: boolean expected"; return null; }; @@ -100045,6 +100063,8 @@ export const query = $root.query = (() => { for (let i = 0; i < object.view_schema_changed.length; ++i) message.view_schema_changed[i] = String(object.view_schema_changed[i]); } + if (object.udfs_changed != null) + message.udfs_changed = Boolean(object.udfs_changed); return message; }; @@ -100076,6 +100096,7 @@ export const query = $root.query = (() => { object.filtered_replication_lag_seconds = options.longs === String ? "0" : 0; object.cpu_usage = 0; object.qps = 0; + object.udfs_changed = false; } if (message.health_error != null && message.hasOwnProperty("health_error")) object.health_error = message.health_error; @@ -100102,6 +100123,8 @@ export const query = $root.query = (() => { for (let j = 0; j < message.view_schema_changed.length; ++j) object.view_schema_changed[j] = message.view_schema_changed[j]; } + if (message.udfs_changed != null && message.hasOwnProperty("udfs_changed")) + object.udfs_changed = message.udfs_changed; return object; }; @@ -101095,12 +101118,14 @@ export const query = $root.query = (() => { * @property {number} VIEWS=0 VIEWS value * @property {number} TABLES=1 TABLES value * @property {number} ALL=2 ALL value + * @property {number} UDF_AGGREGATE=3 UDF_AGGREGATE value */ query.SchemaTableType = (function() { const valuesById = {}, values = Object.create(valuesById); values[valuesById[0] = "VIEWS"] = 0; values[valuesById[1] = "TABLES"] = 1; values[valuesById[2] = "ALL"] = 2; + values[valuesById[3] = "UDF_AGGREGATE"] = 3; return values; })(); @@ -101281,6 +101306,7 @@ export const query = $root.query = (() => { case 0: case 1: case 2: + case 3: break; } if (message.table_names != null && message.hasOwnProperty("table_names")) { @@ -101329,6 +101355,10 @@ export const query = $root.query = (() => { case 2: message.table_type = 2; break; + case "UDF_AGGREGATE": + case 3: + message.table_type = 3; + break; } if (object.table_names) { if (!Array.isArray(object.table_names))