From c58c4bb92942942907765745f9ff13d53883f3a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 11 Jun 2024 09:36:57 +0200 Subject: [PATCH] dkg: add customizable timeout Users might DKG large clusters, containing high amounts of validators (like 2.5K). With large validator amounts, the DKG process needs either more time or more power (CPU, network bandwidth) to complete. Allow users to specify a custom timeout for the entire process, allowing large DKGs on normal-powered machines. Tested 2.5K validator DKGs locally with 0.relay.obol.tech, 10 minutes timeout: completed with no issues. --- cmd/dkg.go | 2 ++ core/parsigex/parsigex.go | 11 +++++++++-- dkg/dkg.go | 3 ++- dkg/dkg_test.go | 2 ++ dkg/exchanger.go | 5 +++-- dkg/exchanger_internal_test.go | 3 ++- testutil/integration/nightly_dkg_test.go | 2 ++ 7 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cmd/dkg.go b/cmd/dkg.go index aec0ae49d..062cf0b21 100644 --- a/cmd/dkg.go +++ b/cmd/dkg.go @@ -46,6 +46,8 @@ this command at the same time.`, bindPublishFlags(cmd.Flags(), &config) bindShutdownDelayFlag(cmd.Flags(), &config.ShutdownDelay) + cmd.Flags().DurationVar(&config.Timeout, "timeout", 1*time.Minute, "Timeout for the DKG process, should be increased if DKG times out.") + return cmd } diff --git a/core/parsigex/parsigex.go b/core/parsigex/parsigex.go index a207358e8..4e0a5e5fc 100644 --- a/core/parsigex/parsigex.go +++ b/core/parsigex/parsigex.go @@ -29,7 +29,7 @@ func Protocols() []protocol.ID { func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID, verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error, - gaterFunc core.DutyGaterFunc, + gaterFunc core.DutyGaterFunc, p2pOpts ...p2p.SendRecvOption, ) *ParSigEx { parSigEx := &ParSigEx{ tcpNode: tcpNode, @@ -41,7 +41,14 @@ func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers [] } newReq := func() proto.Message { return new(pbv1.ParSigExMsg) } - p2p.RegisterHandler("parsigex", tcpNode, protocolID2, newReq, parSigEx.handle) + p2p.RegisterHandler( + "parsigex", + tcpNode, + protocolID2, + newReq, + parSigEx.handle, + p2pOpts..., + ) return parSigEx } diff --git a/dkg/dkg.go b/dkg/dkg.go index eba70dfa7..7a8189c60 100644 --- a/dkg/dkg.go +++ b/dkg/dkg.go @@ -46,6 +46,7 @@ type Config struct { P2P p2p.Config Log log.Config ShutdownDelay time.Duration + Timeout time.Duration KeymanagerAddr string KeymanagerAuthToken string @@ -190,7 +191,7 @@ func Run(ctx context.Context, conf Config) (err error) { sigLock, sigDepositData, sigValidatorRegistration, - }) + }, conf.Timeout) // Register Frost libp2p handlers peerMap := make(map[peer.ID]cluster.NodeIdx) diff --git a/dkg/dkg_test.go b/dkg/dkg_test.go index 013124c68..f175da525 100644 --- a/dkg/dkg_test.go +++ b/dkg/dkg_test.go @@ -148,6 +148,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri }, ShutdownDelay: 1 * time.Second, PublishTimeout: 30 * time.Second, + Timeout: 8 * time.Second, } allReceivedKeystores := make(chan struct{}) // Receives struct{} for each `numNodes` keystore intercepted by the keymanager server @@ -633,6 +634,7 @@ func getConfigs(t *testing.T, def cluster.Definition, keys []*k1.PrivateKey, dir }, TCPNodeCallback: tcpNodeCallback, }, + Timeout: 8 * time.Second, } require.NoError(t, os.MkdirAll(conf.DataDir, 0o755)) diff --git a/dkg/exchanger.go b/dkg/exchanger.go index a2e002dd2..378597468 100644 --- a/dkg/exchanger.go +++ b/dkg/exchanger.go @@ -6,6 +6,7 @@ import ( "context" "slices" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -55,7 +56,7 @@ type exchanger struct { sigDatasChan chan map[core.PubKey][]core.ParSignedData } -func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType) *exchanger { +func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType, timeout time.Duration) *exchanger { // Partial signature roots not known yet, so skip verification in parsigex, rather verify before we aggregate. noopVerifier := func(ctx context.Context, duty core.Duty, key core.PubKey, data core.ParSignedData) error { return nil @@ -82,7 +83,7 @@ func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sig ex := &exchanger{ // threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}), - sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc), + sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc, p2p.WithSendTimeout(timeout), p2p.WithReceiveTimeout(timeout)), sigTypes: st, sigData: dataByPubkey{ store: sigTypeStore{}, diff --git a/dkg/exchanger_internal_test.go b/dkg/exchanger_internal_test.go index d91b06726..aece6c7be 100644 --- a/dkg/exchanger_internal_test.go +++ b/dkg/exchanger_internal_test.go @@ -7,6 +7,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -89,7 +90,7 @@ func TestExchanger(t *testing.T) { } for i := 0; i < nodes; i++ { - ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes) + ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes, 8*time.Second) exchangers = append(exchangers, ex) } diff --git a/testutil/integration/nightly_dkg_test.go b/testutil/integration/nightly_dkg_test.go index ef7b04974..ee8a4cc3d 100644 --- a/testutil/integration/nightly_dkg_test.go +++ b/testutil/integration/nightly_dkg_test.go @@ -92,6 +92,7 @@ func TestLongWaitDKG(t *testing.T) { TestConfig: dkg.TestConfig{ Def: &def, }, + Timeout: 10 * time.Minute, } windowTicker := time.NewTicker(window) @@ -291,6 +292,7 @@ func TestDKGWithHighValidatorsAmt(t *testing.T) { TestConfig: dkg.TestConfig{ Def: &def, }, + Timeout: 10 * time.Minute, } dir := t.TempDir()