Skip to content

Commit

Permalink
ingress
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Nov 14, 2024
1 parent fbbee6e commit 6465c53
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 31 deletions.
4 changes: 1 addition & 3 deletions network/network.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package network

import (
"context"
"io"

"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"

protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p"
Expand All @@ -20,7 +18,7 @@ type DecodedSSVMessage interface {
// MessageRouter is accepting network messages and route them to the corresponding (internal) components
type MessageRouter interface {
// Route routes the given message, this function MUST NOT block
Route(ctx context.Context, message DecodedSSVMessage)
Route(message DecodedSSVMessage)
}

// MessageRouting allows to register a MessageRouter
Expand Down
9 changes: 4 additions & 5 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package p2pv1

import (
"context"
"encoding/hex"
"fmt"
"math/rand"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
Expand Down Expand Up @@ -217,8 +216,8 @@ func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) e
}

// handlePubsubMessages reads messages from the given channel and calls the router, note that this function blocks.
func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.Context, topic string, msg *pubsub.Message) error {
return func(ctx context.Context, topic string, msg *pubsub.Message) error {
func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(topic string, msg *pubsub.Message) error {
return func(topic string, msg *pubsub.Message) error {
if n.msgRouter == nil {
logger.Debug("msg router is not configured")
return nil
Expand All @@ -241,7 +240,7 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C
return fmt.Errorf("unknown decoded message type: %T", m)
}

n.msgRouter.Route(ctx, decodedMsg)
n.msgRouter.Route(decodedMsg)

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/pkg/errors"
"github.com/ssvlabs/ssv-spec-pre-cc/types"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/ssvlabs/ssv-spec-pre-cc/types"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/networkconfig"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
Expand Down Expand Up @@ -262,7 +262,7 @@ type dummyRouter struct {
i int
}

func (r *dummyRouter) Route(_ context.Context, _ network.DecodedSSVMessage) {
func (r *dummyRouter) Route(network.DecodedSSVMessage) {
atomic.AddUint64(&r.count, 1)
}

Expand Down
4 changes: 2 additions & 2 deletions network/topics/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Controller interface {
}

// PubsubMessageHandler handles incoming messages
type PubsubMessageHandler func(context.Context, string, *pubsub.Message) error
type PubsubMessageHandler func(string, *pubsub.Message) error

type messageValidator interface {
ValidatorForTopic(topic string) func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult
Expand Down Expand Up @@ -288,7 +288,7 @@ func (ctrl *topicsCtrl) listen(logger *zap.Logger, sub *pubsub.Subscription) err
logger.Warn("unknown message type", zap.Any("message", m))
}

if err := ctrl.msgHandler(ctx, topicName, msg); err != nil {
if err := ctrl.msgHandler(topicName, msg); err != nil {
logger.Debug("could not handle msg", zap.Error(err))
}
}
Expand Down
6 changes: 3 additions & 3 deletions network/topics/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
"github.com/libp2p/go-libp2p/core/host"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/message/validation"
genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis"
Expand Down Expand Up @@ -374,7 +374,7 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator
Host: h,
TraceLog: false,
MsgIDHandler: midHandler,
MsgHandler: func(_ context.Context, topic string, msg *pubsub.Message) error {
MsgHandler: func(topic string, msg *pubsub.Message) error {
p.saveMsg(topic, msg)
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func NewController(logger *zap.Logger, options ControllerOptions) Controller {

operatorsIDs: operatorsIDs,

messageRouter: newMessageRouter(logger),
messageRouter: newMessageRouter(options.Context, logger),
messageWorker: worker.NewWorker(logger, workerCfg),
historySyncBatchSize: options.HistorySyncBatchSize,

Expand Down
12 changes: 6 additions & 6 deletions operator/validator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,39 +284,39 @@ func TestHandleNonCommitteeMessages(t *testing.T) {

identifier := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), []byte("pk"), spectypes.RoleCommittee)

ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
ctr.messageRouter.Route(&queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVConsensusMsgType,
MsgID: identifier,
Data: generateDecidedMessage(t, identifier),
},
})

ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
ctr.messageRouter.Route(&queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVConsensusMsgType,
MsgID: identifier,
Data: generateChangeRoundMsg(t, identifier),
},
})

ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
ctr.messageRouter.Route(&queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message
MsgType: message.SSVSyncMsgType,
MsgID: identifier,
Data: []byte("data"),
},
})

ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
ctr.messageRouter.Route(&queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message
MsgType: 123,
MsgID: identifier,
Data: []byte("data"),
},
})

ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
ctr.messageRouter.Route(&queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVPartialSignatureMsgType,
MsgID: identifier,
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func setupController(logger *zap.Logger, opts MockControllerOptions) controller
validatorOptions: opts.validatorOptions,
recipientsStorage: opts.recipientsStorage,
networkConfig: opts.networkConfig,
messageRouter: newMessageRouter(logger),
messageRouter: newMessageRouter(context.TODO(), logger),
committeeValidatorSetup: make(chan struct{}),
indicesChange: make(chan struct{}, 32),
messageWorker: worker.NewWorker(logger, &worker.Config{
Expand Down
14 changes: 11 additions & 3 deletions operator/validator/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (

const bufSize = 65536

func newMessageRouter(logger *zap.Logger) *messageRouter {
func newMessageRouter(ctx context.Context, logger *zap.Logger) *messageRouter {
return &messageRouter{
ctx: ctx,
logger: logger,
ch: make(chan VMSG, bufSize),
}
}

type messageRouter struct {
ctx context.Context
logger *zap.Logger
ch chan VMSG
}
Expand All @@ -27,9 +29,15 @@ type VMSG struct {
ctx context.Context
}

func (r *messageRouter) Route(ctx context.Context, message network.DecodedSSVMessage) {
func (r *messageRouter) Route(message network.DecodedSSVMessage) {
ctx := r.ctx

// TODO create new (tracing) context from request data:
// ctx := otel.GetTextMapPropagator().Extract(ctx, message.TraceData)
// pass the context down via r.ch

select {
case <-ctx.Done():
case <-r.ctx.Done():
r.logger.Warn("context canceled, dropping message")
case r.ch <- VMSG{ctx: ctx, DecodedSSVMessage: message}:
default:
Expand Down
6 changes: 3 additions & 3 deletions operator/validator/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRouter(t *testing.T) {

logger := logging.TestLogger(t)

router := newMessageRouter(logger)
router := newMessageRouter(ctx, logger)

expectedCount := 1000
count := 0
Expand Down Expand Up @@ -49,9 +49,9 @@ func TestRouter(t *testing.T) {
},
}

router.Route(context.TODO(), msg)
router.Route(msg)
if i%2 == 0 {
go router.Route(context.TODO(), msg)
go router.Route(msg)
}
}

Expand Down

0 comments on commit 6465c53

Please sign in to comment.