Skip to content

Commit

Permalink
fix(retriever): fix race
Browse files Browse the repository at this point in the history
fix race in DirectCandidateSource when not using libp2p
  • Loading branch information
hannahhoward committed Jan 26, 2024
1 parent 9e72582 commit 81cf0ff
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions pkg/retriever/directcandidatesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,33 +81,37 @@ func (d *DirectCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, c
cs := candidateSender{ctx, cancel, c, candidateResults}
var wg sync.WaitGroup
for _, provider := range d.providers {

// if protocols are specified, just use those
if len(provider.Protocols) > 0 {
cb(types.RetrievalCandidate{
MinerPeer: provider.Peer,
RootCid: cs.rootCid,
Metadata: metadata.Default.New(provider.Protocols...),
})
continue
}

// if it's http, it'll be in the multiaddr and we can't probe it
if d.trySendHTTPCandidate(provider, c, cb) {
continue
}

// if we have no libp2p host, just assume all protocols are available
if d.h == nil {
cb(types.RetrievalCandidate{
MinerPeer: provider.Peer,
RootCid: cs.rootCid,
Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}, metadata.Bitswap{}, &metadata.GraphsyncFilecoinV1{}),
})
continue
}

wg.Add(1)
provider := provider
go func() {
defer wg.Done()

// if protocols are specified, just use those
if len(provider.Protocols) > 0 {
cs.sendCandidate(provider.Peer, provider.Protocols...)
return
}

// if it's http, it'll be in the multiaddr and we can't probe it
if len(provider.Peer.Addrs) == 1 {
for _, proto := range provider.Peer.Addrs[0].Protocols() {
if proto.Name == "http" || proto.Name == "https" {
cs.sendCandidate(provider.Peer, metadata.IpfsGatewayHttp{})
return
}
}
}

// if we have no libp2p host, just assume all protocols are available
if d.h == nil {
cs.sendCandidate(provider.Peer, metadata.IpfsGatewayHttp{}, metadata.Bitswap{}, &metadata.GraphsyncFilecoinV1{})
return
}

// probe it
err := d.h.Connect(ctx, provider.Peer)
// don't add peers that we can't connect to
Expand Down Expand Up @@ -150,6 +154,22 @@ func (d *DirectCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, c
}
}

func (d *DirectCandidateSource) trySendHTTPCandidate(provider types.Provider, rootCid cid.Cid, cb func(types.RetrievalCandidate)) bool {
if len(provider.Peer.Addrs) != 1 {
return false
}
for _, proto := range provider.Peer.Addrs[0].Protocols() {
if proto.Name == "http" || proto.Name == "https" {
cb(types.RetrievalCandidate{
MinerPeer: provider.Peer,
RootCid: rootCid,
Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}),
})
return true
}
}
return false
}
func (d *DirectCandidateSource) retrievalCandidatesFromProtocolProbing(ctx context.Context, provider peer.AddrInfo, cs candidateSender) {
var protocols []metadata.Protocol
s, err := d.h.NewStream(ctx, provider.ID,
Expand Down

0 comments on commit 81cf0ff

Please sign in to comment.