Skip to content

Commit

Permalink
New release version, and add comments (#43)
Browse files Browse the repository at this point in the history
* New release version
  • Loading branch information
gammazero authored May 24, 2023
1 parent 1bd7b3f commit dfad9b3
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 1 deletion.
2 changes: 2 additions & 0 deletions announce/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

const DefaultAnnouncePath = "/announce"

// Sender sends announce messages over HTTP.
type Sender struct {
announceURLs []string
client *http.Client
Expand Down Expand Up @@ -71,6 +72,7 @@ func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, e
}, nil
}

// Close closes idle HTTP connections.
func (s *Sender) Close() error {
s.client.CloseIdleConnections()
return nil
Expand Down
1 change: 1 addition & 0 deletions announce/p2psender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const shutdownTime = 5 * time.Second

// Sender sends announce messages over pubsub.
type Sender struct {
cancelPubSub context.CancelFunc
topic *pubsub.Topic
Expand Down
1 change: 1 addition & 0 deletions announce/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ipni/go-libipni/announce/message"
)

// Sender is the interface for announce sender implementations.
type Sender interface {
// Close closes the Sender.
Close() error
Expand Down
16 changes: 16 additions & 0 deletions dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/multiformats/go-multiaddr"
)

// Publisher is a data-transfer publisher that announces the head of an
// advertisement chain to a set of configured senders.
type Publisher struct {
closeOnce sync.Once
dtManager dt.Manager
Expand Down Expand Up @@ -90,22 +92,30 @@ func NewPublisherFromExisting(dtManager dt.Manager, host host.Host, topicName st
}, nil
}

// Addrs returns the multiaddrs of the publisher's host.
func (p *Publisher) Addrs() []multiaddr.Multiaddr {
return p.host.Addrs()
}

// ID returns the peer ID of the publisher's host.
func (p *Publisher) ID() peer.ID {
return p.host.ID()
}

// Protocol returns the multihash protocol ID of the transport used by the
// publisher.
func (p *Publisher) Protocol() int {
return multiaddr.P_P2P
}

// AnnounceHead announces the current head of the advertisement chain to the
// configured senders.
func (p *Publisher) AnnounceHead(ctx context.Context) error {
return p.announce(ctx, p.headPublisher.Root(), p.Addrs())
}

// AnnounceHeadWithAddrs announces the current head of the advertisement chain
// to the configured senders with the given addresses.
func (p *Publisher) AnnounceHeadWithAddrs(ctx context.Context, addrs []multiaddr.Multiaddr) error {
return p.announce(ctx, p.headPublisher.Root(), addrs)
}
Expand All @@ -131,6 +141,7 @@ func (p *Publisher) announce(ctx context.Context, c cid.Cid, addrs []multiaddr.M
return errs
}

// SetRoot sets the root CID of the advertisement chain.
func (p *Publisher) SetRoot(ctx context.Context, c cid.Cid) error {
if c == cid.Undef {
return errors.New("cannot update to an undefined cid")
Expand All @@ -139,10 +150,14 @@ func (p *Publisher) SetRoot(ctx context.Context, c cid.Cid) error {
return p.headPublisher.UpdateRoot(ctx, c)
}

// UpdateRoot updates the root CID of the advertisement chain and announces it
// to the configured senders.
func (p *Publisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
return p.UpdateRootWithAddrs(ctx, c, p.Addrs())
}

// UpdateRootWithAddrs updates the root CID of the advertisement chain and
// announces it to the configured senders with the given addresses.
func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error {
err := p.SetRoot(ctx, c)
if err != nil {
Expand All @@ -151,6 +166,7 @@ func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []
return p.announce(ctx, c, addrs)
}

// Close closes the publisher and all of its senders.
func (p *Publisher) Close() error {
var errs error
p.closeOnce.Do(func() {
Expand Down
16 changes: 16 additions & 0 deletions dagsync/httpsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

// Publisher is an HTTP publisher that announces the head of an advertisement
// chain to a set of configured senders.
type Publisher struct {
addr multiaddr.Multiaddr
closer io.Closer
Expand Down Expand Up @@ -143,21 +145,28 @@ func (p *Publisher) Addrs() []multiaddr.Multiaddr {
return []multiaddr.Multiaddr{p.addr}
}

// ID returns the p2p peer ID of the Publisher.
func (p *Publisher) ID() peer.ID {
return p.peerID
}

// Protocol returns the multihash protocol ID of the transport used by the
// publisher.
func (p *Publisher) Protocol() int {
return multiaddr.P_HTTP
}

// AnnounceHead announces the head of the advertisement chain to the configured
// senders.
func (p *Publisher) AnnounceHead(ctx context.Context) error {
p.rl.Lock()
c := p.root
p.rl.Unlock()
return p.announce(ctx, c, p.Addrs())
}

// AnnounceHeadWithAddrs announces the head of the advertisement chain to the
// configured senders, with the provided addresses.
func (p *Publisher) AnnounceHeadWithAddrs(ctx context.Context, addrs []multiaddr.Multiaddr) error {
p.rl.Lock()
c := p.root
Expand Down Expand Up @@ -187,17 +196,22 @@ func (p *Publisher) announce(ctx context.Context, c cid.Cid, addrs []multiaddr.M
return errs
}

// SetRoot sets the head of the advertisement chain.
func (p *Publisher) SetRoot(_ context.Context, c cid.Cid) error {
p.rl.Lock()
defer p.rl.Unlock()
p.root = c
return nil
}

// UpdateRoot updates the head of the advertisement chain and announces it to
// the configured senders.
func (p *Publisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
return p.UpdateRootWithAddrs(ctx, c, p.Addrs())
}

// UpdateRootWithAddrs updates the head of the advertisement chain and announces
// it to the configured senders, with the provided addresses.
func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error {
err := p.SetRoot(ctx, c)
if err != nil {
Expand All @@ -206,6 +220,7 @@ func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []
return p.announce(ctx, c, addrs)
}

// Close closes the Publisher and all of its senders.
func (p *Publisher) Close() error {
var errs error
err := p.closer.Close()
Expand All @@ -220,6 +235,7 @@ func (p *Publisher) Close() error {
return errs
}

// ServeHTTP implements the http.Handler interface.
func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var ask string
if p.handlerPath != "" {
Expand Down
4 changes: 4 additions & 0 deletions dagsync/httpsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Sync struct {
lsys ipld.LinkSystem
}

// NewSync creates a new Sync.
func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID, cid.Cid)) *Sync {
if client == nil {
client = &http.Client{
Expand Down Expand Up @@ -77,6 +78,7 @@ func (s *Sync) Close() {

var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
peerID peer.ID
rateLimiter *rate.Limiter
Expand All @@ -85,6 +87,7 @@ type Syncer struct {
sync *Sync
}

// GetHead fetches the head of the peer's advertisement chain.
func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
var head cid.Cid
var pubKey ic.PubKey
Expand All @@ -110,6 +113,7 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return head, nil
}

// Sync syncs the peer's advertisement chain or entries chain.
func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error {
xsel, err := selector.CompileSelector(sel)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,10 +778,13 @@ type (
}
)

// SetNextSyncCid sets the CID that will be synced in the next segmented sync.
func (ss *segmentedSync) SetNextSyncCid(c cid.Cid) {
ss.nextSyncCid = &c
}

// FailSync fails the sync and returns the given error when the current segment
// sync finishes.
func (ss *segmentedSync) FailSync(err error) {
ss.err = err
}
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.1.0"
"version": "v0.1.1"
}

0 comments on commit dfad9b3

Please sign in to comment.