Skip to content

Commit

Permalink
Decouple block and blobs publishing\import (#8728)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Nov 4, 2024
1 parent 8bcbfc9 commit 4266035
Show file tree
Hide file tree
Showing 27 changed files with 721 additions and 504 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,58 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin;
import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;
import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED;

import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.api.ChainDataProvider;
import tech.pegasys.teku.api.NetworkDataProvider;
import tech.pegasys.teku.api.NodeDataProvider;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.beacon.sync.events.SyncStateProvider;
import tech.pegasys.teku.beacon.sync.events.SyncStateTracker;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionAndPublishingPerformanceFactory;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricUtils;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.time.SystemTimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.P2PConfig;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber;
import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb;
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
Expand All @@ -58,16 +76,19 @@
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker;
import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher;

@TestSpecContext(milestone = {SpecMilestone.PHASE0, SpecMilestone.DENEB})
public class ValidatorApiHandlerIntegrationTest {
private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();

// Use full storage system
private final StorageSystem storageSystem =
InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE);
private final CombinedChainDataClient combinedChainDataClient =
storageSystem.combinedChainDataClient();
private final Spec spec = TestSpecFactory.createMinimalPhase0();

// Other dependencies are mocked, but these can be updated as needed
private final SyncStateProvider syncStateProvider = mock(SyncStateTracker.class);
Expand Down Expand Up @@ -100,45 +121,93 @@ public class ValidatorApiHandlerIntegrationTest {
mock(SyncCommitteeSubscriptionManager.class);

private final DutyMetrics dutyMetrics = mock(DutyMetrics.class);
private final ValidatorApiHandler handler =
new ValidatorApiHandler(
chainDataProvider,
nodeDataProvider,
networkDataProvider,
combinedChainDataClient,
syncStateProvider,
blockFactory,
blockImportChannel,
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
attestationPool,
attestationManager,
attestationTopicSubscriber,
activeValidatorTracker,
dutyMetrics,
performanceTracker,
spec,
forkChoiceTrigger,
proposersDataManager,
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0));

private ValidatorApiHandler handler;

@BeforeEach
public void setup() {
public void setup(final SpecContext specContext) {
when(syncStateProvider.getCurrentSyncState()).thenReturn(SyncState.IN_SYNC);
when(forkChoiceTrigger.prepareForAttestationProduction(any())).thenReturn(SafeFuture.COMPLETE);
when(dutyMetrics.getValidatorDutyMetric())
.thenReturn(ValidatorDutyMetricUtils.createValidatorDutyMetric(new StubMetricsSystem()));

when(blockGossipChannel.publishBlock(any())).thenReturn(SafeFuture.COMPLETE);
when(blobSidecarGossipChannel.publishBlobSidecar(any())).thenReturn(SafeFuture.COMPLETE);
when(blobSidecarGossipChannel.publishBlobSidecars(any())).thenReturn(SafeFuture.COMPLETE);

doAnswer(invocation -> SafeFuture.completedFuture(invocation.getArgument(0)))
.when(blockFactory)
.unblindSignedBlockIfBlinded(any(), any());

// BlobSidecar builder
doAnswer(
invocation -> {
final SignedBlockContainer blockContainer = invocation.getArgument(0);
final SpecVersion asspecVersion =
specContext.getSpec().forMilestone(SpecMilestone.DENEB);
if (asspecVersion == null) {
return List.of();
}
final MiscHelpersDeneb miscHelpersDeneb =
MiscHelpersDeneb.required(asspecVersion.miscHelpers());
if (blockContainer.getBlobs().isEmpty()) {
return List.of();
}
final SszList<Blob> blobs = blockContainer.getBlobs().orElseThrow();
final SszList<SszKZGProof> proofs = blockContainer.getKzgProofs().orElseThrow();
return IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
blockContainer.getSignedBlock(),
UInt64.valueOf(index),
blobs.get(index),
proofs.get(index)))
.toList();
})
.when(blockFactory)
.createBlobSidecars(any());

handler =
new ValidatorApiHandler(
chainDataProvider,
nodeDataProvider,
networkDataProvider,
combinedChainDataClient,
syncStateProvider,
blockFactory,
attestationPool,
attestationManager,
attestationTopicSubscriber,
activeValidatorTracker,
dutyMetrics,
performanceTracker,
specContext.getSpec(),
forkChoiceTrigger,
proposersDataManager,
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0),
new MilestoneBasedBlockPublisher(
asyncRunner,
specContext.getSpec(),
blockFactory,
blockImportChannel,
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics,
P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED));
}

@Test
public void createAttestationData_withRecentBlockAvailable() {
@TestTemplate
public void createAttestationData_withRecentBlockAvailable(final SpecContext specContext) {
specContext.assumeIsNotOneOf(SpecMilestone.DENEB);
final UInt64 targetEpoch = UInt64.valueOf(3);
final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetSlot = targetEpochStartSlot.plus(2);

final SignedBlockAndState genesis = chainUpdater.initializeGenesis();
Expand Down Expand Up @@ -167,12 +236,14 @@ public void createAttestationData_withRecentBlockAvailable() {
assertThat(attestation.getTarget()).isEqualTo(expectedTarget);
}

@Test
public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() {
@TestTemplate
public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch(
final SpecContext specContext) {
specContext.assumeIsNotOneOf(SpecMilestone.DENEB);
final UInt64 latestEpoch = UInt64.valueOf(2);
final UInt64 latestSlot = spec.computeStartSlotAtEpoch(latestEpoch).plus(ONE);
final UInt64 latestSlot = specContext.getSpec().computeStartSlotAtEpoch(latestEpoch).plus(ONE);
final UInt64 targetEpoch = UInt64.valueOf(latestEpoch.longValue() + 3);
final UInt64 targetEpochStartSlot = spec.computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetEpochStartSlot = specContext.getSpec().computeStartSlotAtEpoch(targetEpoch);
final UInt64 targetSlot = targetEpochStartSlot.plus(2);

final SignedBlockAndState genesis = chainUpdater.initializeGenesis();
Expand All @@ -196,4 +267,27 @@ public void createUnsignedAttestation_withLatestBlockFromAnOldEpoch() {
assertThat(attestation.getSource()).isEqualTo(genesisCheckpoint);
assertThat(attestation.getTarget()).isEqualTo(expectedTarget);
}

@TestTemplate
void sendSignedBlock_shouldImportAndPublishBlock(final SpecContext specContext) {
final SignedBeaconBlock block = specContext.getDataStructureUtil().randomSignedBeaconBlock(5);

when(blockImportChannel.importBlock(block, NOT_REQUIRED))
.thenReturn(prepareBlockImportResult(BlockImportResult.successful(block)));
final SafeFuture<SendSignedBlockResult> result = handler.sendSignedBlock(block, NOT_REQUIRED);
assertThat(result).isCompletedWithValue(SendSignedBlockResult.success(block.getRoot()));

if (specContext.getSpecMilestone() == SpecMilestone.DENEB) {
verify(blobSidecarGossipChannel).publishBlobSidecars(any());
}
verify(blockGossipChannel).publishBlock(block);
verify(blockImportChannel).importBlock(block, NOT_REQUIRED);
}

private SafeFuture<BlockImportAndBroadcastValidationResults> prepareBlockImportResult(
final BlockImportResult blockImportResult) {
return SafeFuture.completedFuture(
new BlockImportAndBroadcastValidationResults(
SafeFuture.completedFuture(blockImportResult)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
SignedBeaconBlock maybeBlindedBlock, BlockPublishingPerformance blockPublishingPerformance);

List<BlobSidecar> createBlobSidecars(
SignedBlockContainer blockContainer, BlockPublishingPerformance blockPublishingPerformance);
List<BlobSidecar> createBlobSidecars(SignedBlockContainer blockContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -69,12 +68,8 @@ public SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
return operationSelector
.createBlobSidecarsSelector(blockPublishingPerformance)
.apply(blockContainer);
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
return operationSelector.createBlobSidecarsSelector().apply(blockContainer);
}

private BlockContents createBlockContents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ public Function<BeaconBlock, SafeFuture<BlobsBundle>> createBlobsBundleSelector(
};
}

public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector(
final BlockPublishingPerformance blockPublishingPerformance) {
public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector() {
return blockContainer -> {
final UInt64 slot = blockContainer.getSlot();
final SignedBeaconBlock block = blockContainer.getSignedBlock();
Expand Down Expand Up @@ -505,17 +504,12 @@ public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelec
final MiscHelpersDeneb miscHelpersDeneb =
MiscHelpersDeneb.required(spec.atSlot(slot).miscHelpers());

final List<BlobSidecar> blobSidecars =
IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();

blockPublishingPerformance.blobSidecarsPrepared();

return blobSidecars;
return IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,9 @@ public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
}

@Override
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
final SpecMilestone milestone = getMilestone(blockContainer.getSlot());
return registeredFactories
.get(milestone)
.createBlobSidecars(blockContainer, blockPublishingPerformance);
return registeredFactories.get(milestone).createBlobSidecars(blockContainer);
}

private SpecMilestone getMilestone(final UInt64 slot) {
Expand Down
Loading

0 comments on commit 4266035

Please sign in to comment.