Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restructure discovery to do more up front #315

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 100 additions & 73 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"io"
"math/rand"
"os"
"sort"
"time"

"google.golang.org/grpc/resolver"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

// File based discovery for vtgate grpc endpoints
Expand Down Expand Up @@ -60,8 +62,9 @@ type JSONGateResolverBuilder struct {
portField string
poolTypeField string
affinityField string
affinityValue string

targets []targetHost
targets map[string][]targetHost
resolvers []*JSONGateResolver

rand *rand.Rand
Expand All @@ -70,9 +73,9 @@ type JSONGateResolverBuilder struct {
}

type targetHost struct {
addr string
poolType string
affinity string
Addr string
PoolType string
Affinity string
}

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
Expand All @@ -84,10 +87,8 @@ type JSONGateResolver struct {
}

var (
buildCount = stats.NewCounter("JsonDiscoveryBuild", "JSON host discovery rebuilt the host list")
unchangedCount = stats.NewCounter("JsonDiscoveryUnchanged", "JSON host discovery parsed and determined no change to the file")
affinityCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostAffinity", "Count of hosts returned from discovery by AZ affinity", "affinity")
poolTypeCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostPoolType", "Count of hosts returned from discovery by pool type", "type")
parseCount = stats.NewCountersWithSingleLabel("JsonDiscoveryParseCount", "Count of results of JSON host file parsing (changed, unchanged, error)", "result")
targetCount = stats.NewGaugesWithSingleLabel("JsonDiscoveryTargetCount", "Count of hosts returned from discovery by pool type", "pool")
)

func RegisterJSONGateResolver(
Expand All @@ -96,13 +97,16 @@ func RegisterJSONGateResolver(
portField string,
poolTypeField string,
affinityField string,
affinityValue string,
) (*JSONGateResolverBuilder, error) {
jsonDiscovery := &JSONGateResolverBuilder{
targets: map[string][]targetHost{},
jsonPath: jsonPath,
addressField: addressField,
portField: portField,
poolTypeField: poolTypeField,
affinityField: affinityField,
affinityValue: affinityValue,
}

resolver.Register(jsonDiscovery)
Expand All @@ -113,6 +117,8 @@ func RegisterJSONGateResolver(
return nil, err
}

servenv.AddStatusPart("JSON Discovery", targetsTemplate, jsonDiscovery.debugTargets)

return jsonDiscovery, nil
}

Expand All @@ -138,17 +144,19 @@ func (b *JSONGateResolverBuilder) start() error {
poolTypes := map[string]int{}
affinityTypes := map[string]int{}

for _, t := range b.targets {
count := poolTypes[t.poolType]
poolTypes[t.poolType] = count + 1
for _, ts := range b.targets {
for _, t := range ts {
count := poolTypes[t.PoolType]
poolTypes[t.PoolType] = count + 1

count = affinityTypes[t.affinity]
affinityTypes[t.affinity] = count + 1
count = affinityTypes[t.Affinity]
affinityTypes[t.Affinity] = count + 1
}
}

buildCount.Add(1)
parseCount.Add("changed", 1)

log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes)
log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes)

// Start a config watcher
b.ticker = time.NewTicker(1 * time.Second)
Expand All @@ -158,10 +166,12 @@ func (b *JSONGateResolverBuilder) start() error {
}

go func() {
var parseErr error
for range b.ticker.C {
checkFileStat, err := os.Stat(b.jsonPath)
if err != nil {
log.Errorf("Error stat'ing config %v\n", err)
parseCount.Add("error", 1)
continue
}
isUnchanged := checkFileStat.Size() == fileStat.Size() && checkFileStat.ModTime() == fileStat.ModTime()
Expand All @@ -173,12 +183,20 @@ func (b *JSONGateResolverBuilder) start() error {
fileStat = checkFileStat

contentsChanged, err := b.parse()
if err != nil || !contentsChanged {
unchangedCount.Add(1)
if err != nil {
parseCount.Add("error", 1)
if parseErr == nil || err.Error() != parseErr.Error() {
parseErr = err
log.Error(err)
}
continue
}

buildCount.Add(1)
parseErr = nil
if !contentsChanged {
parseCount.Add("unchanged", 1)
continue
}
parseCount.Add("changed", 1)

// notify all the resolvers that the targets changed
for _, r := range b.resolvers {
Expand Down Expand Up @@ -217,7 +235,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err)
}

var targets []targetHost
var targets = map[string][]targetHost{}
for _, host := range hosts {
address, hasAddress := host[b.addressField]
port, hasPort := host[b.portField]
Expand Down Expand Up @@ -258,8 +276,22 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port)
}

targets = append(targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)})
target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}
targets[target.PoolType] = append(targets[target.PoolType], target)
}

for poolType := range targets {
if b.affinityField != "" {
sort.Slice(targets[poolType], func(i, j int) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to sort here to pull the same-AZ hosts to the front of the list, but I don't think we want to slice out the ones in the other AZ.

If we don't have numConnections hosts in the same AZ, I think we still want the discovery to try to find that many valid targets, even if that means crossing AZs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not dropping any targets here, only sorting

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 Oh duh of course, my bad

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should actually get coffee before reviewing code next time.

return b.affinityValue == targets[poolType][i].Affinity
})
}
if len(targets[poolType]) > *numConnections {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much simpler than my unnecessary min() function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use the built in min if we weren't on go 1.18 😢

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know!!

targets[poolType] = targets[poolType][:*numConnections]
}
targetCount.Set(poolType, int64(len(targets[poolType])))
}

b.targets = targets

return true, nil
Expand All @@ -270,17 +302,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

// filter to only targets that match the pool type. if unset, this will just be a copy
// of the full target list.
targets := []targetHost{}
for _, target := range b.targets {
if r.poolType == target.poolType {
targets = append(targets, target)
log.V(1000).Infof("matched target %v with type %s", target, r.poolType)
} else {
log.V(1000).Infof("skipping host %v with type %s", target, r.poolType)
}
}
targets := b.targets[r.poolType]

// Shuffle to ensure every host has a different order to iterate through, putting
// the affinity matching (e.g. same az) hosts at the front and the non-matching ones
Expand All @@ -293,7 +315,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
for i := 0; i < n-1; i++ {
j := head + b.rand.Intn(tail-head+1)

if r.affinity == "" || r.affinity == targets[j].affinity {
if r.affinity == "" || r.affinity == targets[j].Affinity {
targets[head], targets[j] = targets[j], targets[head]
head++
} else {
Expand All @@ -302,32 +324,12 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {
}
}

// Grab the first N addresses, and voila!
var addrs []resolver.Address
targets = targets[:min(*numConnections, len(targets))]
for _, target := range targets {
addrs = append(addrs, resolver.Address{Addr: target.addr})
addrs = append(addrs, resolver.Address{Addr: target.Addr})
}

// Count some metrics
var unknown, local, remote int64
for _, target := range targets {
if r.affinity == "" {
unknown++
} else if r.affinity == target.affinity {
local++
} else {
remote++
}
}
if unknown != 0 {
affinityCount.Add("unknown", unknown)
}
affinityCount.Add("local", local)
affinityCount.Add("remote", remote)
poolTypeCount.Add(r.poolType, int64(len(targets)))

log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote)
log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)

r.clientConn.UpdateState(resolver.State{Addresses: addrs})
}
Expand All @@ -346,19 +348,13 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
}
}

// Affinity on the other hand is just an optimization
affinity := ""
if b.affinityField != "" {
affinity = attrs.Get(b.affinityField)
}

log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity)
log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, b.affinityValue)

r := &JSONGateResolver{
target: target,
clientConn: cc,
poolType: poolType,
affinity: affinity,
affinity: b.affinityValue,
}

b.update(r)
Expand All @@ -367,19 +363,50 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie
return r, nil
}

// debugTargets will return the builder's targets with a sorted slice of
// poolTypes for rendering debug output
func (b *JSONGateResolverBuilder) debugTargets() any {
var pools []string
for pool := range b.targets {
pools = append(pools, pool)
}
sort.Strings(pools)
return struct {
Pools []string
Targets map[string][]targetHost
}{
Pools: pools,
Targets: b.targets,
}
}

func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {}

func (r *JSONGateResolver) Close() {
log.Infof("Closing resolver for target %s", r.target.URL.String())
}

// Utilities
func min(a, b int) int {
if a < b {
return a
}
return b
}

func init() {
}
const (
// targetsTemplate is a HTML template to display the gate resolver's target hosts.
targetsTemplate = `
<style>
table {
border-collapse: collapse;
}
td, th {
border: 1px solid #999;
padding: 0.2rem;
}
</style>
<table>
{{range $i, $p := .Pools}} <tr>
<th colspan="2">{{$p}}</th>
</tr>
{{range index $.Targets $p}} <tr>
<td>{{.Addr}}</td>
<td>{{.Affinity}}</td>
</tr>{{end}}
{{end}}
</table>
`
)
4 changes: 3 additions & 1 deletion go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var (
vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution")
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts")
affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'")
affinityField = flag.String("affinity_field", "", "Attribute (JSON file) used to specify the routing affinity , e.g. 'az_id'")
affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")

Expand Down Expand Up @@ -194,5 +195,6 @@ func Init() {
*portField,
*poolTypeField,
*affinityField,
*affinityValue,
)
}
Loading