Skip to content

Commit

Permalink
Trie log prune async using TrieLogEvent (hyperledger#6394)
Browse files Browse the repository at this point in the history
* Change TrieLogPruner to implement a trieLogObserver
* Remove TrieLogPruner from TrieLogManager
* Remove NoOpTrieLogPruner now it is conditionally added as an observer
* Prune async using EthScheduler.servicesExecutor

---------
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Simon Dudley <simon.dudley@consensys.net>
Co-authored-by: Simon Dudley <simon.dudley@consensys.net>
  • Loading branch information
gfukushima authored Jan 22, 2024
1 parent 1c1f538 commit 3fc3fb1
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.hyperledger.besu.ethereum.trie.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive;
import org.hyperledger.besu.ethereum.trie.forest.pruner.MarkSweepPruner;
Expand Down Expand Up @@ -781,6 +782,15 @@ public BesuController build() {
final JsonRpcMethods additionalJsonRpcMethodFactory =
createAdditionalJsonRpcMethodFactory(protocolContext);

if (dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningEnabled()
&& DataStorageFormat.BONSAI.equals(dataStorageConfiguration.getDataStorageFormat())) {
final TrieLogManager trieLogManager =
((BonsaiWorldStateProvider) worldStateArchive).getTrieLogManager();
final TrieLogPruner trieLogPruner =
createTrieLogPruner(worldStateStorage, blockchain, scheduler);
trieLogManager.subscribe(trieLogPruner);
}

final List<Closeable> closeables = new ArrayList<>();
closeables.add(protocolContext.getWorldStateArchive());
closeables.add(storageProvider);
Expand Down Expand Up @@ -809,6 +819,26 @@ public BesuController build() {
dataStorageConfiguration);
}

private TrieLogPruner createTrieLogPruner(
final WorldStateStorage worldStateStorage,
final Blockchain blockchain,
final EthScheduler scheduler) {
final GenesisConfigOptions genesisConfigOptions = configOptionsSupplier.get();
final boolean isProofOfStake = genesisConfigOptions.getTerminalTotalDifficulty().isPresent();

final TrieLogPruner trieLogPruner =
new TrieLogPruner(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
scheduler::executeServiceTask,
dataStorageConfiguration.getUnstable().getBonsaiTrieLogRetentionThreshold(),
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningLimit(),
isProofOfStake);
trieLogPruner.initialize();

return trieLogPruner;
}

/**
* Create synchronizer synchronizer.
*
Expand Down Expand Up @@ -1069,29 +1099,13 @@ WorldStateArchive createWorldStateArchive(
final Blockchain blockchain,
final CachedMerkleTrieLoader cachedMerkleTrieLoader) {
return switch (dataStorageConfiguration.getDataStorageFormat()) {
case BONSAI -> {
final GenesisConfigOptions genesisConfigOptions = configOptionsSupplier.get();
final boolean isProofOfStake =
genesisConfigOptions.getTerminalTotalDifficulty().isPresent();
final TrieLogPruner trieLogPruner =
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningEnabled()
? new TrieLogPruner(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
dataStorageConfiguration.getUnstable().getBonsaiTrieLogRetentionThreshold(),
dataStorageConfiguration.getUnstable().getBonsaiTrieLogPruningLimit(),
isProofOfStake)
: TrieLogPruner.noOpTrieLogPruner();
trieLogPruner.initialize();
yield new BonsaiWorldStateProvider(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
Optional.of(dataStorageConfiguration.getBonsaiMaxLayersToLoad()),
cachedMerkleTrieLoader,
besuComponent.map(BesuComponent::getBesuPluginContext).orElse(null),
evmConfiguration,
trieLogPruner);
}
case BONSAI -> new BonsaiWorldStateProvider(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
Optional.of(dataStorageConfiguration.getBonsaiMaxLayersToLoad()),
cachedMerkleTrieLoader,
besuComponent.map(BesuComponent::getBesuPluginContext).orElse(null),
evmConfiguration);
case FOREST -> {
final WorldStatePreimageStorage preimageStorage =
storageProvider.createWorldStatePreimageStorage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedWorldStorageManager;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldStateUpdateAccumulator;
import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie;
Expand Down Expand Up @@ -73,18 +72,13 @@ public BonsaiWorldStateProvider(
final Optional<Long> maxLayersToLoad,
final CachedMerkleTrieLoader cachedMerkleTrieLoader,
final BesuContext pluginContext,
final EvmConfiguration evmConfiguration,
final TrieLogPruner trieLogPruner) {
final EvmConfiguration evmConfiguration) {

this.cachedWorldStorageManager = new CachedWorldStorageManager(this, worldStateStorage);
// TODO: de-dup constructors
this.trieLogManager =
new TrieLogManager(
blockchain,
worldStateStorage,
maxLayersToLoad.orElse(RETAINED_LAYERS),
pluginContext,
trieLogPruner);
blockchain, worldStateStorage, maxLayersToLoad.orElse(RETAINED_LAYERS), pluginContext);
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
this.cachedMerkleTrieLoader = cachedMerkleTrieLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class NoOpTrieLogManager extends TrieLogManager {

public NoOpTrieLogManager() {
super(null, null, 0, null, TrieLogPruner.noOpTrieLogPruner());
super(null, null, 0, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,16 @@ public class TrieLogManager {
protected final Subscribers<TrieLogEvent.TrieLogObserver> trieLogObservers = Subscribers.create();

protected final TrieLogFactory trieLogFactory;
private final TrieLogPruner trieLogPruner;

public TrieLogManager(
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad,
final BesuContext pluginContext,
final TrieLogPruner trieLogPruner) {
final BesuContext pluginContext) {
this.blockchain = blockchain;
this.rootWorldStateStorage = worldStateStorage;
this.maxLayersToLoad = maxLayersToLoad;
this.trieLogFactory = setupTrieLogFactory(pluginContext);
this.trieLogPruner = trieLogPruner;
}

public synchronized void saveTrieLog(
Expand All @@ -85,8 +82,6 @@ public synchronized void saveTrieLog(
} finally {
if (success) {
stateUpdater.commit();
trieLogPruner.addToPruneQueue(forBlockHeader.getNumber(), forBlockHeader.getBlockHash());
trieLogPruner.pruneFromQueue();
} else {
stateUpdater.rollback();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.plugin.services.trielogs.TrieLogEvent;

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;

import com.google.common.collect.ArrayListMultimap;
Expand All @@ -33,14 +35,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TrieLogPruner {
public class TrieLogPruner implements TrieLogEvent.TrieLogObserver {

private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);

private final int pruningLimit;
private final int loadingLimit;
private final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final Consumer<Runnable> executeAsync;
private final long numBlocksToRetain;
private final boolean requireFinalizedBlock;

Expand All @@ -50,11 +53,13 @@ public class TrieLogPruner {
public TrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final Consumer<Runnable> executeAsync,
final long numBlocksToRetain,
final int pruningLimit,
final boolean requireFinalizedBlock) {
this.rootWorldStateStorage = rootWorldStateStorage;
this.blockchain = blockchain;
this.executeAsync = executeAsync;
this.numBlocksToRetain = numBlocksToRetain;
this.pruningLimit = pruningLimit;
this.loadingLimit = pruningLimit; // same as pruningLimit for now
Expand Down Expand Up @@ -166,34 +171,18 @@ int pruneFromQueue() {
return wasPruned.size();
}

public static TrieLogPruner noOpTrieLogPruner() {
return new NoOpTrieLogPruner(null, null, 0, 0);
}

public static class NoOpTrieLogPruner extends TrieLogPruner {
private NoOpTrieLogPruner(
final BonsaiWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final long numBlocksToRetain,
final int pruningLimit) {
super(rootWorldStateStorage, blockchain, numBlocksToRetain, pruningLimit, true);
}

@Override
public int initialize() {
// no-op
return -1;
}

@Override
void addToPruneQueue(final long blockNumber, final Hash blockHash) {
// no-op
}

@Override
int pruneFromQueue() {
// no-op
return -1;
@Override
public void onTrieLogAdded(final TrieLogEvent event) {
if (TrieLogEvent.Type.ADDED.equals(event.getType())) {
final Hash blockHash = event.layer().getBlockHash();
final Optional<Long> blockNumber = event.layer().getBlockNumber();
blockNumber.ifPresent(
blockNum ->
executeAsync.accept(
() -> {
addToPruneQueue(blockNum, blockHash);
pruneFromQueue();
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.hyperledger.besu.ethereum.trie.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive;
import org.hyperledger.besu.ethereum.trie.forest.storage.ForestWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.forest.worldview.ForestMutableWorldState;
Expand Down Expand Up @@ -113,8 +112,7 @@ public static BonsaiWorldStateProvider createBonsaiInMemoryWorldStateArchive(
Optional.empty(),
cachedMerkleTrieLoader,
null,
evmConfiguration,
TrieLogPruner.noOpTrieLogPruner());
evmConfiguration);
}

public static MutableWorldState createInMemoryWorldState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStorageProviderBuilder;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration;
Expand Down Expand Up @@ -163,8 +162,7 @@ public void createStorage() {
Optional.of(16L),
new CachedMerkleTrieLoader(new NoOpMetricsSystem()),
null,
EvmConfiguration.DEFAULT,
TrieLogPruner.noOpTrieLogPruner());
EvmConfiguration.DEFAULT);
var ws = archive.getMutable();
genesisState.writeStateTo(ws);
protocolContext = new ProtocolContext(blockchain, archive, null, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogFactoryImpl;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogLayer;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogManager;
import org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner;
import org.hyperledger.besu.ethereum.trie.bonsai.worldview.BonsaiWorldState;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.evm.internal.EvmConfiguration;
Expand Down Expand Up @@ -127,8 +126,7 @@ storageProvider, new NoOpMetricsSystem(), DataStorageConfiguration.DEFAULT_CONFI
Optional.of(512L),
new CachedMerkleTrieLoader(new NoOpMetricsSystem()),
null,
EvmConfiguration.DEFAULT,
TrieLogPruner.noOpTrieLogPruner());
EvmConfiguration.DEFAULT);
final BlockHeader blockHeader = blockBuilder.number(0).buildHeader();
final BlockHeader chainHead = blockBuilder.number(512).buildHeader();
when(blockchain.getChainHeadHeader()).thenReturn(chainHead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.hyperledger.besu.ethereum.trie.bonsai.trielog;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.trie.bonsai.trielog.TrieLogPruner.noOpTrieLogPruner;
import static org.mockito.Mockito.spy;

import org.hyperledger.besu.datatypes.Hash;
Expand Down Expand Up @@ -57,9 +56,7 @@ class TrieLogManagerTests {

@BeforeEach
public void setup() {
trieLogManager =
new TrieLogManager(
blockchain, bonsaiWorldStateKeyValueStorage, 512, null, noOpTrieLogPruner());
trieLogManager = new TrieLogManager(blockchain, bonsaiWorldStateKeyValueStorage, 512, null);
}

@Test
Expand Down
Loading

0 comments on commit 3fc3fb1

Please sign in to comment.