Skip to content

Commit

Permalink
Merge pull request #3183 from keep-network/pre-params-8
Browse files Browse the repository at this point in the history
Pre-param generation: cleanup and wiring this together
  • Loading branch information
lukasz-zimnoch authored Aug 19, 2022
2 parents 6b234e8 + 0a3eb27 commit 554dfe4
Show file tree
Hide file tree
Showing 17 changed files with 793 additions and 459 deletions.
2 changes: 1 addition & 1 deletion cmd/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ var cmdFlagsTests = map[string]struct {
flagName: "--tbtc.preParamsPoolSize",
flagValue: "75",
expectedValueFromFlag: 75,
defaultValue: 50,
defaultValue: 3000,
},
"tbtc.preParamsGenerationTimeout": {
readValueFunc: func(c *config.Config) interface{} { return c.Tbtc.PreParamsGenerationTimeout },
Expand Down
19 changes: 6 additions & 13 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"context"
"fmt"

"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/tbtc"

"github.com/keep-network/keep-core/config"
Expand Down Expand Up @@ -103,11 +105,14 @@ func start(cmd *cobra.Command) error {
return fmt.Errorf("cannot initialize tbtc persistence: [%w]", err)
}

scheduler := generator.StartScheduler()

err = beacon.Initialize(
ctx,
beaconChain,
netProvider,
beaconPersistence,
scheduler,
)
if err != nil {
return fmt.Errorf("error initializing beacon: [%v]", err)
Expand All @@ -118,6 +123,7 @@ func start(cmd *cobra.Command) error {
tbtcChain,
netProvider,
tbtcPersistence,
scheduler,
clientConfig.Tbtc,
)
if err != nil {
Expand All @@ -141,19 +147,6 @@ func initializePersistence(clientConfig *config.Config, application string) (
persistence.Handle,
error,
) {
err := persistence.EnsureDirectoryExists(
clientConfig.Storage.DataDir,
application,
)
if err != nil {
return nil, fmt.Errorf(
"cannot create storage directory for "+
"application [%v]: [%w]",
application,
err,
)
}

path := fmt.Sprintf("%s/%s", clientConfig.Storage.DataDir, application)

diskHandle, err := persistence.NewDiskHandle(path)
Expand Down
3 changes: 3 additions & 0 deletions pkg/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/sortition"

"github.com/ipfs/go-log"
Expand All @@ -31,6 +32,7 @@ func Initialize(
beaconChain beaconchain.Interface,
netProvider net.Provider,
persistence persistence.Handle,
scheduler *generator.Scheduler,
) error {
groupRegistry := registry.NewGroupRegistry(logger, beaconChain, persistence)
groupRegistry.LoadExistingGroups()
Expand All @@ -39,6 +41,7 @@ func Initialize(
beaconChain,
netProvider,
groupRegistry,
scheduler,
)

err := sortition.MonitorPool(ctx, logger, beaconChain, sortition.DefaultStatusCheckTick)
Expand Down
13 changes: 13 additions & 0 deletions pkg/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/keep-network/keep-core/pkg/beacon/dkg"
"github.com/keep-network/keep-core/pkg/beacon/entry"
"github.com/keep-network/keep-core/pkg/beacon/event"
"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/protocol/group"

"github.com/keep-network/keep-core/pkg/beacon/registry"
Expand All @@ -22,6 +23,7 @@ type node struct {
beaconChain beaconchain.Interface
netProvider net.Provider
groupRegistry *registry.Groups
protocolLatch *generator.ProtocolLatch
}

// newNode returns an empty node with no group, zero group count, and a nil last
Expand All @@ -30,11 +32,16 @@ func newNode(
beaconChain beaconchain.Interface,
netProvider net.Provider,
groupRegistry *registry.Groups,
scheduler *generator.Scheduler,
) *node {
latch := generator.NewProtocolLatch()
scheduler.RegisterProtocol(latch)

return &node{
beaconChain: beaconChain,
netProvider: netProvider,
groupRegistry: groupRegistry,
protocolLatch: latch,
}
}

Expand Down Expand Up @@ -136,6 +143,9 @@ func (n *node) JoinDKGIfEligible(
memberIndex := index + 1

go func() {
n.protocolLatch.Lock()
defer n.protocolLatch.Unlock()

signer, err := dkg.ExecuteDKG(
logger,
dkgSeed,
Expand Down Expand Up @@ -359,6 +369,9 @@ func (n *node) GenerateRelayEntry(

for _, member := range memberships {
go func(member *registry.Membership) {
n.protocolLatch.Lock()
defer n.protocolLatch.Unlock()

err = entry.SignAndSubmit(
logger,
blockCounter,
Expand Down
30 changes: 30 additions & 0 deletions pkg/generator/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package generator

import (
"time"

"github.com/ipfs/go-log"
)

var logger = log.Logger("keep-generator")

const checkTick = 1 * time.Second

// StartScheduler creates a new instance of a Scheduler that is responsible
// for managing long-running, computationally-expensive operations.
// The scheduler stops and resumes operations based on the state of registered
// protocols. If at least one of the protocols is currently executing, the
// scheduler stops all computations. Computations are automatically resumed once
// none of the protocols is executing.
func StartScheduler() *Scheduler {
scheduler := &Scheduler{}

go func() {
for {
scheduler.checkProtocols()
time.Sleep(checkTick)
}
}()

return scheduler
}
53 changes: 53 additions & 0 deletions pkg/generator/latch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package generator

import "sync"

// ProtocolLatch increases the internal counter every time protocol execution
// starts and decreases the counter every time protocol execution completes.
// The latch implements Protocol interface and can be registered in the
// Scheduler.
//
// The protocol code using the latch must guarantee that:
// 1. `Lock()` is always called before `Unlock()`
// 2. `Unlock()` is eventually called for every `Lock()`.
//
// Note that the Unlock() function may panic if the conditions are not met.
type ProtocolLatch struct {
counter uint64
mutex sync.RWMutex
}

// NewProtocolLatch returns a new instance of the latch with 0 counter value.
func NewProtocolLatch() *ProtocolLatch {
return &ProtocolLatch{}
}

// Lock increases the counter on the latch by one.
func (pl *ProtocolLatch) Lock() {
pl.mutex.Lock()
defer pl.mutex.Unlock()

pl.counter++
}

// Unlock decreases the counter on the latch by one. Unlock panics if no Lock
// was called before.
func (pl *ProtocolLatch) Unlock() {
pl.mutex.Lock()
defer pl.mutex.Unlock()

if pl.counter == 0 {
panic("Lock was not called before Unlock")
}

pl.counter--
}

// IsExecuting returns true if the latch counter is 0. This is happening when
// the same number of Unlock and Lock happened.
func (pl *ProtocolLatch) IsExecuting() bool {
pl.mutex.RLock()
defer pl.mutex.RUnlock()

return pl.counter != 0
}
52 changes: 52 additions & 0 deletions pkg/generator/latch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package generator

import (
"testing"
)

func TestIsExecuting(t *testing.T) {
latch := NewProtocolLatch()

if latch.IsExecuting() {
t.Errorf("protocol is not executing now")
}

latch.Lock()

if !latch.IsExecuting() {
t.Errorf("protocol is executing now")
}

latch.Unlock()

if latch.IsExecuting() {
t.Errorf("protocol is not executing now")
}

latch.Lock()
latch.Lock()
latch.Unlock()

if !latch.IsExecuting() {
t.Errorf("protocol is executing now")
}

latch.Unlock()

if latch.IsExecuting() {
t.Errorf("protocol is not executing now")
}
}

// TestUnlockPanic ensures the Unlock() function panics if Lock() was called
// before.
func TestUnlockPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("Unlock should panic")
}
}()

latch := NewProtocolLatch()
latch.Unlock()
}
10 changes: 5 additions & 5 deletions pkg/miner/pool.go → pkg/generator/pool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package miner
package generator

import (
"context"
Expand All @@ -22,8 +22,8 @@ type Persistence[T any] interface {
// function up to the pool size. Parameters are stored in the cache and
// persisted using the provided persistence layer to survive client restarts.
// When a parameter is pulled from the pool, the pool starts generating a new
// parameter automatically. The pool submits the work to the provided miner
// instance and can be controlled by the miner.
// parameter automatically. The pool submits the work to the provided scheduler
// instance and can be controlled by the scheduler.
type ParameterPool[T any] struct {
persistence Persistence[T]
pool chan *T
Expand All @@ -32,7 +32,7 @@ type ParameterPool[T any] struct {
// NewParameterPool creates a new instance of ParameterPool.
func NewParameterPool[T any](
logger log.StandardLogger,
miner *Miner,
scheduler *Scheduler,
persistence Persistence[T],
targetSize int,
generateFn func(context.Context) *T,
Expand All @@ -48,7 +48,7 @@ func NewParameterPool[T any](
pool <- parameter
}

miner.Mine(func(ctx context.Context) {
scheduler.compute(func(ctx context.Context) {
start := time.Now()

generated := generateFn(ctx)
Expand Down
Loading

0 comments on commit 554dfe4

Please sign in to comment.