Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Tasks collections in .Net 9 #7585

Draft
wants to merge 5 commits into
base: feature/dotnet9-migration
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public async Task StopAsync(bool processRemainingBlocks = false)
_blockQueue.CompleteAdding();
}

await Task.WhenAll((_recoveryTask ?? Task.CompletedTask), (_processorTask ?? Task.CompletedTask));
await Task.WhenAll(_recoveryTask ?? Task.CompletedTask, _processorTask ?? Task.CompletedTask);
if (_logger.IsInfo) _logger.Info("Blockchain Processor shutdown complete.. please wait for all components to close");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Evm.Tracing;
using Nethermind.Logging;

Expand All @@ -32,23 +33,20 @@ protected MultipleBlockProducer(
public async Task<Block?> BuildBlock(BlockHeader? parentHeader, IBlockTracer? blockTracer = null,
PayloadAttributes? payloadAttributes = null, CancellationToken? token = null)
{
Task<Block?>[] produceTasks = new Task<Block?>[_blockProducers.Length];
using ArrayPoolList<Task<Block>> produceTasks = new(_blockProducers.Length);
for (int i = 0; i < _blockProducers.Length; i++)
{
T blockProducerInfo = _blockProducers[i];
if (!blockProducerInfo.Condition.CanProduce(parentHeader))
{
produceTasks[i] = Task.FromResult<Block?>(null);
continue;
}
produceTasks[i] = blockProducerInfo.BlockProducer.BuildBlock(parentHeader, blockProducerInfo.BlockTracer, cancellationToken: token);
produceTasks.Add(!blockProducerInfo.Condition.CanProduce(parentHeader!)
? Task.FromResult<Block?>(null)
: blockProducerInfo.BlockProducer.BuildBlock(parentHeader, blockProducerInfo.BlockTracer, cancellationToken: token));
}

IEnumerable<(Block? Block, T BlockProducer)> blocksWithProducers;

try
{
Block?[] blocks = await Task.WhenAll(produceTasks);
Block?[] blocks = await Task.WhenAll<Block?>(produceTasks.AsSpan());
blocksWithProducers = blocks.Zip(_blockProducers);
}
catch (OperationCanceledException)
Expand Down
13 changes: 6 additions & 7 deletions src/Nethermind/Nethermind.Core.Test/Encoding/TxDecoderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Numeric;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Extensions;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class TxDecoderTests

[TestCaseSource(nameof(TestCaseSource))]
[Repeat(10)] // Might wanna increase this to double check when changing logic as on lower value, it does not reproduce.
public void CanCorrectlyCalculateTxHash_when_called_concurrently((Transaction Tx, string Description) testCase)
public async Task CanCorrectlyCalculateTxHash_when_called_concurrently((Transaction Tx, string Description) testCase)
{
Transaction tx = testCase.Tx;

Expand All @@ -80,14 +81,12 @@ public void CanCorrectlyCalculateTxHash_when_called_concurrently((Transaction Tx

decodedTx.SetPreHash(rlp.Bytes);

IEnumerable<Task<AndConstraint<ComparableTypeAssertions<Hash256>>>> tasks = Enumerable
using ArrayPoolList<Task<AndConstraint<ComparableTypeAssertions<Hash256>>>> tasks = Enumerable
.Range(0, 32)
.Select((_) =>
Task.Factory
.StartNew(() => decodedTx.Hash.Should().Be(expectedHash),
TaskCreationOptions.RunContinuationsAsynchronously));
.Select(_ => Task.Factory.StartNew(() => decodedTx.Hash.Should().Be(expectedHash), TaskCreationOptions.RunContinuationsAsynchronously))
.ToPooledList(32);

Task.WaitAll(tasks.ToArray());
await Task.WhenAll<AndConstraint<ComparableTypeAssertions<Hash256>>>(tasks.AsSpan());
}

[TestCaseSource(nameof(TestCaseSource))]
Expand Down
22 changes: 8 additions & 14 deletions src/Nethermind/Nethermind.Core/PubSub/CompositePublisher.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;
using Nethermind.Core.Collections;

namespace Nethermind.Core.PubSub
{
public class CompositePublisher : IPublisher
public class CompositePublisher(params IPublisher[] publishers) : IPublisher
{
private readonly IPublisher[] _publishers;

public CompositePublisher(params IPublisher[] publishers)
{
_publishers = publishers;
}

public async Task PublishAsync<T>(T data) where T : class
{
// TODO: .Net 9 stackalloc
Task[] tasks = new Task[_publishers.Length];
for (int i = 0; i < _publishers.Length; i++)
using ArrayPoolList<Task> tasks = new(publishers.Length);
for (int i = 0; i < publishers.Length; i++)
{
tasks[i] = _publishers[i].PublishAsync(data);
tasks.Add(publishers[i].PublishAsync(data));
}

await Task.WhenAll(tasks);
await Task.WhenAll(tasks.AsSpan());
}

public void Dispose()
{
foreach (IPublisher publisher in _publishers)
foreach (IPublisher publisher in publishers)
{
publisher.Dispose();
}
Expand Down
7 changes: 4 additions & 3 deletions src/Nethermind/Nethermind.Db/RocksDbInitializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Nethermind.Core.Collections;

namespace Nethermind.Db
{
Expand Down Expand Up @@ -68,13 +69,13 @@ protected void InitAll()

protected async Task InitAllAsync()
{
HashSet<Task> allInitializers = new();
foreach (var registration in _registrations)
using ArrayPoolList<Task> allInitializers = new(_registrations.Count);
foreach (Action registration in _registrations)
{
allInitializers.Add(Task.Run(() => registration.Invoke()));
}

await Task.WhenAll(allInitializers);
await Task.WhenAll(allInitializers.AsSpan());
}

protected static string GetTitleDbName(string dbName) => char.ToUpper(dbName[0]) + dbName[1..];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Nethermind.Evm.Test.Tracing;
[TestFixture]
public class GethLikeCallTracerTests : VirtualMachineTestsBase
{
private static readonly JsonSerializerOptions SerializerOptions = EthereumJsonSerializer.JsonOptionsIndented;
private static readonly JsonSerializerOptions SerializerOptions = new(EthereumJsonSerializer.JsonOptionsIndented) { NewLine = "\n" };
private const string? WithLog = """{"withLog":true}""";
private const string? OnlyTopCall = """{"onlyTopCall":true}""";
private const string? WithLogAndOnlyTopCall = """{"withLog":true,"onlyTopCall":true}""";
Expand All @@ -27,14 +27,8 @@ private string ExecuteCallTrace(byte[] code, string? tracerConfig = null)
(_, Transaction tx) = PrepareTx(MainnetSpecProvider.CancunActivation, 100000, code);
NativeCallTracer tracer = new(tx, GetGethTraceOptions(tracerConfig));

GethLikeTxTrace callTrace = Execute(
tracer,
code,
MainnetSpecProvider.CancunActivation)
.BuildResult();
return JsonSerializer.Serialize(callTrace.CustomTracerResult?.Value, SerializerOptions)
// fix for windows, can be done better in .NET 9: https://github.com/dotnet/runtime/issues/84117
.ReplaceLineEndings("\n");
GethLikeTxTrace callTrace = Execute(tracer, code, MainnetSpecProvider.CancunActivation).BuildResult();
return JsonSerializer.Serialize(callTrace.CustomTracerResult?.Value, SerializerOptions);
}

private static GethTraceOptions GetGethTraceOptions(string? config) => GethTraceOptions.Default with
Expand Down
6 changes: 4 additions & 2 deletions src/Nethermind/Nethermind.Network.Discovery/NodesLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System.Text;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.Network.Discovery.Lifecycle;
using Nethermind.Network.Discovery.Messages;
Expand Down Expand Up @@ -128,8 +130,8 @@ public async Task LocateNodesAsync(byte[]? searchedNodeId, CancellationToken can
int count = failRequestCount > 0 ? failRequestCount : _discoveryConfig.Concurrency;
IEnumerable<Node> nodesToSend = tryCandidates.Skip(nodesTriedCount).Take(count);

IEnumerable<Task<Result>> sendFindNodeTasks = SendFindNodes(searchedNodeId, nodesToSend, alreadyTriedNodes);
Result?[] results = await Task.WhenAll(sendFindNodeTasks);
using ArrayPoolList<Task<Result>> sendFindNodeTasks = SendFindNodes(searchedNodeId, nodesToSend, alreadyTriedNodes).ToPooledList(count);
Result[] results = await Task.WhenAll<Result>(sendFindNodeTasks.AsSpan());

if (results.Length == 0)
{
Expand Down
9 changes: 5 additions & 4 deletions src/Nethermind/Nethermind.Network/CompositeNodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Core.Collections;
using Nethermind.Core.Extensions;
using Nethermind.Stats.Model;

namespace Nethermind.Network;
Expand All @@ -20,14 +22,13 @@ public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] Cance
{
Channel<Node> ch = Channel.CreateBounded<Node>(1);

// TODO: .Net 9 stackalloc
Task[] feedTasks = _nodeSources.Select(async innerSource =>
using ArrayPoolList<Task> feedTasks = _nodeSources.Select(async innerSource =>
{
await foreach (Node node in innerSource.DiscoverNodes(cancellationToken))
{
await ch.Writer.WriteAsync(node, cancellationToken);
}
}).ToArray();
}).ToPooledList(_nodeSources.Length * 16);

try
{
Expand All @@ -38,7 +39,7 @@ public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] Cance
}
finally
{
await Task.WhenAll(feedTasks);
await Task.WhenAll(feedTasks.AsSpan());
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/Nethermind/Nethermind.Network/PeerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
using FastEnumUtility;
using Nethermind.Core;
using Nethermind.Core.Attributes;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.Network.Config;
using Nethermind.Network.P2P;
Expand Down Expand Up @@ -206,7 +208,7 @@ private class CandidateSelection
private async Task RunPeerUpdateLoop()
{
Channel<Peer> taskChannel = Channel.CreateBounded<Peer>(1);
List<Task>? tasks = Enumerable.Range(0, _outgoingConnectParallelism).Select(async (idx) =>
using ArrayPoolList<Task> tasks = Enumerable.Range(0, _outgoingConnectParallelism).Select(async idx =>
{
await foreach (Peer peer in taskChannel.Reader.ReadAllAsync(_cancellationTokenSource.Token))
{
Expand All @@ -226,7 +228,7 @@ private async Task RunPeerUpdateLoop()
}
}
if (_logger.IsDebug) _logger.Debug($"Connect worker {idx} completed");
}).ToList();
}).ToPooledList(_outgoingConnectParallelism);

int loopCount = 0;
long previousActivePeersCount = 0;
Expand Down Expand Up @@ -359,7 +361,7 @@ private async Task RunPeerUpdateLoop()
}

taskChannel.Writer.Complete();
await Task.WhenAll(tasks);
await Task.WhenAll(tasks.AsSpan());
}

private bool EnsureAvailableActivePeerSlot()
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Network/SessionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -84,7 +85,7 @@ private async Task SendPingMessagesAsync()

if (_pingTasks.Count > 0)
{
bool[] tasks = await Task.WhenAll(_pingTasks);
bool[] tasks = await Task.WhenAll<bool>(CollectionsMarshal.AsSpan(_pingTasks));
int tasksLength = tasks.Length;
if (tasksLength != 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void CommitTrees(IBlockCommitter blockCommitter)
}
}

Task.WaitAll(commitTask);
Task.WaitAll(commitTask.AsSpan());

_toUpdateRoots.Clear();
// only needed here as there is no control over cached storage size otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -47,7 +48,8 @@ protected TrieNodeRecovery(ISyncPeerPool syncPeerPool, ILogManager? logManager)
{
while (keyRecoveries.Count > 0)
{
Task<(Recovery, byte[]?)> task = await Task.WhenAny(keyRecoveries.Select(kr => kr.Task!));
using ArrayPoolList<Task<(Recovery, byte[]?)>>? tasks = keyRecoveries.Select(kr => kr.Task!).ToPooledList(keyRecoveries.Count);
Task<(Recovery, byte[]?)> task = await Task.WhenAny<(Recovery, byte[]?)>(tasks.AsSpan());
(Recovery Recovery, byte[]? Data) result = await task;
if (result.Data is null)
{
Expand All @@ -57,7 +59,7 @@ protected TrieNodeRecovery(ISyncPeerPool syncPeerPool, ILogManager? logManager)
else
{
if (_logger.IsWarn) _logger.Warn($"Successfully recovered from peer {result.Recovery.Peer} with {result.Data.Length} bytes!");
cts.Cancel();
await cts.CancelAsync();
return result.Data;
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,11 @@ public void Start(

try
{
// TODO: .Net 9 stackalloc
Task[]? tasks = Enumerable.Range(0, trieVisitContext.MaxDegreeOfParallelism)
using ArrayPoolList<Task> tasks = Enumerable.Range(0, trieVisitContext.MaxDegreeOfParallelism)
.Select(_ => Task.Run(BatchedThread))
.ToArray();
.ToPooledList(trieVisitContext.MaxDegreeOfParallelism);

Task.WaitAll(tasks);
Task.WaitAll(tasks.AsSpan());
}
catch (Exception)
{
Expand Down
3 changes: 1 addition & 2 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ Task CreateTaskForPath(TreePath childPath, TrieNode childNode, int idx) => Task.
committer.ReturnConcurrencyQuota();
});

// TODO: .Net 9 stackalloc
ArrayPoolList<Task>? childTasks = null;

for (int i = 0; i < 16; i++)
Expand Down Expand Up @@ -233,7 +232,7 @@ Task CreateTaskForPath(TreePath childPath, TrieNode childNode, int idx) => Task.

if (childTasks is not null)
{
Task.WaitAll(childTasks.ToArray());
Task.WaitAll(childTasks.AsSpan());
childTasks.Dispose();
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -773,11 +773,13 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
});
}

Task.WaitAll(parallelStartNodes.Select(entry => Task.Run(() =>
using ArrayPoolList<Task> tasks = parallelStartNodes.Select(entry => Task.Run(() =>
{
(TrieNode trieNode, Hash256? address2, TreePath path2) = entry;
PersistNodeStartingFrom(trieNode, address2, path2, persistedNodeRecorder, writeFlags, disposeQueue);
})));
})).ToPooledList(parallelStartNodes.Count);

Task.WaitAll(tasks.AsSpan());

disposeQueue.CompleteAdding();
Task.WaitAll(_disposeTasks);
Expand Down
Loading