Skip to content

Commit

Permalink
Merge branch 'master' into 8767-update-blobs-gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdi-aouadi authored Nov 6, 2024
2 parents 5396d1a + 703f4d3 commit f52d3bf
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Breaking Changes

### Additions and Improvements
- improve block publishing performance, especially relevant with locally produced blocks

### Bug Fixes
- Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public static SyncingNodeManager create(

final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import tech.pegasys.teku.benchmarks.util.CustomRunner;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.ssz.collections.SszMutableUInt64List;
Expand Down Expand Up @@ -72,6 +74,7 @@
@Threads(1)
@Fork(1)
public class EpochTransitionBenchmark {
AsyncRunner asyncRunner;
Spec spec;
WeakSubjectivityValidator wsValidator;
RecentChainData recentChainData;
Expand Down Expand Up @@ -100,6 +103,7 @@ public void init() throws Exception {
AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP;

spec = TestSpecFactory.createMainnetAltair();
asyncRunner = DelayedExecutorAsyncRunner.create();
String blocksFile =
"/blocks/blocks_epoch_"
+ spec.getSlotsPerEpoch(UInt64.ZERO)
Expand Down Expand Up @@ -131,6 +135,7 @@ public void init() throws Exception {

blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.bls.BLSTestUtil;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class ProfilingRun {
private Spec spec = TestSpecFactory.createMainnetPhase0();

private final MetricsSystem metricsSystem = new StubMetricsSystem();
private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

@Disabled
@Test
Expand Down Expand Up @@ -111,6 +114,7 @@ public void importBlocks() throws Exception {
BeaconChainUtil.create(spec, recentChainData, validatorKeys, false);
BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down Expand Up @@ -203,6 +207,7 @@ public void importBlocksMemProfiling() throws Exception {
metricsSystem);
BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import tech.pegasys.teku.benchmarks.gen.KeyFileGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -79,6 +81,7 @@ public abstract class TransitionBenchmark {
public void init() throws Exception {
spec = TestSpecFactory.createMainnetAltair();
AbstractBlockProcessor.depositSignatureVerifier = BLSSignatureVerifier.NO_OP;
AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

String blocksFile =
"/blocks/blocks_epoch_"
Expand Down Expand Up @@ -109,6 +112,7 @@ public void init() throws Exception {

blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.annotation.CheckReturnValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.ssz.SszList;
Expand Down Expand Up @@ -65,13 +66,17 @@ public class BlockImporter {
private final AtomicReference<CheckpointState> latestFinalizedCheckpointState =
new AtomicReference<>(null);

private final AsyncRunner asyncRunner;

public BlockImporter(
final AsyncRunner asyncRunner,
final Spec spec,
final ReceivedBlockEventsChannel receivedBlockEventsChannelPublisher,
final RecentChainData recentChainData,
final ForkChoice forkChoice,
final WeakSubjectivityValidator weakSubjectivityValidator,
final ExecutionLayerChannel executionLayer) {
this.asyncRunner = asyncRunner;
this.spec = spec;
this.receivedBlockEventsChannelPublisher = receivedBlockEventsChannelPublisher;
this.recentChainData = recentChainData;
Expand Down Expand Up @@ -106,8 +111,13 @@ public SafeFuture<BlockImportResult> importBlock(
return validateWeakSubjectivityPeriod()
.thenCompose(
__ ->
forkChoice.onBlock(
block, blockImportPerformance, blockBroadcastValidator, executionLayer))
asyncRunner.runAsync(
() ->
forkChoice.onBlock(
block,
blockImportPerformance,
blockBroadcastValidator,
executionLayer)))
.thenApply(
result -> {
if (!result.isSuccessful()) {
Expand Down Expand Up @@ -141,7 +151,7 @@ public SafeFuture<BlockImportResult> importBlock(
});
}

private SafeFuture<?> validateWeakSubjectivityPeriod() {
private SafeFuture<Void> validateWeakSubjectivityPeriod() {
return getLatestCheckpointState()
.thenCombine(
SafeFuture.of(() -> recentChainData.getCurrentSlot().orElseThrow()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -38,6 +39,8 @@
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.bls.BLSTestUtil;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
Expand Down Expand Up @@ -79,6 +82,7 @@
import tech.pegasys.teku.weaksubjectivity.config.WeakSubjectivityConfig;

public class BlockImporterTest {
private final AsyncRunner asyncRunner = mock(AsyncRunner.class);
private final Spec spec = TestSpecFactory.createMinimalPhase0();
private final SpecConfig genesisConfig = spec.getGenesisSpecConfig();
private final AttestationSchema<?> attestationSchema =
Expand Down Expand Up @@ -113,6 +117,7 @@ public class BlockImporterTest {

private final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand All @@ -133,6 +138,15 @@ public static void dispose() {

@BeforeEach
public void setup() {
// prepare a synchronous async runner
doAnswer(
invocation -> {
final ExceptionThrowingFutureSupplier<?> task = invocation.getArgument(0);
return SafeFuture.completedFuture(SafeFuture.of(task.get()).join());
})
.when(asyncRunner)
.runAsync((ExceptionThrowingFutureSupplier<?>) any());

otherChain.initializeStorage();
localChain.initializeStorage();
when(weakSubjectivityValidator.isBlockValid(any(), any())).thenReturn(true);
Expand Down Expand Up @@ -403,6 +417,7 @@ public void importBlock_weakSubjectivityFailure_wrongAncestor() throws Exception
WeakSubjectivityValidator.lenient(wsConfig);
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down Expand Up @@ -433,6 +448,7 @@ public void importBlock_weakSubjectivityChecksPass() throws Exception {
WeakSubjectivityValidator.lenient(wsConfig);
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down Expand Up @@ -463,6 +479,7 @@ public void importBlock_runWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down Expand Up @@ -508,6 +525,7 @@ public void importBlock_nonFinalizingChain_runWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down Expand Up @@ -561,6 +579,7 @@ public void importBlock_nonFinalizingChain_skipWSPChecks() throws Exception {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down Expand Up @@ -606,6 +625,7 @@ public void getLatestCheckpointState_initialCall() {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down Expand Up @@ -639,6 +659,7 @@ public void getLatestCheckpointState_shouldPullUpdatedFinalizedCheckpoint() {
storageSystem.getMetricsSystem());
final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down Expand Up @@ -694,6 +715,7 @@ public void importBlock_validBlsToExecutionChanges() throws Exception {

final BlockImporter blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
storageSystem.recentChainData(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin;
import static tech.pegasys.teku.spec.config.SpecConfig.GENESIS_SLOT;
import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP;
import static tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason.FAILED_DATA_AVAILABILITY_CHECK_INVALID;
Expand All @@ -53,13 +52,18 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.bls.BLSSignatureVerifier;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingFutureSupplier;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFutureAssert;
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
Expand Down Expand Up @@ -109,6 +113,7 @@

@SuppressWarnings("FutureReturnValueIgnored")
public class BlockManagerTest {
private final AsyncRunner asyncRunner = mock(AsyncRunner.class);
private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0);
private final EventLogger eventLogger = mock(EventLogger.class);
private Spec spec;
Expand Down Expand Up @@ -157,6 +162,15 @@ public static void resetSession() {

@BeforeEach
public void setup() {
// prepare an async runner
doAnswer(
invocation -> {
final ExceptionThrowingFutureSupplier<?> task = invocation.getArgument(0);
return SafeFuture.of(task.get());
})
.when(asyncRunner)
.runAsync((ExceptionThrowingFutureSupplier<?>) any());

setupWithSpec(TestSpecFactory.createMinimalDeneb());
}

Expand Down Expand Up @@ -184,6 +198,7 @@ private void setupWithSpec(final Spec spec) {
this.executionLayer = spy(new ExecutionLayerChannelStub(spec, false, Optional.empty()));
this.blockImporter =
new BlockImporter(
asyncRunner,
spec,
receivedBlockEventsChannelPublisher,
localRecentChainData,
Expand Down Expand Up @@ -1196,9 +1211,13 @@ private SafeFutureAssert<BlockImportResult> assertThatBlockImport(final SignedBe
}

private void safeJoinBlockImport(final SignedBeaconBlock block) {
safeJoin(
blockManager
.importBlock(block)
.thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult));
try {
blockManager
.importBlock(block)
.thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult)
.get(5, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ public void initBlockImporter() {
LOG.debug("BeaconChainController.initBlockImporter()");
blockImporter =
new BlockImporter(
beaconAsyncRunner,
spec,
receivedBlockEventsChannelPublisher,
recentChainData,
Expand Down

0 comments on commit f52d3bf

Please sign in to comment.