From 73b16baf15d91f308501e948d7a2fa7d650fbd7e Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 8 Jul 2024 14:40:29 -0700 Subject: [PATCH] implement TabletBalancer Signed-off-by: Michael Demmer Co-authored-by: Venkatraju V --- go/vt/vtgate/balancer/balancer.go | 367 +++++++++++++++++++++++ go/vt/vtgate/balancer/balancer_test.go | 387 +++++++++++++++++++++++++ 2 files changed, 754 insertions(+) create mode 100644 go/vt/vtgate/balancer/balancer.go create mode 100644 go/vt/vtgate/balancer/balancer_test.go diff --git a/go/vt/vtgate/balancer/balancer.go b/go/vt/vtgate/balancer/balancer.go new file mode 100644 index 00000000000..30cf862f890 --- /dev/null +++ b/go/vt/vtgate/balancer/balancer.go @@ -0,0 +1,367 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "encoding/json" + "fmt" + "math/rand" + "net/http" + "sync" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +/* + +The tabletBalancer probabalistically orders the list of available tablets into +a ranked order of preference in order to satisfy two high-level goals: + +1. Balance the load across the available replicas +2. Prefer a replica in the same cell as the vtgate if possible + +In some topologies this is trivial to accomplish by simply preferring tablets in the +local cell, assuming there are a proportional number of local tablets in each cell to +satisfy the inbound traffic to the vtgates in that cell. + +However, for topologies with a relatively small number of tablets in each cell, a simple +affinity algorithm does not effectively balance the load. + +As a simple example: + + Given three cells with vtgates, four replicas spread into those cells, where each vtgate + receives an equal query share. If each routes only to its local cell, the tablets will be + unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same + cell will only receive 1/6 of the queries. + + Cell A: 1/3 --> vtgate --> 1/3 => vttablet + + Cell B: 1/3 --> vtgate --> 1/3 => vttablet + + Cell C: 1/3 --> vtgate --> 1/6 => vttablet + \-> 1/6 => vttablet + +Other topologies that can cause similar pathologies include cases where there may be cells +containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas. + +For these topologies, the tabletBalancer proportionally assigns the output flow to each tablet, +preferring the local cell where possible, but only as long as the global query balance is +maintained. + +To accomplish this goal, the balancer is given: + +* The list of cells that receive inbound traffic to vtgates +* The local cell where the vtgate exists +* The set of tablets and their cells (learned from discovery) + +The model assumes there is an equal probablility of a query coming from each vtgate cell, i.e. +traffic is effectively load balanced between the cells with vtgates. + +Given that information, the balancer builds a simple model to determine how much query load +would go to each tablet if vtgate only routed to its local cell. Then if any tablets are +unbalanced, it shifts the desired allocation away from the local cell preference in order to +even out the query load. + +Based on this global model, the vtgate then probabalistically picks a destination for each +query to be sent and uses these weights to order the available tablets accordingly. + +Assuming each vtgate is configured with and discovers the same information about the topology, +and the input flow is balanced across the vtgate cells (as mentioned above), then each vtgate +should come the the same conclusion about the global flows, and cooperatively should +converge on the desired balanced query load. + +*/ + +type TabletBalancer interface { + // Pick is the main entry point to the balancer. Returns the best tablet out of the list + // for a given query to maintain the desired balanced allocation over multiple executions. + Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth + + // Balancer debug page request + DebugHandler(w http.ResponseWriter, r *http.Request) +} + +func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer { + return &tabletBalancer{ + localCell: localCell, + vtGateCells: vtGateCells, + allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{}, + } +} + +type tabletBalancer struct { + // + // Configuration + // + + // The local cell for the vtgate + localCell string + + // The set of cells that have vtgates + vtGateCells []string + + // mu protects the allocation map + mu sync.Mutex + + // + // Allocations for balanced mode, calculated once per target and invalidated + // whenever the topology changes. + // + allocations map[discovery.KeyspaceShardTabletType]*targetAllocation +} + +type targetAllocation struct { + // Target flow per cell based on the number of tablets discovered in the cell + Target map[string]int // json:target + + // Input flows allocated for each cell + Inflows map[string]int + + // Output flows from each vtgate cell to each target cell + Outflows map[string]map[string]int + + // Allocation routed to each tablet from the local cell used for ranking + Allocation map[uint32]int + + // Tablets that local cell does not route to + Unallocated map[uint32]struct{} + + // Total allocation which is basically 1,000,000 / len(vtgatecells) + TotalAllocation int +} + +func (b *tabletBalancer) print() string { + allocations, _ := json.Marshal(&b.allocations) + return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s", + b.localCell, b.vtGateCells, string(allocations)) +} + +func (b *tabletBalancer) DebugHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintf(w, "Local Cell: %v\r\n", b.localCell) + fmt.Fprintf(w, "Vtgate Cells: %v\r\n", b.vtGateCells) + + b.mu.Lock() + defer b.mu.Unlock() + allocations, _ := json.MarshalIndent(b.allocations, "", " ") + fmt.Fprintf(w, "Allocations: %v\r\n", string(allocations)) +} + +// Pick is the main entry point to the balancer. +// +// Given the total allocation for the set of tablets, choose the best target +// by a weighted random sample so that over time the system will achieve the +// desired balanced allocation. +func (b *tabletBalancer) Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth { + + numTablets := len(tablets) + if numTablets == 0 { + return nil + } + + allocationMap, totalAllocation := b.getAllocation(target, tablets) + + r := rand.Intn(totalAllocation) + for i := 0; i < numTablets; i++ { + flow := allocationMap[tablets[i].Tablet.Alias.Uid] + if r < flow { + return tablets[i] + } + r -= flow + } + + return tablets[0] +} + +// To stick with integer arithmetic, use 1,000,000 as the full load +const ALLOCATION = 1000000 + +func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation { + // Initialization: Set up some data structures and derived values + a := targetAllocation{ + Target: map[string]int{}, + Inflows: map[string]int{}, + Outflows: map[string]map[string]int{}, + Allocation: map[uint32]int{}, + Unallocated: map[uint32]struct{}{}, + } + flowPerVtgateCell := ALLOCATION / len(b.vtGateCells) + flowPerTablet := ALLOCATION / len(allTablets) + cellExistsWithNoTablets := false + + for _, th := range allTablets { + a.Target[th.Tablet.Alias.Cell] += flowPerTablet + } + + // + // First pass: Allocate vtgate flow to the local cell where the vtgate exists + // and along the way figure out if there are any vtgates with no local tablets. + // + for _, cell := range b.vtGateCells { + outflow := map[string]int{} + target := a.Target[cell] + + if target > 0 { + a.Inflows[cell] += flowPerVtgateCell + outflow[cell] = flowPerVtgateCell + } else { + cellExistsWithNoTablets = true + } + + a.Outflows[cell] = outflow + } + + // + // Figure out if there is a shortfall + // + underAllocated := make(map[string]int) + unbalancedFlow := 0 + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + + // + // Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount + // proportionally to all cells that may need it + // + if cellExistsWithNoTablets { + for _, vtgateCell := range b.vtGateCells { + target := a.Target[vtgateCell] + if target != 0 { + continue + } + + for underAllocatedCell, underAllocatedFlow := range underAllocated { + allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow + a.Inflows[underAllocatedCell] += allocation + a.Outflows[vtgateCell][underAllocatedCell] += allocation + } + } + + // Recompute underallocated after these flows were assigned + unbalancedFlow = 0 + underAllocated = make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + } + + // + // Third pass: Shift remaining imbalance if any cell is over/under allocated after + // assigning local cell traffic and distributing load from cells without tablets. + // + if /* fudge for integer arithmetic */ unbalancedFlow > 10 { + + // cells which are overallocated + overAllocated := make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] > allocation { + overAllocated[cell] = a.Inflows[cell] - allocation + } + } + + // fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated) + + // + // For each overallocated cell, proportionally shift flow from targets that are overallocated + // to targets that are underallocated. + // + // Note this is an O(N^3) loop, but only over the cells which need adjustment. + // + for _, vtgateCell := range b.vtGateCells { + for underAllocatedCell, underAllocatedFlow := range underAllocated { + for overAllocatedCell, overAllocatedFlow := range overAllocated { + + currentFlow := a.Outflows[vtgateCell][overAllocatedCell] + if currentFlow == 0 { + continue + } + + // Shift a proportional fraction of the amount that the cell is currently allocated weighted + // by the fraction that this vtgate cell is already sending to the overallocated cell, and the + // fraction that the new target is underallocated + // + // Note that the operator order matters -- multiplications need to occur before divisions + // to avoid truncating the integer values. + shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow + + //fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell, + // overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow) + + a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow + a.Inflows[overAllocatedCell] -= shiftFlow + + a.Inflows[underAllocatedCell] += shiftFlow + a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow + } + } + } + } + + // + // Finally, once the cell flows are all adjusted, figure out the local allocation to each + // tablet in the target cells + // + outflow := a.Outflows[b.localCell] + for _, tablet := range allTablets { + cell := tablet.Tablet.Alias.Cell + flow := outflow[cell] + if flow > 0 { + a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell] + a.TotalAllocation += flow * flowPerTablet / a.Target[cell] + } else { + a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{} + } + } + + return &a +} + +// getAllocation builds the allocation map if needed and returns a copy of the map +func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) { + b.mu.Lock() + defer b.mu.Unlock() + + allocation, exists := b.allocations[discovery.KeyFromTarget(target)] + if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) { + mismatch := false + for _, tablet := range tablets { + if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok { + if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok { + mismatch = true + break + } + } + } + if !mismatch { + // No change in tablets for this target. Return computed allocation + return allocation.Allocation, allocation.TotalAllocation + } + } + + allocation = b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = allocation + + return allocation.Allocation, allocation.TotalAllocation +} diff --git a/go/vt/vtgate/balancer/balancer_test.go b/go/vt/vtgate/balancer/balancer_test.go new file mode 100644 index 00000000000..0318fda1391 --- /dev/null +++ b/go/vt/vtgate/balancer/balancer_test.go @@ -0,0 +1,387 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "strconv" + "testing" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" +) + +var nextTestTabletUID int + +func createTestTablet(cell string) *discovery.TabletHealth { + nextTestTabletUID++ + tablet := topo.NewTablet(uint32(nextTestTabletUID), cell, strconv.Itoa(nextTestTabletUID)) + tablet.PortMap["vt"] = 1 + tablet.PortMap["grpc"] = 2 + tablet.Keyspace = "k" + tablet.Shard = "s" + + return &discovery.TabletHealth{ + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: false, + Stats: nil, + PrimaryTermStartTime: 0, + } +} + +// allow 2% fuzz +const FUZZ = 2 + +func fuzzyEquals(a, b int) bool { + diff := a - b + if diff < 0 { + diff = -diff + } + return diff < a*FUZZ/100 +} + +func TestAllocateFlows(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "balanced one tablet per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "balanced multiple tablets per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "vtgate in cell with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e"}, + }, + { + "vtgates in multiple cells with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e", "f", "g"}, + }, + { + "imbalanced multiple tablets in one cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + }, + []string{"a", "b", "c"}, + }, + { + "imbalanced multiple tablets in multiple cells", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "heavy imbalance", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + tabletsByCell := make(map[string][]*discovery.TabletHealth) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + tabletsByCell[cell] = append(tabletsByCell[cell], tablet) + } + + allocationPerTablet := make(map[uint32]int) + expectedPerTablet := ALLOCATION / len(tablets) + + expectedPerCell := make(map[string]int) + for cell := range tabletsByCell { + expectedPerCell[cell] = ALLOCATION / len(tablets) * len(tabletsByCell[cell]) + } + + // Run the balancer over each vtgate cell + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + a := b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = a + + t.Logf("Target Flows %v, Balancer: %s XXX %d %v \n", expectedPerCell, b.print(), len(b.allocations), b.allocations) + + // Accumulate all the output per tablet cell + outflowPerCell := make(map[string]int) + for _, outflow := range a.Outflows { + for tabletCell, flow := range outflow { + if flow < 0 { + t.Errorf("balancer %v negative outflow", b.print()) + } + outflowPerCell[tabletCell] += flow + } + } + + // Check in / out flow to each tablet cell + for cell := range tabletsByCell { + expectedForCell := expectedPerCell[cell] + + if !fuzzyEquals(a.Inflows[cell], expectedForCell) || !fuzzyEquals(outflowPerCell[cell], expectedForCell) { + t.Errorf("Balancer {%s} ExpectedPerCell {%v} did not allocate correct flow to cell %s: expected %d, inflow %d outflow %d", + b.print(), expectedPerCell, cell, expectedForCell, a.Inflows[cell], outflowPerCell[cell]) + } + } + + // Accumulate the allocations for all runs to compare what the system does as a whole + // when routing from all vtgate cells + for uid, flow := range a.Allocation { + allocationPerTablet[uid] += flow + } + } + + // Check that the allocations all add up + for _, tablet := range tablets { + uid := tablet.Tablet.Alias.Uid + + allocation := allocationPerTablet[uid] + if !fuzzyEquals(allocation, expectedPerTablet) { + t.Errorf("did not allocate full allocation to tablet %d: expected %d got %d", + uid, expectedPerTablet, allocation) + } + } + } +} + +func TestBalancedPick(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "simple balanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "simple unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "mixed unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "one target same cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + }, + + []string{"a"}, + }, + { + "one target other cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + }, + + []string{"b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + // test unbalanced distribution + + routed := make(map[uint32]int) + + expectedPerCell := make(map[string]int) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + expectedPerCell[cell] += ALLOCATION / len(tablets) + } + + // Run the algorithm a bunch of times to get a random enough sample + N := 1000000 + for _, localCell := range vtGateCells { + b := NewTabletBalancer(localCell, vtGateCells).(*tabletBalancer) + + for i := 0; i < N/len(vtGateCells); i++ { + th := b.Pick(target, tablets) + if i == 0 { + t.Logf("Target Flows %v, Balancer: %s\n", expectedPerCell, b.print()) + t.Logf(b.print()) + } + + routed[th.Tablet.Alias.Uid]++ + } + } + + expected := N / len(tablets) + delta := make(map[uint32]int) + for _, tablet := range tablets { + got := routed[tablet.Tablet.Alias.Uid] + delta[tablet.Tablet.Alias.Uid] = got - expected + if !fuzzyEquals(got, expected) { + t.Errorf("routing to tablet %d got %d expected %d", tablet.Tablet.Alias.Uid, got, expected) + } + } + t.Logf("Expected %d per tablet, Routed %v, Delta %v, Max delta %d", N/len(tablets), routed, delta, expected*FUZZ/100) + } +} + +func TestTopologyChanged(t *testing.T) { + allTablets := []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + } + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + b := NewTabletBalancer("b", []string{"a", "b"}).(*tabletBalancer) + + N := 1 + + // initially create a slice of tablets with just the two in cell a + tablets := allTablets + tablets = tablets[0:2] + + for i := 0; i < N; i++ { + th := b.Pick(target, tablets) + allocation, totalAllocation := b.getAllocation(target, tablets) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[th.Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if th.Tablet.Alias.Cell != "a" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets[0].Tablet.Alias.Cell) + } + } + + // Run again with the full topology. Now traffic should go to cell b + for i := 0; i < N; i++ { + th := b.Pick(target, allTablets) + + allocation, totalAllocation := b.getAllocation(target, allTablets) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[th.Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if th.Tablet.Alias.Cell != "b" { + t.Errorf("shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell) + } + } +}