From 9d270795ed86ed7b57275cb8f290d60b65e72ff4 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 6 Nov 2024 07:52:28 +0100 Subject: [PATCH] Minimally lock the state during aggregate processing + memory tuning (#12571) - Changed the positions of lock release in aggregate and proof - Removed diffset because it was taking way too much memory and caused way too many allocs - also it is a data structure only used for some debug endpoints so not worth it - Removed compressor for caches (most of those are random 32-bytes hashes anyway) --- .../fork_graph/diff_storage/diff_storage.go | 131 ------------------ .../diff_storage/diff_storage_test.go | 100 ------------- .../forkchoice/fork_graph/fork_graph_disk.go | 92 +++--------- .../fork_graph/fork_graph_disk_fs.go | 20 +-- 4 files changed, 27 insertions(+), 316 deletions(-) delete mode 100644 cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage.go delete mode 100644 cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage_test.go diff --git a/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage.go b/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage.go deleted file mode 100644 index 330c758e014..00000000000 --- a/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package diffstorage - -import ( - "bytes" - "io" - "sync" - - "github.com/alecthomas/atomic" - libcommon "github.com/erigontech/erigon-lib/common" -) - -const maxDumps = 8 // max number of dumps to keep in memory to prevent from memory leak during long non-finality. - -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - -type link struct { - from libcommon.Hash - to libcommon.Hash -} - -// Memory storage for binary diffs -type ChainDiffStorage struct { - dumps sync.Map - parent sync.Map // maps child -> parent - links sync.Map // maps root -> []links - diffFn func(w io.Writer, old, new []byte) error - applyFn func(in, out []byte, diff []byte, reverse bool) ([]byte, error) - diffs sync.Map - dumpsCount atomic.Int32 // prevent from memory leak during long non-finality. -} - -func NewChainDiffStorage(diffFn func(w io.Writer, old, new []byte) error, applyFn func(in, out []byte, diff []byte, reverse bool) ([]byte, error)) *ChainDiffStorage { - return &ChainDiffStorage{ - diffFn: diffFn, - applyFn: applyFn, - dumpsCount: atomic.NewInt32(0), - } -} - -func (c *ChainDiffStorage) Insert(root, parent libcommon.Hash, prevDump, dump []byte, isDump bool) error { - c.parent.Store(root, parent) - if isDump { - c.dumpsCount.Add(1) - if c.dumpsCount.Load() > maxDumps { - *c = *NewChainDiffStorage(c.diffFn, c.applyFn) - c.dumpsCount.Store(0) - return nil - } - c.dumps.Store(root, libcommon.Copy(dump)) - return nil - } - - buf := bufferPool.Get().(*bytes.Buffer) - defer bufferPool.Put(buf) - buf.Reset() - - if err := c.diffFn(buf, prevDump, dump); err != nil { - return err - } - c.diffs.Store(link{from: parent, to: root}, libcommon.Copy(buf.Bytes())) - - links, _ := c.links.LoadOrStore(parent, []link{}) - c.links.Store(parent, append(links.([]link), link{from: parent, to: root})) - - return nil -} - -func (c *ChainDiffStorage) Get(root libcommon.Hash) ([]byte, error) { - dump, foundDump := c.dumps.Load(root) - if foundDump { - return dump.([]byte), nil - } - currentRoot := root - diffs := [][]byte{} - for !foundDump { - parent, found := c.parent.Load(currentRoot) - if !found { - return nil, nil - } - diff, foundDiff := c.diffs.Load(link{from: parent.(libcommon.Hash), to: currentRoot}) - if !foundDiff { - return nil, nil - } - diffs = append(diffs, diff.([]byte)) - currentRoot = parent.(libcommon.Hash) - dump, foundDump = c.dumps.Load(currentRoot) - } - out := libcommon.Copy(dump.([]byte)) - for i := len(diffs) - 1; i >= 0; i-- { - var err error - out, err = c.applyFn(out, out, diffs[i], false) - if err != nil { - return nil, err - } - } - return out, nil -} - -func (c *ChainDiffStorage) Delete(root libcommon.Hash) { - if _, loaded := c.dumps.LoadAndDelete(root); loaded { - c.dumpsCount.Add(-1) - } - c.parent.Delete(root) - links, ok := c.links.Load(root) - if ok { - for _, link := range links.([]link) { - c.diffs.Delete(link) - } - } - c.links.Delete(root) -} diff --git a/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage_test.go b/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage_test.go deleted file mode 100644 index e4a6835dcd0..00000000000 --- a/cl/phase1/forkchoice/fork_graph/diff_storage/diff_storage_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package diffstorage - -import ( - "math" - "testing" - - libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon/cl/cltypes/solid" - "github.com/erigontech/erigon/cl/persistence/base_encoding" - "github.com/stretchr/testify/require" -) - -// 1 -> 2 -> 3 -> 4 -> 5 -// -// | -// --> 6 -func TestDiffStorage(t *testing.T) { - // decleare 5 nodes - node1 := libcommon.Hash{1} - node2 := libcommon.Hash{2} - node3 := libcommon.Hash{3} - node4 := libcommon.Hash{4} - node5 := libcommon.Hash{5} - node6 := libcommon.Hash{6} - - node1Content := []uint64{1, 2, 3, 4, 5} - node2Content := []uint64{1, 2, 3, 4, 5, 6} - node3Content := []uint64{1, 2, 3, 4, 5, 2, 7} - node4Content := []uint64{1, 2, 3, 4, 5, 2, 7, 8} - node5Content := []uint64{1, 6, 8, 4, 5, 2, 7, 8, 9} - node6Content := []uint64{1, 2, 3, 4, 5, 2, 7, 10} - - exp1 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node1Content) - exp2 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node2Content) - exp3 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node3Content) - exp4 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node4Content) - exp5 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node5Content) - exp6 := solid.NewUint64ListSSZFromSlice(math.MaxInt, node6Content) - - enc1, err := exp1.EncodeSSZ(nil) - require.NoError(t, err) - enc2, err := exp2.EncodeSSZ(nil) - require.NoError(t, err) - enc3, err := exp3.EncodeSSZ(nil) - require.NoError(t, err) - enc4, err := exp4.EncodeSSZ(nil) - require.NoError(t, err) - enc5, err := exp5.EncodeSSZ(nil) - require.NoError(t, err) - enc6, err := exp6.EncodeSSZ(nil) - require.NoError(t, err) - - diffStorage := NewChainDiffStorage(base_encoding.ComputeCompressedSerializedUint64ListDiff, base_encoding.ApplyCompressedSerializedUint64ListDiff) - diffStorage.Insert(node1, libcommon.Hash{}, nil, enc1, true) - diffStorage.Insert(node2, node1, enc1, enc2, false) - diffStorage.Insert(node3, node2, enc2, enc3, false) - diffStorage.Insert(node4, node3, enc3, enc4, false) - diffStorage.Insert(node5, node4, enc4, enc5, false) - diffStorage.Insert(node6, node2, enc2, enc6, false) - - d1, err := diffStorage.Get(node1) - require.NoError(t, err) - require.Equal(t, enc1, d1) - - d2, err := diffStorage.Get(node2) - require.NoError(t, err) - require.Equal(t, enc2, d2) - - d3, err := diffStorage.Get(node3) - require.NoError(t, err) - require.Equal(t, enc3, d3) - - d4, err := diffStorage.Get(node4) - require.NoError(t, err) - require.Equal(t, enc4, d4) - - d5, err := diffStorage.Get(node5) - require.NoError(t, err) - require.Equal(t, enc5, d5) - - d6, err := diffStorage.Get(node6) - require.NoError(t, err) - require.Equal(t, enc6, d6) -} diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index bcef69833ed..3068031bebb 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -23,7 +23,6 @@ import ( "sync" "sync/atomic" - "github.com/klauspost/compress/zstd" "github.com/spf13/afero" libcommon "github.com/erigontech/erigon-lib/common" @@ -34,9 +33,7 @@ import ( "github.com/erigontech/erigon/cl/cltypes" "github.com/erigontech/erigon/cl/cltypes/lightclient_utils" "github.com/erigontech/erigon/cl/cltypes/solid" - "github.com/erigontech/erigon/cl/persistence/base_encoding" "github.com/erigontech/erigon/cl/phase1/core/state" - diffstorage "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph/diff_storage" "github.com/erigontech/erigon/cl/transition" "github.com/erigontech/erigon/cl/transition/impl/eth2" ) @@ -48,26 +45,6 @@ type syncCommittees struct { nextSyncCommittee *solid.SyncCommittee } -var compressorPool = sync.Pool{ - New: func() interface{} { - w, err := zstd.NewWriter(nil) - if err != nil { - panic(err) - } - return w - }, -} - -var decompressPool = sync.Pool{ - New: func() interface{} { - r, err := zstd.NewReader(nil) - if err != nil { - panic(err) - } - return r - }, -} - var ErrStateNotFound = errors.New("state not found") type ChainSegmentInsertionResult uint @@ -132,12 +109,9 @@ type forkGraphDisk struct { // for each block root we keep track of the sync committees for head retrieval. syncCommittees sync.Map lightclientBootstraps sync.Map - // diffs storage - balancesStorage *diffstorage.ChainDiffStorage - validatorSetStorage *diffstorage.ChainDiffStorage - inactivityScoresStorage *diffstorage.ChainDiffStorage - previousIndicies sync.Map - currentIndicies sync.Map + + previousIndicies sync.Map + currentIndicies sync.Map // configurations beaconCfg *clparams.BeaconChainConfig @@ -172,23 +146,16 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r farthestExtendingPath[anchorRoot] = true - balancesStorage := diffstorage.NewChainDiffStorage(base_encoding.ComputeCompressedSerializedUint64ListDiff, base_encoding.ApplyCompressedSerializedUint64ListDiff) - validatorSetStorage := diffstorage.NewChainDiffStorage(base_encoding.ComputeCompressedSerializedValidatorSetListDiff, base_encoding.ApplyCompressedSerializedValidatorListDiff) - inactivityScoresStorage := diffstorage.NewChainDiffStorage(base_encoding.ComputeCompressedSerializedUint64ListDiff, base_encoding.ApplyCompressedSerializedUint64ListDiff) - f := &forkGraphDisk{ fs: aferoFs, // current state data currentState: anchorState, // configuration - beaconCfg: anchorState.BeaconConfig(), - genesisTime: anchorState.GenesisTime(), - anchorSlot: anchorState.Slot(), - balancesStorage: balancesStorage, - validatorSetStorage: validatorSetStorage, - inactivityScoresStorage: inactivityScoresStorage, - rcfg: rcfg, - emitter: emitter, + beaconCfg: anchorState.BeaconConfig(), + genesisTime: anchorState.GenesisTime(), + anchorSlot: anchorState.Slot(), + rcfg: rcfg, + emitter: emitter, } f.lowestAvailableBlock.Store(anchorState.Slot()) f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader) @@ -280,13 +247,7 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, } blockRewardsCollector := ð2.BlockRewardsCollector{} - var prevDumpBalances, prevValidatorSetDump, prevInactivityScores []byte - epochCross := newState.Slot()/f.beaconCfg.SlotsPerEpoch != block.Slot/f.beaconCfg.SlotsPerEpoch - if (f.rcfg.Beacon || f.rcfg.Validator || f.rcfg.Lighthouse) && !epochCross { - prevDumpBalances = libcommon.Copy(newState.RawBalances()) - prevValidatorSetDump = libcommon.Copy(newState.RawValidatorSet()) - prevInactivityScores = libcommon.Copy(newState.RawInactivityScores()) - } + // Execute the state if invalidBlockErr := transition.TransitionState(newState, signedBlock, blockRewardsCollector, fullValidation); invalidBlockErr != nil { // Add block to list of invalid blocks @@ -302,11 +263,9 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, if block.Version() != clparams.Phase0Version { f.currentIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawCurrentEpochParticipation())) f.previousIndicies.Store(libcommon.Hash(blockRoot), libcommon.Copy(newState.RawPreviousEpochParticipation())) - f.inactivityScoresStorage.Insert(libcommon.Hash(blockRoot), block.ParentRoot, prevInactivityScores, newState.RawInactivityScores(), epochCross) } f.blockRewards.Store(libcommon.Hash(blockRoot), blockRewardsCollector) - f.balancesStorage.Insert(libcommon.Hash(blockRoot), block.ParentRoot, prevDumpBalances, newState.RawBalances(), epochCross) - f.validatorSetStorage.Insert(libcommon.Hash(blockRoot), block.ParentRoot, prevValidatorSetDump, newState.RawValidatorSet(), epochCross) + period := f.beaconCfg.SyncCommitteePeriod(newState.Slot()) f.syncCommittees.Store(period, syncCommittees{ currentSyncCommittee: newState.CurrentSyncCommittee().Copy(), @@ -474,9 +433,7 @@ func (f *forkGraphDisk) Prune(pruneSlot uint64) (err error) { f.blockRewards.Delete(root) f.fs.Remove(getBeaconStateFilename(root)) f.fs.Remove(getBeaconStateCacheFilename(root)) - f.balancesStorage.Delete(root) - f.validatorSetStorage.Delete(root) - f.inactivityScoresStorage.Delete(root) + f.previousIndicies.Delete(root) f.currentIndicies.Delete(root) } @@ -529,27 +486,25 @@ func (f *forkGraphDisk) GetLightClientUpdate(period uint64) (*cltypes.LightClien } func (f *forkGraphDisk) GetBalances(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) { - b, err := f.balancesStorage.Get(blockRoot) + st, err := f.GetState(blockRoot, true) if err != nil { return nil, err } - if len(b) == 0 { - return nil, nil + if st == nil { + return nil, ErrStateNotFound } - out := solid.NewUint64ListSSZ(int(f.beaconCfg.ValidatorRegistryLimit)) - return out, out.DecodeSSZ(b, 0) + return st.Balances(), nil } func (f *forkGraphDisk) GetInactivitiesScores(blockRoot libcommon.Hash) (solid.Uint64ListSSZ, error) { - b, err := f.inactivityScoresStorage.Get(blockRoot) + st, err := f.GetState(blockRoot, true) if err != nil { return nil, err } - if len(b) == 0 { - return nil, nil + if st == nil { + return nil, ErrStateNotFound } - out := solid.NewUint64ListSSZ(int(f.beaconCfg.ValidatorRegistryLimit)) - return out, out.DecodeSSZ(b, 0) + return st.InactivityScores(), nil } func (f *forkGraphDisk) GetPreviousParticipationIndicies(blockRoot libcommon.Hash) (*solid.ParticipationBitList, error) { @@ -577,13 +532,12 @@ func (f *forkGraphDisk) GetCurrentParticipationIndicies(blockRoot libcommon.Hash } func (f *forkGraphDisk) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) { - b, err := f.validatorSetStorage.Get(blockRoot) + st, err := f.GetState(blockRoot, true) if err != nil { return nil, err } - if len(b) == 0 { - return nil, nil + if st == nil { + return nil, ErrStateNotFound } - out := solid.NewValidatorSet(int(f.beaconCfg.ValidatorRegistryLimit)) - return out, out.DecodeSSZ(b, 0) + return st.ValidatorSet(), nil } diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go index 902426d7801..11a8bc001d1 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go @@ -24,7 +24,6 @@ import ( "os" "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" "github.com/spf13/afero" libcommon "github.com/erigontech/erigon-lib/common" @@ -94,12 +93,7 @@ func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *s } defer cacheFile.Close() - reader := decompressPool.Get().(*zstd.Decoder) - defer decompressPool.Put(reader) - - reader.Reset(cacheFile) - - if err := bs.DecodeCaches(reader); err != nil { + if err := bs.DecodeCaches(cacheFile); err != nil { return nil, err } @@ -162,19 +156,13 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat } defer cacheFile.Close() - writer := compressorPool.Get().(*zstd.Encoder) - defer compressorPool.Put(writer) - - writer.Reset(cacheFile) - defer writer.Close() - - if err := bs.EncodeCaches(writer); err != nil { + if err := bs.EncodeCaches(cacheFile); err != nil { return err } - if err = writer.Close(); err != nil { + + if err = cacheFile.Sync(); err != nil { return } - err = cacheFile.Sync() return }