Skip to content

Commit

Permalink
dkg: exchange sigTypes concurrently (#2496)
Browse files Browse the repository at this point in the history
Before this commit, calling `exchange()` would listen for new incoming messages and discard everything that didn't carry the expected `sigType`.

In a DKG one node might fall behind the others (for example, while DKG'ing 3000 validator keys) and send `sigTypes` that the other peers have already received threshold for.

This PR makes sure a given instance of `exchanger` will hold onto all messages whose `sigTypes` are specified during its initialization phase.

Calling `exchange()` will return as soon as there are enough messages for the specified `sigType`, but different `sigType`s messages will still be stored and will be returned when requested by means of another `exchange()` call, with a different `sigType` argument.

category: bug
ticket: #2491

Closes #2491.
  • Loading branch information
gsora authored Aug 2, 2023
1 parent 7d3fb56 commit 7371573
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 34 deletions.
6 changes: 5 additions & 1 deletion dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ func Run(ctx context.Context, conf Config) (err error) {
return errors.Wrap(err, "get peer IDs")
}

ex := newExchanger(tcpNode, nodeIdx.PeerIdx, peerIds, def.NumValidators)
ex := newExchanger(tcpNode, nodeIdx.PeerIdx, peerIds, def.NumValidators, []sigType{
sigLock,
sigDepositData,
sigValidatorRegistration,
})

// Register Frost libp2p handlers
peerMap := make(map[peer.ID]cluster.NodeIdx)
Expand Down
109 changes: 77 additions & 32 deletions dkg/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ package dkg

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/parsigdb"
"github.com/obolnetwork/charon/core/parsigex"
Expand All @@ -29,33 +33,78 @@ const (
sigValidatorRegistration sigType = 103
)

// sigData includes the fields obtained from sigdb when threshold is reached.
type sigData struct {
sigType sigType
pubkey core.PubKey
psigs []core.ParSignedData
// sigTypeStore is a shorthand for a map of sigType to map of core.PubKey to slice of core.ParSignedData.
type sigTypeStore map[sigType]map[core.PubKey][]core.ParSignedData

// dataByPubkey maps a sigType to its map of public key to slice of core.ParSignedData..
type dataByPubkey struct {
store sigTypeStore
lock sync.Mutex
}

// set sets data for the given sigType and core.PubKey.
func (stb *dataByPubkey) set(pubKey core.PubKey, sigType sigType, data []core.ParSignedData) {
stb.lock.Lock()
defer stb.lock.Unlock()

_, ok := stb.store[sigType]
if !ok {
stb.store[sigType] = map[core.PubKey][]core.ParSignedData{}
}

stb.store[sigType][pubKey] = data
}

// get gets all the core.ParSignedData for a given core.PubKey.
func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedData, bool) {
stb.lock.Lock()
defer stb.lock.Unlock()

data, ok := stb.store[sigType]
if !ok {
return nil, ok
}

ret := make(map[core.PubKey][]core.ParSignedData)

for k, v := range data {
ret[k] = v
}

return ret, ok
}

// exchanger is responsible for exchanging partial signatures between peers on libp2p.
type exchanger struct {
sigChan chan sigData
sigex *parsigex.ParSigEx
sigdb *parsigdb.MemDB
numVals int
sigex *parsigex.ParSigEx
sigdb *parsigdb.MemDB
numVals int
sigTypes map[sigType]bool
sigData dataByPubkey
}

func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int) *exchanger {
func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType) *exchanger {
// Partial signature roots not known yet, so skip verification in parsigex, rather verify before we aggregate.
noopVerifier := func(ctx context.Context, duty core.Duty, key core.PubKey, data core.ParSignedData) error {
return nil
}

st := make(map[sigType]bool)

for _, sigType := range sigTypes {
st[sigType] = true
}

ex := &exchanger{
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier),
sigChan: make(chan sigData, vals), // Allow buffering all signature sets
numVals: vals,
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier),
numVals: vals,
sigTypes: st,
sigData: dataByPubkey{
store: sigTypeStore{},
lock: sync.Mutex{},
},
}

// Wiring core workflow components
Expand All @@ -76,36 +125,32 @@ func (e *exchanger) exchange(ctx context.Context, sigType sigType, set core.ParS
return nil, err
}

sets := make(map[core.PubKey][]core.ParSignedData)
tick := time.NewTicker(50 * time.Millisecond)
defer tick.Stop()

for {
select {
case <-tick.C:
// We are done when we have ParSignedData of all the DVs from all each peer
if data, ok := e.sigData.get(sigType); ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
case peerSet := <-e.sigChan:
if sigType != peerSet.sigType {
// Do nothing if duty doesn't match
continue
}
sets[peerSet.pubkey] = peerSet.psigs
}

// We are done when we have ParSignedData of all the DVs from all each peer
if len(sets) == e.numVals {
break
}
}

return sets, nil
}

// pushPsigs is responsible for writing partial signature data to sigChan obtained from other peers.
func (e *exchanger) pushPsigs(_ context.Context, duty core.Duty, pk core.PubKey, psigs []core.ParSignedData) error {
e.sigChan <- sigData{
sigType: sigType(duty.Slot),
pubkey: pk,
psigs: psigs,
sigType := sigType(duty.Slot)

if !e.sigTypes[sigType] {
return errors.New("unrecognized sigType", z.Int("sigType", int(sigType)))
}

e.sigData.set(pk, sigType, psigs)

return nil
}

Expand Down
29 changes: 28 additions & 1 deletion dkg/exchanger_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,39 @@ func TestExchanger(t *testing.T) {
}

for i := 0; i < nodes; i++ {
ex := newExchanger(hosts[i], i, peers, dvs)
ex := newExchanger(hosts[i], i, peers, dvs, []sigType{
sigLock,
sigDepositData,
sigValidatorRegistration,
})
exchangers = append(exchangers, ex)
}

respChan := make(chan map[core.PubKey][]core.ParSignedData)
var wg sync.WaitGroup

// send multiple (supported) messages at the same time, showing that exchanger can exchange messages of various
// sigTypes concurrently
for i := 0; i < nodes; i++ {
wg.Add(2)
go func(node int) {
defer wg.Done()

data, err := exchangers[node].exchange(ctx, sigDepositData, dataToBeSent[node])
require.NoError(t, err)

respChan <- data
}(i)
go func(node int) {
defer wg.Done()

data, err := exchangers[node].exchange(ctx, sigValidatorRegistration, dataToBeSent[node])
require.NoError(t, err)

respChan <- data
}(i)
}

for i := 0; i < nodes; i++ {
wg.Add(1)
go func(node int) {
Expand Down

0 comments on commit 7371573

Please sign in to comment.