Skip to content

Commit

Permalink
dkg: add customizable timeout
Browse files Browse the repository at this point in the history
Users might DKG large clusters, containing high amounts of validators (like 2.5K).

With large validator amounts, the DKG process needs either more time or more power (CPU, network bandwidth) to complete.

Allow users to specify a custom timeout for the entire process, allowing large DKGs on normal-powered machines.

Tested 2.5K validator DKGs locally with 0.relay.obol.tech, 10 minutes timeout: completed with no issues.
  • Loading branch information
gsora committed Jun 11, 2024
1 parent f286523 commit c58c4bb
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ this command at the same time.`,
bindPublishFlags(cmd.Flags(), &config)
bindShutdownDelayFlag(cmd.Flags(), &config.ShutdownDelay)

cmd.Flags().DurationVar(&config.Timeout, "timeout", 1*time.Minute, "Timeout for the DKG process, should be increased if DKG times out.")

Check warning on line 50 in cmd/dkg.go

View check run for this annotation

Codecov / codecov/patch

cmd/dkg.go#L49-L50

Added lines #L49 - L50 were not covered by tests
return cmd
}

Expand Down
11 changes: 9 additions & 2 deletions core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Protocols() []protocol.ID {

func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID,
verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error,
gaterFunc core.DutyGaterFunc,
gaterFunc core.DutyGaterFunc, p2pOpts ...p2p.SendRecvOption,
) *ParSigEx {
parSigEx := &ParSigEx{
tcpNode: tcpNode,
Expand All @@ -41,7 +41,14 @@ func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []
}

newReq := func() proto.Message { return new(pbv1.ParSigExMsg) }
p2p.RegisterHandler("parsigex", tcpNode, protocolID2, newReq, parSigEx.handle)
p2p.RegisterHandler(
"parsigex",
tcpNode,
protocolID2,
newReq,
parSigEx.handle,
p2pOpts...,
)

return parSigEx
}
Expand Down
3 changes: 2 additions & 1 deletion dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
P2P p2p.Config
Log log.Config
ShutdownDelay time.Duration
Timeout time.Duration

KeymanagerAddr string
KeymanagerAuthToken string
Expand Down Expand Up @@ -190,7 +191,7 @@ func Run(ctx context.Context, conf Config) (err error) {
sigLock,
sigDepositData,
sigValidatorRegistration,
})
}, conf.Timeout)

// Register Frost libp2p handlers
peerMap := make(map[peer.ID]cluster.NodeIdx)
Expand Down
2 changes: 2 additions & 0 deletions dkg/dkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri
},
ShutdownDelay: 1 * time.Second,
PublishTimeout: 30 * time.Second,
Timeout: 8 * time.Second,
}

allReceivedKeystores := make(chan struct{}) // Receives struct{} for each `numNodes` keystore intercepted by the keymanager server
Expand Down Expand Up @@ -633,6 +634,7 @@ func getConfigs(t *testing.T, def cluster.Definition, keys []*k1.PrivateKey, dir
},
TCPNodeCallback: tcpNodeCallback,
},
Timeout: 8 * time.Second,
}
require.NoError(t, os.MkdirAll(conf.DataDir, 0o755))

Expand Down
5 changes: 3 additions & 2 deletions dkg/exchanger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"slices"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -55,7 +56,7 @@ type exchanger struct {
sigDatasChan chan map[core.PubKey][]core.ParSignedData
}

func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType) *exchanger {
func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType, timeout time.Duration) *exchanger {
// Partial signature roots not known yet, so skip verification in parsigex, rather verify before we aggregate.
noopVerifier := func(ctx context.Context, duty core.Duty, key core.PubKey, data core.ParSignedData) error {
return nil
Expand All @@ -82,7 +83,7 @@ func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sig
ex := &exchanger{
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc),
sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier, dutyGaterFunc, p2p.WithSendTimeout(timeout), p2p.WithReceiveTimeout(timeout)),
sigTypes: st,
sigData: dataByPubkey{
store: sigTypeStore{},
Expand Down
3 changes: 2 additions & 1 deletion dkg/exchanger_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestExchanger(t *testing.T) {
}

for i := 0; i < nodes; i++ {
ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes)
ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes, 8*time.Second)
exchangers = append(exchangers, ex)
}

Expand Down
2 changes: 2 additions & 0 deletions testutil/integration/nightly_dkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestLongWaitDKG(t *testing.T) {
TestConfig: dkg.TestConfig{
Def: &def,
},
Timeout: 10 * time.Minute,
}

windowTicker := time.NewTicker(window)
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestDKGWithHighValidatorsAmt(t *testing.T) {
TestConfig: dkg.TestConfig{
Def: &def,
},
Timeout: 10 * time.Minute,
}

dir := t.TempDir()
Expand Down

0 comments on commit c58c4bb

Please sign in to comment.