Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add udfs to vschema on update #15771

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,22 @@ func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Targe
t.mu.Lock()
defer t.mu.Unlock()

var udfs []string
err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDFS, nil, func(schemaRes *querypb.GetSchemaResponse) error {
var udfs []string
for _, udf := range schemaRes.Udfs {
if !udf.Aggregating {
continue
}
udfs = append(udfs, udf.Name)
}

t.udfs[target.Keyspace] = udfs
return nil
})
if err != nil {
log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err)
return err
}
log.Infof("finished loading UDFs for keyspace %s", target.Keyspace)
t.udfs[target.Keyspace] = udfs
log.Infof("finished loading %d UDFs for keyspace %s", len(udfs), target.Keyspace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will we be logging this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On VTGate start and whenever there is a change in UDFs

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/vschema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
vm.updateTableInfo(vschema, ks, ksName)
vm.updateViewInfo(ks, ksName)
vm.updateUDFsInfo(ks, ksName)
}
}

Expand Down Expand Up @@ -267,6 +268,11 @@ func (vm *VSchemaManager) updateTableInfo(vschema *vindexes.VSchema, ks *vindexe
}
}

// updateUDFsInfo updates the aggregate UDFs in the Vschema.
func (vm *VSchemaManager) updateUDFsInfo(ks *vindexes.KeyspaceSchema, ksName string) {
ks.AggregateUDFs = vm.schema.UDFs(ksName)
}

func markErrorIfCyclesInFk(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
// Only check cyclic foreign keys for keyspaces that have
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/vschema_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,38 @@ func TestRebuildVSchema(t *testing.T) {
}
}

// TestVSchemaUDFsUpdate tests that the UDFs are updated in the VSchema.
func TestVSchemaUDFsUpdate(t *testing.T) {
ks := &vindexes.Keyspace{Name: "ks", Sharded: true}

vm := &VSchemaManager{}
var vs *vindexes.VSchema
vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) {
vs = vschema
vs.ResetCreated()
}
vm.schema = &fakeSchema{udfs: []string{"udf1", "udf2"}}
vm.VSchemaUpdate(&vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"ks": {Sharded: true},
},
}, nil)

utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{
RoutingRules: map[string]*vindexes.RoutingRule{},
Keyspaces: map[string]*vindexes.KeyspaceSchema{
"ks": {
Keyspace: ks,
ForeignKeyMode: vschemapb.Keyspace_unmanaged,
Tables: map[string]*vindexes.Table{},
Vindexes: map[string]vindexes.Vindex{},
AggregateUDFs: []string{"udf1", "udf2"},
},
},
}, vs)
utils.MustMatch(t, vs, vm.currentVschema, "currentVschema does not match Vschema")
}

func TestMarkErrorIfCyclesInFk(t *testing.T) {
ksName := "ks"
keyspace := &vindexes.Keyspace{
Expand Down
Loading