Skip to content

Commit

Permalink
make sticky_random actually work
Browse files Browse the repository at this point in the history
  • Loading branch information
demmer committed Jan 17, 2025
1 parent 1b4ddb4 commit a17c5ce
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 12 deletions.
5 changes: 3 additions & 2 deletions go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ import (
)

// newBuilder creates a new first_ready balancer builder.
func newBuilder() balancer.Builder {
func newFirstReadyBuilder() balancer.Builder {
return base.NewBalancerBuilder("first_ready", &frPickerBuilder{currentConns: map[string]balancer.SubConn{}}, base.Config{HealthCheck: true})
}

func init() {
balancer.Register(newBuilder())
log.V(1).Infof("registering first_ready balancer")
balancer.Register(newFirstReadyBuilder())
}

// frPickerBuilder implements both the Builder and the Picker interfaces.
Expand Down
12 changes: 11 additions & 1 deletion go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
}
}()

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
}

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))
result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))

if err := mysql.NewSQLErrorFromError(err); err != nil {
Expand Down Expand Up @@ -261,6 +262,8 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
}
}(session)

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

_, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
err = mysql.NewSQLErrorFromError(err)
if err != nil {
Expand Down Expand Up @@ -306,6 +309,8 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
}
}()

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
Expand Down Expand Up @@ -346,6 +351,8 @@ func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgatec
options.ClientFoundRows = true
}

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

var err error
session, err = ph.proxy.NewSession(ctx, options, c.Attributes)
if err != nil {
Expand All @@ -370,6 +377,9 @@ func (ph *proxyHandler) closeSession(ctx context.Context, c *mysql.Conn) {
if session.SessionPb().InTransaction {
defer atomic.AddInt32(&busyConnections, -1)
}

ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID))

err := ph.proxy.CloseSession(ctx, session)
if err != nil {
log.Errorf("Error happened in transaction rollback: %v", err)
Expand Down
101 changes: 101 additions & 0 deletions go/vt/vtgateproxy/sim/vtgateproxysim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"flag"
"fmt"
"math/rand"
"sort"
"time"

"github.com/guptarohit/asciigraph"
)

var (
numClients = flag.Int("c", 9761, "Number of clients")
numVtgates = flag.Int("v", 1068, "Number of vtgates")
numConnections = flag.Int("n", 4, "number of connections per client host")
numZones = flag.Int("z", 4, "number of zones")
)

func main() {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

flag.Parse()

fmt.Printf("Simulating %d clients => %d vtgates with %d zones %d conns per client\n\n",
*numClients, *numVtgates, *numZones, *numConnections)

var clients []string
for i := 0; i < *numClients; i++ {
clients = append(clients, fmt.Sprintf("client-%03d", i))
}

var vtgates []string
for i := 0; i < *numVtgates; i++ {
vtgates = append(vtgates, fmt.Sprintf("vtgate-%03d", i))
}

// for now just consider 1/N of the s "local"
localClients := clients[:*numClients / *numZones]
localVtgates := vtgates[:*numVtgates / *numZones]

conns := map[string][]string{}

// Simulate "discovery"
for _, client := range localClients {
var clientConns []string

for i := 0; i < *numConnections; i++ {
vtgate := localVtgates[rnd.Intn(len(localVtgates))]
clientConns = append(clientConns, vtgate)
}

conns[client] = clientConns
}

counts := map[string]int{}
for _, conns := range conns {
for _, vtgate := range conns {
counts[vtgate]++
}
}

histogram := map[int]int{}
max := 0
min := -1
for _, count := range counts {
histogram[count]++
if count > max {
max = count
}
if min == -1 || count < min {
min = count
}
}

fmt.Printf("Conns per vtgate\n%v\n\n", counts)
fmt.Printf("Histogram of conn counts\n%v\n\n", histogram)

plot := []float64{}
for i := 0; i < len(localVtgates); i++ {
plot = append(plot, float64(counts[localVtgates[i]]))
}
sort.Float64s(plot)
graph := asciigraph.Plot(plot)
fmt.Println("Number of conns per vtgate host")
fmt.Println(graph)
fmt.Println("")
fmt.Println("")

fmt.Printf("Conn count per vtgate distribution [%d - %d] (%d clients => %d vtgates with %d zones %d conns\n\n",
min, max, *numClients, *numVtgates, *numZones, *numConnections)
plot = []float64{}
for i := min; i < max; i++ {
plot = append(plot, float64(histogram[i]))
}
graph = asciigraph.Plot(plot)
fmt.Println(graph)

fmt.Printf("\nConn stats: min %d max %d spread %d spread/min %f spread/avg %f\n",
min, max, max-min, float64(max-min)/float64(min), float64(max-min)/float64((max+min)/2))
}
22 changes: 13 additions & 9 deletions go/vt/vtgateproxy/sticky_random_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,27 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"vitess.io/vitess/go/vt/log"
)

type ConnIdKey string

const CONN_ID_KEY = ConnIdKey("ConnId")

const Name = "sticky_random"

var logger = grpclog.Component("sticky_random")

// newBuilder creates a new roundrobin balancer builder.
func newStickyRandomBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &stickyPickerBuilder{}, base.Config{HealthCheck: true})
return base.NewBalancerBuilder("sticky_random", &stickyPickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {
log.V(1).Infof("registering sticky_random balancer")
balancer.Register(newStickyRandomBuilder())
}

type stickyPickerBuilder struct{}

func (*stickyPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("stickyRandomPicker: Build called with info: %v", info)
// log.V(100).Infof("stickyRandomPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
Expand All @@ -72,11 +69,18 @@ func (p *stickyPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error)

subConnsLen := len(p.subConns)

connId := info.Ctx.Value(CONN_ID_KEY).(int)
if connId == 0 {
var connId int
connIdVal := info.Ctx.Value(CONN_ID_KEY)
if connIdVal != nil {
connId = connIdVal.(int)
log.V(100).Infof("stickyRandomPicker: using connId %d", connId)
} else {
log.V(100).Infof("stickyRandomPicker: nonexistent connId -- using random")
connId = rand.IntN(subConnsLen) // shouldn't happen
}

// XXX/demmer might want to hash the connId rather than just mod
sc := p.subConns[connId%subConnsLen]

return balancer.PickResult{SubConn: sc}, nil
}
1 change: 1 addition & 0 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func Init() {
case "round_robin":
case "first_ready":
case "pick_first":
case "sticky_random":
break
default:
log.Fatalf("invalid balancer type %s", *balancerType)
Expand Down

0 comments on commit a17c5ce

Please sign in to comment.