Skip to content

Commit

Permalink
Avoid exception in stable subnet subscriber when discovery is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Aug 5, 2023
1 parent 2deff5f commit b9b79f8
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@

import java.util.Set;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription;

public class AllSubnetsSubscriber implements StableSubnetSubscriber {
private static final Logger LOG = LogManager.getLogger();

public static StableSubnetSubscriber create(
final AttestationTopicSubscriber subscriber, final NetworkingSpecConfig networkingConfig) {
LOG.info("Subscribing to all attestation subnets");
final Set<SubnetSubscription> subscriptions =
IntStream.range(0, networkingConfig.getAttestationSubnetCount())
.mapToObj(subnetId -> new SubnetSubscription(subnetId, UInt64.MAX_VALUE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
Expand All @@ -37,12 +36,12 @@ public class NodeBasedStableSubnetSubscriber implements StableSubnetSubscriber {
private final NavigableSet<SubnetSubscription> subnetSubscriptions = new TreeSet<>();
private final Spec spec;
private final int subnetsPerNode;
private final Optional<UInt256> discoveryNodeId;
private final UInt256 discoveryNodeId;

public NodeBasedStableSubnetSubscriber(
final AttestationTopicSubscriber persistentSubnetSubscriber,
final Spec spec,
final Optional<UInt256> discoveryNodeId) {
final UInt256 discoveryNodeId) {
this.persistentSubnetSubscriber = persistentSubnetSubscriber;
this.spec = spec;
this.subnetsPerNode = spec.getNetworkingConfig().getSubnetsPerNode();
Expand Down Expand Up @@ -81,14 +80,10 @@ private Set<SubnetSubscription> adjustNumberOfSubscriptionsToNodeRequirement(
return emptySet();
}

final UInt256 nodeId =
discoveryNodeId.orElseThrow(
() -> new IllegalArgumentException("Unable to get discovery node id"));

final List<UInt64> nodeSubscribedSubnets =
spec.atSlot(currentSlot)
.miscHelpers()
.computeSubscribedSubnets(nodeId, spec.computeEpochAtSlot(currentSlot));
.computeSubscribedSubnets(discoveryNodeId, spec.computeEpochAtSlot(currentSlot));

LOG.trace(
"Computed persistent subnets {}",
Expand All @@ -104,7 +99,7 @@ private Set<SubnetSubscription> adjustNumberOfSubscriptionsToNodeRequirement(
nodeSubscribedSubnetsIterator.next().intValue(),
spec.atSlot(currentSlot)
.miscHelpers()
.calculateNodeSubnetUnsubscriptionSlot(nodeId, currentSlot));
.calculateNodeSubnetUnsubscriptionSlot(discoveryNodeId, currentSlot));
newSubnetSubscriptions.add(newSubnetSubscription);
}
return newSubnetSubscriptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public interface StableSubnetSubscriber extends SlotEventsChannel {
StableSubnetSubscriber NOOP =
new StableSubnetSubscriber() {
@Override
public void onSlot(UInt64 slot) {}
};

@Override
default void onSlot(final UInt64 slot) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -123,9 +122,7 @@ public void shouldUpdateUnsubscriptionSlotWhenAlreadySubscribed() {

final NodeBasedStableSubnetSubscriber subscriber =
new NodeBasedStableSubnetSubscriber(
attestationTopicSubscriberMock,
specMock,
Optional.of(dataStructureUtil.randomUInt256()));
attestationTopicSubscriberMock, specMock, dataStructureUtil.randomUInt256());

final ArgumentCaptor<Set<SubnetSubscription>> subscriptionCaptor =
ArgumentCaptor.forClass(Set.class);
Expand Down Expand Up @@ -154,6 +151,6 @@ public void shouldUpdateUnsubscriptionSlotWhenAlreadySubscribed() {
@NotNull
private NodeBasedStableSubnetSubscriber createSubscriber() {
return new NodeBasedStableSubnetSubscriber(
attestationTopicSubscriber, spec, Optional.of(dataStructureUtil.randomUInt256()));
attestationTopicSubscriber, spec, dataStructureUtil.randomUInt256());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Optional;
import java.util.Set;
import org.apache.tuweni.units.bigints.UInt256;
import org.junit.jupiter.api.Test;
Expand All @@ -42,7 +41,7 @@ public class StableSubnetSubscriberTest {
private final DataStructureUtil dataStructureUtil =
new DataStructureUtil(TestSpecFactory.createDefault());

private final Optional<UInt256> nodeId = Optional.of(dataStructureUtil.randomUInt256());
private final UInt256 nodeId = dataStructureUtil.randomUInt256();

@Test
void shouldSubscribeToSubnetsPerNodeAtStart() {
Expand All @@ -58,7 +57,7 @@ void shouldSubscribeToSubnetsPerNodeAtStart() {
final UInt64 unsubscriptionSlot =
spec.getGenesisSpec()
.miscHelpers()
.calculateNodeSubnetUnsubscriptionSlot(nodeId.get(), currentSlot);
.calculateNodeSubnetUnsubscriptionSlot(nodeId, currentSlot);
subnetSubscriptions
.getValue()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,11 +734,20 @@ protected void initActiveValidatorTracker() {

protected void initSubnetSubscriber() {
LOG.debug("BeaconChainController.initSubnetSubscriber");
this.stableSubnetSubscriber =
beaconConfig.p2pConfig().isSubscribeAllSubnetsEnabled()
? AllSubnetsSubscriber.create(attestationTopicSubscriber, spec.getNetworkingConfig())
: new NodeBasedStableSubnetSubscriber(
attestationTopicSubscriber, spec, p2pNetwork.getDiscoveryNodeId());
if (beaconConfig.p2pConfig().isSubscribeAllSubnetsEnabled()) {
LOG.info("Subscribing to all attestation subnets");
this.stableSubnetSubscriber =
AllSubnetsSubscriber.create(attestationTopicSubscriber, spec.getNetworkingConfig());
} else {
if (p2pNetwork.getDiscoveryNodeId().isPresent()) {
this.stableSubnetSubscriber =
new NodeBasedStableSubnetSubscriber(
attestationTopicSubscriber, spec, p2pNetwork.getDiscoveryNodeId().get());
} else {
LOG.warn("Discovery nodeId is not defined, disabling stable subnet subscriptions");
this.stableSubnetSubscriber = StableSubnetSubscriber.NOOP;
}
}
eventChannels.subscribe(SlotEventsChannel.class, stableSubnetSubscriber);
}

Expand Down

0 comments on commit b9b79f8

Please sign in to comment.