Skip to content

Commit

Permalink
Cleanup/make sync test slightly faster (#7715)
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Rozmej <lukasz.rozmej@gmail.com>
  • Loading branch information
asdacap and LukaszRozmej authored Nov 5, 2024
1 parent 65e9132 commit 6409179
Show file tree
Hide file tree
Showing 19 changed files with 92 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public interface ISyncConfig : IConfig
[ConfigItem(Description = "_Technical._ MultiSyncModeSelector sync mode timer loop interval. Used for testing.", DefaultValue = "1000", HiddenFromDocs = true)]
int MultiSyncModeSelectorLoopTimerMs { get; set; }

[ConfigItem(Description = "_Technical._ SyncDispatcher delay on empty request. Used for testing.", DefaultValue = "10", HiddenFromDocs = true)]
int SyncDispatcherEmptyRequestDelayMs { get; set; }

[ConfigItem(Description = "_Technical._ SyncDispatcher allocation timeout. Used for testing.", DefaultValue = "1000", HiddenFromDocs = true)]
int SyncDispatcherAllocateTimeoutMs { get; set; }

[ConfigItem(Description = "_Technical._ MultiSyncModeSelector will wait for header to completely sync first.", DefaultValue = "false", HiddenFromDocs = true)]
bool NeedToWaitForHeader { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public string? PivotHash
public int MallocTrimIntervalSec { get; set; } = 300;
public bool? SnapServingEnabled { get; set; } = null;
public int MultiSyncModeSelectorLoopTimerMs { get; set; } = 1000;
public int SyncDispatcherEmptyRequestDelayMs { get; set; } = 10;
public int SyncDispatcherAllocateTimeoutMs { get; set; } = 1000;
public bool NeedToWaitForHeader { get; set; }
public bool VerifyTrieOnStateSyncFinished { get; set; }
public bool TrieHealing { get; set; } = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class ProcessingStats : IThreadPoolWorkItem
private long _runMicroseconds;
private long _reportMs;
private Block? _lastBlock;
private Hash256 _lastBranchRoot;
private Hash256? _lastBranchRoot;
private long _sloadOpcodeProcessing;
private long _sstoreOpcodeProcessing;
private long _callsProcessing;
Expand Down Expand Up @@ -131,6 +131,9 @@ void IThreadPoolWorkItem.Execute()
beneficiary = lastTx.To;
isMev = true;
}

if (_lastBranchRoot is null || !_stateReader.HasStateForRoot(_lastBranchRoot) || block.StateRoot is null || !_stateReader.HasStateForRoot(block.StateRoot))
return;
UInt256 beforeBalance = _stateReader.GetBalance(_lastBranchRoot, beneficiary);
UInt256 afterBalance = _stateReader.GetBalance(block.StateRoot, beneficiary);
UInt256 rewards = beforeBalance < afterBalance ? afterBalance - beforeBalance : default;
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ void ConfigureLogger(ParseResult parseResult)

logger = logManager.GetClassLogger();

string logLevel = parseResult.GetValue(BasicOptions.LogLevel) ?? "info";
string? logLevel = parseResult.GetValue(BasicOptions.LogLevel);

// TODO: dynamically switch log levels from CLI
if (logLevel is not null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,12 @@ public IBlockValidator BlockValidator

private SyncDispatcher<BlocksRequest>? _dispatcher;
public SyncDispatcher<BlocksRequest> Dispatcher => _dispatcher ??= new SyncDispatcher<BlocksRequest>(
0,
new SyncConfig()
{
MaxProcessingThreads = 0,
SyncDispatcherEmptyRequestDelayMs = 1,
SyncDispatcherAllocateTimeoutMs = 1
},
Feed!,
BlockDownloader,
PeerPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Logging;
using Nethermind.Synchronization.FastSync;
using Nethermind.Synchronization.ParallelSync;
Expand All @@ -19,7 +20,7 @@ public StateSyncDispatcherTester(
ISyncDownloader<StateSyncBatch> downloader,
ISyncPeerPool syncPeerPool,
IPeerAllocationStrategyFactory<StateSyncBatch> peerAllocationStrategy,
ILogManager logManager) : base(0, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager)
ILogManager logManager) : base(new SyncConfig() { SyncDispatcherEmptyRequestDelayMs = 1, SyncDispatcherAllocateTimeoutMs = 1 }, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager)
{
_downloader = downloader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public async Task HealBigSqueezedRandomTree()
dbContext.LocalStateTree.RootHash = dbContext.RemoteStateTree.RootHash;

SafeContext ctx = PrepareDownloader(dbContext);
await ActivateAndWait(ctx, dbContext, 9, timeout: 10000);
await ActivateAndWait(ctx, dbContext, 9, timeout: 20000);

DetailedProgress data = ctx.TreeFeed.GetDetailedProgress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace Nethermind.Synchronization.Test.FastSync
{
public class StateSyncFeedTestsBase
{
private const int TimeoutLength = 10000;
private const int TimeoutLength = 20000;

private static IBlockTree? _blockTree;
protected static IBlockTree BlockTree => LazyInitializer.EnsureInitialized(ref _blockTree, () => Build.A.BlockTree().OfChainLength(100).TestObject);
Expand Down Expand Up @@ -119,7 +119,11 @@ protected SafeContext PrepareDownloaderWithPeer(DbContext dbContext, IEnumerable
ctx.Feed.SyncModeSelectorOnChanged(SyncMode.StateNodes | SyncMode.FastBlocks);
ctx.Downloader = new StateSyncDownloader(_logManager);
ctx.StateSyncDispatcher = new SyncDispatcher<StateSyncBatch>(
0,
new SyncConfig()
{
SyncDispatcherEmptyRequestDelayMs = 1,
SyncDispatcherAllocateTimeoutMs = 10 // there is a test for requested nodes which get affected if allocate timeout
},
ctx.Feed!,
ctx.Downloader,
ctx.Pool,
Expand Down
19 changes: 16 additions & 3 deletions src/Nethermind/Nethermind.Synchronization.Test/Mocks/FirstFree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,30 @@ public static FirstFree Instance
{
get
{
if (_instance is null) LazyInitializer.EnsureInitialized(ref _instance, () => new FirstFree());
if (_instance is null) LazyInitializer.EnsureInitialized(ref _instance, () => new FirstFree(false));

return _instance;
}
}

private FirstFree()

private static FirstFree? _replaceableInstance;
public static FirstFree ReplaceableInstance
{
get
{
if (_replaceableInstance is null) LazyInitializer.EnsureInitialized(ref _replaceableInstance, () => new FirstFree(true));

return _replaceableInstance;
}
}

private FirstFree(bool canBeReplaced)
{
CanBeReplaced = canBeReplaced;
}

public bool CanBeReplaced => false;
public bool CanBeReplaced { get; private set; }

public PeerInfo Allocate(PeerInfo? currentPeer, IEnumerable<PeerInfo> peers, INodeStatsManager nodeStatsManager, IBlockTree blockTree)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ public async Task Setup()
ITimerFactory timerFactory = Substitute.For<ITimerFactory>();
NodeStatsManager stats = new(timerFactory, LimboLogs.Instance);
_pool = new SyncPeerPool(_blockTree, stats, new TotalDifficultyBetterPeerStrategy(LimboLogs.Instance), LimboLogs.Instance, 25);
SyncConfig syncConfig = new();
SyncConfig syncConfig = new()
{
MultiSyncModeSelectorLoopTimerMs = 1,
SyncDispatcherEmptyRequestDelayMs = 1,
SyncDispatcherAllocateTimeoutMs = 1
};

NodeStorage nodeStorage = new NodeStorage(_stateDb);
TrieStore trieStore = new(nodeStorage, LimboLogs.Instance);
Expand Down Expand Up @@ -164,7 +169,8 @@ public void Syncs_with_empty_peer()
public void Syncs_when_knows_more_blocks()
{
_blockTree = Build.A.BlockTree(_genesisBlock).OfChainLength(SyncBatchSize.Max * 2).TestObject;
_remoteBlockTree = Build.A.BlockTree(_genesisBlock).OfChainLength(1).TestObject;
_remoteBlockTree = Build.A.BlockTree(_genesisBlock).OfChainLength(2).TestObject;
_remoteBlockTree.Head?.Number.Should().NotBe(0);
ISyncPeer peer = new SyncPeerMock(_remoteBlockTree);

ManualResetEvent resetEvent = new(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ public async Task Simple_test_sync()
TestSyncFeed syncFeed = new();
TestDownloader downloader = new TestDownloader();
SyncDispatcher<TestBatch> dispatcher = new(
0,
new SyncConfig()
{
MultiSyncModeSelectorLoopTimerMs = 1,
SyncDispatcherEmptyRequestDelayMs = 1,
SyncDispatcherAllocateTimeoutMs = 1
},
syncFeed,
downloader,
new TestSyncPeerPool(),
Expand All @@ -280,7 +285,11 @@ public async Task Test_release_before_processing_complete(bool isMultiSync, int

TestDownloader downloader = new TestDownloader();
SyncDispatcher<TestBatch> dispatcher = new(
processingThread,
new SyncConfig()
{
MaxProcessingThreads = processingThread,
SyncDispatcherEmptyRequestDelayMs = 1,
},
syncFeed,
downloader,
new TestSyncPeerPool(peerCount),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using Nethermind.Synchronization.Blocks;
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.Peers.AllocationStrategies;
using Nethermind.Synchronization.Test.Mocks;
using NSubstitute;
using NUnit.Framework;

Expand Down Expand Up @@ -643,7 +644,8 @@ public async Task Can_remove_during_init()
ctx.Pool.Start();
ctx.Pool.AddPeer(peer);

SyncPeerAllocation allocation = await ctx.Pool.Allocate(new BySpeedStrategy(TransferSpeedType.Headers, true));
SyncPeerAllocation allocation = await ctx.Pool.Allocate(FirstFree.ReplaceableInstance, timeoutMilliseconds: 1000);
Assert.That(allocation.Current, Is.Not.EqualTo(null));
ctx.Pool.RemovePeer(peer);

Assert.That(allocation.Current, Is.EqualTo(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ ISyncConfig GetSyncConfig() =>
_logger = _logManager.GetClassLogger();
ISyncConfig syncConfig = GetSyncConfig();
syncConfig.MultiSyncModeSelectorLoopTimerMs = 1;
syncConfig.SyncDispatcherEmptyRequestDelayMs = 1;
syncConfig.SyncDispatcherAllocateTimeoutMs = 1;

IDbProvider dbProvider = TestMemDbProvider.Init();
IDb stateDb = new MemDb();
Expand Down Expand Up @@ -521,7 +523,7 @@ public SyncingContext PeerCountIs(long i)

public SyncingContext PeerCountEventuallyIs(long i)
{
Assert.That(() => SyncPeerPool.AllPeers.Count(), Is.EqualTo(i).After(5000, 100), "peer count");
Assert.That(() => SyncPeerPool.AllPeers.Count(), Is.EqualTo(i).After(5000, 10), "peer count");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ public interface ISyncModeSelector : IDisposable
event EventHandler<SyncModeChangedEventArgs> Changed;

void Stop();
void Update();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public event EventHandler<SyncModeChangedEventArgs> Changed
}

public void Stop() { }
public void Update() { }

public event EventHandler<SyncModeChangedEventArgs> Changing
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class SyncDispatcher<T>
private ISyncPeerPool SyncPeerPool { get; }

private readonly SemaphoreSlim _concurrentProcessingSemaphore;
private readonly TimeSpan _emptyRequestDelay;
private readonly int _allocateTimeoutMs;

public SyncDispatcher(
ISyncConfig syncConfig,
Expand All @@ -34,25 +36,14 @@ public SyncDispatcher(
ISyncPeerPool? syncPeerPool,
IPeerAllocationStrategyFactory<T>? peerAllocationStrategy,
ILogManager? logManager)
: this(syncConfig.MaxProcessingThreads, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager)
{

}

public SyncDispatcher(
int maxNumberOfProcessingThread,
ISyncFeed<T>? syncFeed,
ISyncDownloader<T>? downloader,
ISyncPeerPool? syncPeerPool,
IPeerAllocationStrategyFactory<T>? peerAllocationStrategy,
ILogManager? logManager)
{
Logger = logManager?.GetClassLogger<SyncDispatcher<T>>() ?? throw new ArgumentNullException(nameof(logManager));
Feed = syncFeed ?? throw new ArgumentNullException(nameof(syncFeed));
Downloader = downloader ?? throw new ArgumentNullException(nameof(downloader));
SyncPeerPool = syncPeerPool ?? throw new ArgumentNullException(nameof(syncPeerPool));
PeerAllocationStrategyFactory = peerAllocationStrategy ?? throw new ArgumentNullException(nameof(peerAllocationStrategy));

int maxNumberOfProcessingThread = syncConfig.MaxProcessingThreads;
if (maxNumberOfProcessingThread == 0)
{
_concurrentProcessingSemaphore = new SemaphoreSlim(Environment.ProcessorCount, Environment.ProcessorCount);
Expand All @@ -62,6 +53,9 @@ public SyncDispatcher(
_concurrentProcessingSemaphore = new SemaphoreSlim(maxNumberOfProcessingThread, maxNumberOfProcessingThread);
}

_emptyRequestDelay = TimeSpan.FromMilliseconds(syncConfig.SyncDispatcherEmptyRequestDelayMs);
_allocateTimeoutMs = syncConfig.SyncDispatcherAllocateTimeoutMs;

syncFeed.StateChanged += SyncFeedOnStateChanged;
}

Expand Down Expand Up @@ -104,7 +98,7 @@ public async Task Start(CancellationToken cancellationToken)
if (Logger.IsTrace) Logger.Trace($"{Feed.GetType().NameWithGenerics()} enqueued a null request.");
}

await Task.Delay(10, cancellationToken);
await Task.Delay(_emptyRequestDelay, cancellationToken);
continue;
}

Expand Down Expand Up @@ -219,7 +213,7 @@ private void Free(SyncPeerAllocation allocation)

protected async Task<SyncPeerAllocation> Allocate(T request)
{
SyncPeerAllocation allocation = await SyncPeerPool.Allocate(PeerAllocationStrategyFactory.Create(request), Feed.Contexts, 1000);
SyncPeerAllocation allocation = await SyncPeerPool.Allocate(PeerAllocationStrategyFactory.Create(request), Feed.Contexts, _allocateTimeoutMs);
Downloader.OnAllocate(allocation);
return allocation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Nethermind.Synchronization.ParallelSync
{
public abstract class SyncFeed<T> : ISyncFeed<T>
{
private readonly TaskCompletionSource _taskCompletionSource = new();
private TaskCompletionSource? _taskCompletionSource = null;
public abstract Task<T> PrepareRequest(CancellationToken token = default);
public abstract SyncResponseHandlingResult HandleResponse(T response, PeerInfo peer = null);
public abstract bool IsMultiFeed { get; }
Expand All @@ -20,6 +20,11 @@ public abstract class SyncFeed<T> : ISyncFeed<T>

private void ChangeState(SyncFeedState newState)
{
if (newState == SyncFeedState.Active)
{
_taskCompletionSource ??= new TaskCompletionSource();
}

if (CurrentState == SyncFeedState.Finished)
{
throw new InvalidOperationException($"{GetType().Name} has already finished and cannot be {newState} again.");
Expand All @@ -30,7 +35,7 @@ private void ChangeState(SyncFeedState newState)

if (newState == SyncFeedState.Finished)
{
_taskCompletionSource.SetResult();
_taskCompletionSource?.SetResult();
}
}

Expand All @@ -41,7 +46,7 @@ public virtual void Finish()
ChangeState(SyncFeedState.Finished);
GC.Collect(2, GCCollectionMode.Aggressive, true, true);
}
public Task FeedTask => _taskCompletionSource.Task;
public Task FeedTask => _taskCompletionSource?.Task ?? Task.CompletedTask;
public abstract void SyncModeSelectorOnChanged(SyncMode current);
public abstract bool IsFinished { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,9 @@ public async Task<SyncPeerAllocation> Allocate(
if (timeoutReached) return SyncPeerAllocation.FailedAllocation;

int waitTime = 10 * tryCount++;
waitTime = Math.Min(waitTime, timeoutMilliseconds);

if (!_signals.SafeWaitHandle.IsClosed)
if (waitTime > 0 && !_signals.SafeWaitHandle.IsClosed)
{
await _signals.WaitOneAsync(waitTime, cts.Token);
if (!_signals.SafeWaitHandle.IsClosed)
Expand Down
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Synchronization/Synchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public virtual void Start()
WireMultiSyncModeSelector();

SyncModeSelector.Changed += syncReport.SyncModeSelectorOnChanged;

// Make unit test faster.
SyncModeSelector.Update();
}

private void StartFullSyncComponents()
Expand Down

0 comments on commit 6409179

Please sign in to comment.