Skip to content

Commit

Permalink
app: wire duty gater (#2542)
Browse files Browse the repository at this point in the history
Wire duty gater. Improve it to also check duty types.

category: feature
ticket: #2506
  • Loading branch information
corverroos authored Aug 14, 2023
1 parent de1b9aa commit 38fb52c
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 23 deletions.
13 changes: 9 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
})

gaterFunc, err := core.NewDutyGater(ctx, eth2Cl)
if err != nil {
return err
}

fetch, err := fetcher.New(eth2Cl, feeRecipientFunc)
if err != nil {
return err
Expand Down Expand Up @@ -449,7 +454,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

parSigEx = parsigex.NewParSigEx(tcpNode, sender.SendAsync, nodeIdx.PeerIdx, peerIDs, verifyFunc)
parSigEx = parsigex.NewParSigEx(tcpNode, sender.SendAsync, nodeIdx.PeerIdx, peerIDs, verifyFunc, gaterFunc)
}

sigAgg, err := sigagg.New(int(cluster.Threshold), sigagg.NewVerifier(eth2Cl))
Expand All @@ -467,7 +472,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
retryer := retry.New[core.Duty](deadlineFunc)

cons, startCons, err := newConsensus(conf, cluster, tcpNode, p2pKey, sender,
nodeIdx, deadlinerFunc("consensus"), qbftSniffer)
nodeIdx, deadlinerFunc("consensus"), gaterFunc, qbftSniffer)
if err != nil {
return err
}
Expand Down Expand Up @@ -838,7 +843,7 @@ func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager,

// newConsensus returns a new consensus component and its start lifecycle hook.
func newConsensus(conf Config, cluster *manifestpb.Cluster, tcpNode host.Host, p2pKey *k1.PrivateKey,
sender *p2p.Sender, nodeIdx cluster.NodeIdx, deadliner core.Deadliner,
sender *p2p.Sender, nodeIdx cluster.NodeIdx, deadliner core.Deadliner, gaterFunc core.DutyGaterFunc,
qbftSniffer func(*pbv1.SniffedConsensusInstance),
) (core.Consensus, lifecycle.IHookFunc, error) {
peers, err := manifest.ClusterPeers(cluster)
Expand All @@ -851,7 +856,7 @@ func newConsensus(conf Config, cluster *manifestpb.Cluster, tcpNode host.Host, p
}

if featureset.Enabled(featureset.QBFTConsensus) {
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, qbftSniffer)
comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, gaterFunc, qbftSniffer)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (io instanceIO) MaybeStart() bool {

// New returns a new consensus QBFT component.
func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, snifferFunc func(*pbv1.SniffedConsensusInstance),
deadliner core.Deadliner, gaterFunc core.DutyGaterFunc, snifferFunc func(*pbv1.SniffedConsensusInstance),
) (*Component, error) {
// Extract peer pubkeys.
keys := make(map[int64]*k1.PublicKey)
Expand All @@ -218,6 +218,7 @@ func New(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.Pri
pubkeys: keys,
deadliner: deadliner,
snifferFunc: snifferFunc,
gaterFunc: gaterFunc,
dropFilter: log.Filter(),
timerFunc: getTimerFunc(),
}
Expand All @@ -238,6 +239,7 @@ type Component struct {
subs []subscriber
deadliner core.Deadliner
snifferFunc func(*pbv1.SniffedConsensusInstance)
gaterFunc core.DutyGaterFunc
dropFilter z.Field // Filter buffer overflow errors (possible DDoS)
timerFunc timerFunc

Expand Down Expand Up @@ -493,6 +495,10 @@ func (c *Component) handle(ctx context.Context, _ peer.ID, req proto.Message) (p
duty := core.DutyFromProto(pbMsg.Msg.Duty)
ctx = log.WithCtx(ctx, z.Any("duty", duty))

if !c.gaterFunc(duty) {
return nil, false, errors.New("invalid duty", z.Any("duty", duty))
}

for _, justification := range pbMsg.Justification {
if err := verifyMsg(justification, c.pubkeys); err != nil {
return nil, false, errors.Wrap(err, "invalid justification")
Expand Down
9 changes: 8 additions & 1 deletion core/consensus/component_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func TestComponent_handle(t *testing.T) {
var tc Component
tc.deadliner = testDeadliner{}
tc.mutable.instances = make(map[core.Duty]instanceIO)
tc.gaterFunc = func(core.Duty) bool { return true }

msg := &pbv1.ConsensusMsg{
Msg: randomMsg(t),
Expand Down Expand Up @@ -448,7 +449,11 @@ func TestComponentHandle(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, err := new(Component).handle(ctx, "", tt.msg)
c := &Component{
gaterFunc: func(core.Duty) bool { return true },
}

_, _, err := c.handle(ctx, "", tt.msg)
require.ErrorContains(t, err, tt.errorMsg)
})
}
Expand All @@ -464,6 +469,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) {
t.Run("MaybeStart after handle", func(t *testing.T) {
var c Component
c.deadliner = testDeadliner{}
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]instanceIO)

// Generate a p2p private key.
Expand Down Expand Up @@ -492,6 +498,7 @@ func TestInstanceIO_MaybeStart(t *testing.T) {

var c Component
c.deadliner = testDeadliner{}
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]instanceIO)
c.timerFunc = getTimerFunc()

Expand Down
4 changes: 3 additions & 1 deletion core/consensus/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func testComponent(t *testing.T, threshold, nodes int) {
sniffed <- len(msgs.Msgs)
}

c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, sniffer)
gaterFunc := func(core.Duty) bool { return true }

c, err := consensus.New(hosts[i], new(p2p.Sender), peers, p2pkeys[i], testDeadliner{}, gaterFunc, sniffer)
require.NoError(t, err)
c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error {
results <- set
Expand Down
9 changes: 7 additions & 2 deletions core/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
const defaultAllowedFutureEpochs = 2

// DutyGaterFunc is a function that returns true if the duty is allowed to be processed.
// It checks whether or not duties received from peers over the wire are too far in the future.
// It doesn't check whether or not the duty is in the past, that is done by Deadliner.
// It checks whether duties received from peers over the wire are too far in the future
// or whether the type is invalid. It doesn't check whether the duty
// is in the past, that is done by Deadliner.
type DutyGaterFunc func(Duty) bool

// WithDutyGaterForT returns a function that sets the nowFunc and allowedFutureEpochs in
Expand Down Expand Up @@ -57,6 +58,10 @@ func NewDutyGater(ctx context.Context, eth2Cl eth2wrap.Client, opts ...func(*dut
}

return func(duty Duty) bool {
if !duty.Type.Valid() {
return false
}

currentSlot := o.nowFunc().Sub(genesisTime) / slotDuration
currentEpoch := uint64(currentSlot) / slotsPerEpoch

Expand Down
26 changes: 17 additions & 9 deletions core/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ func TestDutyGater(t *testing.T) {
))
require.NoError(t, err)

typ := core.DutyAttester

// Allow slots 0-5.
require.True(t, gater(core.Duty{Slot: 0})) // Current epoch
require.True(t, gater(core.Duty{Slot: 1}))
require.True(t, gater(core.Duty{Slot: 2})) // N+1 epoch
require.True(t, gater(core.Duty{Slot: 3}))
require.True(t, gater(core.Duty{Slot: 4})) // N+2 epoch
require.True(t, gater(core.Duty{Slot: 5}))
require.True(t, gater(core.Duty{Slot: 0, Type: typ})) // Current epoch
require.True(t, gater(core.Duty{Slot: 1, Type: typ}))
require.True(t, gater(core.Duty{Slot: 2, Type: typ})) // N+1 epoch
require.True(t, gater(core.Duty{Slot: 3, Type: typ}))
require.True(t, gater(core.Duty{Slot: 4, Type: typ})) // N+2 epoch
require.True(t, gater(core.Duty{Slot: 5, Type: typ}))

// Disallow slots 6 and after.
require.False(t, gater(core.Duty{Slot: 6})) // N+3 epoch
require.False(t, gater(core.Duty{Slot: 7}))
require.False(t, gater(core.Duty{Slot: 1000}))
require.False(t, gater(core.Duty{Slot: 6, Type: typ})) // N+3 epoch
require.False(t, gater(core.Duty{Slot: 7, Type: typ}))
require.False(t, gater(core.Duty{Slot: 1000, Type: typ}))

// Disallow invalid type
require.False(t, gater(core.Duty{Slot: 0, Type: -1}))
require.False(t, gater(core.Duty{Slot: 1, Type: 0}))
require.False(t, gater(core.Duty{Slot: 2, Type: 100}))
require.False(t, gater(core.Duty{Slot: 3, Type: 1000}))
}
11 changes: 10 additions & 1 deletion core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ func Protocols() []protocol.ID {
return []protocol.ID{protocolID2}
}

func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID, verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error) *ParSigEx {
func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID,
verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error,
gaterFunc core.DutyGaterFunc,
) *ParSigEx {
parSigEx := &ParSigEx{
tcpNode: tcpNode,
sendFunc: sendFunc,
peerIdx: peerIdx,
peers: peers,
verifyFunc: verifyFunc,
gaterFunc: gaterFunc,
}

newReq := func() proto.Message { return new(pbv1.ParSigExMsg) }
Expand All @@ -50,6 +54,7 @@ type ParSigEx struct {
peerIdx int
peers []peer.ID
verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error
gaterFunc core.DutyGaterFunc
subs []func(context.Context, core.Duty, core.ParSignedDataSet) error
}

Expand All @@ -66,6 +71,10 @@ func (m *ParSigEx) handle(ctx context.Context, _ peer.ID, req proto.Message) (pr
duty := core.DutyFromProto(pb.Duty)
ctx = log.WithCtx(ctx, z.Any("duty", duty))

if !m.gaterFunc(duty) {
return nil, false, errors.New("invalid duty")
}

set, err := core.ParSignedDataSetFromProto(duty.Type, pb.DataSet)
if err != nil {
return nil, false, errors.Wrap(err, "convert parsigex proto")
Expand Down
11 changes: 8 additions & 3 deletions core/parsigex/parsigex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,20 @@ func TestParSigEx(t *testing.T) {
hosts[i].Peerstore().AddAddrs(hostsInfo[k].ID, hostsInfo[k].Addrs, peerstore.PermanentAddrTTL)
}
}
verifyFunc := func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error {
return nil
}

gaterFunc := func(core.Duty) bool {
return true
}

var wg sync.WaitGroup

// create ParSigEx components for each host
for i := 0; i < n; i++ {
wg.Add(n - 1)
sigex := parsigex.NewParSigEx(hosts[i], p2p.Send, i, peers, func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error {
return nil
})
sigex := parsigex.NewParSigEx(hosts[i], p2p.Send, i, peers, verifyFunc, gaterFunc)
sigex.Subscribe(func(_ context.Context, d core.Duty, set core.ParSignedDataSet) error {
defer wg.Done()

Expand Down
10 changes: 9 additions & 1 deletion dkg/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,18 @@ func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sig
st[sigType] = true
}

dutyGaterFunc := func(duty core.Duty) bool {
if duty.Type != core.DutySignature {
return false
}

return st[sigType(duty.Slot)]
}

ex := &exchanger{
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc),
sigTypes: st,
sigData: dataByPubkey{
store: sigTypeStore{},
Expand Down

0 comments on commit 38fb52c

Please sign in to comment.