Skip to content

Commit

Permalink
Having announce.Receiver is optional for Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jul 25, 2023
1 parent 2f35326 commit bc1be6e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 117 deletions.
2 changes: 1 addition & 1 deletion announce/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getOpts(opts []Option) (config, error) {
}

// WithAllowPeer sets the function that determines whether to allow or reject
// messages from a peer.
// messages from a peer. When not set or nil, allows messages from all peers.
func WithAllowPeer(allowPeer AllowPeerFunc) Option {
return func(c *config) error {
c.allowPeer = allowPeer
Expand Down
34 changes: 14 additions & 20 deletions announce/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Receiver struct {
hostID peer.ID

announceCache *stringLRU
// announceMutex protects announceCache, and allowPeer, topicSub
// announceMutex protects announceCache and topicSub.
announceMutex sync.Mutex

closed bool
Expand Down Expand Up @@ -204,16 +204,6 @@ func (r *Receiver) Close() error {
return err
}

// SetAllowPeer configures Subscriber with a function to evaluate whether to
// allow or reject messages from a peer. Setting nil removes any filtering and
// allows messages from all peers. Calling SetAllowPeer replaces any previously
// configured AllowPeerFunc.
func (r *Receiver) SetAllowPeer(allowPeer AllowPeerFunc) {
r.announceMutex.Lock()
r.allowPeer = allowPeer
r.announceMutex.Unlock()
}

// UncacheCid removes a CID from the announce cache.
func (r *Receiver) UncacheCid(adCid cid.Cid) {
r.announceMutex.Lock()
Expand All @@ -234,7 +224,6 @@ func (r *Receiver) watch(ctx context.Context) {
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, pubsub.ErrSubscriptionCancelled) {
// This is a normal result of shutting down the Subscriber.
log.Debug("Canceled watching pubsub subscription")
break
}
log.Errorw("Error reading from pubsub", "err", err)
Expand Down Expand Up @@ -299,6 +288,9 @@ func (r *Receiver) watch(ctx context.Context) {
}
err = r.handleAnnounce(ctx, amsg, false)
if err != nil {
if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) {
break
}
log.Errorw("Cannot process message", "err", err)
continue
}
Expand All @@ -319,10 +311,10 @@ func (r *Receiver) Direct(ctx context.Context, nextCid cid.Cid, peerID peer.ID,
PeerID: peerID,
Addrs: addrs,
}
return r.handleAnnounce(ctx, amsg, true)
return r.handleAnnounce(ctx, amsg, r.resend)
}

func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct bool) error {
func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, resend bool) error {
err := r.announceCheck(amsg)
if err != nil {
if err == ErrClosed {
Expand All @@ -339,7 +331,7 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo
// address in their peer store.
}

if direct && r.resend {
if resend {
err = r.republish(ctx, amsg)
if err != nil {
log.Errorw("Cannot republish announce message", "err", err)
Expand All @@ -350,6 +342,8 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo

select {
case r.outChan <- amsg:
case <-r.done:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -358,18 +352,18 @@ func (r *Receiver) handleAnnounce(ctx context.Context, amsg Announce, direct boo
}

func (r *Receiver) announceCheck(amsg Announce) error {
// Check callback to see if peer ID allowed.
if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) {
return errSourceNotAllowed
}

r.announceMutex.Lock()
defer r.announceMutex.Unlock()

if r.closed {
return ErrClosed
}

// Check callback to see if peer ID allowed.
if r.allowPeer != nil && !r.allowPeer(amsg.PeerID) {
return errSourceNotAllowed
}

// Check if a previous announce for this CID was already seen.
if r.announceCache.update(amsg.Cid.String()) {
return errAlreadySeenCid
Expand Down
11 changes: 7 additions & 4 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/httpsync"
"github.com/ipni/go-libipni/dagsync/test"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestAnnounceReplace(t *testing.T) {
require.NoError(t, err)
defer pub.Close()

sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil)
sub, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, RecvAnnounce())
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -163,7 +164,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
defer pubh.Close()
subds := dssync.MutexWrap(datastore.NewMapDatastore())
subls := test.MkLinkSystem(subds)
sub, err := NewSubscriber(subh, subds, subls, testTopic, nil)
sub, err := NewSubscriber(subh, subds, subls, testTopic, RecvAnnounce())
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -217,11 +218,13 @@ func TestAnnounceRepublish(t *testing.T) {

topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2)

sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic, nil, Topic(topics[1]))
sub2, err := NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic,
RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub2.Close()

sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, Topic(topics[0]), ResendAnnounce(true))
sub1, err := NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)))
require.NoError(t, err)
defer sub1.Close()

Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
dstLinkSys := test.MkLinkSystem(dstStore)
dstHost := test.MkTestHost()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, nil, subscriberOptions...)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, subscriberOptions...)
require.NoError(t, err)
t.Cleanup(func() {
sub.Close()
Expand Down
36 changes: 21 additions & 15 deletions dagsync/legs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
srcHost := test.MkTestHost()
dstHost := test.MkTestHost()

Expand All @@ -54,7 +54,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host,
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1]))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
Expand All @@ -67,20 +68,24 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host,

func TestAllowPeerReject(t *testing.T) {
t.Parallel()

// Set function to reject anything except dstHost, which is not the one
// generating the update.
var destID peer.ID
allow := func(peerID peer.ID) bool {
return peerID == destID
}

// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore)
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow)
defer srcHost.Close()
defer dstHost.Close()
defer pub.Close()
defer sub.Close()

// Set function to reject anything except dstHost, which is not the one
// generating the update.
sub.SetAllowPeer(func(peerID peer.ID) bool {
return peerID == dstHost.ID()
})
destID = dstHost.ID()

watcher, cncl := sub.OnSyncFinished()
defer cncl()
Expand All @@ -101,20 +106,21 @@ func TestAllowPeerReject(t *testing.T) {

func TestAllowPeerAllows(t *testing.T) {
t.Parallel()

// Set function to allow any peer.
allow := func(_ peer.ID) bool {
return true
}

// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore)
srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore, allow)
defer srcHost.Close()
defer dstHost.Close()
defer pub.Close()
defer sub.Close()

// Set function to allow any peer.
sub.SetAllowPeer(func(_ peer.ID) bool {
return true
})

watcher, cncl := sub.OnSyncFinished()
defer cncl()

Expand Down Expand Up @@ -167,7 +173,7 @@ func TestPublisherRejectsPeer(t *testing.T) {
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil, dagsync.Topic(topics[1]))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub.Close()

Expand Down
44 changes: 18 additions & 26 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
dt "github.com/filecoin-project/go-data-transfer/v2"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/announce"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -33,9 +34,7 @@ type LastKnownSyncFunc func(peer.ID) (cid.Cid, bool)

// config contains all options for configuring Subscriber.
type config struct {
addrTTL time.Duration
allowPeer announce.AllowPeerFunc
filterIPs bool
addrTTL time.Duration

topic *pubsub.Topic

Expand All @@ -45,12 +44,14 @@ type config struct {
blockHook BlockHookFunc
httpClient *http.Client

dss ipld.Node
syncRecLimit selector.RecursionLimit

idleHandlerTTL time.Duration
lastKnownSync LastKnownSyncFunc

resendAnnounce bool
hasRcvr bool
rcvrOpts []announce.Option

segDepthLimit int64

Expand Down Expand Up @@ -79,15 +80,6 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// AllowPeer sets the function that determines whether to allow or reject
// messages from a peer.
func AllowPeer(allowPeer announce.AllowPeerFunc) Option {
return func(c *config) error {
c.allowPeer = allowPeer
return nil
}
}

// AddrTTL sets the peerstore address time-to-live for addresses discovered
// from pubsub messages.
func AddrTTL(addrTTL time.Duration) Option {
Expand All @@ -105,6 +97,15 @@ func Topic(topic *pubsub.Topic) Option {
}
}

// DefaultSelectorSeq sets the default selector sequence passed to
// ExploreRecursiveWithStopNode.
func DefaultSelectorSeq(dss ipld.Node) Option {
return func(c *config) error {
c.dss = dss
return nil
}
}

// DtManager provides an existing datatransfer manager.
func DtManager(dtManager dt.Manager, gs graphsync.GraphExchange) Option {
return func(c *config) error {
Expand Down Expand Up @@ -133,15 +134,6 @@ func BlockHook(blockHook BlockHookFunc) Option {
}
}

// FilterIPs removes any private, loopback, or unspecified IP multiaddrs from
// addresses supplied in announce messages.
func FilterIPs(enable bool) Option {
return func(c *config) error {
c.filterIPs = enable
return nil
}
}

// IdleHandlerTTL configures the time after which idle handlers are removed.
func IdleHandlerTTL(ttl time.Duration) Option {
return func(c *config) error {
Expand Down Expand Up @@ -171,11 +163,11 @@ func SyncRecursionLimit(limit selector.RecursionLimit) Option {
}
}

// ResendAnnounce determines whether to resend the direct announce mesages
// (those that are not received via pubsub) over pubsub.
func ResendAnnounce(enable bool) Option {
// RecvAnnounce enables an announcement message receiver.
func RecvAnnounce(opts ...announce.Option) Option {
return func(c *config) error {
c.resendAnnounce = enable
c.hasRcvr = true
c.rcvrOpts = opts
return nil
}
}
Expand Down
Loading

0 comments on commit bc1be6e

Please sign in to comment.