Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Support for data-transfer/graphsync #143

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion announce/p2psender/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// config contains all options for configuring dtsync.publisher.
// config contains all options for configuring ipnisync.publisher.
type config struct {
topic *pubsub.Topic
extraData []byte
Expand Down
69 changes: 44 additions & 25 deletions dagsync/README.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,60 @@
## dagsync

dagsync is an interface for [go-data-transfer](https://github.com/filecoin-project/go-data-transfer),
providing a 1:1 mechanism for maintaining a synchronized [IPLD dag](https://docs.ipld.io/) of data between
a publisher and a subscriber's current state for that publisher.
dagsync is an interface for maintaining a synchronized [IPLD dag](https://docs.ipld.io/) of IPNI advertisements between a publisher and a subscriber's current state for that publisher.

## Usage

Typically an application will be either a provider or a subscriber, but may be both.

### Publisher

Create a dagsync publisher. Update its root to cause it to publish.
Create a dagsync publisher. Update its root to publish a new advertisement. Send announcement messages to inform indexers a new advertisement is available.

```golang
pub, err := NewPublisher(host, dsstore, lsys, "/dagsync/topic")
publisher, err := ipnisync.NewPublisher(linkSys, privKey,
ipnisync.WithHTTPListenAddrs("http://127.0.0.1:0"),
ipnisync.WithStreamHost(publisherStreamHost),
)
if err != nil {
panic(err)
}
...

// Create announcement senders to send advertisement announcements to indexers.
var senders []announce.Sender
httpSender, err := httpsender.New(announceURLs, id)
if err != nil {
panic(err)
}
senders = append(senders, httpSender)
p2pSender, err := p2psender.New(publisherStreamHost, pubTopicName)
if err != nil {
panic(err)
}
senders = append(senders, p2pSender)

// ...

// Publish updated root.
err = publisher.UpdateRoot(ctx, lnk.(cidlink.Link).Cid)
adCid := lnk.(cidlink.Link).Cid
err = publisher.SetRoot(adCid)
if err != nil {
panic(err)
}
// Announce new advertisement.
err := announce.Send(ctx, adCid, adsPublishedHereAddrs, senders...)
if err != nil {
panic(err)
}
```

### Subscriber

The `Subscriber` handles subscribing to a topic, reading messages from the topic and tracking the state of each publisher.
The `Subscriber` reads advertisement chains from index-providers. Its announcement receiver receives libp2p pubsub messages from a topic and direct HTTP announcements. The Subscriber reads advertisements is response to announcement messages.

Create a `Subscriber`:

```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys, dagsync.RecvAnnounce(pubTopicName))
if err != nil {
panic(err)
}
Expand All @@ -46,34 +68,31 @@ defer cancelWatcher()
go watch(watcher)

func watch(notifications <-chan dagsync.SyncFinished) {
for {
syncFinished := <-notifications
// newHead is now available in the local dataStore
}
for {
syncFinished := <-notifications
// newHead is now available in the local dataStore
}
}
```

To shutdown a `Subscriber`, call its `Close()` method.

A `Subscriber` can be created with a function that determines if the `Subscriber` accepts or rejects messages from a publisher. Use the `AllowPeer` option to specify the function.
A `Subscriber` can be created with announce receive options that include a function that determines if the `Subscriber` accepts or rejects announcements from a publisher.
```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil, dagsync.AllowPeer(allowPeer))
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys,
dagsync.RecvAnnounce(pubTopicName, announce.WithALlowPeer(allowPeer)),
)

```

The `Subscriber` keeps track of the latest head for each publisher that it has synced. This avoids exchanging the whole DAG from scratch in every update and instead downloads only the part that has not been synced. This value is not persisted as part of the library. If you want to start a `Subscriber` which has already partially synced with a provider you can use the `SetLatestSync` method:
```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys)
if err != nil {
panic(err)
}
// Set up partially synced publishers
if err = sub.SetLatestSync(peerID1, lastSync1) ; err != nil {
panic(err)
}
if err = sub.SetLatestSync(peerID2, lastSync2) ; err != nil {
panic(err)
}
if err = sub.SetLatestSync(peerID3, lastSync3) ; err != nil {
panic(err)
}
sub.SetLatestSync(peerID1, lastSync1)
sub.SetLatestSync(peerID2, lastSync2)
sub.SetLatestSync(peerID3, lastSync3)
```
96 changes: 8 additions & 88 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dagsync_test

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -17,7 +16,6 @@ import (
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/p2psender"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -35,7 +33,7 @@ func TestAnnounceReplace(t *testing.T) {
blocksSeenByHook[c] = struct{}{}
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -172,7 +170,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
subh := test.MkTestHost(t)
subds := dssync.MutexWrap(datastore.NewMapDatastore())
subls := test.MkLinkSystem(subds)
sub, err := dagsync.NewSubscriber(subh, subds, subls, testTopic, dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(subh, subls, dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -226,13 +224,13 @@ func TestAnnounceRepublish(t *testing.T) {

topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2)

sub2, err := dagsync.NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
sub2, err := dagsync.NewSubscriber(dstHost2, dstLnkS2,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub2.Close()

sub1, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)),
sub1, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[0]), announce.WithResend(true)),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub1.Close()
Expand Down Expand Up @@ -344,84 +342,6 @@ func TestAllowPeerAllows(t *testing.T) {
}
}

func TestPublisherRejectsPeer(t *testing.T) {
t.Parallel()
// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())

srcHost := test.MkTestHost(t)
dstHost := test.MkTestHost(t)

topics := test.WaitForMeshWithMessage(t, testTopic, srcHost, dstHost)

srcLnkS := test.MkLinkSystem(srcStore)

blockID := dstHost.ID()
var blockMutex sync.Mutex

allowPeer := func(peerID peer.ID) bool {
blockMutex.Lock()
defer blockMutex.Unlock()
return peerID != blockID
}

p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]))
require.NoError(t, err)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAllowPeer(allowPeer))
require.NoError(t, err)
defer pub.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub.Close()

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher, cncl := sub.OnSyncFinished()
defer cncl()

c := mkLnk(t, srcStore)

// Update root with item
pub.SetRoot(c)
err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender)
require.NoError(t, err)

select {
case <-time.After(updateTimeout):
t.Log("publisher blocked")
case <-watcher:
t.Fatal("sync should not have happened with blocked ID")
}

blockMutex.Lock()
blockID = peer.ID("")
blockMutex.Unlock()

c = mkLnk(t, srcStore)

// Update root with item
pub.SetRoot(c)
err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender)
require.NoError(t, err)

select {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for SyncFinished")
case <-watcher:
t.Log("synced with allowed ID")
}
}

func mkLnk(t *testing.T, srcStore datastore.Batching) cid.Cid {
// Update root with item
np := basicnode.Prototype__Any{}
Expand Down Expand Up @@ -457,8 +377,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
Expand Down
Loading
Loading