Skip to content

Commit

Permalink
revert qmsg
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Nov 14, 2024
1 parent 6465c53 commit 31ed56d
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 156 deletions.
4 changes: 3 additions & 1 deletion network/network.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package network

import (
"context"
"io"

"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"

protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p"
Expand All @@ -18,7 +20,7 @@ type DecodedSSVMessage interface {
// MessageRouter is accepting network messages and route them to the corresponding (internal) components
type MessageRouter interface {
// Route routes the given message, this function MUST NOT block
Route(message DecodedSSVMessage)
Route(ctx context.Context, message DecodedSSVMessage)
}

// MessageRouting allows to register a MessageRouter
Expand Down
9 changes: 5 additions & 4 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package p2pv1

import (
"context"
"encoding/hex"
"fmt"
"math/rand"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/network/commons"
Expand Down Expand Up @@ -216,8 +217,8 @@ func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) e
}

// handlePubsubMessages reads messages from the given channel and calls the router, note that this function blocks.
func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(topic string, msg *pubsub.Message) error {
return func(topic string, msg *pubsub.Message) error {
func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.Context, topic string, msg *pubsub.Message) error {
return func(ctx context.Context, topic string, msg *pubsub.Message) error {
if n.msgRouter == nil {
logger.Debug("msg router is not configured")
return nil
Expand All @@ -240,7 +241,7 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(topic string,
return fmt.Errorf("unknown decoded message type: %T", m)
}

n.msgRouter.Route(decodedMsg)
n.msgRouter.Route(ctx, decodedMsg)

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/ssvlabs/ssv-spec-pre-cc/types"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
spectestingutils "github.com/ssvlabs/ssv-spec/types/testingutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network"
"github.com/ssvlabs/ssv/networkconfig"
beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
Expand Down Expand Up @@ -262,7 +262,7 @@ type dummyRouter struct {
i int
}

func (r *dummyRouter) Route(network.DecodedSSVMessage) {
func (r *dummyRouter) Route(_ context.Context, _ network.DecodedSSVMessage) {
atomic.AddUint64(&r.count, 1)
}

Expand Down
4 changes: 2 additions & 2 deletions network/topics/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Controller interface {
}

// PubsubMessageHandler handles incoming messages
type PubsubMessageHandler func(string, *pubsub.Message) error
type PubsubMessageHandler func(context.Context, string, *pubsub.Message) error

type messageValidator interface {
ValidatorForTopic(topic string) func(ctx context.Context, p peer.ID, pmsg *pubsub.Message) pubsub.ValidationResult
Expand Down Expand Up @@ -288,7 +288,7 @@ func (ctrl *topicsCtrl) listen(logger *zap.Logger, sub *pubsub.Subscription) err
logger.Warn("unknown message type", zap.Any("message", m))
}

if err := ctrl.msgHandler(topicName, msg); err != nil {
if err := ctrl.msgHandler(ctx, topicName, msg); err != nil {
logger.Debug("could not handle msg", zap.Error(err))
}
}
Expand Down
6 changes: 3 additions & 3 deletions network/topics/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
"github.com/libp2p/go-libp2p/core/host"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

genesisspecqbft "github.com/ssvlabs/ssv-spec-pre-cc/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/message/validation"
genesisvalidation "github.com/ssvlabs/ssv/message/validation/genesis"
Expand Down Expand Up @@ -374,7 +374,7 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator
Host: h,
TraceLog: false,
MsgIDHandler: midHandler,
MsgHandler: func(topic string, msg *pubsub.Message) error {
MsgHandler: func(_ context.Context, topic string, msg *pubsub.Message) error {
p.saveMsg(topic, msg)
return nil
},
Expand Down
12 changes: 6 additions & 6 deletions operator/duties/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func setupSchedulerAndMocks(t *testing.T, handlers []dutyHandler, currentSlot *S
func setExecuteDutyFunc(s *Scheduler, executeDutiesCall chan []*spectypes.ValidatorDuty, executeDutiesCallSize int) {
executeDutiesBuffer := make(chan *spectypes.ValidatorDuty, executeDutiesCallSize)

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(context.TODO(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(_ context.Context, logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
logger.Debug("🏃 Executing duty", zap.Any("duty", duty))
executeDutiesBuffer <- duty

Expand Down Expand Up @@ -218,15 +218,15 @@ func setExecuteGenesisDutyFunc(s *Scheduler, executeDutiesCall chan []*genesissp
func setExecuteDutyFuncs(s *Scheduler, executeDutiesCall chan committeeDutiesMap, executeDutiesCallSize int) {
executeDutiesBuffer := make(chan *committeeDuty, executeDutiesCallSize)

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(context.TODO(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteDuty(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, logger *zap.Logger, duty *spectypes.ValidatorDuty) error {
logger.Debug("🏃 Executing duty", zap.Any("duty", duty))
return nil
},
).AnyTimes()

s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteCommitteeDuty(context.TODO(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(logger *zap.Logger, committeeID spectypes.CommitteeID, duty *spectypes.CommitteeDuty) {
s.dutyExecutor.(*MockDutyExecutor).EXPECT().ExecuteCommitteeDuty(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, logger *zap.Logger, committeeID spectypes.CommitteeID, duty *spectypes.CommitteeDuty) {
logger.Debug("🏃 Executing committee duty", zap.Any("duty", duty))
executeDutiesBuffer <- &committeeDuty{id: committeeID, duty: duty}

Expand Down
16 changes: 5 additions & 11 deletions operator/validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func NewController(logger *zap.Logger, options ControllerOptions) Controller {

operatorsIDs: operatorsIDs,

messageRouter: newMessageRouter(options.Context, logger),
messageRouter: newMessageRouter(logger),
messageWorker: worker.NewWorker(logger, workerCfg),
historySyncBatchSize: options.HistorySyncBatchSize,

Expand Down Expand Up @@ -396,7 +396,7 @@ func (c *controller) handleRouterMessages() {
return

case msg := <-ch:
switch m := msg.DecodedSSVMessage.(type) {
switch m := msg.(type) {
case *genesisqueue.GenesisSSVMessage:
if m.MsgType == genesismessage.SSVEventMsgType {
continue
Expand Down Expand Up @@ -428,9 +428,9 @@ func (c *controller) handleRouterMessages() {
copy(cid[:], dutyExecutorID[16:])

if v, ok := c.validatorsMap.GetValidator(spectypes.ValidatorPK(dutyExecutorID)); ok {
v.Validator().HandleMessage(msg.ctx, c.logger, m)
v.Validator().HandleMessage(ctx, c.logger, m)
} else if vc, ok := c.validatorsMap.GetCommittee(cid); ok {
vc.HandleMessage(msg.ctx, c.logger, m)
vc.HandleMessage(ctx, c.logger, m)
} else if c.validatorOptions.Exporter {
if m.MsgType != spectypes.SSVConsensusMsgType && m.MsgType != spectypes.SSVPartialSignatureMsgType {
continue
Expand Down Expand Up @@ -764,17 +764,11 @@ func (c *controller) ExecuteDuty(ctx context.Context, logger *zap.Logger, duty *
return
}
dec, err := queue.DecodeSSVMessage(ssvMsg)

qMsg := queue.QMsg{
SSVMessage: *dec,
Ctx: ctx,
}

if err != nil {
logger.Error("could not decode duty execute msg", zap.Error(err))
return
}
if pushed := v.Validator().Queues[duty.RunnerRole()].Q.TryPush(&qMsg); !pushed {
if pushed := v.Validator().Queues[duty.RunnerRole()].Q.TryPush(dec); !pushed {
logger.Warn("dropping ExecuteDuty message because the queue is full")
}
// logger.Debug("📬 queue: pushed message", fields.MessageID(dec.MsgID), fields.MessageType(dec.MsgType))
Expand Down
12 changes: 6 additions & 6 deletions operator/validator/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,39 +284,39 @@ func TestHandleNonCommitteeMessages(t *testing.T) {

identifier := spectypes.NewMsgID(networkconfig.TestNetwork.DomainType(), []byte("pk"), spectypes.RoleCommittee)

ctr.messageRouter.Route(&queue.SSVMessage{
ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVConsensusMsgType,
MsgID: identifier,
Data: generateDecidedMessage(t, identifier),
},
})

ctr.messageRouter.Route(&queue.SSVMessage{
ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVConsensusMsgType,
MsgID: identifier,
Data: generateChangeRoundMsg(t, identifier),
},
})

ctr.messageRouter.Route(&queue.SSVMessage{
ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message
MsgType: message.SSVSyncMsgType,
MsgID: identifier,
Data: []byte("data"),
},
})

ctr.messageRouter.Route(&queue.SSVMessage{
ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{ // checks that not process unnecessary message
MsgType: 123,
MsgID: identifier,
Data: []byte("data"),
},
})

ctr.messageRouter.Route(&queue.SSVMessage{
ctr.messageRouter.Route(context.TODO(), &queue.SSVMessage{
SSVMessage: &spectypes.SSVMessage{
MsgType: spectypes.SSVPartialSignatureMsgType,
MsgID: identifier,
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func setupController(logger *zap.Logger, opts MockControllerOptions) controller
validatorOptions: opts.validatorOptions,
recipientsStorage: opts.recipientsStorage,
networkConfig: opts.networkConfig,
messageRouter: newMessageRouter(context.TODO(), logger),
messageRouter: newMessageRouter(logger),
committeeValidatorSetup: make(chan struct{}),
indicesChange: make(chan struct{}, 32),
messageWorker: worker.NewWorker(logger, &worker.Config{
Expand Down
27 changes: 7 additions & 20 deletions operator/validator/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,28 @@ import (

const bufSize = 65536

func newMessageRouter(ctx context.Context, logger *zap.Logger) *messageRouter {
func newMessageRouter(logger *zap.Logger) *messageRouter {
return &messageRouter{
ctx: ctx,
logger: logger,
ch: make(chan VMSG, bufSize),
ch: make(chan network.DecodedSSVMessage, bufSize),
}
}

type messageRouter struct {
ctx context.Context
logger *zap.Logger
ch chan VMSG
ch chan network.DecodedSSVMessage
}

type VMSG struct {
network.DecodedSSVMessage
ctx context.Context
}

func (r *messageRouter) Route(message network.DecodedSSVMessage) {
ctx := r.ctx

// TODO create new (tracing) context from request data:
// ctx := otel.GetTextMapPropagator().Extract(ctx, message.TraceData)
// pass the context down via r.ch

func (r *messageRouter) Route(ctx context.Context, message network.DecodedSSVMessage) {
select {
case <-r.ctx.Done():
case <-ctx.Done():
r.logger.Warn("context canceled, dropping message")
case r.ch <- VMSG{ctx: ctx, DecodedSSVMessage: message}:
case r.ch <- message:
default:
r.logger.Warn("message router buffer is full, dropping message")
}
}

func (r *messageRouter) GetMessageChan() <-chan VMSG {
func (r *messageRouter) GetMessageChan() <-chan network.DecodedSSVMessage {
return r.ch
}
6 changes: 3 additions & 3 deletions operator/validator/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRouter(t *testing.T) {

logger := logging.TestLogger(t)

router := newMessageRouter(ctx, logger)
router := newMessageRouter(logger)

expectedCount := 1000
count := 0
Expand Down Expand Up @@ -49,9 +49,9 @@ func TestRouter(t *testing.T) {
},
}

router.Route(msg)
router.Route(context.TODO(), msg)
if i%2 == 0 {
go router.Route(msg)
go router.Route(context.TODO(), msg)
}
}

Expand Down
7 changes: 3 additions & 4 deletions protocol/v2/ssv/queue/message_prioritizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package queue

import (
"github.com/attestantio/go-eth2-client/spec/phase0"

"github.com/ssvlabs/ssv-spec/qbft"
)

Expand All @@ -19,7 +18,7 @@ type State struct {
// MessagePrioritizer is an interface for prioritizing messages.
type MessagePrioritizer interface {
// Prior returns true if message A should be prioritized over B.
Prior(a, b *QMsg) bool
Prior(a, b *SSVMessage) bool
}

type standardPrioritizer struct {
Expand All @@ -32,7 +31,7 @@ func NewMessagePrioritizer(state *State) MessagePrioritizer {
return &standardPrioritizer{state: state}
}

func (p *standardPrioritizer) Prior(a, b *QMsg) bool {
func (p *standardPrioritizer) Prior(a, b *SSVMessage) bool {
msgScoreA, msgScoreB := scoreMessageType(a), scoreMessageType(b)
if msgScoreA != msgScoreB {
return msgScoreA > msgScoreB
Expand Down Expand Up @@ -81,7 +80,7 @@ type committeePrioritizer struct {
state *State
}

func (p *committeePrioritizer) Prior(a, b *QMsg) bool {
func (p *committeePrioritizer) Prior(a, b *SSVMessage) bool {
msgScoreA, msgScoreB := scoreMessageType(a), scoreMessageType(b)
if msgScoreA != msgScoreB {
return msgScoreA > msgScoreB
Expand Down
7 changes: 3 additions & 4 deletions protocol/v2/ssv/queue/message_prioritizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ func TestMessagePrioritizer(t *testing.T) {
messages := make(messageSlice, len(test.messages))
for i, m := range test.messages {
var err error
dec, err := DecodeSignedSSVMessage(m.ssvMessage(test.state))
messages[i], err = DecodeSignedSSVMessage(m.ssvMessage(test.state))
require.NoError(t, err)
messages[i] = &QMsg{SSVMessage: *dec}
}

var shuffles []messageSlice
Expand Down Expand Up @@ -314,10 +313,10 @@ func (m mockTimeoutMessage) ssvMessage(state *State) *spectypes.SignedSSVMessage
}
}

type messageSlice []*QMsg
type messageSlice []*SSVMessage

func (m messageSlice) shuffle() messageSlice {
shuffled := make([]*QMsg, len(m))
shuffled := make([]*SSVMessage, len(m))
for i, j := range rand.Perm(len(m)) {
shuffled[i] = m[j]
}
Expand Down
Loading

0 comments on commit 31ed56d

Please sign in to comment.