From 93135a781c9c8e83c7cfc4a7e7df2ca50c2f48a1 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 5 Sep 2023 17:31:36 +0400 Subject: [PATCH] mirroring parallel --- pool.go | 52 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/pool.go b/pool.go index 03163e2..f1830f4 100644 --- a/pool.go +++ b/pool.go @@ -19,7 +19,10 @@ import ( "github.com/ipld/go-car" ) -const blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" +const ( + blockPathPattern = "/ipfs/%s?format=car&dag-scope=block" + defaultMirroredConcurrency = 5 +) // loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the // Orchestrator. @@ -137,29 +140,38 @@ func (p *pool) refreshPool() { } func (p *pool) checkPool() { + sem := make(chan struct{}, defaultMirroredConcurrency) + for { select { case msg := <-p.mirrorSamples: - // see if it is to a main-tier node - if so find appropriate test node to test against. - if !p.ActiveNodes.Contains(msg.node) { - continue - } - testNode := p.AllNodes.PeekRandom() - if testNode == nil { - continue - } - if p.ActiveNodes.Contains(testNode) { - continue - } + sem <- struct{}{} + + go func(msg mirroredPoolRequest) { + defer func() { <-sem }() + + // see if it is to a main-tier node - if so find appropriate test node to test against. + if !p.ActiveNodes.Contains(msg.node) { + return + } + testNode := p.AllNodes.PeekRandom() + if testNode == nil { + return + } + if p.ActiveNodes.Contains(testNode) { + return + } + + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) + cancel() + if err != nil { + mirroredTrafficTotalMetric.WithLabelValues("error").Inc() + } else { + mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() + } + }(msg) - trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) - err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) - cancel() - if err != nil { - mirroredTrafficTotalMetric.WithLabelValues("error").Inc() - } else { - mirroredTrafficTotalMetric.WithLabelValues("no-error").Inc() - } case <-p.done: return }