Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/aa/test-simulator' into feat/por…
Browse files Browse the repository at this point in the history
…t-Caboose-main
  • Loading branch information
aarshkshah1992 committed Sep 18, 2023
2 parents bda8d0d + 61c82da commit 550cf5b
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 62 deletions.
7 changes: 7 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

Expand Down Expand Up @@ -94,6 +97,7 @@ const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
Expand Down Expand Up @@ -135,6 +139,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
}

logger := newLogger(config)
c := Caboose{
Expand Down
4 changes: 4 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,7 @@ func (n *Node) Rate() float64 {
last := n.Samples.Peek()
return float64(len) / float64(time.Since(last.Start))
}

func (n *Node) String() string {
return n.URL
}
5 changes: 3 additions & 2 deletions node_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func (nh *NodeHeap) PeekRandom() *Node {

func (nh *NodeHeap) TopN(n int) []*Node {
m := make([]*Node, 0, n)
nh.lk.RLock()
defer nh.lk.RUnlock()
nh.lk.Lock()
defer nh.lk.Unlock()
heap.Init(nh)
for i := 0; i < n && i < len(nh.Nodes); i++ {
node := nh.Nodes[i]
m = append(m, node)
Expand Down
90 changes: 74 additions & 16 deletions node_ring.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package caboose

import (
"fmt"
"strings"
"sync"

"github.com/willscott/hashring"
)

// NodeRing represents a set of nodes organized for stable hashing.
type NodeRing struct {
Nodes map[string]*Node
ring hashring.HashRing
Nodes map[string]*Node
ring hashring.HashRing
targetSize int

lk sync.RWMutex
}

func NewNodeRing() *NodeRing {
func NewNodeRing(targetSize int) *NodeRing {
return &NodeRing{
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
targetSize: targetSize,
}
}

Expand All @@ -32,6 +36,40 @@ func (nr *NodeRing) updateRing() error {
return nil
}

// A score of '0' ==> overall experience is the same as the current state
// A positive score ==> overall experience is better than the current state
// A negative score ==> overall experience is worse than the current state
func (nr *NodeRing) getScoreForUpdate(candidate string, priority float64, weight int) float64 {
changes := nr.ring.ConsiderUpdateWeightedNode(candidate, weight)
delta := float64(0)
var neighbor *Node

for n, v := range changes {
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()
if neighborVolume < 1 {
neighborVolume = 1
}

amntChanged := v
// for now, add some bounds
if amntChanged < -1 {
amntChanged = -1
} else if amntChanged > 1 {
amntChanged = 1
}
// a negative amntChanged means that we're replacing the neighbor with the candidate.
amntChanged *= -1

// how much worse is candidate?
diff := priority - neighbor.Priority()
cs := diff * neighborVolume * float64(amntChanged)
delta += cs
// fmt.Printf("+%f (n %s: diff %f=(n %f - candidate %f) * volume %f * v = %f)", cs, neighbor.URL, diff, neighbor.Priority(), priority, neighborVolume, amntChanged)
}
return delta
}

func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold int64) (bool, error) {
nr.lk.Lock()
defer nr.lk.Unlock()
Expand All @@ -44,24 +82,32 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
}

// how much space is being claimed?
overlapEstimate := nr.ring.ConsiderUpdateWeightedNode(candidate.URL, 1)
delta := nr.getScoreForUpdate(candidate.URL, candidate.Priority(), 1)

var neighbor *Node
delta := float64(0)

for n, v := range overlapEstimate {
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()
if delta >= float64(activationThreshold) {
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}

// how much worse is candidate?
diff := candidate.Priority() - neighbor.Priority()
delta += diff * neighborVolume * float64(v)
// not a clear benefit to add, but maybe acceptable for substitution:
worst := candidate.Priority()
worstN := ""
for _, n := range nr.Nodes {
if n.Priority() < worst {
worst = n.Priority()
worstN = n.URL
}
}

if delta > float64(activationThreshold) {
// todo: the '+1' is an arbitrary threshold to prevent thrashing. it should be configurable.
if worstN != "" && candidate.Priority()-worst > float64(activationThreshold)+1 {
nr.Nodes[candidate.URL] = candidate
delete(nr.Nodes, worstN)
return true, nr.updateRing()

}

// fmt.Printf("did not add - delta %f activation %d, node priority %f\n", delta, activationThreshold, candidate.Priority())
return false, nil
}

Expand Down Expand Up @@ -116,3 +162,15 @@ func (nr *NodeRing) Len() int {
defer nr.lk.RUnlock()
return nr.ring.Size()
}

func (nr *NodeRing) String() string {
nr.lk.RLock()
defer nr.lk.RUnlock()

ns := make([]string, 0, len(nr.Nodes))
for _, n := range nr.Nodes {
ns = append(ns, n.String())
}

return fmt.Sprintf("NodeRing[len %d]{%s}", nr.ring.Size(), strings.Join(ns, ","))
}
2 changes: 1 addition & 1 deletion node_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestNodeRing(t *testing.T) {
nr := caboose.NewNodeRing()
nr := caboose.NewNodeRing(30)
nodes := make([]*caboose.Node, 0)
for i := 0; i < 100; i++ {
nodes = append(nodes, &caboose.Node{URL: fmt.Sprintf("node%d", i)})
Expand Down
2 changes: 1 addition & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func newPool(c *Config, logger *logger) *pool {
fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),
fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute),

ActiveNodes: NewNodeRing(),
ActiveNodes: NewNodeRing(c.PoolTargetSize),
AllNodes: NewNodeHeap(),
}

Expand Down
45 changes: 8 additions & 37 deletions pool_dynamics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package caboose_test

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"
Expand Down Expand Up @@ -30,7 +29,6 @@ are picked randomly in the beginning of each test. At the end of each test, the
always be converging to the "good" nodes.
*/
func TestPoolDynamics(t *testing.T) {

baseStatSize := 100000
baseStatLatency := 100
poolRefreshNo := 10
Expand All @@ -44,64 +42,32 @@ func TestPoolDynamics(t *testing.T) {
ch.FetchAndAssertSuccess(t, ctx, testCid)

goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)
for _, n := range ch.CabooseAllNodes.Nodes {
_, ok := controlGroup[n.URL]
if ok {
goodNodes = append(goodNodes, n)
} else {
badNodes = append(badNodes, n)
}
}

for i := 0; i < 1; i++ {
nodes := make([]string, 0)
for _, n := range ch.CabooseAllNodes.Nodes {
nodes = append(nodes, n.URL)
}
fmt.Println("All nodes", nodes)

goodStats := util.NodeStats{
Start: time.Now().Add(-time.Second * 2),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}

bn := make([]string, 0)
gn := make([]string, 0)
for _, n := range goodNodes {
gn = append(gn, n.URL)
}

for _, n := range badNodes {
bn = append(bn, n.URL)
}
fmt.Println("Good Nodes", gn)
fmt.Println("Bad nodes", bn)

ch.RecordSuccesses(t, goodNodes, goodStats, 1000)
ch.CaboosePool.DoRefresh()
}

for n := range controlGroup {
assert.Contains(t, ch.CabooseActiveNodes.Nodes, n)
}

np := make([]string, 0)
for _, n := range ch.CabooseActiveNodes.Nodes {
np = append(np, n.URL)
}

fmt.Println("Final Node Pool", np)

for _, n := range ch.CabooseAllNodes.Nodes {
fmt.Println("Node", n.URL, "Priority", n.Priority(), "Rate", n.Rate(), "samples ", len(n.Samples.PeekAll()))
}

})

t.Run("pool converges to good nodes vs nodes with worse stats", func(t *testing.T) {
ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2)
ch.FetchAndAssertSuccess(t, ctx, testCid)

goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)
Expand Down Expand Up @@ -135,13 +101,14 @@ func TestPoolDynamics(t *testing.T) {
for n := range controlGroup {
assert.Contains(t, ch.CabooseActiveNodes.Nodes, n)
}

})

// When new nodes join, if they start consistently performing better than the nodes in the current pool,
// then those nodes should replace the nodes in the current pool.
t.Run("pool converges to new nodes that are better than the current pool", func(t *testing.T) {
ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2)
ch.FetchAndAssertSuccess(t, ctx, testCid)

goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)

Expand Down Expand Up @@ -188,6 +155,8 @@ func TestPoolDynamics(t *testing.T) {
// to nodes that are not failing.
t.Run("pool converges to other nodes if the current ones start failing", func(t *testing.T) {
ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2)
ch.FetchAndAssertSuccess(t, ctx, testCid)

goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)

Expand Down Expand Up @@ -241,7 +210,9 @@ func TestPoolDynamics(t *testing.T) {
}

func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) {
ch := util.BuildCabooseHarness(t, nodesSize, 3)
ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) {
config.PoolTargetSize = 3
})

ch.StartOrchestrator()

Expand Down
9 changes: 4 additions & 5 deletions pool_tier_promotion.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package caboose

const (
PoolConsiderationCount = 30
activationThreshold = 0
var (
activationThreshold = 0
)

func updateActiveNodes(active *NodeRing, all *NodeHeap) error {
candidates := all.TopN(PoolConsiderationCount)
candidates := all.TopN(active.targetSize)
added := 0
for _, c := range candidates {
if active.Contains(c) {
continue
}
activeSize := active.Len()
discount := PoolConsiderationCount - activeSize
discount := active.targetSize - activeSize
if discount < 0 {
discount = 0
}
Expand Down

0 comments on commit 550cf5b

Please sign in to comment.