From 703f4d397e9bac581ae59b84526a8859d8b6a824 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 6 Nov 2024 14:53:35 +0100 Subject: [PATCH] run `Forkchoice::onBlock` async in block importer (#8820) --- CHANGELOG.md | 1 + .../teku/beacon/sync/SyncingNodeManager.java | 1 + .../benchmarks/EpochTransitionBenchmark.java | 5 ++++ .../pegasys/teku/benchmarks/ProfilingRun.java | 5 ++++ .../teku/benchmarks/TransitionBenchmark.java | 4 +++ .../statetransition/block/BlockImporter.java | 16 ++++++++-- .../block/BlockImporterTest.java | 22 ++++++++++++++ .../block/BlockManagerTest.java | 29 +++++++++++++++---- .../beaconchain/BeaconChainController.java | 1 + 9 files changed, 76 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49d7a57742a..1e0c6f219ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Breaking Changes ### Additions and Improvements +- improve block publishing performance, especially relevant with locally produced blocks ### Bug Fixes - Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index b66f60cd0ff..b94030b4214 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -146,6 +146,7 @@ public static SyncingNodeManager create( final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, diff --git a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/EpochTransitionBenchmark.java b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/EpochTransitionBenchmark.java index c98f81fd97d..6caeac08de4 100644 --- a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/EpochTransitionBenchmark.java +++ b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/EpochTransitionBenchmark.java @@ -36,6 +36,8 @@ import tech.pegasys.teku.benchmarks.util.CustomRunner; import tech.pegasys.teku.bls.BLSKeyPair; import tech.pegasys.teku.bls.BLSSignatureVerifier; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.ssz.collections.SszMutableUInt64List; @@ -72,6 +74,7 @@ @Threads(1) @Fork(1) public class EpochTransitionBenchmark { + AsyncRunner asyncRunner; Spec spec; WeakSubjectivityValidator wsValidator; RecentChainData recentChainData; @@ -100,6 +103,7 @@ public void init() throws Exception { AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP; spec = TestSpecFactory.createMainnetAltair(); + asyncRunner = DelayedExecutorAsyncRunner.create(); String blocksFile = "/blocks/blocks_epoch_" + spec.getSlotsPerEpoch(UInt64.ZERO) @@ -131,6 +135,7 @@ public void init() throws Exception { blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, diff --git a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/ProfilingRun.java b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/ProfilingRun.java index 735a6fe25c1..4e2d92dde26 100644 --- a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/ProfilingRun.java +++ b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/ProfilingRun.java @@ -31,6 +31,8 @@ import tech.pegasys.teku.bls.BLSPublicKey; import tech.pegasys.teku.bls.BLSSignatureVerifier; import tech.pegasys.teku.bls.BLSTestUtil; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -63,6 +65,7 @@ public class ProfilingRun { private Spec spec = TestSpecFactory.createMainnetPhase0(); private final MetricsSystem metricsSystem = new StubMetricsSystem(); + private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create(); @Disabled @Test @@ -111,6 +114,7 @@ public void importBlocks() throws Exception { BeaconChainUtil.create(spec, recentChainData, validatorKeys, false); BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, @@ -203,6 +207,7 @@ public void importBlocksMemProfiling() throws Exception { metricsSystem); BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, diff --git a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/TransitionBenchmark.java b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/TransitionBenchmark.java index bdddc99d8f7..295b2062c8b 100644 --- a/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/TransitionBenchmark.java +++ b/eth-benchmark-tests/src/jmh/java/tech/pegasys/teku/benchmarks/TransitionBenchmark.java @@ -33,6 +33,8 @@ import tech.pegasys.teku.benchmarks.gen.KeyFileGenerator; import tech.pegasys.teku.bls.BLSKeyPair; import tech.pegasys.teku.bls.BLSSignatureVerifier; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -79,6 +81,7 @@ public abstract class TransitionBenchmark { public void init() throws Exception { spec = TestSpecFactory.createMainnetAltair(); AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP; + AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create(); String blocksFile = "/blocks/blocks_epoch_" @@ -109,6 +112,7 @@ public void init() throws Exception { blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java index d9e8e812088..0bae14716a2 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java @@ -20,6 +20,7 @@ import javax.annotation.CheckReturnValue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.ssz.SszList; @@ -65,13 +66,17 @@ public class BlockImporter { private final AtomicReference latestFinalizedCheckpointState = new AtomicReference<>(null); + private final AsyncRunner asyncRunner; + public BlockImporter( + final AsyncRunner asyncRunner, final Spec spec, final ReceivedBlockEventsChannel receivedBlockEventsChannelPublisher, final RecentChainData recentChainData, final ForkChoice forkChoice, final WeakSubjectivityValidator weakSubjectivityValidator, final ExecutionLayerChannel executionLayer) { + this.asyncRunner = asyncRunner; this.spec = spec; this.receivedBlockEventsChannelPublisher = receivedBlockEventsChannelPublisher; this.recentChainData = recentChainData; @@ -106,8 +111,13 @@ public SafeFuture importBlock( return validateWeakSubjectivityPeriod() .thenCompose( __ -> - forkChoice.onBlock( - block, blockImportPerformance, blockBroadcastValidator, executionLayer)) + asyncRunner.runAsync( + () -> + forkChoice.onBlock( + block, + blockImportPerformance, + blockBroadcastValidator, + executionLayer))) .thenApply( result -> { if (!result.isSuccessful()) { @@ -141,7 +151,7 @@ public SafeFuture importBlock( }); } - private SafeFuture validateWeakSubjectivityPeriod() { + private SafeFuture validateWeakSubjectivityPeriod() { return getLatestCheckpointState() .thenCombine( SafeFuture.of(() -> recentChainData.getCurrentSlot().orElseThrow()), diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockImporterTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockImporterTest.java index f02908bb7ed..cc6e36ca465 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockImporterTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockImporterTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -38,6 +39,8 @@ import tech.pegasys.teku.bls.BLSSignatureVerifier; import tech.pegasys.teku.bls.BLSTestUtil; import tech.pegasys.teku.ethereum.execution.types.Eth1Address; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; @@ -79,6 +82,7 @@ import tech.pegasys.teku.weaksubjectivity.config.WeakSubjectivityConfig; public class BlockImporterTest { + private final AsyncRunner asyncRunner = mock(AsyncRunner.class); private final Spec spec = TestSpecFactory.createMinimalPhase0(); private final SpecConfig genesisConfig = spec.getGenesisSpecConfig(); private final AttestationSchema attestationSchema = @@ -113,6 +117,7 @@ public class BlockImporterTest { private final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, @@ -133,6 +138,15 @@ public static void dispose() { @BeforeEach public void setup() { + // prepare a synchronous async runner + doAnswer( + invocation -> { + final ExceptionThrowingFutureSupplier task = invocation.getArgument(0); + return SafeFuture.completedFuture(SafeFuture.of(task.get()).join()); + }) + .when(asyncRunner) + .runAsync((ExceptionThrowingFutureSupplier) any()); + otherChain.initializeStorage(); localChain.initializeStorage(); when(weakSubjectivityValidator.isBlockValid(any(), any())).thenReturn(true); @@ -403,6 +417,7 @@ public void importBlock_weakSubjectivityFailure_wrongAncestor() throws Exception WeakSubjectivityValidator.lenient(wsConfig); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, @@ -433,6 +448,7 @@ public void importBlock_weakSubjectivityChecksPass() throws Exception { WeakSubjectivityValidator.lenient(wsConfig); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData, @@ -463,6 +479,7 @@ public void importBlock_runWSPChecks() throws Exception { storageSystem.getMetricsSystem()); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), @@ -508,6 +525,7 @@ public void importBlock_nonFinalizingChain_runWSPChecks() throws Exception { storageSystem.getMetricsSystem()); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), @@ -561,6 +579,7 @@ public void importBlock_nonFinalizingChain_skipWSPChecks() throws Exception { storageSystem.getMetricsSystem()); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), @@ -606,6 +625,7 @@ public void getLatestCheckpointState_initialCall() { storageSystem.getMetricsSystem()); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), @@ -639,6 +659,7 @@ public void getLatestCheckpointState_shouldPullUpdatedFinalizedCheckpoint() { storageSystem.getMetricsSystem()); final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), @@ -694,6 +715,7 @@ public void importBlock_validBlsToExecutionChanges() throws Exception { final BlockImporter blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, storageSystem.recentChainData(), diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java index 0753bb160d8..1095051d219 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.when; import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture; import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; -import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import static tech.pegasys.teku.spec.config.SpecConfig.GENESIS_SLOT; import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP; import static tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason.FAILED_DATA_AVAILABILITY_CHECK_INVALID; @@ -53,6 +52,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -60,6 +62,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.bls.BLSSignatureVerifier; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.SafeFutureAssert; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; @@ -109,6 +113,7 @@ @SuppressWarnings("FutureReturnValueIgnored") public class BlockManagerTest { + private final AsyncRunner asyncRunner = mock(AsyncRunner.class); private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0); private final EventLogger eventLogger = mock(EventLogger.class); private Spec spec; @@ -157,6 +162,15 @@ public static void resetSession() { @BeforeEach public void setup() { + // prepare an async runner + doAnswer( + invocation -> { + final ExceptionThrowingFutureSupplier task = invocation.getArgument(0); + return SafeFuture.of(task.get()); + }) + .when(asyncRunner) + .runAsync((ExceptionThrowingFutureSupplier) any()); + setupWithSpec(TestSpecFactory.createMinimalDeneb()); } @@ -184,6 +198,7 @@ private void setupWithSpec(final Spec spec) { this.executionLayer = spy(new ExecutionLayerChannelStub(spec, false, Optional.empty())); this.blockImporter = new BlockImporter( + asyncRunner, spec, receivedBlockEventsChannelPublisher, localRecentChainData, @@ -1196,9 +1211,13 @@ private SafeFutureAssert assertThatBlockImport(final SignedBe } private void safeJoinBlockImport(final SignedBeaconBlock block) { - safeJoin( - blockManager - .importBlock(block) - .thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult)); + try { + blockManager + .importBlock(block) + .thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult) + .get(5, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 091e0348554..d57ef3bb489 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -1235,6 +1235,7 @@ public void initBlockImporter() { LOG.debug("BeaconChainController.initBlockImporter()"); blockImporter = new BlockImporter( + beaconAsyncRunner, spec, receivedBlockEventsChannelPublisher, recentChainData,