From b9b79f805d4609344e779f878d8bb68a5e2dc064 Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Sat, 5 Aug 2023 13:10:29 +0200 Subject: [PATCH] Avoid exception in stable subnet subscriber when discovery is disabled --- .../gossip/subnets/AllSubnetsSubscriber.java | 4 ---- .../NodeBasedStableSubnetSubscriber.java | 13 ++++--------- .../subnets/StableSubnetSubscriber.java | 5 +++++ .../NodeBasedStableSubnetSubscriberTest.java | 7 ++----- .../subnets/StableSubnetSubscriberTest.java | 5 ++--- .../beaconchain/BeaconChainController.java | 19 ++++++++++++++----- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AllSubnetsSubscriber.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AllSubnetsSubscriber.java index 83e7b8a2df1..06cb1c9cd12 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AllSubnetsSubscriber.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AllSubnetsSubscriber.java @@ -17,18 +17,14 @@ import java.util.Set; import java.util.stream.IntStream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.config.NetworkingSpecConfig; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; public class AllSubnetsSubscriber implements StableSubnetSubscriber { - private static final Logger LOG = LogManager.getLogger(); public static StableSubnetSubscriber create( final AttestationTopicSubscriber subscriber, final NetworkingSpecConfig networkingConfig) { - LOG.info("Subscribing to all attestation subnets"); final Set subscriptions = IntStream.range(0, networkingConfig.getAttestationSubnetCount()) .mapToObj(subnetId -> new SubnetSubscription(subnetId, UInt64.MAX_VALUE)) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriber.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriber.java index ae7d11fd663..367fca19dfc 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriber.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriber.java @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; -import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -37,12 +36,12 @@ public class NodeBasedStableSubnetSubscriber implements StableSubnetSubscriber { private final NavigableSet subnetSubscriptions = new TreeSet<>(); private final Spec spec; private final int subnetsPerNode; - private final Optional discoveryNodeId; + private final UInt256 discoveryNodeId; public NodeBasedStableSubnetSubscriber( final AttestationTopicSubscriber persistentSubnetSubscriber, final Spec spec, - final Optional discoveryNodeId) { + final UInt256 discoveryNodeId) { this.persistentSubnetSubscriber = persistentSubnetSubscriber; this.spec = spec; this.subnetsPerNode = spec.getNetworkingConfig().getSubnetsPerNode(); @@ -81,14 +80,10 @@ private Set adjustNumberOfSubscriptionsToNodeRequirement( return emptySet(); } - final UInt256 nodeId = - discoveryNodeId.orElseThrow( - () -> new IllegalArgumentException("Unable to get discovery node id")); - final List nodeSubscribedSubnets = spec.atSlot(currentSlot) .miscHelpers() - .computeSubscribedSubnets(nodeId, spec.computeEpochAtSlot(currentSlot)); + .computeSubscribedSubnets(discoveryNodeId, spec.computeEpochAtSlot(currentSlot)); LOG.trace( "Computed persistent subnets {}", @@ -104,7 +99,7 @@ private Set adjustNumberOfSubscriptionsToNodeRequirement( nodeSubscribedSubnetsIterator.next().intValue(), spec.atSlot(currentSlot) .miscHelpers() - .calculateNodeSubnetUnsubscriptionSlot(nodeId, currentSlot)); + .calculateNodeSubnetUnsubscriptionSlot(discoveryNodeId, currentSlot)); newSubnetSubscriptions.add(newSubnetSubscription); } return newSubnetSubscriptions; diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriber.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriber.java index de5afc06112..ca95c72e7e6 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriber.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriber.java @@ -17,6 +17,11 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; public interface StableSubnetSubscriber extends SlotEventsChannel { + StableSubnetSubscriber NOOP = + new StableSubnetSubscriber() { + @Override + public void onSlot(UInt64 slot) {} + }; @Override default void onSlot(final UInt64 slot) {} diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriberTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriberTest.java index 3479bb1fb6c..267326d9173 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriberTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/NodeBasedStableSubnetSubscriberTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.when; import java.util.List; -import java.util.Optional; import java.util.Set; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -123,9 +122,7 @@ public void shouldUpdateUnsubscriptionSlotWhenAlreadySubscribed() { final NodeBasedStableSubnetSubscriber subscriber = new NodeBasedStableSubnetSubscriber( - attestationTopicSubscriberMock, - specMock, - Optional.of(dataStructureUtil.randomUInt256())); + attestationTopicSubscriberMock, specMock, dataStructureUtil.randomUInt256()); final ArgumentCaptor> subscriptionCaptor = ArgumentCaptor.forClass(Set.class); @@ -154,6 +151,6 @@ public void shouldUpdateUnsubscriptionSlotWhenAlreadySubscribed() { @NotNull private NodeBasedStableSubnetSubscriber createSubscriber() { return new NodeBasedStableSubnetSubscriber( - attestationTopicSubscriber, spec, Optional.of(dataStructureUtil.randomUInt256())); + attestationTopicSubscriber, spec, dataStructureUtil.randomUInt256()); } } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriberTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriberTest.java index 95bf705fa5e..9fd65f508f2 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriberTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/StableSubnetSubscriberTest.java @@ -22,7 +22,6 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; -import java.util.Optional; import java.util.Set; import org.apache.tuweni.units.bigints.UInt256; import org.junit.jupiter.api.Test; @@ -42,7 +41,7 @@ public class StableSubnetSubscriberTest { private final DataStructureUtil dataStructureUtil = new DataStructureUtil(TestSpecFactory.createDefault()); - private final Optional nodeId = Optional.of(dataStructureUtil.randomUInt256()); + private final UInt256 nodeId = dataStructureUtil.randomUInt256(); @Test void shouldSubscribeToSubnetsPerNodeAtStart() { @@ -58,7 +57,7 @@ void shouldSubscribeToSubnetsPerNodeAtStart() { final UInt64 unsubscriptionSlot = spec.getGenesisSpec() .miscHelpers() - .calculateNodeSubnetUnsubscriptionSlot(nodeId.get(), currentSlot); + .calculateNodeSubnetUnsubscriptionSlot(nodeId, currentSlot); subnetSubscriptions .getValue() .forEach( diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 5c29b2b3d6d..70aa9e57a03 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -734,11 +734,20 @@ protected void initActiveValidatorTracker() { protected void initSubnetSubscriber() { LOG.debug("BeaconChainController.initSubnetSubscriber"); - this.stableSubnetSubscriber = - beaconConfig.p2pConfig().isSubscribeAllSubnetsEnabled() - ? AllSubnetsSubscriber.create(attestationTopicSubscriber, spec.getNetworkingConfig()) - : new NodeBasedStableSubnetSubscriber( - attestationTopicSubscriber, spec, p2pNetwork.getDiscoveryNodeId()); + if (beaconConfig.p2pConfig().isSubscribeAllSubnetsEnabled()) { + LOG.info("Subscribing to all attestation subnets"); + this.stableSubnetSubscriber = + AllSubnetsSubscriber.create(attestationTopicSubscriber, spec.getNetworkingConfig()); + } else { + if (p2pNetwork.getDiscoveryNodeId().isPresent()) { + this.stableSubnetSubscriber = + new NodeBasedStableSubnetSubscriber( + attestationTopicSubscriber, spec, p2pNetwork.getDiscoveryNodeId().get()); + } else { + LOG.warn("Discovery nodeId is not defined, disabling stable subnet subscriptions"); + this.stableSubnetSubscriber = StableSubnetSubscriber.NOOP; + } + } eventChannels.subscribe(SlotEventsChannel.class, stableSubnetSubscriber); }