Skip to content

Commit

Permalink
refactor: bg router integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jun 19, 2023
1 parent 577ac16 commit 744cb2b
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 103 deletions.
296 changes: 242 additions & 54 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package background
import (
"context"
"fmt"

dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
"github.com/pokt-network/pocket/p2p/unicast"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
Expand All @@ -22,18 +25,27 @@ import (
var (
_ typesP2P.Router = &backgroundRouter{}
_ modules.IntegratableModule = &backgroundRouter{}
_ backgroundRouterFactory = &backgroundRouter{}
)

type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig]

// backgroundRouter implements `typesP2P.Router` for use with all P2P participants.
type backgroundRouter struct {
base_modules.IntegratableModule
unicast.UnicastRouter

logger *modules.Logger
// handler is the function to call when a message is received.
handler typesP2P.MessageHandler
// host represents a libp2p network node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme)
host libp2pHost.Host

// Fields below are assigned during creation via `#setupDependencies()`.

// gossipSub is used for broadcast communication
// (i.e. multiple, unidentified receivers)
// TECHDEBT: investigate diff between randomSub and gossipSub
Expand All @@ -47,96 +59,87 @@ type backgroundRouter struct {
kadDHT *dht.IpfsDHT
// TECHDEBT: `pstore` will likely be removed in future refactoring / simplification
// of the `Router` interface.
// pstore is the background router's peerstore.
// pstore is the background router's peerstore. Assigned in `backgroundRouter#setupPeerstore()`.
pstore typesP2P.Peerstore
}

// NewBackgroundRouter returns a `backgroundRouter` as a `typesP2P.Router`
// Create returns a `backgroundRouter` as a `typesP2P.Router`
// interface using the given configuration.
func NewBackgroundRouter(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
func Create(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
return new(backgroundRouter).Create(bus, cfg)
}

func (*backgroundRouter) Create(bus modules.Bus, cfg *config.BackgroundConfig) (typesP2P.Router, error) {
// TECHDEBT(#595): add ctx to interface methods and propagate down.
ctx := context.TODO()

networkLogger := logger.Global.CreateLoggerForModule("backgroundRouter")
networkLogger.Info().Msg("Initializing background router")

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
pstore, err := cfg.PeerstoreProvider.GetStakedPeerstoreAtHeight(
cfg.CurrentHeightProvider.CurrentHeight(),
)
if err != nil {
if err := cfg.IsValid(); err != nil {
return nil, err
}

// CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
gossipSub, err := pubsub.NewGossipSub(ctx, cfg.Host)
if err != nil {
return nil, fmt.Errorf("creating gossip pubsub: %w", err)
}

dhtMode := dht.ModeAutoServer
// NB: don't act as a bootstrap node in peer discovery in client debug mode
if isClientDebugMode(bus) {
dhtMode = dht.ModeClient
}

kadDHT, err := dht.New(ctx, cfg.Host, dht.Mode(dhtMode))
if err != nil {
return nil, fmt.Errorf("creating DHT: %w", err)
}

topic, err := gossipSub.Join(protocol.BackgroundTopicStr)
if err != nil {
return nil, fmt.Errorf("joining background topic: %w", err)
rtr := &backgroundRouter{
logger: networkLogger,
handler: cfg.Handler,
host: cfg.Host,
}
rtr.SetBus(bus)

// INVESTIGATE: `WithBufferSize` `SubOpt`:
// > WithBufferSize is a Subscribe option to customize the size of the subscribe
// > output buffer. The default length is 32 but it can be configured to avoid
// > dropping messages if the consumer is not reading fast enough.
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize)
subscription, err := topic.Subscribe()
if err != nil {
return nil, fmt.Errorf("subscribing to background topic: %w", err)
if err := rtr.setupDependencies(ctx, cfg); err != nil {
return nil, err
}

rtr := &backgroundRouter{
host: cfg.Host,
gossipSub: gossipSub,
kadDHT: kadDHT,
topic: topic,
subscription: subscription,
logger: networkLogger,
pstore: pstore,
}
go rtr.readSubscription(ctx)

return rtr, nil
}

// Broadcast implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) Broadcast(data []byte) error {
// CONSIDERATION: validate as PocketEnvelopeBz here (?)
// TODO_THIS_COMMIT: wrap in BackgroundMessage
backgroundMsg := &typesP2P.BackgroundMessage{
Data: data,
}
backgroundMsgBz, err := proto.Marshal(backgroundMsg)
if err != nil {
return err
}

// TECHDEBT(#595): add ctx to interface methods and propagate down.
return rtr.topic.Publish(context.TODO(), data)
return rtr.topic.Publish(context.TODO(), backgroundMsgBz)
}

// Send implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) Send(data []byte, address cryptoPocket.Address) error {
func (rtr *backgroundRouter) Send(pocketEnvelopeBz []byte, address cryptoPocket.Address) error {
rtr.logger.Warn().Str("address", address.String()).Msg("sending background message to peer")

backgroundMessage := &typesP2P.BackgroundMessage{
Data: pocketEnvelopeBz,
}
backgroundMessageBz, err := proto.Marshal(backgroundMessage)
if err != nil {
return fmt.Errorf("marshalling background message: %w", err)
}

peer := rtr.pstore.GetPeer(address)
if peer == nil {
return fmt.Errorf("peer with address %s not in peerstore", address)
}

if err := utils.Libp2pSendToPeer(rtr.host, data, peer); err != nil {
if err := utils.Libp2pSendToPeer(
rtr.host,
protocol.BackgroundProtocolID,
backgroundMessageBz,
peer,
); err != nil {
return err
}
return nil
}

// HandleNetworkData implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) HandleNetworkData(data []byte) ([]byte, error) {
return data, nil // intentional passthrough
}

// GetPeerstore implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) GetPeerstore() typesP2P.Peerstore {
return rtr.pstore
Expand Down Expand Up @@ -166,6 +169,191 @@ func (rtr *backgroundRouter) RemovePeer(peer typesP2P.Peer) error {
return rtr.pstore.RemovePeer(peer.GetAddress())
}

func (rtr *backgroundRouter) Close() error {
// TODO_THIS_COMMIT: why is this causing problems?
//rtr.subscription.Cancel()

//return multierror.Append(
// rtr.topic.Close(),
// rtr.kadDHT.Close(),
//)
return nil
}

func (rtr *backgroundRouter) setupUnicastRouter() error {
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
ProtocolID: protocol.BackgroundProtocolID,
MessageHandler: rtr.handleBackgroundMsg,
PeerHandler: rtr.AddPeer,
}

unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
if err != nil {
return fmt.Errorf("setting up unicast router: %w", err)
}

rtr.UnicastRouter = *unicastRouter
return nil
}

func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.BackgroundConfig) error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

if err := rtr.setupPeerDiscovery(ctx); err != nil {
return fmt.Errorf("setting up peer discovery: %w", err)
}

if err := rtr.setupPubsub(ctx); err != nil {
return fmt.Errorf("setting up pubsub: %w", err)
}

if err := rtr.setupTopic(); err != nil {
return fmt.Errorf("setting up topic: %w", err)
}

if err := rtr.setupSubscription(); err != nil {
return fmt.Errorf("setting up subscription: %w", err)
}

if err := rtr.setupPeerstore(
ctx,
cfg.PeerstoreProvider,
cfg.CurrentHeightProvider,
); err != nil {
return fmt.Errorf("setting up peerstore: %w", err)
}
return nil
}

func (rtr *backgroundRouter) setupPeerstore(
ctx context.Context,
pstoreProvider providers.PeerstoreProvider,
currentHeightProvider providers.CurrentHeightProvider,
) (err error) {
// seed initial peerstore with current on-chain peer info (i.e. staked actors)
rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(
currentHeightProvider.CurrentHeight(),
)
if err != nil {
return err
}

// CONSIDERATION: add `GetPeers` method to `PeerstoreProvider` interface
// to avoid this loop.
for _, peer := range rtr.pstore.GetPeerList() {
if err := utils.AddPeerToLibp2pHost(rtr.host, peer); err != nil {
return err
}

// TODO: refactor: #bootstrap()
libp2pPeer, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return fmt.Errorf(
"converting peer info, pokt address: %s: %w",
peer.GetAddress(),
err,
)
}

// don't attempt to connect to self
if rtr.host.ID() == libp2pPeer.ID {
return nil
}

if err := rtr.host.Connect(ctx, libp2pPeer); err != nil {
return fmt.Errorf("connecting to peer: %w", err)
}
}
return nil
}

func (rtr *backgroundRouter) setupPeerDiscovery(ctx context.Context) (err error) {
dhtMode := dht.ModeAutoServer
// NB: don't act as a bootstrap node in peer discovery in client debug mode
if isClientDebugMode(rtr.GetBus()) {
dhtMode = dht.ModeClient
}

rtr.kadDHT, err = dht.New(ctx, rtr.host, dht.Mode(dhtMode))
return err
}

func (rtr *backgroundRouter) setupPubsub(ctx context.Context) (err error) {
// TODO_THIS_COMMIT: remove or refactor
//
// TECHDEBT: integrate with go-libp2p-pubsub tracing
//truncID := host.ID().String()[:20]
//jsonTracer, err := pubsub.NewJSONTracer(fmt.Sprintf("./pubsub-trace_%s.json", truncID))
//if err != nil {
// return fmt.Errorf("creating json tracer: %w", err)
//}
//
//tracerOpt := pubsub.WithEventTracer(jsonTracer)

// CONSIDERATION: If switching to `NewRandomSub`, there will be a max size
//rtr.gossipSub, err = pubsub.NewGossipSub(ctx, host, tracerOpt)
rtr.gossipSub, err = pubsub.NewGossipSub(ctx, rtr.host)
return err
}

func (rtr *backgroundRouter) setupTopic() (err error) {
rtr.topic, err = rtr.gossipSub.Join(protocol.BackgroundTopicStr)
return err
}

func (rtr *backgroundRouter) setupSubscription() (err error) {
// INVESTIGATE: `WithBufferSize` `SubOpt`:
// > WithBufferSize is a Subscribe option to customize the size of the subscribe
// > output buffer. The default length is 32 but it can be configured to avoid
// > dropping messages if the consumer is not reading fast enough.
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithBufferSize)
rtr.subscription, err = rtr.topic.Subscribe()
return err
}

func (rtr *backgroundRouter) readSubscription(ctx context.Context) {
// TODO_THIS_COMMIT: look into "topic validaton"
// (see: https://github.com/libp2p/specs/tree/master/pubsub#topic-validation)
for {
msg, err := rtr.subscription.Next(ctx)
if ctx.Err() != nil {
fmt.Printf("error: %s\n", ctx.Err())
return
}

if err != nil {
rtr.logger.Error().Err(err).
Msg("error reading from background topic subscription")
continue
}

// TECHDEBT/DISCUSS: telemetry
if err := rtr.handleBackgroundMsg(msg.Data); err != nil {
rtr.logger.Error().Err(err).Msg("error handling background message")
continue
}
}
}

func (rtr *backgroundRouter) handleBackgroundMsg(backgroundMsgBz []byte) error {
var backgroundMsg typesP2P.BackgroundMessage
if err := proto.Unmarshal(backgroundMsgBz, &backgroundMsg); err != nil {
return err
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if backgroundMsg.Data == nil {
return nil
}

return rtr.handler(backgroundMsg.Data)
}

// isClientDebugMode returns the value of `ClientDebugMode` in the base config
func isClientDebugMode(bus modules.Bus) bool {
return bus.GetRuntimeMgr().GetConfig().ClientDebugMode
Expand Down
Loading

0 comments on commit 744cb2b

Please sign in to comment.