Skip to content

Commit

Permalink
update blob sidecars gossip topics
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdi-aouadi committed Nov 6, 2024
1 parent 1f39904 commit c9dc8a4
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import tech.pegasys.teku.networking.p2p.gossip.GossipNetwork;
import tech.pegasys.teku.networking.p2p.gossip.TopicChannel;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
Expand Down Expand Up @@ -68,8 +68,11 @@ public static BlobSidecarGossipManager create(
.getBlobSidecarSchema();
final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler =
new Int2ObjectOpenHashMap<>();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(forkSpecVersion.getConfig());
IntStream.range(0, specConfigDeneb.getBlobSidecarSubnetCount())
final SpecMilestone specMilestone = spec.atEpoch(forkInfo.getFork().getEpoch()).getMilestone();
final int blobSidecarSubnetCount =
spec.forMilestone(specMilestone).miscHelpers().getBlobSidecarSubnetCount();

IntStream.range(0, blobSidecarSubnetCount)
.forEach(
subnetId -> {
final Eth2TopicHandler<BlobSidecar> topicHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,18 @@ public static Set<String> getAllTopics(
for (int i = 0; i < NetworkConstants.SYNC_COMMITTEE_SUBNET_COUNT; i++) {
topics.add(getSyncCommitteeSubnetTopic(forkDigest, i, gossipEncoding));
}
if (spec.getNetworkingConfigDeneb().isPresent()) {
for (int i = 0; i < spec.getNetworkingConfigDeneb().get().getBlobSidecarSubnetCount(); i++) {
topics.add(getBlobSidecarSubnetTopic(forkDigest, i, gossipEncoding));
}
if (spec.getNetworkingConfigElectra().isPresent()) {
addBlobSidecarSubnetTopics(
spec.getNetworkingConfigElectra().get().getBlobSidecarSubnetCountElectra(),
topics,
forkDigest,
gossipEncoding);
} else if (spec.getNetworkingConfigDeneb().isPresent()) {
addBlobSidecarSubnetTopics(
spec.getNetworkingConfigDeneb().get().getBlobSidecarSubnetCount(),
topics,
forkDigest,
gossipEncoding);
}
for (GossipTopicName topicName : GossipTopicName.values()) {
topics.add(GossipTopics.getTopic(forkDigest, topicName, gossipEncoding));
Expand All @@ -98,4 +106,14 @@ public static Bytes4 extractForkDigest(final String topic) throws IllegalArgumen

return Bytes4.fromHexString(forkDigest);
}

private static void addBlobSidecarSubnetTopics(
final int blobSidecarSubnetCount,
final Set<String> topics,
final Bytes4 forkDigest,
final GossipEncoding gossipEncoding) {
for (int i = 0; i < blobSidecarSubnetCount; i++) {
topics.add(getBlobSidecarSubnetTopic(forkDigest, i, gossipEncoding));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.tuweni.bytes.Bytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand All @@ -41,9 +41,10 @@
import tech.pegasys.teku.networking.p2p.gossip.TopicChannel;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.config.SpecConfigElectra;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.util.DataStructureUtil;
Expand All @@ -52,31 +53,30 @@
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;

@TestSpecContext(milestone = {SpecMilestone.DENEB, SpecMilestone.ELECTRA})
public class BlobSidecarGossipManagerTest {

private static final Pattern BLOB_SIDECAR_TOPIC_PATTERN = Pattern.compile("blob_sidecar_(\\d+)");

private final Spec spec = TestSpecFactory.createMainnetDeneb();
private final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec);
private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);

@SuppressWarnings("unchecked")
private final OperationProcessor<BlobSidecar> processor = mock(OperationProcessor.class);

private final GossipNetwork gossipNetwork = mock(GossipNetwork.class);
private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;

private final Map<Integer, TopicChannel> topicChannels = new HashMap<>();

private final StubAsyncRunner asyncRunner = new StubAsyncRunner();

private final ForkInfo forkInfo =
new ForkInfo(spec.fork(UInt64.ZERO), dataStructureUtil.randomBytes32());

private Spec spec;
private DataStructureUtil dataStructureUtil;
private BlobSidecarGossipManager blobSidecarGossipManager;
private SpecMilestone specMilestone;

@BeforeEach
public void setup() {
public void setup(final TestSpecInvocationContextProvider.SpecContext specContext) {
spec = specContext.getSpec();
dataStructureUtil = specContext.getDataStructureUtil();
specMilestone = specContext.getSpecMilestone();
final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec);
storageSystem.chainUpdater().initializeGenesis();
// return TopicChannel mock for each blob_sidecar_<subnet_id> topic
doAnswer(
Expand All @@ -96,6 +96,8 @@ public void setup() {
.subscribe(any(), any());
when(processor.process(any(), any()))
.thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT));
final ForkInfo forkInfo =
new ForkInfo(spec.fork(UInt64.ZERO), dataStructureUtil.randomBytes32());
blobSidecarGossipManager =
BlobSidecarGossipManager.create(
storageSystem.recentChainData(),
Expand All @@ -109,7 +111,7 @@ public void setup() {
blobSidecarGossipManager.subscribe();
}

@Test
@TestTemplate
public void testGossipingBlobSidecarPublishesToCorrectSubnet() {
final BlobSidecar blobSidecar =
dataStructureUtil.createRandomBlobSidecarBuilder().index(UInt64.ONE).build();
Expand All @@ -127,27 +129,26 @@ public void testGossipingBlobSidecarPublishesToCorrectSubnet() {
});
}

@Test
@TestTemplate
public void testGossipingBlobSidecarWithLargeIndexGossipToCorrectSubnet() {
final BlobSidecar blobSidecar =
dataStructureUtil.createRandomBlobSidecarBuilder().index(UInt64.valueOf(10)).build();
final Bytes serialized = gossipEncoding.encode(blobSidecar);

safeJoin(blobSidecarGossipManager.publishBlobSidecar(blobSidecar));
final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config);
final int blobSidecarSubnetCount = getBlobSidecarSubnetCount();

topicChannels.forEach(
(subnetId, channel) -> {
if (subnetId == 10 % specConfigDeneb.getBlobSidecarSubnetCount()) {
if (subnetId == 10 % blobSidecarSubnetCount) {
verify(channel).gossip(serialized);
} else {
verifyNoInteractions(channel);
}
});
}

@Test
@TestTemplate
public void testUnsubscribingClosesAllChannels() {
blobSidecarGossipManager.unsubscribe();

Expand All @@ -159,7 +160,7 @@ public void testUnsubscribingClosesAllChannels() {
});
}

@Test
@TestTemplate
public void testAcceptingSidecarGossipIfOnTheCorrectTopic() {
// topic handler for blob sidecars with subnet_id 1
final Eth2TopicHandler<BlobSidecar> topicHandler = blobSidecarGossipManager.getTopicHandler(1);
Expand All @@ -175,7 +176,7 @@ public void testAcceptingSidecarGossipIfOnTheCorrectTopic() {
assertThat(validationResult).isEqualTo(InternalValidationResult.ACCEPT);
}

@Test
@TestTemplate
public void testRejectingSidecarGossipIfNotOnTheCorrectTopic() {
// topic handler for blob sidecars with subnet_id 1
final Eth2TopicHandler<BlobSidecar> topicHandler = blobSidecarGossipManager.getTopicHandler(1);
Expand All @@ -190,4 +191,12 @@ public void testRejectingSidecarGossipIfNotOnTheCorrectTopic() {
assertThat(validationResult.getDescription())
.hasValue("blob sidecar with subnet_id 2 does not match the topic subnet_id 1");
}

private int getBlobSidecarSubnetCount() {
return specMilestone.isGreaterThanOrEqualTo(SpecMilestone.ELECTRA)
? SpecConfigElectra.required(spec.forMilestone(SpecMilestone.ELECTRA).getConfig())
.getBlobSidecarSubnetCountElectra()
: SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig())
.getBlobSidecarSubnetCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,68 +27,89 @@
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.TestSpecContext;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.TestSpecInvocationContextProvider;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.config.SpecConfigElectra;
import tech.pegasys.teku.spec.datastructures.state.Fork;
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

@TestSpecContext(milestone = {SpecMilestone.DENEB, SpecMilestone.ELECTRA})
class Eth2GossipTopicFilterTest {

private final UInt64 denebForkEpoch = UInt64.valueOf(10);
private final Spec spec = TestSpecFactory.createMinimalWithDenebForkEpoch(denebForkEpoch);
private final List<Fork> forks = spec.getForkSchedule().getForks();
private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);
private final Bytes32 genesisValidatorsRoot = dataStructureUtil.randomBytes32();
private final ForkInfo forkInfo = new ForkInfo(forks.get(0), genesisValidatorsRoot);
private final Fork nextFork = forks.get(1);
private final RecentChainData recentChainData = mock(RecentChainData.class);
private final Bytes4 nextForkDigest =
spec.atEpoch(nextFork.getEpoch())
.miscHelpers()
.computeForkDigest(nextFork.getCurrentVersion(), genesisValidatorsRoot);

private final Eth2GossipTopicFilter filter =
new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec);
private final UInt64 currentForkEpoch = UInt64.valueOf(10);
private Spec spec;
private SpecMilestone specMilestone;
private DataStructureUtil dataStructureUtil;
private ForkInfo forkInfo;
private Eth2GossipTopicFilter filter;
private Bytes4 nextForkDigest;

@BeforeEach
void setUp() {
void setUp(final TestSpecInvocationContextProvider.SpecContext specContext) {
specMilestone = specContext.getSpecMilestone();
spec =
switch (specContext.getSpecMilestone()) {
case PHASE0 -> throw new IllegalArgumentException("Phase0 is an unsupported milestone");
case ALTAIR -> throw new IllegalArgumentException("Altair is an unsupported milestone");
case BELLATRIX ->
throw new IllegalArgumentException("Bellatrix is an unsupported milestone");
case CAPELLA -> throw new IllegalArgumentException("Capella is an unsupported milestone");
case DENEB -> TestSpecFactory.createMinimalWithDenebForkEpoch(currentForkEpoch);
case ELECTRA -> TestSpecFactory.createMinimalWithElectraForkEpoch(currentForkEpoch);
};
dataStructureUtil = new DataStructureUtil(spec);
filter = new Eth2GossipTopicFilter(recentChainData, SSZ_SNAPPY, spec);

final Bytes32 genesisValidatorsRoot = dataStructureUtil.randomBytes32();
final List<Fork> forks = spec.getForkSchedule().getForks();
forkInfo = new ForkInfo(forks.get(0), genesisValidatorsRoot);

final Fork nextFork = forks.get(1);
nextForkDigest =
spec.atEpoch(nextFork.getEpoch())
.miscHelpers()
.computeForkDigest(nextFork.getCurrentVersion(), genesisValidatorsRoot);

when(recentChainData.getCurrentForkInfo()).thenReturn(Optional.of(forkInfo));
when(recentChainData.getNextFork(forkInfo.getFork())).thenReturn(Optional.of(nextFork));
}

@Test
@TestTemplate
void shouldNotAllowIrrelevantTopics() {
assertThat(filter.isRelevantTopic("abc")).isFalse();
}

@Test
@TestTemplate
void shouldNotRequireNextForkToBePresent() {
when(recentChainData.getNextFork(any())).thenReturn(Optional.empty());
assertThat(filter.isRelevantTopic(getTopicName(GossipTopicName.BEACON_BLOCK))).isTrue();
}

@Test
@TestTemplate
void shouldConsiderTopicsForNextForkRelevant() {
assertThat(filter.isRelevantTopic(getNextForkTopicName(GossipTopicName.BEACON_BLOCK))).isTrue();
}

@Test
@TestTemplate
void shouldConsiderTopicsForSignedContributionAndProofRelevant() {
assertThat(
filter.isRelevantTopic(
getNextForkTopicName(GossipTopicName.SYNC_COMMITTEE_CONTRIBUTION_AND_PROOF)))
.isTrue();
}

@Test
@TestTemplate
void shouldConsiderAllAttestationSubnetsRelevant() {
for (int i = 0; i < spec.getNetworkingConfig().getAttestationSubnetCount(); i++) {
assertThat(filter.isRelevantTopic(getTopicName(getAttestationSubnetTopicName(i)))).isTrue();
Expand All @@ -97,14 +118,14 @@ void shouldConsiderAllAttestationSubnetsRelevant() {
}
}

@Test
@TestTemplate
void shouldConsiderAllSyncCommitteeSubnetsRelevant() {
for (int i = 0; i < SYNC_COMMITTEE_SUBNET_COUNT; i++) {
assertThat(filter.isRelevantTopic(getTopicName(getSyncCommitteeSubnetTopicName(i)))).isTrue();
}
}

@Test
@TestTemplate
void shouldConsiderAllBlobSidecarSubnetsRelevant() {
final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config);
Expand All @@ -113,31 +134,36 @@ void shouldConsiderAllBlobSidecarSubnetsRelevant() {
}
}

@Test
@TestTemplate
void shouldNotConsiderBlobSidecarWithIncorrectSubnetIdRelevant() {
final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig();
final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config);
final int blobSidecarSubnetCount = getBlobSidecarSubnetCount();
assertThat(
filter.isRelevantTopic(
getTopicName(
getBlobSidecarSubnetTopicName(
specConfigDeneb.getBlobSidecarSubnetCount() + 1))))
getTopicName(getBlobSidecarSubnetTopicName(blobSidecarSubnetCount + 1))))
.isFalse();
}

@Test
@TestTemplate
void shouldNotConsiderBlobSidecarWithoutIndexTopicRelevant() {
assertThat(filter.isRelevantTopic(getTopicName("blob_sidecar"))).isFalse();
}

@Test
@TestTemplate
void shouldNotAllowTopicsWithUnknownForkDigest() {
final String irrelevantTopic =
GossipTopics.getTopic(
Bytes4.fromHexString("0x11223344"), GossipTopicName.BEACON_BLOCK, SSZ_SNAPPY);
assertThat(filter.isRelevantTopic(irrelevantTopic)).isFalse();
}

private int getBlobSidecarSubnetCount() {
return specMilestone.isGreaterThanOrEqualTo(SpecMilestone.ELECTRA)
? SpecConfigElectra.required(spec.forMilestone(SpecMilestone.ELECTRA).getConfig())
.getBlobSidecarSubnetCountElectra()
: SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig())
.getBlobSidecarSubnetCount();
}

private String getTopicName(final GossipTopicName name) {
return GossipTopics.getTopic(forkInfo.getForkDigest(spec), name, SSZ_SNAPPY);
}
Expand Down

0 comments on commit c9dc8a4

Please sign in to comment.