Skip to content

Commit

Permalink
feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#… (
Browse files Browse the repository at this point in the history
#128)

* feat(p2p): render `HasChannel(chID)` is a public `p2p.Peer` method (cometbft#3510)

Closes: cometbft#3472

It also prevents reactors from starting routines intended to send
messages to a peer that does not implement/support a given channel.
Because all `Send()` or `TrySend()` calls from this routine will be
useless, always returning `false` and possibly producing some busy-wait
behavior (see cometbft#3414).

The changes are restricted to: mempool and evidence reactor, because
they use a single channel and have a sending routine peer peer, and two
of the consensus routines, for Data and Votes.

Block and State sync reactors have smarter ways to deal with
unresponsive peers, so probably this check is not needed.

---

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec

* One more test fix

* Add changelog

* Update changelog

---------

Co-authored-by: Daniel <daniel.cason@informal.systems>
  • Loading branch information
ValarDragon and cason authored Jul 24, 2024
1 parent 6917034 commit be6b582
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/features/3472-p2p-has-channel-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by
reactors to check whether a peer implements/supports a given channel.
([#3472](https://github.com/cometbft/cometbft/issues/3472))
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased (I think)

* [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510)
* [#126]() Remove p2p allocations for wrapping outbound packets
* [#125]() Fix marshalling and concurrency overhead within broadcast routines
* perf(p2p): Only update send monitor once per batch packet msg send (#3382)
* [#124]() Secret connection read buffer
* [#123](https://github.com/osmosis-labs/cometbft/pull/123) perf(p2p/conn): Remove unneeded global pool buffers in secret connection #3403
* perf(p2p): Delete expensive debug log already slated for deletion #3412
* perf(p2p): Reduce the p2p metrics overhead. #3411
Expand Down
8 changes: 8 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(DataChannel) {
logger.Info("Peer does not implement DataChannel.")
return
}
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
Expand Down Expand Up @@ -743,6 +747,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
if !peer.HasChannel(VoteChannel) {
logger.Info("Peer does not implement VoteChannel.")
return
}
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
Expand Down
4 changes: 3 additions & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {

// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
if peer.HasChannel(EvidenceChannel) {
go evR.broadcastEvidenceRoutine(peer)
}
}

// Receive implements Reactor.
Expand Down
1 change: 1 addition & 0 deletions evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}
Expand Down
2 changes: 1 addition & 1 deletion mempool/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
if memR.config.Broadcast && peer.HasChannel(mempool.MempoolChannel) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
Expand Down
1 change: 1 addition & 0 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewPeer(ip net.IP) *Peer {
}

func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) HasChannel(_ byte) bool { return true }
func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true }
Expand Down
18 changes: 18 additions & 0 deletions p2p/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Peer interface {
Status() cmtconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket

HasChannel(chID byte) bool // Does the peer implement this channel?
SendEnvelope(Envelope) bool
TrySendEnvelope(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool
Expand Down Expand Up @@ -114,7 +115,7 @@ type peer struct {

// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
// cached to avoid copying nodeInfo in HasChannel
nodeInfo NodeInfo
channels []byte

Expand Down Expand Up @@ -288,7 +289,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
} else if !p.HasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
Expand All @@ -308,23 +309,22 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}

// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
// HasChannel returns whether the peer reported implementing this channel.
func (p *peer) HasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
}
}
// NOTE: probably will want to remove this
// but could be helpful while the feature is new
// p.Logger.Debug(
// "Unknown channel for peer",
// "channel",
// chID,
// "channels",
// p.channels,
// )
p.Logger.Debug(
"Unknown channel for peer",
"channel",
chID,
"channels",
p.channels,
)
return false
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type mockPeer struct {
id ID
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore erro
func (mp *mockPeer) HasChannel(byte) bool { return true }
func (mp *mockPeer) TrySendMarshalled(e MarshalledEnvelope) bool { return true }

func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true }
Expand Down
26 changes: 16 additions & 10 deletions spec/p2p/reactor-api/p2p-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,16 @@ From this point, reactors can use the methods of the new `Peer` instance.
The table below summarizes the interaction of the standard reactors with
connected peers, with the `Peer` methods used by them:

| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|--------------------------------------------|-----------|------------|------------|---------|-----------|-------|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |
| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX |
|----------------------------|-----------|------------|------------|---------|----------|-----|
| `ID() ID` | x | x | x | x | x | x |
| `IsRunning() bool` | x | | | x | x | |
| `Quit() <-chan struct{}` | | | | x | x | |
| `Get(string) interface{}` | x | | | x | x | |
| `Set(string, interface{})` | x | | | | | |
| `HasChannel(byte) bool` | x | | | x | x | |
| `Send(Envelope) bool` | x | x | x | x | x | x |
| `TrySend(Envelope) bool` | x | x | | | | |

The above list is not exhaustive as it does not include all the `Peer` methods
invoked by the PEX reactor, a special component that should be considered part
Expand Down Expand Up @@ -265,8 +266,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion
reactors running at that peer.
This is ultimately the goal of the switch when it provides `Peer` instances to
the registered reactors.
There are two methods for sending messages:
There are two methods for sending messages, and one auxiliary method to check
whether the peer supports a given channel:

func (p Peer) HasChannel(chID byte) bool
func (p Peer) Send(e Envelope) bool
func (p Peer) TrySend(e Envelope) bool

Expand All @@ -275,6 +278,9 @@ set as follows:

- `ChannelID`: the channel the message should be sent through, which defines
the reactor that will process the message;
- The auxiliary `HasChannel()` method allows testing whether the remote peer
implements a channel; if it does not, both message-sending methods will
immediately return `false`, as sending always fails.
- `Src`: this field represents the source of an incoming message, which is
irrelevant for outgoing messages;
- `Message`: the actual message's payload, which is marshalled using protocol buffers.
Expand Down
30 changes: 16 additions & 14 deletions test/fuzz/p2p/pex/reactor_receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,19 @@ func (fp *fuzzPeer) RemoteIP() net.IP { return net.IPv4(0, 0, 0, 0) }
func (fp *fuzzPeer) RemoteAddr() net.Addr {
return &net.TCPAddr{IP: fp.RemoteIP(), Port: 98991, Zone: ""}
}
func (fp *fuzzPeer) IsOutbound() bool { return false }
func (fp *fuzzPeer) IsPersistent() bool { return false }
func (fp *fuzzPeer) CloseConn() error { return nil }
func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo }
func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs }
func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) }
func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true }
func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true }
func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true }
func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true }
func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value }
func (fp *fuzzPeer) Get(key string) interface{} { return fp.m[key] }
func (fp *fuzzPeer) GetRemovalFailed() bool { return false }
func (fp *fuzzPeer) SetRemovalFailed() {}
func (fp *fuzzPeer) IsOutbound() bool { return false }
func (fp *fuzzPeer) IsPersistent() bool { return false }
func (fp *fuzzPeer) CloseConn() error { return nil }
func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo }
func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs }
func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) }
func (fp *fuzzPeer) HasChannel(byte) bool { return true }
func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true }
func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true }
func (fp *fuzzPeer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true }
func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true }
func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value }
func (fp *fuzzPeer) Get(key string) interface{} { return fp.m[key] }
func (fp *fuzzPeer) GetRemovalFailed() bool { return false }
func (fp *fuzzPeer) SetRemovalFailed() {}

0 comments on commit be6b582

Please sign in to comment.