Skip to content

Commit

Permalink
fix: measure "connect time" per-peer for peer scoring
Browse files Browse the repository at this point in the history
prior to this, the "start" is the same for all peers, regardless of
whether they come in from the indexer at different times; this makes it
strictly local to the peer so we can score it accordingly
  • Loading branch information
rvagg committed Sep 21, 2023
1 parent a7c7002 commit 4caba91
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pkg/retriever/graphsyncretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func graphsyncMetadataCompare(a, b *metadata.GraphsyncFilecoinV1, defaultValue b
return defaultValue
}

func (pg *ProtocolGraphsync) Connect(ctx context.Context, retrieval *retrieval, startTime time.Time, candidate types.RetrievalCandidate) (time.Duration, error) {
func (pg *ProtocolGraphsync) Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) (time.Duration, error) {
startTime := pg.Clock.Now()
if err := pg.Client.Connect(ctx, candidate.MinerPeer); err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetada
return &metadata.IpfsGatewayHttp{}
}

func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, startTime time.Time, candidate types.RetrievalCandidate) (time.Duration, error) {
func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) (time.Duration, error) {
// We could begin the request here by moving ph.beginRequest() to this function.
// That would result in parallel connections to candidates as they are received,
// then serial reading of bodies.
Expand Down
8 changes: 3 additions & 5 deletions pkg/retriever/parallelpeerretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type GetStorageProviderTimeout func(peer peer.ID) time.Duration
type TransportProtocol interface {
Code() multicodec.Code
GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol
Connect(ctx context.Context, retrieval *retrieval, startTime time.Time, candidate types.RetrievalCandidate) (time.Duration, error)
Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) (time.Duration, error)
Retrieve(
ctx context.Context,
retrieval *retrieval,
Expand Down Expand Up @@ -158,7 +158,6 @@ func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.In
}

// start retrievals
startTime := retrieval.Clock.Now()
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func() {
Expand All @@ -174,7 +173,7 @@ func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.In
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
retrieval.runRetrievalCandidate(ctx, shared, startTime, candidate)
retrieval.runRetrievalCandidate(ctx, shared, candidate)
}()
}
}
Expand Down Expand Up @@ -306,7 +305,6 @@ func collectResults(ctx context.Context, shared *retrievalShared, eventsCallback
func (retrieval *retrieval) runRetrievalCandidate(
ctx context.Context,
shared *retrievalShared,
startTime time.Time,
candidate types.RetrievalCandidate,
) {
timeout := retrieval.Session.GetStorageProviderTimeout(candidate.MinerPeer.ID)
Expand All @@ -324,7 +322,7 @@ func (retrieval *retrieval) runRetrievalCandidate(
}

// Setup in parallel
connectTime, err := retrieval.Protocol.Connect(connectCtx, retrieval, startTime, candidate)
connectTime, err := retrieval.Protocol.Connect(connectCtx, retrieval, candidate)
if err != nil {
// Exclude the case where the context was cancelled by the parent, which likely means that
// another protocol has succeeded.
Expand Down

0 comments on commit 4caba91

Please sign in to comment.