Skip to content

Commit 1f94f93

Browse files
authored
Merge branch 'slack-vitess-r14.0.5' into backport-13856
2 parents cd5adbb + bbfad9e commit 1f94f93

File tree

5 files changed

+803
-1
lines changed

5 files changed

+803
-1
lines changed

go/flags/endtoend/vtgate.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
Usage of vtgate:
22
--allowed_tablet_types TabletTypeList Specifies the tablet types this vtgate is allowed to route queries to
33
--alsologtostderr log to standard error as well as files
4+
--balancer_enabled Whether to enable the tablet balancer to evenly spread query load
5+
--balancer_keyspaces strings When in balanced mode, a comma-separated list of keyspaces for which to use the balancer (optional)
6+
--balancer_vtgate_cells strings When in balanced mode, a comma-separated list of cells that contain vtgates (required)
47
--buffer_drain_concurrency int Maximum number of requests retried simultaneously. More concurrency will increase the load on the PRIMARY vttablet when draining the buffer. (default 1)
58
--buffer_implementation string Allowed values: healthcheck (legacy implementation), keyspace_events (default) (default "keyspace_events")
69
--buffer_keyspace_shards string If not empty, limit buffering to these entries (comma separated). Entry format: keyspace or keyspace/shard. Requires --enable_buffer=true.

go/vt/vtgate/balancer/balancer.go

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
/*
2+
Copyright 2023 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package balancer
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"math/rand"
23+
"net/http"
24+
"sync"
25+
26+
"vitess.io/vitess/go/vt/discovery"
27+
querypb "vitess.io/vitess/go/vt/proto/query"
28+
)
29+
30+
/*
31+
32+
The tabletBalancer probabalistically orders the list of available tablets into
33+
a ranked order of preference in order to satisfy two high-level goals:
34+
35+
1. Balance the load across the available replicas
36+
2. Prefer a replica in the same cell as the vtgate if possible
37+
38+
In some topologies this is trivial to accomplish by simply preferring tablets in the
39+
local cell, assuming there are a proportional number of local tablets in each cell to
40+
satisfy the inbound traffic to the vtgates in that cell.
41+
42+
However, for topologies with a relatively small number of tablets in each cell, a simple
43+
affinity algorithm does not effectively balance the load.
44+
45+
As a simple example:
46+
47+
Given three cells with vtgates, four replicas spread into those cells, where each vtgate
48+
receives an equal query share. If each routes only to its local cell, the tablets will be
49+
unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same
50+
cell will only receive 1/6 of the queries.
51+
52+
Cell A: 1/3 --> vtgate --> 1/3 => vttablet
53+
54+
Cell B: 1/3 --> vtgate --> 1/3 => vttablet
55+
56+
Cell C: 1/3 --> vtgate --> 1/6 => vttablet
57+
\-> 1/6 => vttablet
58+
59+
Other topologies that can cause similar pathologies include cases where there may be cells
60+
containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas.
61+
62+
For these topologies, the tabletBalancer proportionally assigns the output flow to each tablet,
63+
preferring the local cell where possible, but only as long as the global query balance is
64+
maintained.
65+
66+
To accomplish this goal, the balancer is given:
67+
68+
* The list of cells that receive inbound traffic to vtgates
69+
* The local cell where the vtgate exists
70+
* The set of tablets and their cells (learned from discovery)
71+
72+
The model assumes there is an equal probablility of a query coming from each vtgate cell, i.e.
73+
traffic is effectively load balanced between the cells with vtgates.
74+
75+
Given that information, the balancer builds a simple model to determine how much query load
76+
would go to each tablet if vtgate only routed to its local cell. Then if any tablets are
77+
unbalanced, it shifts the desired allocation away from the local cell preference in order to
78+
even out the query load.
79+
80+
Based on this global model, the vtgate then probabalistically picks a destination for each
81+
query to be sent and uses these weights to order the available tablets accordingly.
82+
83+
Assuming each vtgate is configured with and discovers the same information about the topology,
84+
and the input flow is balanced across the vtgate cells (as mentioned above), then each vtgate
85+
should come the the same conclusion about the global flows, and cooperatively should
86+
converge on the desired balanced query load.
87+
88+
*/
89+
90+
type TabletBalancer interface {
91+
// Randomly shuffle the tablets into an order for routing queries
92+
ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth)
93+
94+
// Balancer debug page request
95+
DebugHandler(w http.ResponseWriter, r *http.Request)
96+
}
97+
98+
func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer {
99+
return &tabletBalancer{
100+
localCell: localCell,
101+
vtGateCells: vtGateCells,
102+
allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{},
103+
}
104+
}
105+
106+
type tabletBalancer struct {
107+
//
108+
// Configuration
109+
//
110+
111+
// The local cell for the vtgate
112+
localCell string
113+
114+
// The set of cells that have vtgates
115+
vtGateCells []string
116+
117+
// mu protects the allocation map
118+
mu sync.Mutex
119+
120+
//
121+
// Allocations for balanced mode, calculated once per target and invalidated
122+
// whenever the topology changes.
123+
//
124+
allocations map[discovery.KeyspaceShardTabletType]*targetAllocation
125+
}
126+
127+
type targetAllocation struct {
128+
// Target flow per cell based on the number of tablets discovered in the cell
129+
Target map[string]int // json:target
130+
131+
// Input flows allocated for each cell
132+
Inflows map[string]int
133+
134+
// Output flows from each vtgate cell to each target cell
135+
Outflows map[string]map[string]int
136+
137+
// Allocation routed to each tablet from the local cell used for ranking
138+
Allocation map[uint32]int
139+
140+
// Tablets that local cell does not route to
141+
Unallocated map[uint32]struct{}
142+
143+
// Total allocation which is basically 1,000,000 / len(vtgatecells)
144+
TotalAllocation int
145+
}
146+
147+
func (b *tabletBalancer) print() string {
148+
allocations, _ := json.Marshal(&b.allocations)
149+
return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s",
150+
b.localCell, b.vtGateCells, string(allocations))
151+
}
152+
153+
func (b *tabletBalancer) DebugHandler(w http.ResponseWriter, _ *http.Request) {
154+
w.Header().Set("Content-Type", "text/plain")
155+
fmt.Fprintf(w, "Local Cell: %v\r\n", b.localCell)
156+
fmt.Fprintf(w, "Vtgate Cells: %v\r\n", b.vtGateCells)
157+
158+
b.mu.Lock()
159+
defer b.mu.Unlock()
160+
allocations, _ := json.MarshalIndent(b.allocations, "", " ")
161+
fmt.Fprintf(w, "Allocations: %v\r\n", string(allocations))
162+
}
163+
164+
// ShuffleTablets is the main entry point to the balancer.
165+
//
166+
// It shuffles the tablets into a preference list for routing a given query.
167+
// However, since all tablets should be healthy, the query will almost always go
168+
// to the first tablet in the list, so the balancer ranking algoritm randomly
169+
// shuffles the list to break ties, then chooses a weighted random selection
170+
// based on the balance alorithm to promote to the first in the set.
171+
func (b *tabletBalancer) ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) {
172+
173+
numTablets := len(tablets)
174+
175+
allocationMap, totalAllocation := b.getAllocation(target, tablets)
176+
177+
rand.Shuffle(numTablets, func(i, j int) { tablets[i], tablets[j] = tablets[j], tablets[i] })
178+
179+
// Do another O(n) seek through the list to effect the weighted sample by picking
180+
// a random point in the allocation space and seeking forward in the list of (randomized)
181+
// tablets to that point, promoting the tablet to the head of the list.
182+
r := rand.Intn(totalAllocation)
183+
for i := 0; i < numTablets; i++ {
184+
flow := allocationMap[tablets[i].Tablet.Alias.Uid]
185+
if r < flow {
186+
tablets[0], tablets[i] = tablets[i], tablets[0]
187+
break
188+
}
189+
r -= flow
190+
}
191+
}
192+
193+
// To stick with integer arithmetic, use 1,000,000 as the full load
194+
const ALLOCATION = 1000000
195+
196+
func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation {
197+
// Initialization: Set up some data structures and derived values
198+
a := targetAllocation{
199+
Target: map[string]int{},
200+
Inflows: map[string]int{},
201+
Outflows: map[string]map[string]int{},
202+
Allocation: map[uint32]int{},
203+
Unallocated: map[uint32]struct{}{},
204+
}
205+
flowPerVtgateCell := ALLOCATION / len(b.vtGateCells)
206+
flowPerTablet := ALLOCATION / len(allTablets)
207+
cellExistsWithNoTablets := false
208+
209+
for _, th := range allTablets {
210+
a.Target[th.Tablet.Alias.Cell] += flowPerTablet
211+
}
212+
213+
//
214+
// First pass: Allocate vtgate flow to the local cell where the vtgate exists
215+
// and along the way figure out if there are any vtgates with no local tablets.
216+
//
217+
for _, cell := range b.vtGateCells {
218+
outflow := map[string]int{}
219+
target := a.Target[cell]
220+
221+
if target > 0 {
222+
a.Inflows[cell] += flowPerVtgateCell
223+
outflow[cell] = flowPerVtgateCell
224+
} else {
225+
cellExistsWithNoTablets = true
226+
}
227+
228+
a.Outflows[cell] = outflow
229+
}
230+
231+
//
232+
// Figure out if there is a shortfall
233+
//
234+
underAllocated := make(map[string]int)
235+
unbalancedFlow := 0
236+
for cell, allocation := range a.Target {
237+
if a.Inflows[cell] < allocation {
238+
underAllocated[cell] = allocation - a.Inflows[cell]
239+
unbalancedFlow += underAllocated[cell]
240+
}
241+
}
242+
243+
//
244+
// Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount
245+
// proportionally to all cells that may need it
246+
//
247+
if cellExistsWithNoTablets {
248+
for _, vtgateCell := range b.vtGateCells {
249+
target := a.Target[vtgateCell]
250+
if target != 0 {
251+
continue
252+
}
253+
254+
for underAllocatedCell, underAllocatedFlow := range underAllocated {
255+
allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow
256+
a.Inflows[underAllocatedCell] += allocation
257+
a.Outflows[vtgateCell][underAllocatedCell] += allocation
258+
}
259+
}
260+
261+
// Recompute underallocated after these flows were assigned
262+
unbalancedFlow = 0
263+
underAllocated = make(map[string]int)
264+
for cell, allocation := range a.Target {
265+
if a.Inflows[cell] < allocation {
266+
underAllocated[cell] = allocation - a.Inflows[cell]
267+
unbalancedFlow += underAllocated[cell]
268+
}
269+
}
270+
}
271+
272+
//
273+
// Third pass: Shift remaining imbalance if any cell is over/under allocated after
274+
// assigning local cell traffic and distributing load from cells without tablets.
275+
//
276+
if /* fudge for integer arithmetic */ unbalancedFlow > 10 {
277+
278+
// cells which are overallocated
279+
overAllocated := make(map[string]int)
280+
for cell, allocation := range a.Target {
281+
if a.Inflows[cell] > allocation {
282+
overAllocated[cell] = a.Inflows[cell] - allocation
283+
}
284+
}
285+
286+
// fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated)
287+
288+
//
289+
// For each overallocated cell, proportionally shift flow from targets that are overallocated
290+
// to targets that are underallocated.
291+
//
292+
// Note this is an O(N^3) loop, but only over the cells which need adjustment.
293+
//
294+
for _, vtgateCell := range b.vtGateCells {
295+
for underAllocatedCell, underAllocatedFlow := range underAllocated {
296+
for overAllocatedCell, overAllocatedFlow := range overAllocated {
297+
298+
currentFlow := a.Outflows[vtgateCell][overAllocatedCell]
299+
if currentFlow == 0 {
300+
continue
301+
}
302+
303+
// Shift a proportional fraction of the amount that the cell is currently allocated weighted
304+
// by the fraction that this vtgate cell is already sending to the overallocated cell, and the
305+
// fraction that the new target is underallocated
306+
//
307+
// Note that the operator order matters -- multiplications need to occur before divisions
308+
// to avoid truncating the integer values.
309+
shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow
310+
311+
//fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell,
312+
// overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow)
313+
314+
a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow
315+
a.Inflows[overAllocatedCell] -= shiftFlow
316+
317+
a.Inflows[underAllocatedCell] += shiftFlow
318+
a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow
319+
}
320+
}
321+
}
322+
}
323+
324+
//
325+
// Finally, once the cell flows are all adjusted, figure out the local allocation to each
326+
// tablet in the target cells
327+
//
328+
outflow := a.Outflows[b.localCell]
329+
for _, tablet := range allTablets {
330+
cell := tablet.Tablet.Alias.Cell
331+
flow := outflow[cell]
332+
if flow > 0 {
333+
a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell]
334+
a.TotalAllocation += flow * flowPerTablet / a.Target[cell]
335+
} else {
336+
a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{}
337+
}
338+
}
339+
340+
return &a
341+
}
342+
343+
// getAllocation builds the allocation map if needed and returns a copy of the map
344+
func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) {
345+
b.mu.Lock()
346+
defer b.mu.Unlock()
347+
348+
allocation, exists := b.allocations[discovery.KeyFromTarget(target)]
349+
if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) {
350+
mismatch := false
351+
for _, tablet := range tablets {
352+
if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok {
353+
if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok {
354+
mismatch = true
355+
break
356+
}
357+
}
358+
}
359+
if !mismatch {
360+
// No change in tablets for this target. Return computed allocation
361+
return allocation.Allocation, allocation.TotalAllocation
362+
}
363+
}
364+
365+
allocation = b.allocateFlows(tablets)
366+
b.allocations[discovery.KeyFromTarget(target)] = allocation
367+
368+
return allocation.Allocation, allocation.TotalAllocation
369+
}

0 commit comments

Comments
 (0)