From d555fc450dfa8fddddf5203bb29d5b7acf4811ed Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 27 Sep 2023 19:09:36 +0200 Subject: [PATCH] Fixed ungraceful shutdown (#8307) --- cl/pool/operation_pool.go | 32 +++++++++++++ cl/pool/operations_pool.go | 43 +++++++++++++++++ cl/pool/operations_pool_test.go | 71 +++++++++++++++++++++++++++++ erigon-lib/downloader/downloader.go | 8 +++- 4 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 cl/pool/operation_pool.go create mode 100644 cl/pool/operations_pool.go create mode 100644 cl/pool/operations_pool_test.go diff --git a/cl/pool/operation_pool.go b/cl/pool/operation_pool.go new file mode 100644 index 00000000000..0aa8fc3ed61 --- /dev/null +++ b/cl/pool/operation_pool.go @@ -0,0 +1,32 @@ +package pool + +import ( + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/phase1/core/state/lru" +) + +var operationsMultiplier = 10 // Cap the amount of cached element to max_operations_per_block * operations_multiplier + +type OperationPool[T any] struct { + pool *lru.Cache[libcommon.Bytes96, T] // Map the Signature to the underlying object +} + +func NewOperationPool[T any](maxOperationsPerBlock int, matricName string) *OperationPool[T] { + pool, err := lru.New[libcommon.Bytes96, T](matricName, maxOperationsPerBlock*operationsMultiplier) + if err != nil { + panic(err) + } + return &OperationPool[T]{pool: pool} +} + +func (o *OperationPool[T]) Insert(signature libcommon.Bytes96, operation T) { + o.pool.Add(signature, operation) +} + +func (o *OperationPool[T]) DeleteIfExist(signature libcommon.Bytes96) (removed bool) { + return o.pool.Remove(signature) +} + +func (o *OperationPool[T]) Raw() []T { + return o.pool.Values() +} diff --git a/cl/pool/operations_pool.go b/cl/pool/operations_pool.go new file mode 100644 index 00000000000..8b6ab64db11 --- /dev/null +++ b/cl/pool/operations_pool.go @@ -0,0 +1,43 @@ +package pool + +import ( + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/utils" +) + +// DoubleSignatureKey uses blake2b algorithm to merge two signatures together. blake2 is faster than sha3. +func doubleSignatureKey(one, two libcommon.Bytes96) (out libcommon.Bytes96) { + res := utils.Keccak256(one[:], two[:]) + copy(out[:], res[:]) + return +} + +func ComputeKeyForProposerSlashing(slashing *cltypes.ProposerSlashing) libcommon.Bytes96 { + return doubleSignatureKey(slashing.Header1.Signature, slashing.Header2.Signature) +} + +func ComputeKeyForAttesterSlashing(slashing *cltypes.AttesterSlashing) libcommon.Bytes96 { + return doubleSignatureKey(slashing.Attestation_1.Signature, slashing.Attestation_2.Signature) +} + +// OperationsPool is the collection of all gossip-collectable operations. +type OperationsPool struct { + AttestationsPool *OperationPool[*solid.Attestation] + AttesterSlashingsPool *OperationPool[*cltypes.AttesterSlashing] + ProposerSlashingsPool *OperationPool[*cltypes.ProposerSlashing] + BLSToExecutionChangesPool *OperationPool[*cltypes.SignedBLSToExecutionChange] + VoluntaryExistsPool *OperationPool[*cltypes.SignedVoluntaryExit] +} + +func NewOperationsPool(beaconCfg *clparams.BeaconChainConfig) OperationsPool { + return OperationsPool{ + AttestationsPool: NewOperationPool[*solid.Attestation](int(beaconCfg.MaxAttestations), "attestationsPool"), + AttesterSlashingsPool: NewOperationPool[*cltypes.AttesterSlashing](int(beaconCfg.MaxAttestations), "attesterSlashingsPool"), + ProposerSlashingsPool: NewOperationPool[*cltypes.ProposerSlashing](int(beaconCfg.MaxAttestations), "proposerSlashingsPool"), + BLSToExecutionChangesPool: NewOperationPool[*cltypes.SignedBLSToExecutionChange](int(beaconCfg.MaxBlsToExecutionChanges), "blsExecutionChangesPool"), + VoluntaryExistsPool: NewOperationPool[*cltypes.SignedVoluntaryExit](int(beaconCfg.MaxBlsToExecutionChanges), "voluntaryExitsPool"), + } +} diff --git a/cl/pool/operations_pool_test.go b/cl/pool/operations_pool_test.go new file mode 100644 index 00000000000..865df13df65 --- /dev/null +++ b/cl/pool/operations_pool_test.go @@ -0,0 +1,71 @@ +package pool + +import ( + "testing" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/stretchr/testify/require" +) + +func TestOperationsPool(t *testing.T) { + pools := NewOperationsPool(&clparams.MainnetBeaconConfig) + + // AttestationsPool + pools.AttestationsPool.Insert([96]byte{}, &solid.Attestation{}) + pools.AttestationsPool.Insert([96]byte{1}, &solid.Attestation{}) + require.Equal(t, 2, len(pools.AttestationsPool.Raw())) + require.True(t, pools.AttestationsPool.DeleteIfExist([96]byte{})) + require.Equal(t, 1, len(pools.AttestationsPool.Raw())) + // ProposerSlashingsPool + slashing1 := &cltypes.ProposerSlashing{ + Header1: &cltypes.SignedBeaconBlockHeader{ + Signature: [96]byte{1}, + }, + Header2: &cltypes.SignedBeaconBlockHeader{ + Signature: [96]byte{2}, + }, + } + slashing2 := &cltypes.ProposerSlashing{ + Header1: &cltypes.SignedBeaconBlockHeader{ + Signature: [96]byte{3}, + }, + Header2: &cltypes.SignedBeaconBlockHeader{ + Signature: [96]byte{4}, + }, + } + pools.ProposerSlashingsPool.Insert(ComputeKeyForProposerSlashing(slashing1), slashing1) + pools.ProposerSlashingsPool.Insert(ComputeKeyForProposerSlashing(slashing2), slashing2) + require.True(t, pools.ProposerSlashingsPool.DeleteIfExist(ComputeKeyForProposerSlashing(slashing2))) + // AttesterSlashingsPool + attesterSlashing1 := &cltypes.AttesterSlashing{ + Attestation_1: &cltypes.IndexedAttestation{ + Signature: [96]byte{1}, + }, + Attestation_2: &cltypes.IndexedAttestation{ + Signature: [96]byte{2}, + }, + } + attesterSlashing2 := &cltypes.AttesterSlashing{ + Attestation_1: &cltypes.IndexedAttestation{ + Signature: [96]byte{3}, + }, + Attestation_2: &cltypes.IndexedAttestation{ + Signature: [96]byte{4}, + }, + } + pools.AttesterSlashingsPool.Insert(ComputeKeyForAttesterSlashing(attesterSlashing1), attesterSlashing1) + pools.AttesterSlashingsPool.Insert(ComputeKeyForAttesterSlashing(attesterSlashing2), attesterSlashing2) + require.True(t, pools.AttesterSlashingsPool.DeleteIfExist(ComputeKeyForAttesterSlashing(attesterSlashing2))) + require.Equal(t, 1, len(pools.AttesterSlashingsPool.Raw())) + + // BLSToExecutionChangesPool + pools.BLSToExecutionChangesPool.Insert([96]byte{}, &cltypes.SignedBLSToExecutionChange{}) + pools.BLSToExecutionChangesPool.Insert([96]byte{1}, &cltypes.SignedBLSToExecutionChange{}) + require.Equal(t, 2, len(pools.BLSToExecutionChangesPool.Raw())) + require.True(t, pools.BLSToExecutionChangesPool.DeleteIfExist([96]byte{})) + require.Equal(t, 1, len(pools.BLSToExecutionChangesPool.Raw())) + + require.Equal(t, 1, len(pools.ProposerSlashingsPool.Raw())) +} diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index d9cf831d7aa..44d59731570 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -235,7 +235,13 @@ func (d *Downloader) mainLoop(silent bool) error { } }(t) } - time.Sleep(10 * time.Second) + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + select { + case <-d.ctx.Done(): + return + case <-timer.C: + } } }()