Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Aug 14, 2020
1 parent 1d01c52 commit d35ad59
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 6 deletions.
28 changes: 24 additions & 4 deletions aperture/aperture.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,27 @@ import (
"google.golang.org/grpc/balancer"
)

// Aperture support map local peers to remote peers
// to divide remote peers into subsets
// to reduce the connections and separate services into small sets
type Aperture struct {
localID string
localPeers []string
localPeersMap map[string]int
remotePeers []interface{}
logicalAperture int

p2c loadbalance.P2C
p2c loadbalance.P2C
apertureIdxes []int
}

const (
// defaultLogicalAperture means the max logic aperture size
// to control the stability for aperture load balance algorithm
defaultLogicalAperture int = 12
)

// New returns an Apeture interface
func New() loadbalance.Aperture {
return &Aperture{
logicalAperture: defaultLogicalAperture,
Expand All @@ -32,18 +39,21 @@ func New() loadbalance.Aperture {
}
}

// SetLogicalAperture sets the logical aperture size
func (a *Aperture) SetLogicalAperture(width int) {
if width > 0 {
a.logicalAperture = width
a.rebuild()
}
}

// SetLocalPeerID sets the local peer id
func (a *Aperture) SetLocalPeerID(id string) {
a.localID = id
a.rebuild()
}

// SetLocalPeers sets the local peers
func (a *Aperture) SetLocalPeers(localPeers []string) {
a.localPeers = localPeers
for idx, local := range localPeers {
Expand All @@ -53,15 +63,24 @@ func (a *Aperture) SetLocalPeers(localPeers []string) {
a.rebuild()
}

// SetRemotePeers sets the remote peers
func (a *Aperture) SetRemotePeers(remotePeers []interface{}) {
a.remotePeers = remotePeers
a.rebuild()
}

// Next returns the next selected item
func (a *Aperture) Next() (interface{}, func(balancer.DoneInfo)) {
return a.p2c.Next()
}

// List returns the remote peers for the local peer id
// NOTE: current for test/debug only
func (a *Aperture) List() []int {
return a.apertureIdxes
}

// rebuild just rebuilds the aperture when any arguments changed
func (a *Aperture) rebuild() {
if len(a.localPeers) == 0 {
return
Expand All @@ -87,15 +106,16 @@ func (a *Aperture) rebuild() {
offset := float64(idx) * apertureWidth

ring := NewRing(len(a.remotePeers))
apertureIdxes := ring.Slice(offset, apertureWidth)
a.p2c = leastloaded.New()
a.apertureIdxes = ring.Slice(offset, apertureWidth)

for _, apertureIdx := range apertureIdxes {
a.p2c = leastloaded.New()
for _, apertureIdx := range a.apertureIdxes {
weight := ring.Weight(apertureIdx, offset, apertureWidth)
a.p2c.Add(a.remotePeers[apertureIdx], weight)
}
}

// dApertureWidth calculates the actual aperture size base on logic aperture size
func dApertureWidth(localWidth, remoteWidth float64, logicalAperture int) float64 {
unitWidth := localWidth
unitAperture := float64(logicalAperture) * remoteWidth
Expand Down
35 changes: 33 additions & 2 deletions aperture/aperture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func TestAperture(t *testing.T) {
assert.Nil(t, item)
})

t.Run("1 item", func(t *testing.T) {
t.Run("1 client 1 server", func(t *testing.T) {
ll := aperture.New()
ll.SetLocalPeers(nil)
ll.SetLocalPeers([]string{"1"})
ll.SetRemotePeers([]interface{}{"8"})
ll.SetLocalPeerID("1")
Expand All @@ -29,7 +30,7 @@ func TestAperture(t *testing.T) {
assert.Equal(t, "8", item)
})

t.Run("3 items", func(t *testing.T) {
t.Run("3 client 3 server", func(t *testing.T) {
ll := aperture.New()
ll.SetLocalPeers([]string{"1", "2", "3"})
ll.SetRemotePeers([]interface{}{"8", "9", "10"})
Expand Down Expand Up @@ -94,3 +95,33 @@ func TestAperture(t *testing.T) {
assert.Equal(t, totalCount, total)
})
}

func TestDynamic(t *testing.T) {
t.Run("1client-3client", func(t *testing.T) {
ll := aperture.New()
ll.SetLocalPeers([]string{"1"})
ll.SetRemotePeers([]interface{}{"8", "9", "10"})
ll.SetLocalPeerID("1")
ll.SetLogicalAperture(2)

assert.Equal(t, []int{0, 1, 2}, ll.(*aperture.Aperture).List())

ll.SetLocalPeers([]string{"1", "2", "3"})
assert.Equal(t, []int{0, 1}, ll.(*aperture.Aperture).List())

})

t.Run("3server-4server", func(t *testing.T) {
ll := aperture.New()
ll.SetLocalPeers([]string{"1", "2", "3"})
ll.SetRemotePeers([]interface{}{"8", "9", "10"})
ll.SetLocalPeerID("1")
ll.SetLogicalAperture(2)

assert.Equal(t, []int{0, 1}, ll.(*aperture.Aperture).List())

ll.SetRemotePeers([]interface{}{"1", "2", "3", "4"})
assert.Equal(t, []int{0, 1, 2}, ll.(*aperture.Aperture).List())

})
}
7 changes: 7 additions & 0 deletions aperture/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math"
)

// Ring maps the indices [0, `size`) uniformly around a coordinate space [0.0, 1.0).
type Ring struct {
size int
unitWidth float64
Expand All @@ -21,6 +22,7 @@ func NewRing(size int) *Ring {
}
}

// Range returns the total number of indices that [offset, offset + width) intersects with.
func (r *Ring) Range(offset, width float64) int {
begin := r.Index(offset)
end := r.Index(math.Mod(offset+width, 1.0))
Expand Down Expand Up @@ -56,6 +58,7 @@ func (r *Ring) Range(offset, width float64) int {
return r.size
}

// Slice returns the indices where [offset, offset + width) intersects.
func (r *Ring) Slice(offset, width float64) []int {
seq := make([]int, 0)
i := r.Index(offset)
Expand All @@ -71,10 +74,13 @@ func (r *Ring) Slice(offset, width float64) []int {
return seq
}

// Index returns the (zero-based) index between [0, `size`) which the
// position `offset` maps to.
func (r *Ring) Index(offset float64) int {
return int(math.Floor(offset*float64(r.size))) % r.size
}

// Weight returns the ratio of the intersection between `index` and [offset, offset + width).
func (r *Ring) Weight(index int, offset, width float64) float64 {
ab := float64(index) * r.unitWidth
if ab+1 < offset+width {
Expand All @@ -86,6 +92,7 @@ func (r *Ring) Weight(index int, offset, width float64) float64 {
return intersect(ab, ae, offset, offset+width) / r.unitWidth
}

// intersect returns the length of the intersection between the two ranges.
func intersect(b0, e0, b1, e1 float64) float64 {
len := math.Min(e0, e1) - math.Max(b0, b1)
return math.Max(0, len)
Expand Down
6 changes: 6 additions & 0 deletions loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"google.golang.org/grpc/balancer"
)

// Aperture support map local peers to remote peers
// to divide remote peers into subsets
// to separate services into small sets and reduce the total connections
type Aperture interface {
// Next returns next selected item.
Next() (interface{}, func(balancer.DoneInfo))
Expand All @@ -17,6 +20,9 @@ type Aperture interface {
SetRemotePeers([]interface{})
}

// P2C support p2c algorithm for load balance,
// uses the ideas behind the "power of 2 choices"
// to select two nodes from the underlying vector.
type P2C interface {
// Next returns next selected item.
Next() (interface{}, func(balancer.DoneInfo))
Expand Down

0 comments on commit d35ad59

Please sign in to comment.