Skip to content

Commit 73b16ba

Browse files
demmervenkatraju
andcommitted
implement TabletBalancer
Signed-off-by: Michael Demmer <mdemmer@slack-corp.com> Co-authored-by: Venkatraju V <venkatraju@slack-corp.com>
1 parent 6d95d7c commit 73b16ba

File tree

2 files changed

+754
-0
lines changed

2 files changed

+754
-0
lines changed

go/vt/vtgate/balancer/balancer.go

Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
/*
2+
Copyright 2023 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package balancer
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"math/rand"
23+
"net/http"
24+
"sync"
25+
26+
"vitess.io/vitess/go/vt/discovery"
27+
querypb "vitess.io/vitess/go/vt/proto/query"
28+
)
29+
30+
/*
31+
32+
The tabletBalancer probabalistically orders the list of available tablets into
33+
a ranked order of preference in order to satisfy two high-level goals:
34+
35+
1. Balance the load across the available replicas
36+
2. Prefer a replica in the same cell as the vtgate if possible
37+
38+
In some topologies this is trivial to accomplish by simply preferring tablets in the
39+
local cell, assuming there are a proportional number of local tablets in each cell to
40+
satisfy the inbound traffic to the vtgates in that cell.
41+
42+
However, for topologies with a relatively small number of tablets in each cell, a simple
43+
affinity algorithm does not effectively balance the load.
44+
45+
As a simple example:
46+
47+
Given three cells with vtgates, four replicas spread into those cells, where each vtgate
48+
receives an equal query share. If each routes only to its local cell, the tablets will be
49+
unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same
50+
cell will only receive 1/6 of the queries.
51+
52+
Cell A: 1/3 --> vtgate --> 1/3 => vttablet
53+
54+
Cell B: 1/3 --> vtgate --> 1/3 => vttablet
55+
56+
Cell C: 1/3 --> vtgate --> 1/6 => vttablet
57+
\-> 1/6 => vttablet
58+
59+
Other topologies that can cause similar pathologies include cases where there may be cells
60+
containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas.
61+
62+
For these topologies, the tabletBalancer proportionally assigns the output flow to each tablet,
63+
preferring the local cell where possible, but only as long as the global query balance is
64+
maintained.
65+
66+
To accomplish this goal, the balancer is given:
67+
68+
* The list of cells that receive inbound traffic to vtgates
69+
* The local cell where the vtgate exists
70+
* The set of tablets and their cells (learned from discovery)
71+
72+
The model assumes there is an equal probablility of a query coming from each vtgate cell, i.e.
73+
traffic is effectively load balanced between the cells with vtgates.
74+
75+
Given that information, the balancer builds a simple model to determine how much query load
76+
would go to each tablet if vtgate only routed to its local cell. Then if any tablets are
77+
unbalanced, it shifts the desired allocation away from the local cell preference in order to
78+
even out the query load.
79+
80+
Based on this global model, the vtgate then probabalistically picks a destination for each
81+
query to be sent and uses these weights to order the available tablets accordingly.
82+
83+
Assuming each vtgate is configured with and discovers the same information about the topology,
84+
and the input flow is balanced across the vtgate cells (as mentioned above), then each vtgate
85+
should come the the same conclusion about the global flows, and cooperatively should
86+
converge on the desired balanced query load.
87+
88+
*/
89+
90+
type TabletBalancer interface {
91+
// Pick is the main entry point to the balancer. Returns the best tablet out of the list
92+
// for a given query to maintain the desired balanced allocation over multiple executions.
93+
Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth
94+
95+
// Balancer debug page request
96+
DebugHandler(w http.ResponseWriter, r *http.Request)
97+
}
98+
99+
func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer {
100+
return &tabletBalancer{
101+
localCell: localCell,
102+
vtGateCells: vtGateCells,
103+
allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{},
104+
}
105+
}
106+
107+
type tabletBalancer struct {
108+
//
109+
// Configuration
110+
//
111+
112+
// The local cell for the vtgate
113+
localCell string
114+
115+
// The set of cells that have vtgates
116+
vtGateCells []string
117+
118+
// mu protects the allocation map
119+
mu sync.Mutex
120+
121+
//
122+
// Allocations for balanced mode, calculated once per target and invalidated
123+
// whenever the topology changes.
124+
//
125+
allocations map[discovery.KeyspaceShardTabletType]*targetAllocation
126+
}
127+
128+
type targetAllocation struct {
129+
// Target flow per cell based on the number of tablets discovered in the cell
130+
Target map[string]int // json:target
131+
132+
// Input flows allocated for each cell
133+
Inflows map[string]int
134+
135+
// Output flows from each vtgate cell to each target cell
136+
Outflows map[string]map[string]int
137+
138+
// Allocation routed to each tablet from the local cell used for ranking
139+
Allocation map[uint32]int
140+
141+
// Tablets that local cell does not route to
142+
Unallocated map[uint32]struct{}
143+
144+
// Total allocation which is basically 1,000,000 / len(vtgatecells)
145+
TotalAllocation int
146+
}
147+
148+
func (b *tabletBalancer) print() string {
149+
allocations, _ := json.Marshal(&b.allocations)
150+
return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s",
151+
b.localCell, b.vtGateCells, string(allocations))
152+
}
153+
154+
func (b *tabletBalancer) DebugHandler(w http.ResponseWriter, _ *http.Request) {
155+
w.Header().Set("Content-Type", "text/plain")
156+
fmt.Fprintf(w, "Local Cell: %v\r\n", b.localCell)
157+
fmt.Fprintf(w, "Vtgate Cells: %v\r\n", b.vtGateCells)
158+
159+
b.mu.Lock()
160+
defer b.mu.Unlock()
161+
allocations, _ := json.MarshalIndent(b.allocations, "", " ")
162+
fmt.Fprintf(w, "Allocations: %v\r\n", string(allocations))
163+
}
164+
165+
// Pick is the main entry point to the balancer.
166+
//
167+
// Given the total allocation for the set of tablets, choose the best target
168+
// by a weighted random sample so that over time the system will achieve the
169+
// desired balanced allocation.
170+
func (b *tabletBalancer) Pick(target *querypb.Target, tablets []*discovery.TabletHealth) *discovery.TabletHealth {
171+
172+
numTablets := len(tablets)
173+
if numTablets == 0 {
174+
return nil
175+
}
176+
177+
allocationMap, totalAllocation := b.getAllocation(target, tablets)
178+
179+
r := rand.Intn(totalAllocation)
180+
for i := 0; i < numTablets; i++ {
181+
flow := allocationMap[tablets[i].Tablet.Alias.Uid]
182+
if r < flow {
183+
return tablets[i]
184+
}
185+
r -= flow
186+
}
187+
188+
return tablets[0]
189+
}
190+
191+
// To stick with integer arithmetic, use 1,000,000 as the full load
192+
const ALLOCATION = 1000000
193+
194+
func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation {
195+
// Initialization: Set up some data structures and derived values
196+
a := targetAllocation{
197+
Target: map[string]int{},
198+
Inflows: map[string]int{},
199+
Outflows: map[string]map[string]int{},
200+
Allocation: map[uint32]int{},
201+
Unallocated: map[uint32]struct{}{},
202+
}
203+
flowPerVtgateCell := ALLOCATION / len(b.vtGateCells)
204+
flowPerTablet := ALLOCATION / len(allTablets)
205+
cellExistsWithNoTablets := false
206+
207+
for _, th := range allTablets {
208+
a.Target[th.Tablet.Alias.Cell] += flowPerTablet
209+
}
210+
211+
//
212+
// First pass: Allocate vtgate flow to the local cell where the vtgate exists
213+
// and along the way figure out if there are any vtgates with no local tablets.
214+
//
215+
for _, cell := range b.vtGateCells {
216+
outflow := map[string]int{}
217+
target := a.Target[cell]
218+
219+
if target > 0 {
220+
a.Inflows[cell] += flowPerVtgateCell
221+
outflow[cell] = flowPerVtgateCell
222+
} else {
223+
cellExistsWithNoTablets = true
224+
}
225+
226+
a.Outflows[cell] = outflow
227+
}
228+
229+
//
230+
// Figure out if there is a shortfall
231+
//
232+
underAllocated := make(map[string]int)
233+
unbalancedFlow := 0
234+
for cell, allocation := range a.Target {
235+
if a.Inflows[cell] < allocation {
236+
underAllocated[cell] = allocation - a.Inflows[cell]
237+
unbalancedFlow += underAllocated[cell]
238+
}
239+
}
240+
241+
//
242+
// Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount
243+
// proportionally to all cells that may need it
244+
//
245+
if cellExistsWithNoTablets {
246+
for _, vtgateCell := range b.vtGateCells {
247+
target := a.Target[vtgateCell]
248+
if target != 0 {
249+
continue
250+
}
251+
252+
for underAllocatedCell, underAllocatedFlow := range underAllocated {
253+
allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow
254+
a.Inflows[underAllocatedCell] += allocation
255+
a.Outflows[vtgateCell][underAllocatedCell] += allocation
256+
}
257+
}
258+
259+
// Recompute underallocated after these flows were assigned
260+
unbalancedFlow = 0
261+
underAllocated = make(map[string]int)
262+
for cell, allocation := range a.Target {
263+
if a.Inflows[cell] < allocation {
264+
underAllocated[cell] = allocation - a.Inflows[cell]
265+
unbalancedFlow += underAllocated[cell]
266+
}
267+
}
268+
}
269+
270+
//
271+
// Third pass: Shift remaining imbalance if any cell is over/under allocated after
272+
// assigning local cell traffic and distributing load from cells without tablets.
273+
//
274+
if /* fudge for integer arithmetic */ unbalancedFlow > 10 {
275+
276+
// cells which are overallocated
277+
overAllocated := make(map[string]int)
278+
for cell, allocation := range a.Target {
279+
if a.Inflows[cell] > allocation {
280+
overAllocated[cell] = a.Inflows[cell] - allocation
281+
}
282+
}
283+
284+
// fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated)
285+
286+
//
287+
// For each overallocated cell, proportionally shift flow from targets that are overallocated
288+
// to targets that are underallocated.
289+
//
290+
// Note this is an O(N^3) loop, but only over the cells which need adjustment.
291+
//
292+
for _, vtgateCell := range b.vtGateCells {
293+
for underAllocatedCell, underAllocatedFlow := range underAllocated {
294+
for overAllocatedCell, overAllocatedFlow := range overAllocated {
295+
296+
currentFlow := a.Outflows[vtgateCell][overAllocatedCell]
297+
if currentFlow == 0 {
298+
continue
299+
}
300+
301+
// Shift a proportional fraction of the amount that the cell is currently allocated weighted
302+
// by the fraction that this vtgate cell is already sending to the overallocated cell, and the
303+
// fraction that the new target is underallocated
304+
//
305+
// Note that the operator order matters -- multiplications need to occur before divisions
306+
// to avoid truncating the integer values.
307+
shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow
308+
309+
//fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell,
310+
// overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow)
311+
312+
a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow
313+
a.Inflows[overAllocatedCell] -= shiftFlow
314+
315+
a.Inflows[underAllocatedCell] += shiftFlow
316+
a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow
317+
}
318+
}
319+
}
320+
}
321+
322+
//
323+
// Finally, once the cell flows are all adjusted, figure out the local allocation to each
324+
// tablet in the target cells
325+
//
326+
outflow := a.Outflows[b.localCell]
327+
for _, tablet := range allTablets {
328+
cell := tablet.Tablet.Alias.Cell
329+
flow := outflow[cell]
330+
if flow > 0 {
331+
a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell]
332+
a.TotalAllocation += flow * flowPerTablet / a.Target[cell]
333+
} else {
334+
a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{}
335+
}
336+
}
337+
338+
return &a
339+
}
340+
341+
// getAllocation builds the allocation map if needed and returns a copy of the map
342+
func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) {
343+
b.mu.Lock()
344+
defer b.mu.Unlock()
345+
346+
allocation, exists := b.allocations[discovery.KeyFromTarget(target)]
347+
if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) {
348+
mismatch := false
349+
for _, tablet := range tablets {
350+
if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok {
351+
if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok {
352+
mismatch = true
353+
break
354+
}
355+
}
356+
}
357+
if !mismatch {
358+
// No change in tablets for this target. Return computed allocation
359+
return allocation.Allocation, allocation.TotalAllocation
360+
}
361+
}
362+
363+
allocation = b.allocateFlows(tablets)
364+
b.allocations[discovery.KeyFromTarget(target)] = allocation
365+
366+
return allocation.Allocation, allocation.TotalAllocation
367+
}

0 commit comments

Comments
 (0)