Skip to content

Commit

Permalink
Merge pull request #66 from bloXroute-Labs/v2.129.41
Browse files Browse the repository at this point in the history
Publish release v2.129.41
  • Loading branch information
leonbiton1 authored Oct 31, 2024
2 parents 7eed91d + 671b4fb commit c5d6e2d
Show file tree
Hide file tree
Showing 50 changed files with 921 additions and 642 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ issues:
- EXC0014

run:
go: "1.21"
go: "1.23"
allow-parallel-runners: true
exclude-files:
- ".*\\.pb\\.go$" # skip protobuf generated code
Expand Down
5 changes: 2 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG GO_VERSION=alpine
ARG BASE=golang:1.21.7-alpine3.19
ARG BASE=golang:1.23.2-alpine3.20

FROM ${BASE} AS builder

Expand Down Expand Up @@ -30,8 +30,7 @@ COPY go.sum .
RUN go mod download
COPY --chown=bloxroute:bloxroute . .

ARG PORTABLE
RUN if [ "$PORTABLE" = "true" ] ; then make gateway-portable ; else make gateway ; fi
RUN make gateway
RUN chown bloxroute:bloxroute ./bin/gateway
RUN chown bloxroute:bloxroute ./bin/bxcli

Expand Down
6 changes: 0 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ gateway: fmt | $(BIN); $(info $(M) building gateway executable) @ ## Build progr
-ldflags '-X $(MODULE)/version.BuildVersion=$(VERSION) -X $(MODULE)/version.BuildDate=$(DATE)' \
-o $(BIN) ./cmd/...

gateway-portable: fmt | $(BIN); $(info $(M) building gateway portable executable) @ ## Build program binary
$Q CGO_CFLAGS="-O -D__BLST_PORTABLE__" $(GO) build \
-tags release \
-ldflags '-X $(MODULE)/version.BuildVersion=$(VERSION) -X $(MODULE)/version.BuildDate=$(DATE)' \
-o $(BIN) ./cmd/...

$(BIN):
@mkdir -p $@
$(BIN)/%: | $(BIN) ; $(info $(M) building $(PACKAGE))
Expand Down
1 change: 0 additions & 1 deletion blockchain/beacon/handle_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func HandleBDNBlocks(ctx context.Context, b blockchain.Bridge, n *Node, beaconAP
log.Tracef("broadcasted block to blockchain: p2p, block_hash: %v", bdnBlock.Hash())
}
}()

}

if broadcastBeaconAPI {
Expand Down
172 changes: 108 additions & 64 deletions blockchain/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/bloXroute-Labs/gateway/v2/blockchain"
"github.com/bloXroute-Labs/gateway/v2/blockchain/network"
log "github.com/bloXroute-Labs/gateway/v2/logger"
bxTypes "github.com/bloXroute-Labs/gateway/v2/types"
"github.com/bloXroute-Labs/gateway/v2/utils"
"github.com/bloXroute-Labs/gateway/v2/utils/syncmap"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p"
mplex "github.com/libp2p/go-libp2p-mplex"
Expand All @@ -26,7 +21,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/pkg/errors"
fastssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
Expand All @@ -41,9 +35,14 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"google.golang.org/protobuf/proto"
)

var errPeerUnknown = errors.New("peer is unknown")
"github.com/bloXroute-Labs/gateway/v2/blockchain"
"github.com/bloXroute-Labs/gateway/v2/blockchain/network"
log "github.com/bloXroute-Labs/gateway/v2/logger"
bxTypes "github.com/bloXroute-Labs/gateway/v2/types"
"github.com/bloXroute-Labs/gateway/v2/utils"
"github.com/bloXroute-Labs/gateway/v2/utils/syncmap"
)

const (
forkDigestLength = 4
Expand All @@ -58,14 +57,12 @@ const (
// libp2p settings
const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark

// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about

// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
Expand Down Expand Up @@ -132,6 +129,7 @@ type Node struct {
host host.Host
pubSub *pubsub.PubSub
bridge blockchain.Bridge
staticPeers []libp2pPeer.AddrInfo
peers peers
trustedPeers trustedPeers
enableAnyIncomingConnection bool
Expand Down Expand Up @@ -191,6 +189,16 @@ func newNode(params NodeParams, clock utils.Clock) (*Node, error) {
return nil, fmt.Errorf("could not create chain: %v", err)
}

staticPeers := make([]libp2pPeer.AddrInfo, 0, len(params.EthConfig.StaticPeers.BeaconNodes()))
for _, multiaddr := range params.EthConfig.StaticPeers.BeaconNodes() {
addrInfo, err := libp2pPeer.AddrInfoFromP2pAddr(*multiaddr)
if err != nil {
return nil, fmt.Errorf("could not convert multiaddr %v to addr info: %v", multiaddr, err)
}

staticPeers = append(staticPeers, *addrInfo)
}

ctx, cancel := context.WithCancel(params.ParentContext)

n := &Node{
Expand All @@ -201,6 +209,7 @@ func newNode(params NodeParams, clock utils.Clock) (*Node, error) {
genesisState: genesisState,
chain: chain,
bridge: params.Bridge,
staticPeers: staticPeers,
peers: newPeers(),
trustedPeers: newTrustedPeers(),
enableAnyIncomingConnection: params.TrustedPeersFilePath == "",
Expand All @@ -225,10 +234,6 @@ func newNode(params NodeParams, clock utils.Clock) (*Node, error) {
return nil, err
}

if err := n.addPeers(); err != nil {
return nil, fmt.Errorf("could not add peers %v", err)
}

psOpts := n.pubsubOptions()

// TODO: put it into log file
Expand All @@ -244,18 +249,26 @@ func newNode(params NodeParams, clock utils.Clock) (*Node, error) {

n.host.Network().Notify(&libp2pNetwork.NotifyBundle{
ConnectedF: func(net libp2pNetwork.Network, conn libp2pNetwork.Conn) {
n.log.Tracef("peer %v connected", conn.RemotePeer())

peer := n.peers.get(conn.RemotePeer())

// Skip handshake for incoming connections
if peer == nil {
n.log.Tracef("peer %s connected", conn.RemoteMultiaddr())

peer := n.peers.add(net.Peerstore().PeerInfo(conn.RemotePeer()))
peerEndpoint := utils.MultiaddrToNodeEndoint(conn.RemoteMultiaddr(), n.networkName)

if conn.Stat().Direction == libp2pNetwork.DirInbound {
if peer.connect() {
if err := n.bridge.SendBlockchainConnectionStatus(blockchain.ConnectionStatus{
PeerEndpoint: peerEndpoint,
IsConnected: true,
}); err != nil {
n.log.Errorf("could not send blockchain connection status: %v", err)
}
}
return
}

// should be async
go func() {
if peer.startHandshake() {
if !peer.startHandshake() {
n.log.Tracef("peer %v skipping handshake", peer)
return
}
Expand All @@ -264,18 +277,38 @@ func newNode(params NodeParams, clock utils.Clock) (*Node, error) {
if err := n.handshake(peer, conn); err != nil {
n.log.Infof("handshake with peer %v failed: %v", peer, err)

if err := n.host.Network().ClosePeer(conn.RemotePeer()); err != nil {
n.log.Errorf("could not close peer %v: %v", peer, err)
if n.host.Network().Connectedness(conn.RemotePeer()) == libp2pNetwork.Connected {
if err := n.host.Network().ClosePeer(conn.RemotePeer()); err != nil {
n.log.Errorf("could not close peer %v: %v", peer, err)
}
}

return
}

if peer.connect() {
if err := n.bridge.SendBlockchainConnectionStatus(blockchain.ConnectionStatus{
PeerEndpoint: peerEndpoint,
IsConnected: true,
}); err != nil {
n.log.Errorf("could not send blockchain connection status: %v", err)
}
}

n.log.Tracef("peer %v successed handshake", conn.RemotePeer())
}()
},
DisconnectedF: func(net libp2pNetwork.Network, conn libp2pNetwork.Conn) {
n.log.Tracef("peer %v disconnected", conn.RemotePeer())
n.log.Tracef("peer %s disconnected", conn.RemoteMultiaddr())

if peer := n.peers.get(conn.RemotePeer()); peer != nil && peer.disconnect() {
if err := n.bridge.SendBlockchainConnectionStatus(blockchain.ConnectionStatus{
PeerEndpoint: utils.MultiaddrToNodeEndoint(conn.RemoteMultiaddr(), n.networkName),
IsConnected: false,
}); err != nil {
n.log.Errorf("could not send blockchain connection status: %v", err)
}
}
},
})

Expand Down Expand Up @@ -383,6 +416,7 @@ func (n *Node) Start() error {

go n.ensurePeerConnections()
go n.sendStatusRequests()
go n.bxStatusHandler()

if err := n.scheduleDenebForkUpdate(); err != nil {
return fmt.Errorf("could not schedule deneb fork update: %v", err)
Expand All @@ -405,7 +439,7 @@ func (n *Node) sendStatusRequests() {
return
case <-ticker.Alert():
n.peers.rangeByID(func(peerID libp2pPeer.ID, peer *peer) bool {
// Skip non connected
// skip non-connected
c := n.host.Network().ConnsToPeer(peerID)
if len(c) == 0 {
return true
Expand All @@ -414,7 +448,7 @@ func (n *Node) sendStatusRequests() {
ctx, cancel := context.WithTimeout(n.ctx, beaconParams.BeaconConfig().RespTimeoutDuration())
defer cancel()

stream, err := n.host.NewStream(ctx, libp2pPeer.ID(peerID), protocol.ID(p2p.RPCStatusTopicV1+n.encoding.ProtocolSuffix()))
stream, err := n.host.NewStream(ctx, peerID, protocol.ID(p2p.RPCStatusTopicV1+n.encoding.ProtocolSuffix()))
if err != nil {
n.log.Errorf("could not create stream for status request: %v", err)
return true
Expand Down Expand Up @@ -597,11 +631,8 @@ func (n *Node) blobSubscriber(msg *pubsub.Message) {

endpoint, err := n.loadNodeEndpointFromPeerID(msg.ReceivedFrom)
if err != nil {
if err == errPeerUnknown {
n.log.Debugf("skipping blob, the peer ID %v that broadcasted the blob is not trusted", msg.ReceivedFrom)
} else {
n.log.Errorf("could not load peer endpoint: %v", err)
}
n.log.Errorf("could not load peer endpoint: %v", err)

return
}

Expand Down Expand Up @@ -705,11 +736,8 @@ func (n *Node) subscribe(topic string, handler func(msg *pubsub.Message)) error
func (n *Node) blockSubscriber(msg *pubsub.Message) {
endpoint, err := n.loadNodeEndpointFromPeerID(msg.ReceivedFrom)
if err != nil {
if err == errPeerUnknown {
n.log.Debugf("skipping block, the peer ID %v that broadcasted the block is not trusted", msg.ReceivedFrom)
} else {
n.log.Errorf("could not load peer endpoint: %v", err)
}
n.log.Errorf("could not load peer endpoint: %v", err)

return
}

Expand Down Expand Up @@ -777,6 +805,7 @@ func (n *Node) loadNodeEndpointFromPeerID(peerID libp2pPeer.ID) (*bxTypes.NodeEn
}

multiaddr := utils.MultiaddrToNodeEndoint(conns[0].RemoteMultiaddr(), n.networkName)

return &multiaddr, nil
}

Expand All @@ -792,7 +821,7 @@ func (n *Node) broadcast(topic string, msg proto.Message) error {

castMsg, ok := msg.(fastssz.Marshaler)
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
return fmt.Errorf("message of %T does not support marshaller interface", msg)
}

buf := new(bytes.Buffer)
Expand All @@ -803,20 +832,6 @@ func (n *Node) broadcast(topic string, msg proto.Message) error {
return pbTopic.topic.Publish(n.ctx, buf.Bytes())
}

func (n *Node) addPeers() error {
for _, multiaddr := range n.config.StaticPeers.BeaconNodes() {
addrInfo, err := libp2pPeer.AddrInfoFromP2pAddr(*multiaddr)
if err != nil {
return fmt.Errorf("could not convert multiaddr %v to addr info: %v", multiaddr, err)
}

// Not using multiple addresses in same multiaddr
n.peers.add(*addrInfo)
}

return nil
}

func (n *Node) ensurePeerConnections() {
ticker := n.clock.Ticker(peerReconnectTimeout)

Expand All @@ -825,29 +840,29 @@ func (n *Node) ensurePeerConnections() {
case <-n.ctx.Done():
return
case <-ticker.Alert():
n.peers.rangeByID(func(peerID libp2pPeer.ID, peer *peer) bool {
c := n.host.Network().ConnsToPeer(peerID)
for _, peer := range n.staticPeers {
c := n.host.Network().ConnsToPeer(peer.ID)
if len(c) > 0 {
return true
continue
}

ctx, cancel := context.WithTimeout(n.ctx, peerConnectionTimeout)
defer cancel()

if err := n.host.Connect(ctx, peer.addrInfo); err != nil {
if err := n.host.Connect(ctx, peer); err != nil {
// Try to reconnect as fast as possible again
// https://github.com/libp2p/go-libp2p/blob/ddfb6f9240679b840d3663021e8b4433f51379a7/examples/relay/main.go#L90
n.host.Network().(*swarm.Swarm).Backoff().Clear(peerID)
n.host.Network().(*swarm.Swarm).Backoff().Clear(peer.ID)

if err := n.host.Network().ClosePeer(peerID); err != nil {
if err := n.host.Network().ClosePeer(peer.ID); err != nil {
n.log.Errorf("could not close peer %s: %v", peer, err)
}

n.log.Warnf("could not connect peer %s: %v", peer, err)
}

return true
})
continue
}
}
}
}
Expand Down Expand Up @@ -933,6 +948,35 @@ func (n *Node) pubsubOptions() []pubsub.Option {
return psOpts
}

func (n *Node) bxStatusHandler() {
statusBridge := n.bridge.SubscribeStatus()

for {
select {
case <-n.ctx.Done():
return
case <-statusBridge.ReceiveBlockchainStatusRequest():
endpoints := make([]*bxTypes.NodeEndpoint, 0)
n.peers.rangeByID(func(peerID libp2pPeer.ID, peer *peer) bool {
c := n.host.Network().ConnsToPeer(peerID)
if len(c) == 0 {
return true
}

endpoint := utils.MultiaddrToNodeEndoint(peer.addrInfo.Load().Addrs[0], n.networkName)
endpoint.ConnectedAt = peer.connectedAt().Format(time.RFC3339)
endpoints = append(endpoints, &endpoint)

return true
})

if err := statusBridge.SendBlockchainStatusResponse(endpoints); err != nil {
n.log.Errorf("could not send blockchain status response: %v", err)
}
}
}
}

// creates a custom gossipsub parameter set.
func pubsubGossipParam() pubsub.GossipSubParams {
gParams := pubsub.DefaultGossipSubParams()
Expand Down
Loading

0 comments on commit c5d6e2d

Please sign in to comment.