diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index d9dd380861b..b8d07e370b7 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -41,12 +41,13 @@ import ( ) // newBuilder creates a new first_ready balancer builder. -func newBuilder() balancer.Builder { +func newFirstReadyBuilder() balancer.Builder { return base.NewBalancerBuilder("first_ready", &frPickerBuilder{currentConns: map[string]balancer.SubConn{}}, base.Config{HealthCheck: true}) } func init() { - balancer.Register(newBuilder()) + log.V(1).Infof("registering first_ready balancer") + balancer.Register(newFirstReadyBuilder()) } // frPickerBuilder implements both the Builder and the Picker interfaces. diff --git a/go/vt/vtgateproxy/mysql_server.go b/go/vt/vtgateproxy/mysql_server.go index f1cdf6e84fc..818aefe061d 100644 --- a/go/vt/vtgateproxy/mysql_server.go +++ b/go/vt/vtgateproxy/mysql_server.go @@ -195,12 +195,13 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql } }() + ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) + if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP { err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback) return mysql.NewSQLErrorFromError(err) } - ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable)) if err := mysql.NewSQLErrorFromError(err); err != nil { @@ -261,6 +262,8 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str } }(session) + ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) + _, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars) err = mysql.NewSQLErrorFromError(err) if err != nil { @@ -306,6 +309,8 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData } }() + ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) + if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP { err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback) return mysql.NewSQLErrorFromError(err) @@ -346,6 +351,8 @@ func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgatec options.ClientFoundRows = true } + ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) + var err error session, err = ph.proxy.NewSession(ctx, options, c.Attributes) if err != nil { @@ -370,6 +377,9 @@ func (ph *proxyHandler) closeSession(ctx context.Context, c *mysql.Conn) { if session.SessionPb().InTransaction { defer atomic.AddInt32(&busyConnections, -1) } + + ctx = context.WithValue(ctx, CONN_ID_KEY, int(c.ConnectionID)) + err := ph.proxy.CloseSession(ctx, session) if err != nil { log.Errorf("Error happened in transaction rollback: %v", err) diff --git a/go/vt/vtgateproxy/sim/vtgateproxysim.go b/go/vt/vtgateproxy/sim/vtgateproxysim.go new file mode 100644 index 00000000000..e8c6faa934d --- /dev/null +++ b/go/vt/vtgateproxy/sim/vtgateproxysim.go @@ -0,0 +1,101 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "sort" + "time" + + "github.com/guptarohit/asciigraph" +) + +var ( + numClients = flag.Int("c", 9761, "Number of clients") + numVtgates = flag.Int("v", 1068, "Number of vtgates") + numConnections = flag.Int("n", 4, "number of connections per client host") + numZones = flag.Int("z", 4, "number of zones") +) + +func main() { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + flag.Parse() + + fmt.Printf("Simulating %d clients => %d vtgates with %d zones %d conns per client\n\n", + *numClients, *numVtgates, *numZones, *numConnections) + + var clients []string + for i := 0; i < *numClients; i++ { + clients = append(clients, fmt.Sprintf("client-%03d", i)) + } + + var vtgates []string + for i := 0; i < *numVtgates; i++ { + vtgates = append(vtgates, fmt.Sprintf("vtgate-%03d", i)) + } + + // for now just consider 1/N of the s "local" + localClients := clients[:*numClients / *numZones] + localVtgates := vtgates[:*numVtgates / *numZones] + + conns := map[string][]string{} + + // Simulate "discovery" + for _, client := range localClients { + var clientConns []string + + for i := 0; i < *numConnections; i++ { + vtgate := localVtgates[rnd.Intn(len(localVtgates))] + clientConns = append(clientConns, vtgate) + } + + conns[client] = clientConns + } + + counts := map[string]int{} + for _, conns := range conns { + for _, vtgate := range conns { + counts[vtgate]++ + } + } + + histogram := map[int]int{} + max := 0 + min := -1 + for _, count := range counts { + histogram[count]++ + if count > max { + max = count + } + if min == -1 || count < min { + min = count + } + } + + fmt.Printf("Conns per vtgate\n%v\n\n", counts) + fmt.Printf("Histogram of conn counts\n%v\n\n", histogram) + + plot := []float64{} + for i := 0; i < len(localVtgates); i++ { + plot = append(plot, float64(counts[localVtgates[i]])) + } + sort.Float64s(plot) + graph := asciigraph.Plot(plot) + fmt.Println("Number of conns per vtgate host") + fmt.Println(graph) + fmt.Println("") + fmt.Println("") + + fmt.Printf("Conn count per vtgate distribution [%d - %d] (%d clients => %d vtgates with %d zones %d conns\n\n", + min, max, *numClients, *numVtgates, *numZones, *numConnections) + plot = []float64{} + for i := min; i < max; i++ { + plot = append(plot, float64(histogram[i])) + } + graph = asciigraph.Plot(plot) + fmt.Println(graph) + + fmt.Printf("\nConn stats: min %d max %d spread %d spread/min %f spread/avg %f\n", + min, max, max-min, float64(max-min)/float64(min), float64(max-min)/float64((max+min)/2)) +} diff --git a/go/vt/vtgateproxy/sticky_random_balancer.go b/go/vt/vtgateproxy/sticky_random_balancer.go index 4301e3889f6..b70860f803b 100644 --- a/go/vt/vtgateproxy/sticky_random_balancer.go +++ b/go/vt/vtgateproxy/sticky_random_balancer.go @@ -26,30 +26,27 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/grpclog" + "vitess.io/vitess/go/vt/log" ) type ConnIdKey string const CONN_ID_KEY = ConnIdKey("ConnId") -const Name = "sticky_random" - -var logger = grpclog.Component("sticky_random") - // newBuilder creates a new roundrobin balancer builder. func newStickyRandomBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &stickyPickerBuilder{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilder("sticky_random", &stickyPickerBuilder{}, base.Config{HealthCheck: true}) } func init() { + log.V(1).Infof("registering sticky_random balancer") balancer.Register(newStickyRandomBuilder()) } type stickyPickerBuilder struct{} func (*stickyPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Infof("stickyRandomPicker: Build called with info: %v", info) + // log.V(100).Infof("stickyRandomPicker: Build called with info: %v", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } @@ -72,11 +69,18 @@ func (p *stickyPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) subConnsLen := len(p.subConns) - connId := info.Ctx.Value(CONN_ID_KEY).(int) - if connId == 0 { + var connId int + connIdVal := info.Ctx.Value(CONN_ID_KEY) + if connIdVal != nil { + connId = connIdVal.(int) + log.V(100).Infof("stickyRandomPicker: using connId %d", connId) + } else { + log.V(100).Infof("stickyRandomPicker: nonexistent connId -- using random") connId = rand.IntN(subConnsLen) // shouldn't happen } + // XXX/demmer might want to hash the connId rather than just mod sc := p.subConns[connId%subConnsLen] + return balancer.PickResult{SubConn: sc}, nil } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index a9bbbfe9343..45605ae5c93 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -216,6 +216,7 @@ func Init() { case "round_robin": case "first_ready": case "pick_first": + case "sticky_random": break default: log.Fatalf("invalid balancer type %s", *balancerType)