From 6465c5311a47b17e34503fe5409ebe295ed1ee9f Mon Sep 17 00:00:00 2001 From: Anatolie Lupacescu Date: Thu, 14 Nov 2024 12:06:22 +0200 Subject: [PATCH] ingress --- network/network.go | 4 +--- network/p2p/p2p_pubsub.go | 9 ++++----- network/p2p/p2p_test.go | 10 +++++----- network/topics/controller.go | 4 ++-- network/topics/controller_test.go | 6 +++--- operator/validator/controller.go | 2 +- operator/validator/controller_test.go | 12 ++++++------ operator/validator/router.go | 14 +++++++++++--- operator/validator/router_test.go | 6 +++--- 9 files changed, 36 insertions(+), 31 deletions(-) diff --git a/network/network.go b/network/network.go index bfdd7ac149..cb1011984b 100644 --- a/network/network.go +++ b/network/network.go @@ -1,11 +1,9 @@ package network import ( - "context" "io" "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/zap" protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p" @@ -20,7 +18,7 @@ type DecodedSSVMessage interface { // MessageRouter is accepting network messages and route them to the corresponding (internal) components type MessageRouter interface { // Route routes the given message, this function MUST NOT block - Route(ctx context.Context, message DecodedSSVMessage) + Route(message DecodedSSVMessage) } // MessageRouting allows to register a MessageRouter diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index 68baf5126c..acabe82146 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -1,7 +1,6 @@ package p2pv1 import ( - "context" "encoding/hex" "fmt" "math/rand" @@ -9,9 +8,9 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" - spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" + spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/logging/fields" "github.com/ssvlabs/ssv/network" "github.com/ssvlabs/ssv/network/commons" @@ -217,8 +216,8 @@ func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) e } // handlePubsubMessages reads messages from the given channel and calls the router, note that this function blocks. -func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.Context, topic string, msg *pubsub.Message) error { - return func(ctx context.Context, topic string, msg *pubsub.Message) error { +func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(topic string, msg *pubsub.Message) error { + return func(topic string, msg *pubsub.Message) error { if n.msgRouter == nil { logger.Debug("msg router is not configured") return nil @@ -241,7 +240,7 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C return fmt.Errorf("unknown decoded message type: %T", m) } - n.msgRouter.Route(ctx, decodedMsg) + n.msgRouter.Route(decodedMsg) return nil } diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index 0ab9bc1154..4fbaf53f78 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -12,14 +12,14 @@ import ( eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/pkg/errors" - "github.com/ssvlabs/ssv-spec-pre-cc/types" - specqbft "github.com/ssvlabs/ssv-spec/qbft" - spectypes "github.com/ssvlabs/ssv-spec/types" - spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/ssvlabs/ssv-spec-pre-cc/types" + specqbft "github.com/ssvlabs/ssv-spec/qbft" + spectypes "github.com/ssvlabs/ssv-spec/types" + spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils" "github.com/ssvlabs/ssv/network" "github.com/ssvlabs/ssv/networkconfig" beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" @@ -262,7 +262,7 @@ type dummyRouter struct { i int } -func (r *dummyRouter) Route(_ context.Context, _ network.DecodedSSVMessage) { +func (r *dummyRouter) Route(network.DecodedSSVMessage) { atomic.AddUint64(&r.count, 1) } diff --git a/network/topics/controller.go b/network/topics/controller.go index 71f37414ab..923f830594 100644 --- a/network/topics/controller.go +++ b/network/topics/controller.go @@ -41,7 +41,7 @@ type Controller interface { } // PubsubMessageHandler handles incoming messages -type PubsubMessageHandler func(context.Context, string, *pubsub.Message) error +type PubsubMessageHandler func(string, *pubsub.Message) error type messageValidator interface { ValidatorForTopic(topic string) func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult @@ -288,7 +288,7 @@ func (ctrl *topicsCtrl) listen(logger *zap.Logger, sub *pubsub.Subscription) err logger.Warn("unknown message type", zap.Any("message", m)) } - if err := ctrl.msgHandler(ctx, topicName, msg); err != nil { + if err := ctrl.msgHandler(topicName, msg); err != nil { logger.Debug("could not handle msg", zap.Error(err)) } } diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 5c4beba80b..3f2f1d0833 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -17,11 +17,11 @@ import ( "github.com/libp2p/go-libp2p/core/host" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/stretchr/testify/require" "go.uber.org/zap" + genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft" + spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/message/validation" genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis" @@ -374,7 +374,7 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator Host: h, TraceLog: false, MsgIDHandler: midHandler, - MsgHandler: func(_ context.Context, topic string, msg *pubsub.Message) error { + MsgHandler: func(topic string, msg *pubsub.Message) error { p.saveMsg(topic, msg) return nil }, diff --git a/operator/validator/controller.go b/operator/validator/controller.go index ab5cf0b74f..aacdc58114 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -320,7 +320,7 @@ func NewController(logger *zap.Logger, options ControllerOptions) Controller { operatorsIDs: operatorsIDs, - messageRouter: newMessageRouter(logger), + messageRouter: newMessageRouter(options.Context, logger), messageWorker: worker.NewWorker(logger, workerCfg), historySyncBatchSize: options.HistorySyncBatchSize, diff --git a/operator/validator/controller_test.go b/operator/validator/controller_test.go index 25119bdce4..eab069f15c 100644 --- a/operator/validator/controller_test.go +++ b/operator/validator/controller_test.go @@ -284,7 +284,7 @@ func TestHandleNonCommitteeMessages(t *testing.T) { identifier := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), []byte("pk"), spectypes.RoleCommittee) - ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{ + ctr.messageRouter.Route(&queue.SSVMessage{ SSVMessage: &spectypes.SSVMessage{ MsgType: spectypes.SSVConsensusMsgType, MsgID: identifier, @@ -292,7 +292,7 @@ func TestHandleNonCommitteeMessages(t *testing.T) { }, }) - ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{ + ctr.messageRouter.Route(&queue.SSVMessage{ SSVMessage: &spectypes.SSVMessage{ MsgType: spectypes.SSVConsensusMsgType, MsgID: identifier, @@ -300,7 +300,7 @@ func TestHandleNonCommitteeMessages(t *testing.T) { }, }) - ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{ + ctr.messageRouter.Route(&queue.SSVMessage{ SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message MsgType: message.SSVSyncMsgType, MsgID: identifier, @@ -308,7 +308,7 @@ func TestHandleNonCommitteeMessages(t *testing.T) { }, }) - ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{ + ctr.messageRouter.Route(&queue.SSVMessage{ SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message MsgType: 123, MsgID: identifier, @@ -316,7 +316,7 @@ func TestHandleNonCommitteeMessages(t *testing.T) { }, }) - ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{ + ctr.messageRouter.Route(&queue.SSVMessage{ SSVMessage: &spectypes.SSVMessage{ MsgType: spectypes.SSVPartialSignatureMsgType, MsgID: identifier, @@ -1029,7 +1029,7 @@ func setupController(logger *zap.Logger, opts MockControllerOptions) controller validatorOptions: opts.validatorOptions, recipientsStorage: opts.recipientsStorage, networkConfig: opts.networkConfig, - messageRouter: newMessageRouter(logger), + messageRouter: newMessageRouter(context.TODO(), logger), committeeValidatorSetup: make(chan struct{}), indicesChange: make(chan struct{}, 32), messageWorker: worker.NewWorker(logger, &worker.Config{ diff --git a/operator/validator/router.go b/operator/validator/router.go index 340328028c..730f80039e 100644 --- a/operator/validator/router.go +++ b/operator/validator/router.go @@ -10,14 +10,16 @@ import ( const bufSize = 65536 -func newMessageRouter(logger *zap.Logger) *messageRouter { +func newMessageRouter(ctx context.Context, logger *zap.Logger) *messageRouter { return &messageRouter{ + ctx: ctx, logger: logger, ch: make(chan VMSG, bufSize), } } type messageRouter struct { + ctx context.Context logger *zap.Logger ch chan VMSG } @@ -27,9 +29,15 @@ type VMSG struct { ctx context.Context } -func (r *messageRouter) Route(ctx context.Context, message network.DecodedSSVMessage) { +func (r *messageRouter) Route(message network.DecodedSSVMessage) { + ctx := r.ctx + + // TODO create new (tracing) context from request data: + // ctx := otel.GetTextMapPropagator().Extract(ctx, message.TraceData) + // pass the context down via r.ch + select { - case <-ctx.Done(): + case <-r.ctx.Done(): r.logger.Warn("context canceled, dropping message") case r.ch <- VMSG{ctx: ctx, DecodedSSVMessage: message}: default: diff --git a/operator/validator/router_test.go b/operator/validator/router_test.go index 0b494b6cf1..137e0e46f0 100644 --- a/operator/validator/router_test.go +++ b/operator/validator/router_test.go @@ -20,7 +20,7 @@ func TestRouter(t *testing.T) { logger := logging.TestLogger(t) - router := newMessageRouter(logger) + router := newMessageRouter(ctx, logger) expectedCount := 1000 count := 0 @@ -49,9 +49,9 @@ func TestRouter(t *testing.T) { }, } - router.Route(context.TODO(), msg) + router.Route(msg) if i%2 == 0 { - go router.Route(context.TODO(), msg) + go router.Route(msg) } }