From 041ff5e0ee9645bf9f6d4c781ae70008e1b47377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Fri, 4 Aug 2023 12:43:33 +0200 Subject: [PATCH] dkg: wait for all validator key sigs (#2510) Wait until all peer's validator signatures over exchanger-handled data. This should fix nightly test. category: bug ticket: none --- dkg/exchanger.go | 20 +++++++------ dkg/exchanger_internal_test.go | 52 +++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/dkg/exchanger.go b/dkg/exchanger.go index 9802e01d4..0943c2387 100644 --- a/dkg/exchanger.go +++ b/dkg/exchanger.go @@ -25,9 +25,9 @@ import ( type sigType int const ( - // dutyLock is responsible for lock hash signed partial signatures exchange and aggregation. + // sigLock is responsible for lock hash signed partial signatures exchange and aggregation. sigLock sigType = 101 - // dutyDepositData is responsible for deposit data signed partial signatures exchange and aggregation. + // sigDepositData is responsible for deposit data signed partial signatures exchange and aggregation. sigDepositData sigType = 102 // sigValidatorRegistration is responsible for the pre-generated validator registration exchange and aggregation. sigValidatorRegistration sigType = 103 @@ -38,8 +38,9 @@ type sigTypeStore map[sigType]map[core.PubKey][]core.ParSignedData // dataByPubkey maps a sigType to its map of public key to slice of core.ParSignedData.. type dataByPubkey struct { - store sigTypeStore - lock sync.Mutex + numVals int + store sigTypeStore + lock sync.Mutex } // set sets data for the given sigType and core.PubKey. @@ -65,6 +66,10 @@ func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedD return nil, ok } + if len(data) != stb.numVals { + return nil, false + } + ret := make(map[core.PubKey][]core.ParSignedData) for k, v := range data { @@ -78,7 +83,6 @@ func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedD type exchanger struct { sigex *parsigex.ParSigEx sigdb *parsigdb.MemDB - numVals int sigTypes map[sigType]bool sigData dataByPubkey } @@ -99,11 +103,11 @@ func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sig // threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}), sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier), - numVals: vals, sigTypes: st, sigData: dataByPubkey{ - store: sigTypeStore{}, - lock: sync.Mutex{}, + store: sigTypeStore{}, + numVals: vals, + lock: sync.Mutex{}, }, } diff --git a/dkg/exchanger_internal_test.go b/dkg/exchanger_internal_test.go index 9974cdb22..0bff8647f 100644 --- a/dkg/exchanger_internal_test.go +++ b/dkg/exchanger_internal_test.go @@ -58,6 +58,12 @@ func TestExchanger(t *testing.T) { hosts []host.Host hostsInfo []peer.AddrInfo exchangers []*exchanger + + expectedSigTypes = []sigType{ + sigLock, + sigDepositData, + sigValidatorRegistration, + } ) // Create hosts @@ -83,15 +89,16 @@ func TestExchanger(t *testing.T) { } for i := 0; i < nodes; i++ { - ex := newExchanger(hosts[i], i, peers, dvs, []sigType{ - sigLock, - sigDepositData, - sigValidatorRegistration, - }) + ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes) exchangers = append(exchangers, ex) } - respChan := make(chan map[core.PubKey][]core.ParSignedData) + type respStruct struct { + data map[core.PubKey][]core.ParSignedData + sigType sigType + } + + respChan := make(chan respStruct) var wg sync.WaitGroup // send multiple (supported) messages at the same time, showing that exchanger can exchange messages of various @@ -104,7 +111,10 @@ func TestExchanger(t *testing.T) { data, err := exchangers[node].exchange(ctx, sigDepositData, dataToBeSent[node]) require.NoError(t, err) - respChan <- data + respChan <- respStruct{ + data: data, + sigType: sigDepositData, + } }(i) go func(node int) { defer wg.Done() @@ -112,7 +122,10 @@ func TestExchanger(t *testing.T) { data, err := exchangers[node].exchange(ctx, sigValidatorRegistration, dataToBeSent[node]) require.NoError(t, err) - respChan <- data + respChan <- respStruct{ + data: data, + sigType: sigValidatorRegistration, + } }(i) } @@ -124,7 +137,10 @@ func TestExchanger(t *testing.T) { data, err := exchangers[node].exchange(ctx, sigLock, dataToBeSent[node]) require.NoError(t, err) - respChan <- data + respChan <- respStruct{ + data: data, + sigType: sigLock, + } }(i) } @@ -133,12 +149,22 @@ func TestExchanger(t *testing.T) { close(respChan) // Closes response channel once all the goroutines are done with writing. }() - var actual []map[core.PubKey][]core.ParSignedData + actual := make(sigTypeStore) for res := range respChan { - actual = append(actual, res) + actual[res.sigType] = res.data } - for i := 0; i < nodes; i++ { - reflect.DeepEqual(actual[i], expectedData) + // test that data we expected arrived, for each sigType + for _, data := range actual { + reflect.DeepEqual(data, expectedData) + } + + // test that all sigTypes expected to arrive actually arrived + for _, expectedSigType := range expectedSigTypes { + _, ok := actual[expectedSigType] + require.True(t, ok, "missing sigType %d from received data", expectedSigType) } + + // require that we encountered all the sigTypes expected + require.Len(t, actual, len(expectedSigTypes)) }