Skip to content

Commit

Permalink
Merge branch 'master' into attestation-subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 authored Dec 3, 2024
2 parents dee1586 + 0e86c08 commit 4a94329
Show file tree
Hide file tree
Showing 93 changed files with 1,292 additions and 778 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Additions and Improvements
- improve block publishing performance, especially relevant with locally produced blocks
- delay blobs publishing until the block is published to at least 1 peer, especially relevant with locally produced blocks with low upload bandwidth connections. Can be disabled via `--Xp2p-gossip-blobs-after-block-enabled=false`

### Bug Fixes
- Added a startup script for unix systems to ensure that when jemalloc is installed the script sets the LD_PRELOAD environment variable to the use the jemalloc library
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public void setup(final SpecContext specContext) {
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics,
P2PConfig.DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ public SafeFuture<Optional<BlockContainerAndMetaData>> createUnsignedBlockIntern
requestedBuilderBoostFactor,
blockSlotState,
blockProductionPerformance))
.thenPeek(
maybeBlock ->
maybeBlock.ifPresent(
block ->
performanceTracker.saveProducedBlock(
block.blockContainer().getBlock().getSlotAndBlockRoot())))
.alwaysRun(blockProductionPerformance::complete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
Expand All @@ -42,7 +43,6 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.StatusLogger;
import tech.pegasys.teku.infrastructure.metrics.SettableGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
Expand All @@ -51,6 +51,7 @@
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.statetransition.attestation.utils.AttestationBitsAggregator;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode;
import tech.pegasys.teku.validator.coordinator.ActiveValidatorTracker;
Expand Down Expand Up @@ -169,6 +170,9 @@ private SafeFuture<?> reportBlockPerformance(final UInt64 currentEpoch) {
validatorPerformanceMetrics.updateBlockPerformanceMetrics(blockPerformance);
}
}
})
.alwaysRun(
() -> {
producedBlocksByEpoch.headMap(blockProductionEpoch, true).clear();
blockProductionAttemptsByEpoch.headMap(blockProductionEpoch, true).clear();
});
Expand Down Expand Up @@ -208,8 +212,8 @@ private SafeFuture<?> reportAttestationPerformance(final UInt64 currentEpoch) {
validatorPerformanceMetrics.updateAttestationPerformanceMetrics(
attestationPerformance);
}
producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear();
});
})
.alwaysRun(() -> producedAttestationsByEpoch.headMap(analyzedEpoch, true).clear());
}

private SafeFuture<BlockPerformance> getBlockPerformanceForEpoch(final UInt64 currentEpoch) {
Expand Down Expand Up @@ -300,16 +304,23 @@ private AttestationPerformance calculateAttestationPerformance(

// Pre-process attestations included on chain to group them by
// data hash to inclusion slot to aggregation bitlist
final Map<Bytes32, NavigableMap<UInt64, SszBitlist>> slotAndBitlistsByAttestationDataHash =
new HashMap<>();
final Map<Bytes32, NavigableMap<UInt64, AttestationBitsAggregator>>
slotAndBitlistsByAttestationDataHash = new HashMap<>();
for (Map.Entry<UInt64, List<Attestation>> entry : attestationsIncludedOnChain.entrySet()) {
final Optional<Int2IntMap> committeesSize =
Optional.of(spec.getBeaconCommitteesSize(state, entry.getKey()));
for (Attestation attestation : entry.getValue()) {
final Bytes32 attestationDataHash = attestation.getData().hashTreeRoot();
final NavigableMap<UInt64, SszBitlist> slotToBitlists =
final NavigableMap<UInt64, AttestationBitsAggregator> slotToBitlists =
slotAndBitlistsByAttestationDataHash.computeIfAbsent(
attestationDataHash, __ -> new TreeMap<>());
slotToBitlists.merge(
entry.getKey(), attestation.getAggregationBits(), SszBitlist::nullableOr);
entry.getKey(),
AttestationBitsAggregator.of(attestation, committeesSize),
(firstBitsAggregator, secondBitsAggregator) -> {
firstBitsAggregator.or(secondBitsAggregator);
return firstBitsAggregator;
});
}
}

Expand All @@ -319,10 +330,10 @@ private AttestationPerformance calculateAttestationPerformance(
if (!slotAndBitlistsByAttestationDataHash.containsKey(sentAttestationDataHash)) {
continue;
}
final NavigableMap<UInt64, SszBitlist> slotAndBitlists =
final NavigableMap<UInt64, AttestationBitsAggregator> slotAndBitlists =
slotAndBitlistsByAttestationDataHash.get(sentAttestationDataHash);
for (UInt64 slot : slotAndBitlists.keySet()) {
if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation.getAggregationBits())) {
if (slotAndBitlists.get(slot).isSuperSetOf(sentAttestation)) {
inclusionDistances.add(slot.minus(sentAttestationSlot).intValue());
break;
}
Expand Down Expand Up @@ -420,11 +431,11 @@ public void saveProducedAttestation(final Attestation attestation) {
}

@Override
public void saveProducedBlock(final SignedBeaconBlock block) {
final UInt64 epoch = spec.computeEpochAtSlot(block.getSlot());
public void saveProducedBlock(final SlotAndBlockRoot slotAndBlockRoot) {
final UInt64 epoch = spec.computeEpochAtSlot(slotAndBlockRoot.getSlot());
final Set<SlotAndBlockRoot> blocksInEpoch =
producedBlocksByEpoch.computeIfAbsent(epoch, __ -> concurrentSet());
blocksInEpoch.add(block.getSlotAndBlockRoot());
blocksInEpoch.add(slotAndBlockRoot);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import it.unimi.dsi.fastutil.ints.IntSet;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage;

Expand All @@ -28,7 +28,7 @@ public void start(final UInt64 nodeStartSlot) {}
public void saveProducedAttestation(final Attestation attestation) {}

@Override
public void saveProducedBlock(final SignedBeaconBlock block) {}
public void saveProducedBlock(final SlotAndBlockRoot slotAndBlockRoot) {}

@Override
public void reportBlockProductionAttempt(final UInt64 epoch) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage;

Expand All @@ -26,7 +26,7 @@ public interface PerformanceTracker extends SlotEventsChannel {

void saveProducedAttestation(Attestation attestation);

void saveProducedBlock(SignedBeaconBlock block);
void saveProducedBlock(SlotAndBlockRoot slotAndBlockRoot);

void reportBlockProductionAttempt(UInt64 epoch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;

public abstract class AbstractBlockPublisher implements BlockPublisher {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -48,22 +47,19 @@ public abstract class AbstractBlockPublisher implements BlockPublisher {
protected final BlockFactory blockFactory;
protected final BlockImportChannel blockImportChannel;
protected final BlockGossipChannel blockGossipChannel;
protected final PerformanceTracker performanceTracker;
protected final DutyMetrics dutyMetrics;

public AbstractBlockPublisher(
final AsyncRunner asyncRunner,
final BlockFactory blockFactory,
final BlockGossipChannel blockGossipChannel,
final BlockImportChannel blockImportChannel,
final PerformanceTracker performanceTracker,
final DutyMetrics dutyMetrics,
final boolean gossipBlobsAfterBlock) {
this.asyncRunner = asyncRunner;
this.blockFactory = blockFactory;
this.blockImportChannel = blockImportChannel;
this.blockGossipChannel = blockGossipChannel;
this.performanceTracker = performanceTracker;
this.dutyMetrics = dutyMetrics;
this.gossipBlobsAfterBlock = gossipBlobsAfterBlock;
}
Expand All @@ -75,7 +71,6 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final BlockPublishingPerformance blockPublishingPerformance) {
return blockFactory
.unblindSignedBlockIfBlinded(blockContainer.getSignedBlock(), blockPublishingPerformance)
.thenPeek(performanceTracker::saveProducedBlock)
.thenCompose(
// creating blob sidecars after unblinding the block to ensure in the blinded flow we
// will have the cached builder payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;

public class BlockPublisherDeneb extends BlockPublisherPhase0 {

Expand All @@ -38,15 +37,13 @@ public BlockPublisherDeneb(
final BlockGossipChannel blockGossipChannel,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final BlobSidecarGossipChannel blobSidecarGossipChannel,
final PerformanceTracker performanceTracker,
final DutyMetrics dutyMetrics,
final boolean gossipBlobsAfterBlock) {
super(
asyncRunner,
blockFactory,
blockGossipChannel,
blockImportChannel,
performanceTracker,
dutyMetrics,
gossipBlobsAfterBlock);
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;

public class BlockPublisherPhase0 extends AbstractBlockPublisher {

Expand All @@ -34,15 +33,13 @@ public BlockPublisherPhase0(
final BlockFactory blockFactory,
final BlockGossipChannel blockGossipChannel,
final BlockImportChannel blockImportChannel,
final PerformanceTracker performanceTracker,
final DutyMetrics dutyMetrics,
final boolean gossipBlobsAfterBlock) {
super(
asyncRunner,
blockFactory,
blockGossipChannel,
blockImportChannel,
performanceTracker,
dutyMetrics,
gossipBlobsAfterBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;

public class MilestoneBasedBlockPublisher implements BlockPublisher {

Expand All @@ -47,7 +46,6 @@ public MilestoneBasedBlockPublisher(
final BlockGossipChannel blockGossipChannel,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final BlobSidecarGossipChannel blobSidecarGossipChannel,
final PerformanceTracker performanceTracker,
final DutyMetrics dutyMetrics,
final boolean gossipBlobsAfterBlock) {
this.spec = spec;
Expand All @@ -57,7 +55,6 @@ public MilestoneBasedBlockPublisher(
blockFactory,
blockGossipChannel,
blockImportChannel,
performanceTracker,
dutyMetrics,
gossipBlobsAfterBlock);

Expand All @@ -72,7 +69,6 @@ public MilestoneBasedBlockPublisher(
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics,
gossipBlobsAfterBlock));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ public void createUnsignedBlock_shouldCreateBlock() {
Optional.empty(),
Optional.of(ONE),
BlockProductionPerformance.NOOP);

verify(performanceTracker).reportBlockProductionAttempt(spec.computeEpochAtSlot(newSlot));
verify(performanceTracker)
.saveProducedBlock(
blockContainerAndMetaData.blockContainer().getBlock().getSlotAndBlockRoot());
}

@Test
Expand Down
Loading

0 comments on commit 4a94329

Please sign in to comment.