diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs new file mode 100644 index 00000000000..34392783f02 --- /dev/null +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.Threading.Tasks; +using FluentAssertions; +using Nethermind.Core; +using Nethermind.Core.Test.Builders; +using NSubstitute; +using NUnit.Framework; + +namespace Nethermind.Blockchain.Test; + +public class BlockTreeSuggestPacerTests +{ + [Test] + public void WillNotBlockIfInBatchLimit() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + pacer.WaitForQueue(1, default).IsCompleted.Should().BeTrue(); + } + + [Test] + public void WillBlockIfBatchTooLarge() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + pacer.WaitForQueue(11, default).IsCompleted.Should().BeFalse(); + } + + [Test] + public void WillOnlyUnblockOnceHeadReachHighEnough() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + Task waitTask = pacer.WaitForQueue(11, default); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(1).TestObject)); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(5).TestObject)); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(6).TestObject)); + waitTask.IsCompleted.Should().BeTrue(); + } +} diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs new file mode 100644 index 00000000000..d040c878349 --- /dev/null +++ b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs @@ -0,0 +1,63 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Core; + +namespace Nethermind.Blockchain; + +/// +/// Utility class during bulk loading to prevent processing queue from becoming too large +/// +public class BlockTreeSuggestPacer : IDisposable +{ + private TaskCompletionSource? _dbBatchProcessed; + private long _blockNumberReachedToUnlock = 0; + private readonly long _stopBatchSize; + private readonly long _resumeBatchSize; + private readonly IBlockTree _blockTree; + + public BlockTreeSuggestPacer(IBlockTree blockTree, long stopBatchSize, long resumeBatchSize) + { + blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; + _blockTree = blockTree; + _stopBatchSize = stopBatchSize; + _resumeBatchSize = resumeBatchSize; + } + + private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) + { + TaskCompletionSource? completionSource = _dbBatchProcessed; + if (completionSource is null) return; + if (e.Block.Number < _blockNumberReachedToUnlock) return; + + _dbBatchProcessed = null; + completionSource.SetResult(); + } + + public async Task WaitForQueue(long currentBlockNumber, CancellationToken token) + { + long currentHeadNumber = _blockTree.Head?.Number ?? 0; + if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed is null) + { + _blockNumberReachedToUnlock = currentBlockNumber - _stopBatchSize + _resumeBatchSize; + TaskCompletionSource completionSource = new TaskCompletionSource(); + _dbBatchProcessed = completionSource; + } + + if (_dbBatchProcessed is not null) + { + await using (token.Register(() => _dbBatchProcessed.TrySetCanceled())) + { + await _dbBatchProcessed.Task; + } + } + } + + public void Dispose() + { + _blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock; + } +} diff --git a/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs b/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs index 2f156c57d13..61c09d0635e 100644 --- a/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs +++ b/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs @@ -11,7 +11,7 @@ namespace Nethermind.Blockchain.Visitors { - public class DbBlocksLoader : IBlockTreeVisitor + public class DbBlocksLoader : IBlockTreeVisitor, IDisposable { public const int DefaultBatchSize = 4000; @@ -20,8 +20,7 @@ public class DbBlocksLoader : IBlockTreeVisitor private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private TaskCompletionSource _dbBatchProcessed; - private long _currentDbLoadBatchEnd; + private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer; public DbBlocksLoader(IBlockTree blockTree, ILogger logger, @@ -30,6 +29,7 @@ public DbBlocksLoader(IBlockTree blockTree, long maxBlocksToLoad = long.MaxValue) { _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); + _blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2); _logger = logger; _batchSize = batchSize; @@ -37,27 +37,9 @@ public DbBlocksLoader(IBlockTree blockTree, _blocksToLoad = Math.Min(maxBlocksToLoad, _blockTree.BestKnownNumber - StartLevelInclusive); EndLevelExclusive = StartLevelInclusive + _blocksToLoad + 1; - if (_blocksToLoad != 0) - { - _blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; - } - LogPlannedOperation(); } - private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) - { - if (_dbBatchProcessed is not null) - { - if (e.Block.Number == _currentDbLoadBatchEnd) - { - TaskCompletionSource completionSource = _dbBatchProcessed; - _dbBatchProcessed = null; - completionSource.SetResult(null); - } - } - } - public bool PreventsAcceptingNewBlocks => true; public bool CalculateTotalDifficultyIfMissing => true; public long StartLevelInclusive { get; } @@ -99,20 +81,17 @@ Task IBlockTreeVisitor.VisitHeader(BlockHeader header, Cance async Task IBlockTreeVisitor.VisitBlock(Block block, CancellationToken cancellationToken) { // this will hang + Task waitTask = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken); + long i = block.Number - StartLevelInclusive; - if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && _blockTree.Head.Number + _batchSize < block.Number) + if (!waitTask.IsCompleted) { if (_logger.IsInfo) { _logger.Info($"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more."); } - _dbBatchProcessed = new TaskCompletionSource(); - await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled())) - { - _currentDbLoadBatchEnd = block.Number - _batchSize; - await _dbBatchProcessed.Task; - } + await waitTask; } return BlockVisitOutcome.Suggest; @@ -134,5 +113,10 @@ private void LogPlannedOperation() if (_logger.IsInfo) _logger.Info($"Found {_blocksToLoad} blocks to load from DB starting from current head block {_blockTree.Head?.ToString(Block.Format.Short)}"); } } + + public void Dispose() + { + _blockTreeSuggestPacer.Dispose(); + } } } diff --git a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs index d63d56c2e34..6534b2ea3a4 100644 --- a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs +++ b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs @@ -15,7 +15,7 @@ namespace Nethermind.Blockchain.Visitors { - public class StartupBlockTreeFixer : IBlockTreeVisitor + public class StartupBlockTreeFixer : IBlockTreeVisitor, IDisposable { public const int DefaultBatchSize = 4000; private readonly IBlockTree _blockTree; @@ -33,11 +33,9 @@ public class StartupBlockTreeFixer : IBlockTreeVisitor private long? _lastProcessedLevel; private long? _processingGapStart; - private TaskCompletionSource _dbBatchProcessed; - private long _currentDbLoadBatchEnd; private bool _firstBlockVisited = true; private bool _suggestBlocks = true; - private readonly long _batchSize; + private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer; public StartupBlockTreeFixer( ISyncConfig syncConfig, @@ -47,36 +45,18 @@ public StartupBlockTreeFixer( long batchSize = DefaultBatchSize) { _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); + _blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2); _stateReader = stateReader; _logger = logger; - _batchSize = batchSize; long assumedHead = _blockTree.Head?.Number ?? 0; _startNumber = Math.Max(syncConfig.PivotNumberParsed, assumedHead + 1); _blocksToLoad = (assumedHead + 1) >= _startNumber ? (_blockTree.BestKnownNumber - _startNumber + 1) : 0; _currentLevelNumber = _startNumber - 1; // because we always increment on entering - if (_blocksToLoad != 0) - { - _blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; - } - LogPlannedOperation(); } - private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) - { - if (_dbBatchProcessed is not null) - { - if (e.Block.Number == _currentDbLoadBatchEnd) - { - TaskCompletionSource completionSource = _dbBatchProcessed; - _dbBatchProcessed = null; - completionSource.SetResult(); - } - } - } - public bool PreventsAcceptingNewBlocks => true; public bool CalculateTotalDifficultyIfMissing => true; public long StartLevelInclusive => _startNumber; @@ -168,9 +148,10 @@ async Task IBlockTreeVisitor.VisitBlock(Block block, Cancella if (!_suggestBlocks) return BlockVisitOutcome.None; + Task waitSuggestQueue = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken); + long i = block.Number - StartLevelInclusive; - if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && - _blockTree.Head.Number + _batchSize < block.Number) + if (!waitSuggestQueue.IsCompleted) { if (_logger.IsInfo) { @@ -178,12 +159,7 @@ async Task IBlockTreeVisitor.VisitBlock(Block block, Cancella $"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more."); } - _dbBatchProcessed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled())) - { - _currentDbLoadBatchEnd = block.Number - _batchSize; - await _dbBatchProcessed.Task; - } + await waitSuggestQueue; } return BlockVisitOutcome.Suggest; @@ -266,5 +242,10 @@ private void LogPlannedOperation() $"Found {_blocksToLoad} block tree levels to review for fixes starting from {StartLevelInclusive}"); } } + + public void Dispose() + { + _blockTreeSuggestPacer.Dispose(); + } } } diff --git a/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs b/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs index 2a200190434..b75a4fc9cb9 100644 --- a/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs +++ b/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs @@ -44,7 +44,7 @@ private async Task RunBlockTreeInitTasks(CancellationToken cancellationToken) if (!syncConfig.FastSync) { - DbBlocksLoader loader = new(_api.BlockTree, _logger); + using DbBlocksLoader loader = new(_api.BlockTree, _logger); await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t => { if (t.IsFaulted) @@ -59,7 +59,7 @@ await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t => } else { - StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!); + using StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!); await _api.BlockTree.Accept(fixer, cancellationToken).ContinueWith(t => { if (t.IsFaulted)