Skip to content

Commit 85fce37

Browse files
committed
first commit
0 parents  commit 85fce37

File tree

10 files changed

+614
-0
lines changed

10 files changed

+614
-0
lines changed

.gitignore

Whitespace-only changes.

aperture/aperture.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package aperture
2+
3+
import (
4+
"math"
5+
6+
"github.com/hnlq715/go-loadbalance"
7+
"github.com/hnlq715/go-loadbalance/p2c/leastloaded"
8+
"google.golang.org/grpc/balancer"
9+
)
10+
11+
type Aperture struct {
12+
localID string
13+
localPeers []string
14+
localPeersMap map[string]int
15+
remotePeers []interface{}
16+
logicalWidth int
17+
18+
p2c loadbalance.P2C
19+
}
20+
21+
const (
22+
defaultLogicalWidth int = 1
23+
)
24+
25+
func New() loadbalance.Aperture {
26+
return &Aperture{
27+
logicalWidth: defaultLogicalWidth,
28+
localPeers: make([]string, 0),
29+
localPeersMap: make(map[string]int),
30+
remotePeers: make([]interface{}, 0),
31+
p2c: leastloaded.New(),
32+
}
33+
}
34+
35+
func (a *Aperture) SetLogicalWidth(width int) {
36+
a.logicalWidth = width
37+
a.rebuild()
38+
}
39+
40+
func (a *Aperture) SetLocalPeerID(id string) {
41+
a.localID = id
42+
a.rebuild()
43+
}
44+
45+
func (a *Aperture) SetLocalPeers(localPeers []string) {
46+
a.localPeers = localPeers
47+
for idx, local := range localPeers {
48+
a.localPeersMap[local] = idx
49+
}
50+
51+
a.rebuild()
52+
}
53+
54+
func (a *Aperture) SetRemotePeers(remotePeers []interface{}) {
55+
a.remotePeers = remotePeers
56+
a.rebuild()
57+
}
58+
59+
func (a *Aperture) Next() (interface{}, func(balancer.DoneInfo)) {
60+
return a.p2c.Next()
61+
}
62+
63+
func (a *Aperture) rebuild() {
64+
if len(a.localPeers) == 0 {
65+
return
66+
}
67+
68+
if len(a.remotePeers) == 0 {
69+
return
70+
}
71+
72+
idx, ok := a.localPeersMap[a.localID]
73+
if !ok {
74+
return
75+
}
76+
77+
localWidth := floatOne / float64(len(a.localPeers))
78+
remoteWidth := floatOne / float64(len(a.remotePeers))
79+
80+
width := dApertureWidth(localWidth, remoteWidth, a.logicalWidth)
81+
82+
offset := float64(idx) * width
83+
84+
ring := NewRing(len(a.remotePeers))
85+
idxes := ring.Slice(offset, width)
86+
a.p2c = leastloaded.New()
87+
88+
for _, idx := range idxes {
89+
weight := ring.Weight(idx, offset, width)
90+
a.p2c.Add(a.remotePeers[idx], weight)
91+
}
92+
}
93+
94+
func dApertureWidth(localWidth, remoteWidth float64, logicalAperture int) float64 {
95+
unitWidth := localWidth
96+
unitAperture := float64(logicalAperture) * remoteWidth
97+
n := math.Ceil(unitAperture / unitWidth)
98+
width := n * unitWidth
99+
100+
return math.Min(floatOne, width)
101+
}

aperture/aperture_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package aperture_test
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/hnlq715/go-loadbalance/aperture"
9+
"github.com/stretchr/testify/assert"
10+
"google.golang.org/grpc/balancer"
11+
)
12+
13+
func TestAperture(t *testing.T) {
14+
t.Run("0 item", func(t *testing.T) {
15+
ll := aperture.New()
16+
item, done := ll.Next()
17+
done(balancer.DoneInfo{})
18+
assert.Nil(t, item)
19+
})
20+
21+
t.Run("1 item", func(t *testing.T) {
22+
ll := aperture.New()
23+
ll.SetLocalPeers([]string{"1"})
24+
ll.SetRemotePeers([]interface{}{"8"})
25+
ll.SetLocalPeerID("1")
26+
27+
item, done := ll.Next()
28+
done(balancer.DoneInfo{})
29+
assert.Equal(t, "8", item)
30+
})
31+
32+
t.Run("3 items", func(t *testing.T) {
33+
ll := aperture.New()
34+
ll.SetLocalPeers([]string{"1", "2", "3"})
35+
ll.SetRemotePeers([]interface{}{"8", "9", "10"})
36+
ll.SetLocalPeerID("1")
37+
38+
item, done := ll.Next()
39+
done(balancer.DoneInfo{})
40+
assert.Equal(t, "8", item)
41+
42+
ll.SetLocalPeerID("2")
43+
44+
item, done = ll.Next()
45+
done(balancer.DoneInfo{})
46+
assert.Equal(t, "9", item)
47+
48+
ll.SetLocalPeerID("3")
49+
50+
item, done = ll.Next()
51+
done(balancer.DoneInfo{})
52+
assert.Equal(t, "10", item)
53+
})
54+
55+
t.Run("count", func(t *testing.T) {
56+
ll := aperture.New()
57+
ll.SetLocalPeers([]string{"1", "2", "3"})
58+
ll.SetRemotePeers([]interface{}{"8", "9", "10", "11", "12"})
59+
ll.SetLocalPeerID("1")
60+
ll.SetLogicalWidth(2)
61+
62+
countMap := make(map[interface{}]int)
63+
64+
totalCount := 10000
65+
wg := sync.WaitGroup{}
66+
wg.Add(totalCount)
67+
68+
mu := sync.Mutex{}
69+
for i := 0; i < totalCount; i++ {
70+
go func() {
71+
defer wg.Done()
72+
item, done := ll.Next()
73+
time.Sleep(1 * time.Second)
74+
done(balancer.DoneInfo{})
75+
76+
mu.Lock()
77+
countMap[item]++
78+
mu.Unlock()
79+
}()
80+
}
81+
82+
wg.Wait()
83+
84+
total := 0
85+
for _, count := range countMap {
86+
total += count
87+
}
88+
assert.Less(t, 2990, countMap["8"])
89+
assert.Less(t, 2990, countMap["9"])
90+
assert.Less(t, 2990, countMap["10"])
91+
assert.Less(t, 990, countMap["11"])
92+
93+
assert.Equal(t, totalCount, total)
94+
})
95+
}

aperture/ring.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package aperture
2+
3+
import (
4+
"math"
5+
)
6+
7+
type Ring struct {
8+
size int
9+
unitWidth float64
10+
}
11+
12+
const (
13+
floatOne float64 = 1.0
14+
intOne int = 1
15+
)
16+
17+
func NewRing(size int) *Ring {
18+
return &Ring{
19+
size: size,
20+
unitWidth: floatOne / float64(size),
21+
}
22+
}
23+
24+
func (r *Ring) Range(offset, width float64) int {
25+
begin := r.Index(offset)
26+
end := r.Index(math.Mod(offset+width, 1.0))
27+
28+
if begin == end && width > r.unitWidth {
29+
return r.size
30+
} else if begin == end {
31+
return intOne
32+
}
33+
34+
beginWeight := r.Weight(begin, offset, width)
35+
endWeight := r.Weight(end, offset, width)
36+
37+
adjustedBegin := begin
38+
if beginWeight <= 0 {
39+
adjustedBegin++
40+
}
41+
42+
adjustedEnd := end
43+
if endWeight > 0 {
44+
adjustedEnd++
45+
}
46+
47+
diff := adjustedEnd - adjustedBegin
48+
if diff < 0 {
49+
return diff + r.size
50+
}
51+
52+
return diff
53+
}
54+
55+
func (r *Ring) Slice(offset, width float64) []int {
56+
seq := make([]int, 0)
57+
i := r.Index(offset)
58+
rr := r.Range(offset, width)
59+
60+
for rr > 0 {
61+
idx := i % r.size
62+
seq = append(seq, idx)
63+
i++
64+
rr--
65+
}
66+
67+
return seq
68+
}
69+
70+
func (r *Ring) Index(offset float64) int {
71+
return int(math.Floor(offset*float64(r.size))) % r.size
72+
}
73+
74+
func (r *Ring) Weight(index int, offset, width float64) float64 {
75+
ab := float64(index) * r.unitWidth
76+
if ab+1 < offset+width {
77+
ab++
78+
}
79+
80+
ae := ab + r.unitWidth
81+
return intersect(ab, ae, offset, offset+width) / r.unitWidth
82+
}
83+
84+
func intersect(b0, e0, b1, e1 float64) float64 {
85+
len := math.Min(e0, e1) - math.Max(b0, b1)
86+
return math.Max(0, len)
87+
}

aperture/ring_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package aperture
2+
3+
import (
4+
"math"
5+
"testing"
6+
7+
"gotest.tools/assert"
8+
)
9+
10+
func TestRing(t *testing.T) {
11+
r := NewRing(3)
12+
13+
len := float64(3)
14+
width := 1.0 / len
15+
16+
offset := math.Mod(0*width, 1.0)
17+
assert.DeepEqual(t, []int{0}, r.Slice(offset, width))
18+
19+
offset = math.Mod(1*width, 1.0)
20+
assert.DeepEqual(t, []int{1}, r.Slice(offset, width))
21+
22+
offset = math.Mod(2*width, 1.0)
23+
assert.DeepEqual(t, []int{2}, r.Slice(offset, width))
24+
}
25+
26+
func TestRing1(t *testing.T) {
27+
r := NewRing(5)
28+
29+
len := float64(3)
30+
width := 1.0 / len
31+
32+
offset := math.Mod(0*width, 1.0)
33+
assert.DeepEqual(t, []int{0, 1}, r.Slice(offset, width))
34+
35+
offset = math.Mod(1*width, 1.0)
36+
assert.DeepEqual(t, []int{1, 2, 3}, r.Slice(offset, width))
37+
38+
offset = math.Mod(2*width, 1.0)
39+
assert.DeepEqual(t, []int{3, 4}, r.Slice(offset, width))
40+
41+
}
42+
43+
func TestRing2(t *testing.T) {
44+
r := NewRing(5)
45+
46+
len := float64(3)
47+
width := 1.0 / len
48+
49+
offset := float64(0) * width
50+
assert.Equal(t, float64(10), math.Round(r.Weight(0, offset, width)*10))
51+
assert.Equal(t, float64(7), math.Round(r.Weight(1, offset, width)*10))
52+
assert.Equal(t, float64(0), math.Round(r.Weight(2, offset, width)*10))
53+
assert.Equal(t, float64(0), math.Round(r.Weight(3, offset, width)*10))
54+
assert.Equal(t, float64(0), math.Round(r.Weight(4, offset, width)*10))
55+
56+
offset = float64(1) * width
57+
assert.Equal(t, float64(0), math.Round(r.Weight(0, offset, width)*10))
58+
assert.Equal(t, float64(3), math.Round(r.Weight(1, offset, width)*10))
59+
assert.Equal(t, float64(10), math.Round(r.Weight(2, offset, width)*10))
60+
assert.Equal(t, float64(3), math.Round(r.Weight(3, offset, width)*10))
61+
assert.Equal(t, float64(0), math.Round(r.Weight(5, offset, width)*10))
62+
63+
offset = float64(2) * width
64+
assert.Equal(t, float64(0), math.Round(r.Weight(0, offset, width)*10))
65+
assert.Equal(t, float64(0), math.Round(r.Weight(1, offset, width)*10))
66+
assert.Equal(t, float64(0), math.Round(r.Weight(2, offset, width)*10))
67+
assert.Equal(t, float64(7), math.Round(r.Weight(3, offset, width)*10))
68+
assert.Equal(t, float64(10), math.Round(r.Weight(4, offset, width)*10))
69+
}

go.mod

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
module github.com/hnlq715/go-loadbalance
2+
3+
go 1.14
4+
5+
require (
6+
github.com/davecgh/go-spew v1.1.1 // indirect
7+
github.com/golang/protobuf v1.4.2 // indirect
8+
github.com/kr/pretty v0.2.0 // indirect
9+
github.com/pkg/errors v0.9.1 // indirect
10+
github.com/stretchr/testify v1.6.1
11+
google.golang.org/grpc v1.31.0
12+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
13+
gotest.tools v2.2.0+incompatible
14+
)

0 commit comments

Comments
 (0)