Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
mirroring parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Sep 5, 2023
1 parent 2713f51 commit 93135a7
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"github.com/ipld/go-car"
)

const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block"
const (
blockPathPattern = "/ipfs/%s?format=car&dag-scope=block"
defaultMirroredConcurrency = 5
)

// loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the
// Orchestrator.
Expand Down Expand Up @@ -137,29 +140,38 @@ func (p *pool) refreshPool() {
}

func (p *pool) checkPool() {
sem := make(chan struct{}, defaultMirroredConcurrency)

for {
select {
case msg := <-p.mirrorSamples:
// see if it is to a main-tier node - if so find appropriate test node to test against.
if !p.ActiveNodes.Contains(msg.node) {
continue
}
testNode := p.AllNodes.PeekRandom()
if testNode == nil {
continue
}
if p.ActiveNodes.Contains(testNode) {
continue
}
sem <- struct{}{}

go func(msg mirroredPoolRequest) {
defer func() { <-sem }()

// see if it is to a main-tier node - if so find appropriate test node to test against.
if !p.ActiveNodes.Contains(msg.node) {
return
}
testNode := p.AllNodes.PeekRandom()
if testNode == nil {
return
}
if p.ActiveNodes.Contains(testNode) {
return
}

trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator)
cancel()
if err != nil {
mirroredTrafficTotalMetric.WithLabelValues("error").Inc()
} else {
mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc()
}
}(msg)

trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator)
cancel()
if err != nil {
mirroredTrafficTotalMetric.WithLabelValues("error").Inc()
} else {
mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc()
}
case <-p.done:
return
}
Expand Down

0 comments on commit 93135a7

Please sign in to comment.