Skip to content

Commit

Permalink
dkg: wait for all validator key sigs (#2510)
Browse files Browse the repository at this point in the history
Wait until all peer's validator signatures over exchanger-handled data.

This should fix nightly test.

category: bug
ticket: none
  • Loading branch information
gsora authored Aug 4, 2023
1 parent 2b768e3 commit 041ff5e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
20 changes: 12 additions & 8 deletions dkg/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
type sigType int

const (
// dutyLock is responsible for lock hash signed partial signatures exchange and aggregation.
// sigLock is responsible for lock hash signed partial signatures exchange and aggregation.
sigLock sigType = 101
// dutyDepositData is responsible for deposit data signed partial signatures exchange and aggregation.
// sigDepositData is responsible for deposit data signed partial signatures exchange and aggregation.
sigDepositData sigType = 102
// sigValidatorRegistration is responsible for the pre-generated validator registration exchange and aggregation.
sigValidatorRegistration sigType = 103
Expand All @@ -38,8 +38,9 @@ type sigTypeStore map[sigType]map[core.PubKey][]core.ParSignedData

// dataByPubkey maps a sigType to its map of public key to slice of core.ParSignedData..
type dataByPubkey struct {
store sigTypeStore
lock sync.Mutex
numVals int
store sigTypeStore
lock sync.Mutex
}

// set sets data for the given sigType and core.PubKey.
Expand All @@ -65,6 +66,10 @@ func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedD
return nil, ok
}

if len(data) != stb.numVals {
return nil, false
}

ret := make(map[core.PubKey][]core.ParSignedData)

for k, v := range data {
Expand All @@ -78,7 +83,6 @@ func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedD
type exchanger struct {
sigex *parsigex.ParSigEx
sigdb *parsigdb.MemDB
numVals int
sigTypes map[sigType]bool
sigData dataByPubkey
}
Expand All @@ -99,11 +103,11 @@ func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sig
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier),
numVals: vals,
sigTypes: st,
sigData: dataByPubkey{
store: sigTypeStore{},
lock: sync.Mutex{},
store: sigTypeStore{},
numVals: vals,
lock: sync.Mutex{},
},
}

Expand Down
52 changes: 39 additions & 13 deletions dkg/exchanger_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func TestExchanger(t *testing.T) {
hosts []host.Host
hostsInfo []peer.AddrInfo
exchangers []*exchanger

expectedSigTypes = []sigType{
sigLock,
sigDepositData,
sigValidatorRegistration,
}
)

// Create hosts
Expand All @@ -83,15 +89,16 @@ func TestExchanger(t *testing.T) {
}

for i := 0; i < nodes; i++ {
ex := newExchanger(hosts[i], i, peers, dvs, []sigType{
sigLock,
sigDepositData,
sigValidatorRegistration,
})
ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes)
exchangers = append(exchangers, ex)
}

respChan := make(chan map[core.PubKey][]core.ParSignedData)
type respStruct struct {
data map[core.PubKey][]core.ParSignedData
sigType sigType
}

respChan := make(chan respStruct)
var wg sync.WaitGroup

// send multiple (supported) messages at the same time, showing that exchanger can exchange messages of various
Expand All @@ -104,15 +111,21 @@ func TestExchanger(t *testing.T) {
data, err := exchangers[node].exchange(ctx, sigDepositData, dataToBeSent[node])
require.NoError(t, err)

respChan <- data
respChan <- respStruct{
data: data,
sigType: sigDepositData,
}
}(i)
go func(node int) {
defer wg.Done()

data, err := exchangers[node].exchange(ctx, sigValidatorRegistration, dataToBeSent[node])
require.NoError(t, err)

respChan <- data
respChan <- respStruct{
data: data,
sigType: sigValidatorRegistration,
}
}(i)
}

Expand All @@ -124,7 +137,10 @@ func TestExchanger(t *testing.T) {
data, err := exchangers[node].exchange(ctx, sigLock, dataToBeSent[node])
require.NoError(t, err)

respChan <- data
respChan <- respStruct{
data: data,
sigType: sigLock,
}
}(i)
}

Expand All @@ -133,12 +149,22 @@ func TestExchanger(t *testing.T) {
close(respChan) // Closes response channel once all the goroutines are done with writing.
}()

var actual []map[core.PubKey][]core.ParSignedData
actual := make(sigTypeStore)
for res := range respChan {
actual = append(actual, res)
actual[res.sigType] = res.data
}

for i := 0; i < nodes; i++ {
reflect.DeepEqual(actual[i], expectedData)
// test that data we expected arrived, for each sigType
for _, data := range actual {
reflect.DeepEqual(data, expectedData)
}

// test that all sigTypes expected to arrive actually arrived
for _, expectedSigType := range expectedSigTypes {
_, ok := actual[expectedSigType]
require.True(t, ok, "missing sigType %d from received data", expectedSigType)
}

// require that we encountered all the sigTypes expected
require.Len(t, actual, len(expectedSigTypes))
}

0 comments on commit 041ff5e

Please sign in to comment.