diff --git a/src/Nethermind/Nethermind.Core/PubSub/CompositePublisher.cs b/src/Nethermind/Nethermind.Core/PubSub/CompositePublisher.cs index 31261d6a34d..cfb3541fa4b 100644 --- a/src/Nethermind/Nethermind.Core/PubSub/CompositePublisher.cs +++ b/src/Nethermind/Nethermind.Core/PubSub/CompositePublisher.cs @@ -16,6 +16,7 @@ public CompositePublisher(params IPublisher[] publishers) public async Task PublishAsync(T data) where T : class { + // TODO: .Net 9 stackalloc Task[] tasks = new Task[_publishers.Length]; for (int i = 0; i < _publishers.Length; i++) { diff --git a/src/Nethermind/Nethermind.Network/CompositeNodeSource.cs b/src/Nethermind/Nethermind.Network/CompositeNodeSource.cs index a56329f4731..1402ebcf725 100644 --- a/src/Nethermind/Nethermind.Network/CompositeNodeSource.cs +++ b/src/Nethermind/Nethermind.Network/CompositeNodeSource.cs @@ -20,7 +20,8 @@ public async IAsyncEnumerable DiscoverNodes([EnumeratorCancellation] Cance { Channel ch = Channel.CreateBounded(1); - Task[] feedTasks = _nodeSources.Select(async (innerSource) => + // TODO: .Net 9 stackalloc + Task[] feedTasks = _nodeSources.Select(async innerSource => { await foreach (Node node in innerSource.DiscoverNodes(cancellationToken)) { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs index e3540377184..95cde2d1a4e 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncServerTests.cs @@ -702,8 +702,11 @@ public void GetNodeData_returns_cached_trie_nodes() Hash256 nodeKey = TestItem.KeccakA; TrieNode node = new(NodeType.Leaf, nodeKey, TestItem.KeccakB.Bytes); IScopedTrieStore scopedTrieStore = trieStore.GetTrieStore(null); - scopedTrieStore.CommitNode(1, new NodeCommitInfo(node, TreePath.Empty)); - scopedTrieStore.FinishBlockCommit(TrieType.State, 1, node); + using (ICommitter committer = scopedTrieStore.BeginCommit(TrieType.State, 1, node)) + { + TreePath path = TreePath.Empty; + committer.CommitNode(ref path, new NodeCommitInfo(node)); + } stateDb.KeyExists(nodeKey).Should().BeFalse(); ctx.SyncServer.GetNodeData(new[] { nodeKey }, CancellationToken.None, NodeDataType.All).Should().BeEquivalentTo(new[] { TestItem.KeccakB.BytesToArray() }); diff --git a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs index 3a1677afd69..7d78cc8443f 100644 --- a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs +++ b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs @@ -70,7 +70,11 @@ public void Memory_with_one_node_is_288() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new TestPruningStrategy(true)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode, TreePath.Empty)); + TreePath emptyPath = TreePath.Empty; + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1234, null)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode)); + } fullTrieStore.MemoryUsedByDirtyCache.Should().Be( trieNode.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize); } @@ -85,10 +89,16 @@ public void Pruning_off_cache_should_not_change_commit_node() using TrieStore fullTrieStore = CreateTrieStore(); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1234, trieNode); - trieStore.CommitNode(124, new NodeCommitInfo(trieNode2, TreePath.Empty)); - trieStore.CommitNode(11234, new NodeCommitInfo(trieNode3, TreePath.Empty)); + TreePath emptyPath = TreePath.Empty; + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1234, trieNode)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode)); + } + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1235, trieNode)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode2)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode3)); + } fullTrieStore.MemoryUsedByDirtyCache.Should().Be(0); } @@ -101,8 +111,12 @@ public void When_commit_forward_write_flag_if_available() using TrieStore fullTrieStore = CreateTrieStore(kvStore: testMemDb); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode, TreePath.Empty), WriteFlags.LowPriority); - trieStore.FinishBlockCommit(TrieType.State, 1234, trieNode, WriteFlags.LowPriority); + + TreePath emptyPath = TreePath.Empty; + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1234, trieNode, WriteFlags.LowPriority)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode)); + } if (_scheme == INodeStorage.KeyScheme.HalfPath) { @@ -123,13 +137,13 @@ public void Should_always_announce_block_number_when_pruning_disabled_and_persis using TrieStore fullTrieStore = CreateTrieStore(persistenceStrategy: Archive.Instance); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); fullTrieStore.ReorgBoundaryReached += (_, e) => reorgBoundaryCount += e.BlockNumber; - trieStore.FinishBlockCommit(TrieType.State, 1, trieNode); + trieStore.BeginCommit(TrieType.State, 1, trieNode).Dispose(); reorgBoundaryCount.Should().Be(0); - trieStore.FinishBlockCommit(TrieType.State, 2, trieNode); + trieStore.BeginCommit(TrieType.State, 2, trieNode).Dispose(); reorgBoundaryCount.Should().Be(1); - trieStore.FinishBlockCommit(TrieType.State, 3, trieNode); + trieStore.BeginCommit(TrieType.State, 3, trieNode).Dispose(); reorgBoundaryCount.Should().Be(3); - trieStore.FinishBlockCommit(TrieType.State, 4, trieNode); + trieStore.BeginCommit(TrieType.State, 4, trieNode).Dispose(); reorgBoundaryCount.Should().Be(6); } @@ -142,10 +156,10 @@ public void Should_always_announce_zero_when_not_persisting() using TrieStore fullTrieStore = CreateTrieStore(); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); fullTrieStore.ReorgBoundaryReached += (_, e) => reorgBoundaryCount += e.BlockNumber; - trieStore.FinishBlockCommit(TrieType.State, 1, trieNode); - trieStore.FinishBlockCommit(TrieType.State, 2, trieNode); - trieStore.FinishBlockCommit(TrieType.State, 3, trieNode); - trieStore.FinishBlockCommit(TrieType.State, 4, trieNode); + trieStore.BeginCommit(TrieType.State, 1, trieNode).Dispose(); + trieStore.BeginCommit(TrieType.State, 2, trieNode).Dispose(); + trieStore.BeginCommit(TrieType.State, 3, trieNode).Dispose(); + trieStore.BeginCommit(TrieType.State, 4, trieNode).Dispose(); reorgBoundaryCount.Should().Be(0L); } @@ -189,8 +203,12 @@ public void Memory_with_two_nodes_is_correct() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new TestPruningStrategy(true)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode1, TreePath.Empty)); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode2, TreePath.Empty)); + TreePath emptyPath = TreePath.Empty; + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1234, null)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode1)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode2)); + } fullTrieStore.MemoryUsedByDirtyCache.Should().Be( trieNode1.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize + trieNode2.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize); @@ -206,11 +224,18 @@ public void Memory_with_two_times_two_nodes_is_correct() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new TestPruningStrategy(true)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode1, TreePath.Empty)); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode2, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1234, trieNode2); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode3, TreePath.Empty)); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode4, TreePath.Empty)); + TreePath emptyPath = TreePath.Empty; + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1234, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode1)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode2)); + } + + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 1235, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode3)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode4)); + } // depending on whether the node gets resolved it gives different values here in debugging and run // needs some attention @@ -236,13 +261,21 @@ public void Dispatcher_will_try_to_clear_memory() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new MemoryLimit(640)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode1, TreePath.Empty)); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode2, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1234, trieNode2); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode3, TreePath.Empty)); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode4, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1235, trieNode2); - trieStore.FinishBlockCommit(TrieType.State, 1236, trieNode2); + + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1234, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode1)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode2)); + } + + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1235, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode3)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode4)); + } + + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 1236, trieNode2)) { } + fullTrieStore.MemoryUsedByDirtyCache.Should().Be( trieNode1.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize + trieNode2.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize + @@ -267,11 +300,19 @@ public void Dispatcher_will_try_to_clear_memory_the_soonest_possible() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new MemoryLimit(512)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode1, TreePath.Empty)); - trieStore.CommitNode(1234, new NodeCommitInfo(trieNode2, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1234, trieNode2); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode3, TreePath.Empty)); - trieStore.CommitNode(1235, new NodeCommitInfo(trieNode4, TreePath.Empty)); + + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1234, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode1)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode2)); + } + + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1235, trieNode2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode3)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode4)); + } + fullTrieStore.MemoryUsedByDirtyCache.Should().Be( trieNode1.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize + trieNode2.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize + @@ -287,16 +328,17 @@ public void Dispatcher_will_always_try_to_clear_memory() TreePath emptyPath = TreePath.Empty; for (int i = 0; i < 1024; i++) { - for (int j = 0; j < 1 + i % 3; j++) - { - TrieNode trieNode = new(NodeType.Leaf, new byte[0]); // 192B - trieNode.ResolveKey(NullTrieNodeResolver.Instance, ref emptyPath, true); - trieStore.CommitNode(i, new NodeCommitInfo(trieNode, TreePath.Empty)); - } - TrieNode fakeRoot = new(NodeType.Leaf, new byte[0]); // 192B fakeRoot.ResolveKey(NullTrieNodeResolver.Instance, ref emptyPath, true); - trieStore.FinishBlockCommit(TrieType.State, i, fakeRoot); + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, i, fakeRoot)) + { + for (int j = 0; j < 1 + i % 3; j++) + { + TrieNode trieNode = new(NodeType.Leaf, new byte[0]); // 192B + trieNode.ResolveKey(NullTrieNodeResolver.Instance, ref emptyPath, true); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode)); + } + } } fullTrieStore.MemoryUsedByDirtyCache.Should().BeLessThan(512 * 2); @@ -318,12 +360,14 @@ public void Dispatcher_will_save_to_db_everything_from_snapshot_blocks() persistenceStrategy: new ConstantInterval(4)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(0, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 0, a); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.FinishBlockCommit(TrieType.State, 3, a); - trieStore.FinishBlockCommit(TrieType.State, 4, a); + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 0, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 1, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, a)) { } storage.Get(null, TreePath.Empty, a.Keccak).Should().NotBeNull(); fullTrieStore.IsNodeCached(null, TreePath.Empty, a.Keccak).Should().BeTrue(); @@ -342,11 +386,13 @@ public void Stays_in_memory_until_persisted() using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new MemoryLimit(16.MB())); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(0, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 0, a); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.FinishBlockCommit(TrieType.State, 3, a); + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 0, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 1, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, a)) { } // <- do not persist in this test storage.Get(null, TreePath.Empty, a.Keccak).Should().BeNull(); @@ -381,16 +427,18 @@ public void Will_get_persisted_on_snapshot_if_referenced() ); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.FinishBlockCommit(TrieType.State, 3, a); - trieStore.FinishBlockCommit(TrieType.State, 4, a); - trieStore.FinishBlockCommit(TrieType.State, 5, a); - trieStore.FinishBlockCommit(TrieType.State, 6, a); - trieStore.FinishBlockCommit(TrieType.State, 7, a); - trieStore.FinishBlockCommit(TrieType.State, 8, a); + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 7, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, a)) { } storage.Get(null, TreePath.Empty, a.Keccak).Should().NotBeNull(); fullTrieStore.IsNodeCached(null, TreePath.Empty, a.Keccak).Should().BeTrue(); @@ -416,17 +464,21 @@ public void Will_not_get_dropped_on_snapshot_if_unreferenced_in_later_blocks() IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.FinishBlockCommit(TrieType.State, 3, a); - trieStore.FinishBlockCommit(TrieType.State, 4, a); - trieStore.FinishBlockCommit(TrieType.State, 5, a); - trieStore.FinishBlockCommit(TrieType.State, 6, a); - trieStore.CommitNode(7, new NodeCommitInfo(b, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 7, b); - trieStore.FinishBlockCommit(TrieType.State, 8, b); + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, a)) { } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 7, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(b)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, a)) { } nodeStorage.Get(null, TreePath.Empty, a.Keccak).Should().NotBeNull(); fullTrieStore.IsNodeCached(null, TreePath.Empty, a.Keccak).Should().BeTrue(); @@ -451,17 +503,21 @@ public void Will_get_dropped_on_snapshot_if_it_was_a_transient_node() IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.CommitNode(3, new NodeCommitInfo(b, TreePath.Empty)); // <- new root - trieStore.FinishBlockCommit(TrieType.State, 3, b); - trieStore.FinishBlockCommit(TrieType.State, 4, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 5, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 6, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 7, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 8, b); // should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 3, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(b)); // <- new root + } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, b)) { } // should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, b)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, b)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 7, b)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, b)) { } memDb[a.Keccak!.Bytes].Should().BeNull(); fullTrieStore.IsNodeCached(null, TreePath.Empty, a.Keccak).Should().BeTrue(); @@ -544,21 +600,24 @@ public void Will_store_storage_on_snapshot() persistenceStrategy: new ConstantInterval(4)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - fullTrieStore.GetTrieStore(TestItem.KeccakA) - .CommitNode(1, new NodeCommitInfo(storage1, TreePath.Empty)); - fullTrieStore.GetTrieStore(TestItem.KeccakA) - .FinishBlockCommit(TrieType.Storage, 1, storage1); - trieStore.FinishBlockCommit(TrieType.Storage, 1, storage1); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.FinishBlockCommit(TrieType.State, 3, a); - trieStore.FinishBlockCommit(TrieType.State, 4, a); - trieStore.FinishBlockCommit(TrieType.State, 5, a); - trieStore.FinishBlockCommit(TrieType.State, 6, a); - trieStore.FinishBlockCommit(TrieType.State, 7, a); - trieStore.FinishBlockCommit(TrieType.State, 8, a); + + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + using (ICommitter committer = fullTrieStore.GetTrieStore(TestItem.KeccakA).BeginCommit(TrieType.Storage, 1, storage1)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(storage1)); + } + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1, a)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + } + + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 7, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, a)) { } asStorage.Get(null, TreePath.Empty, a.Keccak).Should().NotBeNull(); asStorage.Get(TestItem.KeccakA, TreePath.Empty, storage1.Keccak).Should().NotBeNull(); @@ -591,19 +650,27 @@ public void Will_drop_transient_storage() IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.CommitNode(1, new NodeCommitInfo(storage1, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.Storage, 1, storage1); - trieStore.FinishBlockCommit(TrieType.State, 1, a); - trieStore.FinishBlockCommit(TrieType.State, 2, a); - trieStore.CommitNode(3, new NodeCommitInfo(b, TreePath.Empty)); // <- new root - trieStore.FinishBlockCommit(TrieType.State, 3, b); - trieStore.FinishBlockCommit(TrieType.State, 4, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 5, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 6, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 7, b); // should be 'a' to test properly - trieStore.FinishBlockCommit(TrieType.State, 8, b); // should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + + using (ICommitter committer = trieStore.BeginCommit(TrieType.Storage, 1, storage1)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(storage1)); + } + + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 1, a)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, a)) { } + + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 2, b)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(b)); // <- new root + } + + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, b)) { } // Should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, b)) { } // Should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, b)) { } // Should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 7, b)) { } // Should be 'a' to test properly + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, b)) { } // Should be 'a' to test properly memDb[a.Keccak!.Bytes].Should().BeNull(); memDb[storage1.Keccak!.Bytes].Should().BeNull(); @@ -653,27 +720,32 @@ public void Will_combine_same_storage() IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.FinishBlockCommit(TrieType.State, 0, null); + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 0, null)) { } + + using (ICommitter committer = fullTrieStore.GetTrieStore(new Hash256(Nibbles.ToBytes(storage1Nib))).BeginCommit(TrieType.Storage, 1, storage1)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(storage1)); + } - IScopedTrieStore storageTrieStore = fullTrieStore.GetTrieStore(new Hash256(Nibbles.ToBytes(storage1Nib))); - storageTrieStore.CommitNode(1, new NodeCommitInfo(storage1, TreePath.Empty)); - storageTrieStore.FinishBlockCommit(TrieType.Storage, 1, storage1); + using (ICommitter committer = fullTrieStore.GetTrieStore(new Hash256(Nibbles.ToBytes(storage2Nib))).BeginCommit(TrieType.Storage, 1, storage2)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(storage2)); + } - storageTrieStore = fullTrieStore.GetTrieStore(new Hash256(Nibbles.ToBytes(storage2Nib))); - storageTrieStore.CommitNode(1, new NodeCommitInfo(storage2, TreePath.Empty)); - storageTrieStore.FinishBlockCommit(TrieType.Storage, 1, storage2); + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, 1, branch)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(a)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(b)); + committer.CommitNode(ref emptyPath, new NodeCommitInfo(branch)); + } - trieStore.CommitNode(1, new NodeCommitInfo(a, TreePath.Empty)); - trieStore.CommitNode(1, new NodeCommitInfo(b, TreePath.Empty)); - trieStore.CommitNode(1, new NodeCommitInfo(branch, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 1, branch); - trieStore.FinishBlockCommit(TrieType.State, 2, branch); - trieStore.FinishBlockCommit(TrieType.State, 3, branch); - trieStore.FinishBlockCommit(TrieType.State, 4, branch); - trieStore.FinishBlockCommit(TrieType.State, 5, branch); - trieStore.FinishBlockCommit(TrieType.State, 6, branch); - trieStore.FinishBlockCommit(TrieType.State, 7, branch); - trieStore.FinishBlockCommit(TrieType.State, 8, branch); + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 2, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 3, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 4, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 5, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 6, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 7, branch)) { } + using (ICommitter _ = trieStore.BeginCommit(TrieType.State, 8, branch)) { } storage.Get(null, TreePath.FromNibble(new byte[] { 0 }), a.Keccak).Should().NotBeNull(); storage.Get(new Hash256(Nibbles.ToBytes(storage1Nib)), TreePath.Empty, storage1.Keccak).Should().NotBeNull(); @@ -702,7 +774,10 @@ public async Task Read_only_trie_store_is_allowing_many_thread_to_work_with_the_ IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); TreePath emptyPath = TreePath.Empty; trieNode.ResolveKey(trieStore, ref emptyPath, false); - trieStore.CommitNode(1, new NodeCommitInfo(trieNode, TreePath.Empty)); + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 0, trieNode)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(trieNode)); + } if (beThreadSafe) { @@ -755,8 +830,12 @@ public void ReadOnly_store_returns_copies(bool pruning) using TrieStore fullTrieStore = CreateTrieStore(pruningStrategy: new TestPruningStrategy(pruning)); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); - trieStore.CommitNode(0, new NodeCommitInfo(node, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 0, node); + + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 0, node)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(node)); + } + var originalNode = trieStore.FindCachedOrUnknown(TreePath.Empty, node.Keccak); IReadOnlyTrieStore readOnlyTrieStore = fullTrieStore.AsReadOnly(); @@ -807,12 +886,15 @@ public async Task Will_RemovePastKeys_OnSnapshot() persistenceStrategy: No.Persistence); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); + TreePath emptyPath = TreePath.Empty; for (int i = 0; i < 64; i++) { TrieNode node = new(NodeType.Leaf, TestItem.Keccaks[i], new byte[2]); - trieStore.CommitNode(i, new NodeCommitInfo(node, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, i, node); + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, i, node)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(node)); + } // Pruning is done in background await Task.Delay(TimeSpan.FromMilliseconds(10)); @@ -842,12 +924,15 @@ public async Task Will_Trigger_ReorgBoundaryEvent_On_Prune() fullTrieStore.ReorgBoundaryReached += (sender, reached) => reorgBoundary = reached.BlockNumber; IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); + TreePath emptyPath = TreePath.Empty; for (int i = 0; i < 64; i++) { TrieNode node = new(NodeType.Leaf, TestItem.Keccaks[i], new byte[2]); - trieStore.CommitNode(i, new NodeCommitInfo(node, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, i, node); + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, i, node)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(node)); + } if (i > 4) { @@ -875,12 +960,15 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning() persistenceStrategy: isPruningPersistenceStrategy); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); + TreePath emptyPath = TreePath.Empty; for (int i = 0; i < 64; i++) { TrieNode node = new(NodeType.Leaf, TestItem.Keccaks[i], new byte[2]); - trieStore.CommitNode(i, new NodeCommitInfo(node, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, i, node); + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, i, node)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(node)); + } // Pruning is done in background await Task.Delay(TimeSpan.FromMilliseconds(10)); @@ -902,13 +990,15 @@ public async Task Will_NotRemove_ReCommittedNode() persistenceStrategy: No.Persistence); IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); + TreePath emptyPath = TreePath.Empty; for (int i = 0; i < 64; i++) { TrieNode node = new(NodeType.Leaf, TestItem.Keccaks[i % 4], new byte[2]); - trieStore.CommitNode(i, new NodeCommitInfo(node, TreePath.Empty)); - node = trieStore.FindCachedOrUnknown(TreePath.Empty, node.Keccak); - trieStore.FinishBlockCommit(TrieType.State, i, node); + using (ICommitter committer = trieStore.BeginCommit(TrieType.State, i, node)) + { + committer.CommitNode(ref emptyPath, new NodeCommitInfo(node)); + } // Pruning is done in background await Task.Delay(TimeSpan.FromMilliseconds(10)); diff --git a/src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs b/src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs index 6ee564db88a..001be11d7d7 100644 --- a/src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs +++ b/src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs @@ -933,15 +933,20 @@ public void Rlp_is_cloned_when_cloning() TreePath emptyPath = TreePath.Empty; leaf1.ResolveKey(trieStore, ref emptyPath, false); leaf1.Seal(); - trieStore.CommitNode(0, new NodeCommitInfo(leaf1, TreePath.Empty)); TrieNode leaf2 = new(NodeType.Leaf); leaf2.Key = Bytes.FromHexString("abd"); leaf2.Value = new byte[222]; leaf2.ResolveKey(trieStore, ref emptyPath, false); leaf2.Seal(); - trieStore.CommitNode(0, new NodeCommitInfo(leaf2, TreePath.Empty)); - trieStore.FinishBlockCommit(TrieType.State, 0, leaf2); + + TreePath path = TreePath.Empty; + + using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 0, leaf2)) + { + committer.CommitNode(ref path, new NodeCommitInfo(leaf1)); + committer.CommitNode(ref path, new NodeCommitInfo(leaf2)); + } TrieNode trieNode = new(NodeType.Branch); trieNode.SetChild(1, leaf1); diff --git a/src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs b/src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs index ef0706a09cf..99bff648354 100644 --- a/src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs +++ b/src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs @@ -142,11 +142,12 @@ public void Start( try { + // TODO: .Net 9 stackalloc Task[]? tasks = Enumerable.Range(0, trieVisitContext.MaxDegreeOfParallelism) - .Select((_) => Task.Run(BatchedThread)) + .Select(_ => Task.Run(BatchedThread)) .ToArray(); - Task.WhenAll(tasks).Wait(); + Task.WaitAll(tasks); } catch (Exception) { diff --git a/src/Nethermind/Nethermind.Trie/CachedTrieStore.cs b/src/Nethermind/Nethermind.Trie/CachedTrieStore.cs index d9a1769c5aa..5b23ba6ad92 100644 --- a/src/Nethermind/Nethermind.Trie/CachedTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/CachedTrieStore.cs @@ -20,46 +20,25 @@ public class CachedTrieStore(IScopedTrieStore @base) : IScopedTrieStore { private readonly NonBlocking.ConcurrentDictionary<(TreePath path, Hash256 hash), TrieNode> _cachedNode = new(); - public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) - { - return _cachedNode.GetOrAdd((path, hash), (key) => @base.FindCachedOrUnknown(key.path, key.hash)); - } + public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) => + _cachedNode.GetOrAdd((path, hash), (key) => @base.FindCachedOrUnknown(key.path, key.hash)); - public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return @base.LoadRlp(in path, hash, flags); - } + public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => + @base.LoadRlp(in path, hash, flags); - public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return @base.TryLoadRlp(in path, hash, flags); - } + public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => + @base.TryLoadRlp(in path, hash, flags); - public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address) - { + public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address) => throw new InvalidOperationException("unsupported"); - } public INodeStorage.KeyScheme Scheme => @base.Scheme; - public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) - { - @base.CommitNode(blockNumber, nodeCommitInfo, writeFlags); - } + public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) => + @base.BeginCommit(trieType, blockNumber, root, writeFlags); - public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) - { - @base.FinishBlockCommit(trieType, blockNumber, root, writeFlags); - } + public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => @base.IsPersisted(in path, in keccak); - public bool IsPersisted(in TreePath path, in ValueHash256 keccak) - { - return @base.IsPersisted(in path, in keccak); - } - - public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - @base.Set(in path, in keccak, rlp); - } + public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) => @base.Set(in path, in keccak, rlp); } diff --git a/src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs b/src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs index bf85db2709d..265d81631fb 100644 --- a/src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs +++ b/src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs @@ -1,37 +1,35 @@ +using System.Diagnostics.CodeAnalysis; + namespace Nethermind.Trie { public readonly struct NodeCommitInfo { public NodeCommitInfo( - TrieNode node, - in TreePath path + TrieNode node ) { ChildPositionAtParent = 0; Node = node; - Path = path; NodeParent = null; } public NodeCommitInfo( TrieNode node, TrieNode nodeParent, - in TreePath path, int childPositionAtParent) { ChildPositionAtParent = childPositionAtParent; Node = node; - Path = path; NodeParent = nodeParent; } public TrieNode? Node { get; } - public readonly TreePath Path; public TrieNode? NodeParent { get; } public int ChildPositionAtParent { get; } + [MemberNotNullWhen(false, nameof(Node))] public bool IsEmptyBlockMarker => Node is null; public bool IsRoot => !IsEmptyBlockMarker && NodeParent is null; diff --git a/src/Nethermind/Nethermind.Trie/PatriciaTree.cs b/src/Nethermind/Nethermind.Trie/PatriciaTree.cs index 3ab8c9d1127..88b443a3983 100644 --- a/src/Nethermind/Nethermind.Trie/PatriciaTree.cs +++ b/src/Nethermind/Nethermind.Trie/PatriciaTree.cs @@ -8,13 +8,14 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Linq; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Buffers; -using Nethermind.Core.Cpu; +using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; using Nethermind.Logging; @@ -39,10 +40,6 @@ public class PatriciaTree public TrieType TrieType { get; init; } private Stack? _nodeStack; - - private ConcurrentQueue? _commitExceptions; - private ConcurrentQueue? _currentCommit; - public IScopedTrieStore TrieStore { get; } public ICappedArrayPool? _bufferPool; @@ -56,6 +53,9 @@ public class PatriciaTree public TrieNode? RootRef { get; set; } + // Used to estimate if parallelization is needed during commit + private long _writeBeforeCommit = 0; + /// /// Only used in EthereumTests /// @@ -138,37 +138,30 @@ public void Commit(long blockNumber, bool skipRoot = false, WriteFlags writeFlag ThrowReadOnlyTrieException(); } - if (RootRef is not null && RootRef.IsDirty) + int maxLevelForConcurrentCommit = _writeBeforeCommit switch { - Commit(new NodeCommitInfo(RootRef, TreePath.Empty), skipSelf: skipRoot); - while (TryDequeueCommit(out NodeCommitInfo node)) - { - if (_logger.IsTrace) Trace(blockNumber, node); - TrieStore.CommitNode(blockNumber, node, writeFlags: writeFlags); - } - - // reset objects - TreePath path = TreePath.Empty; - RootRef!.ResolveKey(TrieStore, ref path, true, bufferPool: _bufferPool); - SetRootHash(RootRef.Keccak!, true); - } - - TrieStore.FinishBlockCommit(TrieType, blockNumber, RootRef, writeFlags); + > 64 * 16 => 1, // we separate at two top levels + > 64 => 0, // we separate at top level + _ => -1 + }; - if (_logger.IsDebug) Debug(blockNumber); + _writeBeforeCommit = 0; - bool TryDequeueCommit(out NodeCommitInfo value) + using (ICommitter committer = TrieStore.BeginCommit(TrieType, blockNumber, RootRef, writeFlags)) { - Unsafe.SkipInit(out value); - return _currentCommit?.TryDequeue(out value) ?? false; - } + if (RootRef is not null && RootRef.IsDirty) + { + TreePath path = TreePath.Empty; + Commit(committer, ref path, new NodeCommitInfo(RootRef), skipSelf: skipRoot, maxLevelForConcurrentCommit: maxLevelForConcurrentCommit); - [MethodImpl(MethodImplOptions.NoInlining)] - void Trace(long blockNumber, in NodeCommitInfo node) - { - _logger.Trace($"Committing {node} in {blockNumber}"); + // reset objects + RootRef!.ResolveKey(TrieStore, ref path, true, bufferPool: _bufferPool); + SetRootHash(RootRef.Keccak!, true); + } } + if (_logger.IsDebug) Debug(blockNumber); + [MethodImpl(MethodImplOptions.NoInlining)] void Debug(long blockNumber) { @@ -176,7 +169,7 @@ void Debug(long blockNumber) } } - private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) + private void Commit(ICommitter committer, ref TreePath path, NodeCommitInfo nodeCommitInfo, int maxLevelForConcurrentCommit, bool skipSelf = false) { if (!_allowCommits) { @@ -184,18 +177,18 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) } TrieNode node = nodeCommitInfo.Node; - TreePath path = nodeCommitInfo.Path; if (node!.IsBranch) { - // idea from EthereumJ - testing parallel branches - if (!_parallelBranches || !nodeCommitInfo.IsRoot) + if (path.Length > maxLevelForConcurrentCommit) { for (int i = 0; i < 16; i++) { if (node.IsChildDirty(i)) { - TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, i); - Commit(new NodeCommitInfo(node.GetChildWithChildPath(TrieStore, ref childPath, i)!, node, childPath, i)); + path.AppendMut(i); + TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref path, i); + Commit(committer, ref path, new NodeCommitInfo(childNode!, node, i), maxLevelForConcurrentCommit); + path.TruncateOne(); } else { @@ -208,13 +201,33 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) } else { - List nodesToCommit = new(16); + Task CreateTaskForPath(TreePath childPath, TrieNode childNode, int idx) => Task.Run(() => + { + Commit(committer, ref childPath, new NodeCommitInfo(childNode!, node, idx), maxLevelForConcurrentCommit); + committer.ReturnConcurrencyQuota(); + }); + + // TODO: .Net 9 stackalloc + ArrayPoolList? childTasks = null; + for (int i = 0; i < 16; i++) { if (node.IsChildDirty(i)) { - TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, i); - nodesToCommit.Add(new NodeCommitInfo(node.GetChildWithChildPath(TrieStore, ref childPath, i)!, node, childPath, i)); + if (i < 15 && committer.CanSpawnTask()) + { + childTasks ??= new ArrayPoolList(15); + TreePath childPath = path.Append(i); + TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref childPath, i); + childTasks.Add(CreateTaskForPath(childPath, childNode, i)); + } + else + { + path.AppendMut(i); + TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref path, i); + Commit(committer, ref path, new NodeCommitInfo(childNode!, node, i), maxLevelForConcurrentCommit); + path.TruncateOne(); + } } else { @@ -225,39 +238,17 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) } } - if (nodesToCommit.Count >= 4) - { - ClearExceptions(); - Parallel.For(0, nodesToCommit.Count, RuntimeInformation.ParallelOptionsLogicalCores, i => - { - try - { - Commit(nodesToCommit[i]); - } - catch (Exception e) - { - AddException(e); - } - }); - - if (WereExceptions()) - { - ThrowAggregateExceptions(); - } - } - else + if (childTasks is not null) { - for (int i = 0; i < nodesToCommit.Count; i++) - { - Commit(nodesToCommit[i]); - } + Task.WaitAll(childTasks.ToArray()); + childTasks.Dispose(); } } } else if (node.NodeType == NodeType.Extension) { - TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, 0); - TrieNode extensionChild = node.GetChildWithChildPath(TrieStore, ref childPath, 0); + int previousPathLength = node.AppendChildPath(ref path, 0); + TrieNode extensionChild = node.GetChildWithChildPath(TrieStore, ref path, 0); if (extensionChild is null) { ThrowInvalidExtension(); @@ -265,12 +256,13 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) if (extensionChild.IsDirty) { - Commit(new NodeCommitInfo(extensionChild, node, childPath, 0)); + Commit(committer, ref path, new NodeCommitInfo(extensionChild, node, 0), maxLevelForConcurrentCommit); } else { if (_logger.IsTrace) TraceExtensionSkip(extensionChild); } + path.TruncateMut(previousPathLength); } node.ResolveKey(TrieStore, ref path, nodeCommitInfo.IsRoot, bufferPool: _bufferPool); @@ -280,7 +272,7 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) { if (!skipSelf) { - EnqueueCommit(nodeCommitInfo); + committer.CommitNode(ref path, nodeCommitInfo); } } else @@ -288,37 +280,6 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false) if (_logger.IsTrace) TraceSkipInlineNode(node); } - void EnqueueCommit(in NodeCommitInfo value) - { - ConcurrentQueue queue = Volatile.Read(ref _currentCommit); - // Allocate queue if first commit made - queue ??= CreateQueue(ref _currentCommit); - queue.Enqueue(value); - } - - void ClearExceptions() => _commitExceptions?.Clear(); - bool WereExceptions() => _commitExceptions?.IsEmpty == false; - - void AddException(Exception value) - { - ConcurrentQueue queue = Volatile.Read(ref _commitExceptions); - // Allocate queue if first exception thrown - queue ??= CreateQueue(ref _commitExceptions); - queue.Enqueue(value); - } - - [MethodImpl(MethodImplOptions.NoInlining)] - ConcurrentQueue CreateQueue(ref ConcurrentQueue queueRef) - { - ConcurrentQueue queue = new(); - ConcurrentQueue current = Interlocked.CompareExchange(ref queueRef, queue, null); - return (current is null) ? queue : current; - } - - [DoesNotReturn] - [StackTraceHidden] - void ThrowAggregateExceptions() => throw new AggregateException(_commitExceptions); - [DoesNotReturn] [StackTraceHidden] static void ThrowInvalidExtension() => throw new InvalidOperationException("An attempt to store an extension without a child."); @@ -493,6 +454,8 @@ public virtual void Set(ReadOnlySpan rawKey, in CappedArray value) ThrowNonConcurrentWrites(); } + _writeBeforeCommit++; + try { int nibblesCount = 2 * rawKey.Length; diff --git a/src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs b/src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs index efb9d242748..82d83a3e8f3 100644 --- a/src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs @@ -34,14 +34,9 @@ public void Dispose() _inner.Dispose(); } - public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) + public ICommitter BeginCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags) { - _inner.CommitNode(blockNumber, address, in nodeCommitInfo, writeFlags); - } - - public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) - { - _inner.FinishBlockCommit(trieType, blockNumber, address, root, writeFlags); + return _inner.BeginCommit(trieType, blockNumber, address, root, writeFlags); } public bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak) diff --git a/src/Nethermind/Nethermind.Trie/Pruning/IScopedTrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/IScopedTrieStore.cs index f8d2384fd0e..3dec5d49e25 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/IScopedTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/IScopedTrieStore.cs @@ -13,11 +13,7 @@ namespace Nethermind.Trie.Pruning; /// public interface IScopedTrieStore : ITrieNodeResolver { - // TODO: Commit and FinishBlockCommit is unnecessary. Geth just compile the changes and return it in a batch, - // which get committed in a single call. - void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None); - - void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None); + ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None); // Only used by snap provider, so ValueHash instead of Hash bool IsPersisted(in TreePath path, in ValueHash256 keccak); @@ -25,3 +21,11 @@ public interface IScopedTrieStore : ITrieNodeResolver // Used for trie node recovery void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp); } + +public interface ICommitter : IDisposable +{ + void CommitNode(ref TreePath path, NodeCommitInfo nodeCommitInfo); + + bool CanSpawnTask() => false; + void ReturnConcurrencyQuota() { } +} diff --git a/src/Nethermind/Nethermind.Trie/Pruning/ITrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/ITrieStore.cs index a7ec0699ec1..f9f2f59d55e 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/ITrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/ITrieStore.cs @@ -13,10 +13,6 @@ namespace Nethermind.Trie.Pruning /// public interface ITrieStore : IDisposable { - void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None); - - void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None); - bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak); IReadOnlyTrieStore AsReadOnly(INodeStorage? keyValueStore = null); @@ -37,6 +33,7 @@ public interface ITrieStore : IDisposable byte[]? LoadRlp(Hash256? address, in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None); byte[]? TryLoadRlp(Hash256? address, in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None); INodeStorage.KeyScheme Scheme { get; } + ICommitter BeginCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags); } public interface IPruningTrieStore diff --git a/src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs index 4982d7166b8..585050f7150 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs @@ -15,27 +15,27 @@ private NullTrieStore() { } public static NullTrieStore Instance { get; } = new(); - public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags flags = WriteFlags.None) { } - - public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags flags = WriteFlags.None) { } - public TrieNode FindCachedOrUnknown(in TreePath treePath, Hash256 hash) => new(NodeType.Unknown, hash); - public byte[] LoadRlp(in TreePath treePath, Hash256 hash, ReadFlags flags = ReadFlags.None) => Array.Empty(); + public byte[] LoadRlp(in TreePath treePath, Hash256 hash, ReadFlags flags = ReadFlags.None) => []; + + public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => []; - public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => Array.Empty(); + public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) => new NullCommitter(); public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => true; - public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - } + public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) { } - public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256 storageRoot) - { - return this; - } + public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256 storageRoot) => this; public INodeStorage.KeyScheme Scheme => INodeStorage.KeyScheme.HalfPath; + + internal class NullCommitter : ICommitter + { + public void Dispose() { } + + public void CommitNode(ref TreePath path, NodeCommitInfo nodeCommitInfo) { } + } } } diff --git a/src/Nethermind/Nethermind.Trie/Pruning/ReadOnlyTrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/ReadOnlyTrieStore.cs index 3152aad59cd..4bb2c8d537f 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/ReadOnlyTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/ReadOnlyTrieStore.cs @@ -12,36 +12,25 @@ namespace Nethermind.Trie.Pruning /// /// Safe to be reused for the same wrapped store. /// - public class ReadOnlyTrieStore : IReadOnlyTrieStore + public class ReadOnlyTrieStore(TrieStore trieStore, INodeStorage? readOnlyStore) : IReadOnlyTrieStore { - private readonly TrieStore _trieStore; - private readonly INodeStorage? _readOnlyStore; + private readonly TrieStore _trieStore = trieStore ?? throw new ArgumentNullException(nameof(trieStore)); public INodeStorage.KeyScheme Scheme => _trieStore.Scheme; - public ReadOnlyTrieStore(TrieStore trieStore, INodeStorage? readOnlyStore) - { - _trieStore = trieStore ?? throw new ArgumentNullException(nameof(trieStore)); - _readOnlyStore = readOnlyStore; - } - public TrieNode FindCachedOrUnknown(Hash256? address, in TreePath treePath, Hash256 hash) => _trieStore.FindCachedOrUnknown(address, treePath, hash, true); public byte[] LoadRlp(Hash256? address, in TreePath treePath, Hash256 hash, ReadFlags flags) => - _trieStore.LoadRlp(address, treePath, hash, _readOnlyStore, flags); + _trieStore.LoadRlp(address, treePath, hash, readOnlyStore, flags); public byte[]? TryLoadRlp(Hash256? address, in TreePath treePath, Hash256 hash, ReadFlags flags) => - _trieStore.TryLoadRlp(address, treePath, hash, _readOnlyStore, flags); + _trieStore.TryLoadRlp(address, treePath, hash, readOnlyStore, flags); public bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak) => _trieStore.IsPersisted(address, path, keccak); - public IReadOnlyTrieStore AsReadOnly(INodeStorage nodeStore) - { - return new ReadOnlyTrieStore(_trieStore, nodeStore); - } - - public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags flags = WriteFlags.None) { } + public IReadOnlyTrieStore AsReadOnly(INodeStorage nodeStore) => new ReadOnlyTrieStore(_trieStore, nodeStore); - public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags flags = WriteFlags.None) { } + public ICommitter BeginCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags) => + new NullTrieStore.NullCommitter(); public event EventHandler ReorgBoundaryReached { @@ -51,72 +40,36 @@ public event EventHandler ReorgBoundaryReached public IReadOnlyKeyValueStore TrieNodeRlpStore => _trieStore.TrieNodeRlpStore; - public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - } + public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, byte[] rlp) { } - public IScopedTrieStore GetTrieStore(Hash256? address) - { - return new ScopedReadOnlyTrieStore(this, address); - } + public IScopedTrieStore GetTrieStore(Hash256? address) => new ScopedReadOnlyTrieStore(this, address); - public bool HasRoot(Hash256 stateRoot) - { - return _trieStore.HasRoot(stateRoot); - } + public bool HasRoot(Hash256 stateRoot) => _trieStore.HasRoot(stateRoot); public void Dispose() { } - private class ScopedReadOnlyTrieStore : IScopedTrieStore + private class ScopedReadOnlyTrieStore(ReadOnlyTrieStore fullTrieStore, Hash256? address) : IScopedTrieStore { - private readonly ReadOnlyTrieStore _trieStoreImplementation; - private readonly Hash256? _address; - - public ScopedReadOnlyTrieStore(ReadOnlyTrieStore fullTrieStore, Hash256? address) - { - _trieStoreImplementation = fullTrieStore; - _address = address; - } - - public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) - { - return _trieStoreImplementation.FindCachedOrUnknown(_address, path, hash); - } - - public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return _trieStoreImplementation.LoadRlp(_address, path, hash, flags); - } - - public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return _trieStoreImplementation.TryLoadRlp(_address, path, hash, flags); - } - - public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address) - { - if (address == _address) return this; - return new ScopedReadOnlyTrieStore(_trieStoreImplementation, address); - } - - public INodeStorage.KeyScheme Scheme => _trieStoreImplementation.Scheme; - - public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) - { - } - - public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) - { - } - - public bool IsPersisted(in TreePath path, in ValueHash256 keccak) - { - return _trieStoreImplementation.IsPersisted(_address, path, in keccak); - } - - public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - } + public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) => + fullTrieStore.FindCachedOrUnknown(address, path, hash); + + public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => + fullTrieStore.LoadRlp(address, path, hash, flags); + + public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => fullTrieStore.TryLoadRlp(address, path, hash, flags); + + public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address1) => + address1 == address ? this : new ScopedReadOnlyTrieStore(fullTrieStore, address1); + + public INodeStorage.KeyScheme Scheme => fullTrieStore.Scheme; + + public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) => + new NullTrieStore.NullCommitter(); + + public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => + fullTrieStore.IsPersisted(address, path, in keccak); + + public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) { } } } } diff --git a/src/Nethermind/Nethermind.Trie/Pruning/ScopedTrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/ScopedTrieStore.cs index 02dc3e4d85e..650ee399086 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/ScopedTrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/ScopedTrieStore.cs @@ -6,57 +6,28 @@ namespace Nethermind.Trie.Pruning; -public sealed class ScopedTrieStore : IScopedTrieStore +public sealed class ScopedTrieStore(ITrieStore fullTrieStore, Hash256? address) : IScopedTrieStore { - private readonly ITrieStore _trieStoreImplementation; - private readonly Hash256? _address; - - public ScopedTrieStore(ITrieStore fullTrieStore, Hash256? address) - { - _trieStoreImplementation = fullTrieStore; - _address = address; - } - - public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) - { - return _trieStoreImplementation.FindCachedOrUnknown(_address, path, hash); - } - - public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return _trieStoreImplementation.LoadRlp(_address, path, hash, flags); - } - - public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) - { - return _trieStoreImplementation.TryLoadRlp(_address, path, hash, flags); - } - - public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address) - { - if (address == _address) return this; - return new ScopedTrieStore(_trieStoreImplementation, address); - } - - public INodeStorage.KeyScheme Scheme => _trieStoreImplementation.Scheme; - - public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) - { - _trieStoreImplementation.CommitNode(blockNumber, _address, nodeCommitInfo, writeFlags); - } - - public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) - { - _trieStoreImplementation.FinishBlockCommit(trieType, blockNumber, _address, root, writeFlags); - } - - public bool IsPersisted(in TreePath path, in ValueHash256 keccak) - { - return _trieStoreImplementation.IsPersisted(_address, path, in keccak); - } - - public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - _trieStoreImplementation.Set(_address, path, keccak, rlp); - } + public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) => + fullTrieStore.FindCachedOrUnknown(address, path, hash); + + public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => + fullTrieStore.LoadRlp(address, path, hash, flags); + + public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) => + fullTrieStore.TryLoadRlp(address, path, hash, flags); + + public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address1) => + address1 == address ? this : new ScopedTrieStore(fullTrieStore, address1); + + public INodeStorage.KeyScheme Scheme => fullTrieStore.Scheme; + + public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) => + fullTrieStore.BeginCommit(trieType, blockNumber, address, root, writeFlags); + + public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => + fullTrieStore.IsPersisted(address, path, in keccak); + + public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) => + fullTrieStore.Set(address, path, keccak, rlp); } diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TreePath.cs b/src/Nethermind/Nethermind.Trie/Pruning/TreePath.cs index ba768f5844a..36306f16241 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TreePath.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TreePath.cs @@ -27,7 +27,7 @@ public struct TreePath : IEquatable public const int MemorySize = 36; public ValueHash256 Path; - public static TreePath Empty => new TreePath(); + public static TreePath Empty => new(); public readonly Span Span => Path.BytesAsSpan; @@ -38,7 +38,7 @@ public TreePath(in ValueHash256 path, int length) Length = length; } - public int Length { get; internal set; } + public int Length { get; private set; } public static TreePath FromPath(ReadOnlySpan pathHash) { @@ -46,7 +46,7 @@ public static TreePath FromPath(ReadOnlySpan pathHash) if (pathHash.Length == 32) return new TreePath(new ValueHash256(pathHash), 64); // Some of the test passes path directly to PatriciaTrie, but its not 32 byte. - TreePath newTreePath = new TreePath(); + TreePath newTreePath = new(); pathHash.CopyTo(newTreePath.Span); newTreePath.Length = pathHash.Length * 2; return newTreePath; diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs index 50d71a17d5f..6ae6ca164fc 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs @@ -149,37 +149,33 @@ public int CachedNodesCount } } - public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) + private void CommitNode(long blockNumber, Hash256? address, ref TreePath path, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) { - ArgumentOutOfRangeException.ThrowIfNegative(blockNumber); - EnsureCommitSetExistsForBlock(blockNumber); - if (_logger.IsTrace) Trace(blockNumber, in nodeCommitInfo); if (!nodeCommitInfo.IsEmptyBlockMarker && !nodeCommitInfo.Node.IsBoundaryProofNode) { - TrieNode node = nodeCommitInfo.Node!; + TrieNode node = nodeCommitInfo.Node; - if (node!.Keccak is null) + if (node.Keccak is null) { ThrowUnknownHash(node); } - if (CurrentPackage is null) + if (node.LastSeen >= 0) { - ThrowUnknownPackage(blockNumber, node); + ThrowNodeHasBeenSeen(blockNumber, node); } - if (node!.LastSeen >= 0) + if (_pruningStrategy.PruningEnabled) { - ThrowNodeHasBeenSeen(blockNumber, node); + node = SaveOrReplaceInDirtyNodesCache(address, ref path, nodeCommitInfo, node); } - node = SaveOrReplaceInDirtyNodesCache(address, nodeCommitInfo, node); node.LastSeen = Math.Max(blockNumber, node.LastSeen); if (!_pruningStrategy.PruningEnabled) { - PersistNode(address, nodeCommitInfo.Path, node, blockNumber, writeFlags); + PersistNode(address, path, node, blockNumber, writeFlags); } CommittedNodesCount++; @@ -195,10 +191,6 @@ void Trace(long blockNumber, in NodeCommitInfo nodeCommitInfo) [StackTraceHidden] static void ThrowUnknownHash(TrieNode node) => throw new TrieStoreException($"The hash of {node} should be known at the time of committing."); - [DoesNotReturn] - [StackTraceHidden] - static void ThrowUnknownPackage(long blockNumber, TrieNode node) => throw new TrieStoreException($"{nameof(CurrentPackage)} is NULL when committing {node} at {blockNumber}."); - [DoesNotReturn] [StackTraceHidden] static void ThrowNodeHasBeenSeen(long blockNumber, TrieNode node) => throw new TrieStoreException($"{nameof(TrieNode.LastSeen)} set on {node} committed at {blockNumber}."); @@ -239,37 +231,33 @@ private TrieNode DirtyNodesFromCachedRlpOrUnknown(TrieStoreDirtyNodesCache.Key k private TrieNode DirtyNodesFindCachedOrUnknown(TrieStoreDirtyNodesCache.Key key) => GetDirtyNodeShard(key).FindCachedOrUnknown(key); - private TrieNode SaveOrReplaceInDirtyNodesCache(Hash256? address, NodeCommitInfo nodeCommitInfo, TrieNode node) + private TrieNode SaveOrReplaceInDirtyNodesCache(Hash256? address, ref TreePath path, NodeCommitInfo nodeCommitInfo, TrieNode node) { - if (_pruningStrategy.PruningEnabled) + TrieStoreDirtyNodesCache.Key key = new(address, path, node.Keccak); + if (DirtyNodesTryGetValue(in key, out TrieNode cachedNodeCopy)) { - TrieStoreDirtyNodesCache.Key key = new TrieStoreDirtyNodesCache.Key(address, nodeCommitInfo.Path, node.Keccak); - if (DirtyNodesTryGetValue(in key, out TrieNode cachedNodeCopy)) + Metrics.LoadedFromCacheNodesCount++; + if (!ReferenceEquals(cachedNodeCopy, node)) { - Metrics.LoadedFromCacheNodesCount++; - if (!ReferenceEquals(cachedNodeCopy, node)) + if (_logger.IsTrace) Trace(node, cachedNodeCopy); + cachedNodeCopy.ResolveKey(GetTrieStore(address), ref path, nodeCommitInfo.IsRoot); + if (node.Keccak != cachedNodeCopy.Keccak) { - if (_logger.IsTrace) Trace(node, cachedNodeCopy); - TreePath path = nodeCommitInfo.Path; - cachedNodeCopy.ResolveKey(GetTrieStore(address), ref path, nodeCommitInfo.IsRoot); - if (node.Keccak != cachedNodeCopy.Keccak) - { - ThrowNodeIsNotSame(node, cachedNodeCopy); - } - - if (!nodeCommitInfo.IsRoot) - { - nodeCommitInfo.NodeParent!.ReplaceChildRef(nodeCommitInfo.ChildPositionAtParent, cachedNodeCopy); - } + ThrowNodeIsNotSame(node, cachedNodeCopy); + } - node = cachedNodeCopy; - Metrics.ReplacedNodesCount++; + if (!nodeCommitInfo.IsRoot) + { + nodeCommitInfo.NodeParent!.ReplaceChildRef(nodeCommitInfo.ChildPositionAtParent, cachedNodeCopy); } + + node = cachedNodeCopy; + Metrics.ReplacedNodesCount++; } - else - { - DirtyNodesSaveInCache(key, node); - } + } + else + { + DirtyNodesSaveInCache(key, node); } return node; @@ -286,11 +274,20 @@ static void ThrowNodeIsNotSame(TrieNode node, TrieNode cachedNodeCopy) => throw new InvalidOperationException($"The hash of replacement node {cachedNodeCopy} is not the same as the original {node}."); } - public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) + public ICommitter BeginCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags) { ArgumentOutOfRangeException.ThrowIfNegative(blockNumber); EnsureCommitSetExistsForBlock(blockNumber); + int concurrency = _pruningStrategy.PruningEnabled + ? Environment.ProcessorCount + : 0; // The write batch when pruning is not enabled is not concurrent safe + + return new TrieStoreCommitter(this, trieType, blockNumber, address, root, writeFlags, concurrency); + } + + private void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) + { try { if (trieType == TrieType.State) // storage tries happen before state commits @@ -1118,6 +1115,50 @@ public bool HasRoot(Hash256 stateRoot) return true; } + private class TrieStoreCommitter( + TrieStore trieStore, + TrieType trieType, + long blockNumber, + Hash256? address, + TrieNode? root, + WriteFlags writeFlags, + int concurrency + ) : ICommitter + { + private readonly bool _needToResetRoot = root is not null && root.IsDirty; + private int _concurrency = concurrency; + private TrieNode? _root = root; + + public void Dispose() + { + if (_needToResetRoot) + { + // During commit it PatriciaTrie, the root may get resolved to an existing node (same keccak). + // This ensure that the root that we use here is the same. + _root = trieStore.FindCachedOrUnknown(address, TreePath.Empty, _root?.Keccak); + } + + trieStore.FinishBlockCommit(trieType, blockNumber, address, _root, writeFlags); + } + + public void CommitNode(ref TreePath path, NodeCommitInfo nodeCommitInfo) => + trieStore.CommitNode(blockNumber, address, ref path, nodeCommitInfo, writeFlags: writeFlags); + + public bool CanSpawnTask() + { + if (Interlocked.Decrement(ref _concurrency) >= 0) + { + return true; + } + + ReturnConcurrencyQuota(); + return false; + } + + public void ReturnConcurrencyQuota() => Interlocked.Increment(ref _concurrency); + } + + internal static class HashHelpers { private const int HashPrime = 101; diff --git a/src/Nethermind/Nethermind.Trie/TrieStoreWithReadFlags.cs b/src/Nethermind/Nethermind.Trie/TrieStoreWithReadFlags.cs index 557e519a36e..e93f1096d06 100644 --- a/src/Nethermind/Nethermind.Trie/TrieStoreWithReadFlags.cs +++ b/src/Nethermind/Nethermind.Trie/TrieStoreWithReadFlags.cs @@ -7,32 +7,15 @@ namespace Nethermind.Trie; -public class TrieStoreWithReadFlags : TrieNodeResolverWithReadFlags, IScopedTrieStore +public class TrieStoreWithReadFlags(IScopedTrieStore implementation, ReadFlags flags) + : TrieNodeResolverWithReadFlags(implementation, flags), IScopedTrieStore { - private readonly IScopedTrieStore _baseImplementation; + public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) => + implementation.BeginCommit(trieType, blockNumber, root, writeFlags); - public TrieStoreWithReadFlags(IScopedTrieStore implementation, ReadFlags flags) : base(implementation, flags) - { - _baseImplementation = implementation; - } + public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => + implementation.IsPersisted(in path, in keccak); - public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None) - { - _baseImplementation.CommitNode(blockNumber, nodeCommitInfo, writeFlags); - } - - public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) - { - _baseImplementation.FinishBlockCommit(trieType, blockNumber, root, writeFlags); - } - - public bool IsPersisted(in TreePath path, in ValueHash256 keccak) - { - return _baseImplementation.IsPersisted(in path, in keccak); - } - - public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) - { - _baseImplementation.Set(in path, in keccak, rlp); - } + public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) => + implementation.Set(in path, in keccak, rlp); }