Skip to content

Commit 79d7b1a

Browse files
committed
Add picker interface
1 parent 509fde8 commit 79d7b1a

File tree

7 files changed

+80
-30
lines changed

7 files changed

+80
-30
lines changed

aperture/aperture.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/hnlq715/go-loadbalance"
77
"github.com/hnlq715/go-loadbalance/p2c"
8+
"github.com/hnlq715/go-loadbalance/roundrobin"
89
"google.golang.org/grpc/balancer"
910
)
1011

@@ -18,7 +19,7 @@ type Aperture struct {
1819
remotePeers []interface{}
1920
logicalAperture int
2021

21-
p2c loadbalance.P2C
22+
picker loadbalance.Picker
2223
apertureIdxes []int
2324
}
2425

@@ -35,7 +36,7 @@ func NewLeastLoadedApeture() loadbalance.Aperture {
3536
localPeers: make([]string, 0),
3637
localPeersMap: make(map[string]int),
3738
remotePeers: make([]interface{}, 0),
38-
p2c: p2c.NewLeastLoaded(),
39+
picker: p2c.NewLeastLoaded(),
3940
}
4041
}
4142

@@ -46,7 +47,18 @@ func NewPeakEwmaAperture() loadbalance.Aperture {
4647
localPeers: make([]string, 0),
4748
localPeersMap: make(map[string]int),
4849
remotePeers: make([]interface{}, 0),
49-
p2c: p2c.NewPeakEwma(),
50+
picker: p2c.NewPeakEwma(),
51+
}
52+
}
53+
54+
// NewSmoothRoundrobin returns an Apeture interface with smooth roundrobin
55+
func NewSmoothRoundrobin() loadbalance.Aperture {
56+
return &Aperture{
57+
logicalAperture: defaultLogicalAperture,
58+
localPeers: make([]string, 0),
59+
localPeersMap: make(map[string]int),
60+
remotePeers: make([]interface{}, 0),
61+
picker: roundrobin.NewSmoothRoundrobin(),
5062
}
5163
}
5264

@@ -82,7 +94,7 @@ func (a *Aperture) SetRemotePeers(remotePeers []interface{}) {
8294

8395
// Next returns the next selected item
8496
func (a *Aperture) Next() (interface{}, func(balancer.DoneInfo)) {
85-
return a.p2c.Next()
97+
return a.picker.Next()
8698
}
8799

88100
// List returns the remote peers for the local peer id
@@ -119,10 +131,10 @@ func (a *Aperture) rebuild() {
119131
ring := NewRing(len(a.remotePeers))
120132
a.apertureIdxes = ring.Slice(offset, apertureWidth)
121133

122-
a.p2c = p2c.NewLeastLoaded()
134+
a.picker.Reset()
123135
for _, apertureIdx := range a.apertureIdxes {
124136
weight := ring.Weight(apertureIdx, offset, apertureWidth)
125-
a.p2c.Add(a.remotePeers[apertureIdx], weight)
137+
a.picker.Add(a.remotePeers[apertureIdx], weight)
126138
}
127139
}
128140

aperture/aperture_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ func TestAperture(t *testing.T) {
1818
assert.Nil(t, item)
1919
})
2020

21+
t.Run("1 client 1 server", func(t *testing.T) {
22+
ll := aperture.NewSmoothRoundrobin()
23+
ll.SetLocalPeers(nil)
24+
ll.SetLocalPeers([]string{"1"})
25+
ll.SetRemotePeers([]interface{}{"8"})
26+
ll.SetLocalPeerID("1")
27+
28+
item, done := ll.Next()
29+
done(balancer.DoneInfo{})
30+
assert.Equal(t, "8", item)
31+
})
32+
2133
t.Run("1 client 1 server", func(t *testing.T) {
2234
ll := aperture.NewLeastLoadedApeture()
2335
ll.SetLocalPeers(nil)

loadbalance.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,14 @@ type Aperture interface {
2020
SetRemotePeers([]interface{})
2121
}
2222

23-
// P2C support p2c algorithm for load balance,
23+
// Picker supports multiple algorithms for load balance,
2424
// uses the ideas behind the "power of 2 choices"
2525
// to select two nodes from the underlying vector.
26-
type P2C interface {
26+
type Picker interface {
2727
// Next returns next selected item.
2828
Next() (interface{}, func(balancer.DoneInfo))
2929
// Add a weighted item.
3030
Add(interface{}, float64)
31-
}
32-
33-
// RoundRobin support roundrobin algorithm for load balance
34-
type RoundRobin interface {
35-
// Next returns next selected item.
36-
Next() interface{}
37-
// Add a weighted item.
38-
Add(interface{}, int64)
31+
// Reset this picker
32+
Reset()
3933
}

p2c/least_loaded.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type leastLoaded struct {
2222
rand *rand.Rand
2323
}
2424

25-
func NewLeastLoaded() loadbalance.P2C {
25+
func NewLeastLoaded() loadbalance.Picker {
2626
return &leastLoaded{
2727
items: make([]*leastLoadedNode, 0),
2828
rand: rand.New(rand.NewSource(time.Now().Unix())),
@@ -33,6 +33,10 @@ func (p *leastLoaded) Add(item interface{}, weight float64) {
3333
p.items = append(p.items, &leastLoadedNode{item: item, weight: weight})
3434
}
3535

36+
func (p *leastLoaded) Reset() {
37+
p.items = p.items[:0]
38+
}
39+
3640
func (p *leastLoaded) Next() (interface{}, func(balancer.DoneInfo)) {
3741
var sc, backsc *leastLoadedNode
3842

p2c/pewma.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type pewma struct {
6464
rand *rand.Rand
6565
}
6666

67-
func NewPeakEwma() loadbalance.P2C {
67+
func NewPeakEwma() loadbalance.Picker {
6868
return &pewma{
6969
items: make([]*peakEwmaNode, 0),
7070
rand: rand.New(rand.NewSource(time.Now().Unix())),
@@ -75,6 +75,13 @@ func (p *pewma) Add(item interface{}, weight float64) {
7575
p.items = append(p.items, &peakEwmaNode{item: item, latency: newPEWMA(), weight: weight})
7676
}
7777

78+
func (p *pewma) Reset() {
79+
*p = pewma{
80+
items: make([]*peakEwmaNode, 0),
81+
rand: rand.New(rand.NewSource(time.Now().Unix())),
82+
}
83+
}
84+
7885
func (p *pewma) Next() (interface{}, func(balancer.DoneInfo)) {
7986
var sc, backsc *peakEwmaNode
8087
begin := time.Now().UnixNano()

roundrobin/roundrobin.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package roundrobin
22

3-
import "github.com/hnlq715/go-loadbalance"
3+
import (
4+
"math"
5+
6+
"github.com/hnlq715/go-loadbalance"
7+
"google.golang.org/grpc/balancer"
8+
)
49

510
// smoothRoundrobinNode is a wrapped weighted item.
611
type smoothRoundrobinNode struct {
@@ -26,28 +31,34 @@ type smoothRoundrobin struct {
2631
// among peers.
2732
// In case of { 5, 1, 1 } weights this gives the following sequence of
2833
// current_weight's: (a, a, b, a, c, a, a)
29-
func NewSmoothRoundrobin() loadbalance.RoundRobin {
34+
func NewSmoothRoundrobin() loadbalance.Picker {
3035
return &smoothRoundrobin{}
3136
}
3237

3338
// Add a weighted server.
34-
func (w *smoothRoundrobin) Add(item interface{}, weight int64) {
35-
weighted := &smoothRoundrobinNode{Item: item, Weight: weight, EffectiveWeight: weight}
39+
func (w *smoothRoundrobin) Add(item interface{}, weight float64) {
40+
wt := int64(math.Floor(weight))
41+
weighted := &smoothRoundrobinNode{Item: item, Weight: wt, EffectiveWeight: wt}
3642
w.items = append(w.items, weighted)
3743
w.n++
3844
}
3945

46+
func (w *smoothRoundrobin) Reset() {
47+
w.items = w.items[:0]
48+
w.n = 0
49+
}
50+
4051
// Next returns next selected server.
41-
func (w *smoothRoundrobin) Next() interface{} {
52+
func (w *smoothRoundrobin) Next() (interface{}, func(balancer.DoneInfo)) {
4253
if w.n == 0 {
43-
return nil
54+
return nil, func(balancer.DoneInfo) {}
4455
}
4556

4657
if w.n == 1 {
47-
return w.items[0].Item
58+
return w.items[0].Item, func(balancer.DoneInfo) {}
4859
}
4960

50-
return nextSmoothWeighted(w.items).Item
61+
return nextSmoothWeighted(w.items).Item, func(balancer.DoneInfo) {}
5162
}
5263

5364
// nextSmoothWeighted selects the best node through the smooth weighted roundrobin .

roundrobin/roundrobin_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,29 @@ import (
99
func TestSW_Next(t *testing.T) {
1010
w := NewSmoothRoundrobin()
1111

12-
assert.Nil(t, w.Next())
12+
s, _ := w.Next()
13+
assert.Nil(t, s)
1314

1415
w.Add("server1", 5)
15-
assert.Equal(t, "server1", w.Next().(string))
16+
s, _ = w.Next()
17+
assert.Equal(t, "server1", s.(string))
18+
19+
w.Reset()
20+
s, _ = w.Next()
21+
assert.Nil(t, s)
22+
23+
w.Add("server1", 5)
24+
s, _ = w.Next()
25+
assert.Equal(t, "server1", s.(string))
1626

1727
w.Add("server2", 2)
1828
w.Add("server3", 3)
1929

2030
results := make(map[string]int)
2131

2232
for i := 0; i < 1000; i++ {
23-
s := w.Next().(string)
24-
results[s]++
33+
s, _ := w.Next()
34+
results[s.(string)]++
2535
}
2636

2737
if results["server1"] != 500 || results["server2"] != 200 || results["server3"] != 300 {

0 commit comments

Comments
 (0)