Skip to content

Commit

Permalink
Add block_gossip SSE event
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Feb 1, 2024
1 parent ecefc10 commit c0739c6
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
Expand Down Expand Up @@ -158,9 +159,7 @@ public static SyncingNodeManager create(
blockValidator,
timeProvider,
EVENT_LOG,
Optional.empty(),
true,
false);
Optional.empty());

eventChannels
.subscribe(SlotEventsChannel.class, blockManager)
Expand Down Expand Up @@ -207,8 +206,18 @@ public static SyncingNodeManager create(
recentBlocksFetcher.subscribeBlockFetched(
block -> blockManager.importBlock(block, BroadcastValidationLevel.NOT_REQUIRED));
blockManager.subscribeToReceivedBlocks(
(block, executionOptimistic) ->
recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot()));
new ReceivedBlockListener() {
@Override
public void onBlockValidated(final SignedBeaconBlock block) {
// NOOP
}

@Override
public void onBlockImported(
final SignedBeaconBlock block, final boolean executionOptimistic) {
recentBlocksFetcher.cancelRecentBlockRequest(block.getRoot());
}
});

recentBlocksFetcher.start().join();
blockManager.start().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import tech.pegasys.teku.infrastructure.io.PortAvailability;

public class BeaconRestApiConfig {
public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED = false;
public static final Boolean DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED = true;
private static final Logger LOG = LogManager.getLogger();

public static final int DEFAULT_REST_API_PORT = 5051;
Expand All @@ -52,8 +50,6 @@ public class BeaconRestApiConfig {
private final int maxUrlLength;
private final int maxPendingEvents;
private final Optional<Integer> validatorThreads;
private final boolean beaconEventsBlockNotifyWhenValidatedEnabled;
private final boolean beaconEventsBlockNotifyWhenImportedEnabled;

private BeaconRestApiConfig(
final int restApiPort,
Expand All @@ -67,9 +63,7 @@ private BeaconRestApiConfig(
final int maxUrlLength,
final int maxPendingEvents,
final Optional<Integer> validatorThreads,
final boolean beaconLivenessTrackingEnabled,
final boolean beaconEventsBlockNotifyWhenValidatedEnabled,
final boolean beaconEventsBlockNotifyWhenImportedEnabled) {
final boolean beaconLivenessTrackingEnabled) {
this.restApiPort = restApiPort;
this.restApiDocsEnabled = restApiDocsEnabled;
this.restApiEnabled = restApiEnabled;
Expand All @@ -82,8 +76,6 @@ private BeaconRestApiConfig(
this.maxPendingEvents = maxPendingEvents;
this.validatorThreads = validatorThreads;
this.beaconLivenessTrackingEnabled = beaconLivenessTrackingEnabled;
this.beaconEventsBlockNotifyWhenValidatedEnabled = beaconEventsBlockNotifyWhenValidatedEnabled;
this.beaconEventsBlockNotifyWhenImportedEnabled = beaconEventsBlockNotifyWhenImportedEnabled;
}

public int getRestApiPort() {
Expand All @@ -106,14 +98,6 @@ public boolean isBeaconLivenessTrackingEnabled() {
return beaconLivenessTrackingEnabled;
}

public boolean isBeaconEventsBlockNotifyWhenValidatedEnabled() {
return beaconEventsBlockNotifyWhenValidatedEnabled;
}

public boolean isBeaconEventsBlockNotifyWhenImportedEnabled() {
return beaconEventsBlockNotifyWhenImportedEnabled;
}

public String getRestApiInterface() {
return restApiInterface;
}
Expand Down Expand Up @@ -171,10 +155,6 @@ public static final class BeaconRestApiConfigBuilder {
private int maxUrlLength = DEFAULT_MAX_URL_LENGTH;
private Optional<Integer> validatorThreads = Optional.empty();
private Eth1Address eth1DepositContractAddress;
private boolean beaconEventsBlockNotifyWhenValidatedEnabled =
DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_VALIDATED_ENABLED;
private boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled =
DEFAULT_BEACON_EVENTS_BLOCK_NOTIFY_WHEN_IMPORTED_ENABLED;

private BeaconRestApiConfigBuilder() {}

Expand Down Expand Up @@ -281,28 +261,12 @@ public BeaconRestApiConfig build() {
maxUrlLength,
maxPendingEvents,
validatorThreads,
beaconLivenessTrackingEnabled,
beaconEventsBlockNotifyWhenValidatedEnabled,
defaultBeaconEventsBlockNotifyWhenImportedEnabled);
beaconLivenessTrackingEnabled);
}

public BeaconRestApiConfigBuilder maxUrlLength(final int maxUrlLength) {
this.maxUrlLength = maxUrlLength;
return this;
}

public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenValidatedEnabled(
final boolean beaconEventsBlockNotifyWhenValidatedEnabled) {
this.beaconEventsBlockNotifyWhenValidatedEnabled =
beaconEventsBlockNotifyWhenValidatedEnabled;
return this;
}

public BeaconRestApiConfigBuilder beaconEventsBlockNotifyWhenImportedEnabled(
final boolean defaultBeaconEventsBlockNotifyWhenImportedEnabled) {
this.defaultBeaconEventsBlockNotifyWhenImportedEnabled =
defaultBeaconEventsBlockNotifyWhenImportedEnabled;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.handlers.v1.events;

import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class BlockGossipEvent extends Event<BlockGossipEvent.BlockData> {

private static final SerializableTypeDefinition<BlockGossipEvent.BlockData>
BLOCK_GOSSIP_EVENT_TYPE =
SerializableTypeDefinition.object(BlockGossipEvent.BlockData.class)
.name("BlockGossipEvent")
.withField("slot", UINT64_TYPE, BlockGossipEvent.BlockData::slot)
.withField("block", BYTES32_TYPE, BlockGossipEvent.BlockData::block)
.build();

BlockGossipEvent(final SignedBeaconBlock block) {
super(BLOCK_GOSSIP_EVENT_TYPE, new BlockData(block.getSlot(), block.getRoot()));
}

public record BlockData(UInt64 slot, Bytes32 block) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -87,7 +88,19 @@ public EventSubscriptionManager(
eventChannels.subscribe(ChainHeadChannel.class, this);
eventChannels.subscribe(FinalizedCheckpointChannel.class, this);
syncDataProvider.subscribeToSyncStateChanges(this::onSyncStateChange);
nodeDataProvider.subscribeToReceivedBlocks(this::onNewBlock);
nodeDataProvider.subscribeToReceivedBlocks(
new ReceivedBlockListener() {
@Override
public void onBlockValidated(final SignedBeaconBlock block) {
onNewBlockGossip(block);
}

@Override
public void onBlockImported(
final SignedBeaconBlock block, final boolean executionOptimistic) {
onNewBlock(block, executionOptimistic);
}
});
nodeDataProvider.subscribeToReceivedBlobSidecar(this::onNewBlobSidecar);
nodeDataProvider.subscribeToAttesterSlashing(this::onNewAttesterSlashing);
nodeDataProvider.subscribeToProposerSlashing(this::onNewProposerSlashing);
Expand Down Expand Up @@ -195,6 +208,11 @@ protected void onNewBlock(final SignedBeaconBlock block, final boolean execution
notifySubscribersOfEvent(EventType.block, blockEvent);
}

protected void onNewBlockGossip(final SignedBeaconBlock block) {
final BlockGossipEvent blockGossipEvent = new BlockGossipEvent(block);
notifySubscribersOfEvent(EventType.block_gossip, blockGossipEvent);
}

protected void onNewBlobSidecar(final BlobSidecar blobSidecar) {
final BlobSidecarEvent blobSidecarEvent = BlobSidecarEvent.create(spec, blobSidecar);
notifySubscribersOfEvent(EventType.blob_sidecar, blobSidecarEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ void shouldPropagateBlock() throws IOException {
checkEvent("block", new BlockEvent(sampleBlock.asInternalSignedBeaconBlock(spec), false));
}

@Test
void shouldPropagateBlockGossip() throws IOException {
when(req.getQueryString()).thenReturn("&topics=block_gossip");
manager.registerClient(client1);

triggerBlockGossipEvent();
checkEvent("block_gossip", new BlockGossipEvent(sampleBlock.asInternalSignedBeaconBlock(spec)));
}

@Test
void shouldPropagateBlobSidecar() throws IOException {
when(req.getQueryString()).thenReturn("&topics=blob_sidecar");
Expand Down Expand Up @@ -444,6 +453,11 @@ private void triggerBlockEvent() {
asyncRunner.executeQueuedActions();
}

private void triggerBlockGossipEvent() {
manager.onNewBlockGossip(sampleBlock.asInternalSignedBeaconBlock(spec));
asyncRunner.executeQueuedActions();
}

private void triggerBlobSidecarEvent() {
manager.onNewBlobSidecar(sampleBlobSidecar);
asyncRunner.executeQueuedActions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener;
import tech.pegasys.teku.spec.datastructures.blocks.ImportedBlockListener;
import tech.pegasys.teku.spec.datastructures.blocks.ReceivedBlockListener;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing;
import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing;
Expand Down Expand Up @@ -168,7 +168,7 @@ private SafeFuture<Optional<SubmitDataError>> addBlsToExecutionChange(
});
}

public void subscribeToReceivedBlocks(ImportedBlockListener listener) {
public void subscribeToReceivedBlocks(ReceivedBlockListener listener) {
blockManager.subscribeToReceivedBlocks(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum EventType {
blob_sidecar,
attester_slashing,
proposer_slashing,
payload_attributes;
payload_attributes,
block_gossip;

public static List<EventType> getTopics(List<String> topics) {
return topics.stream().map(EventType::valueOf).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

package tech.pegasys.teku.spec.datastructures.blocks;

@FunctionalInterface
public interface ImportedBlockListener {
public interface ReceivedBlockListener {
void onBlockValidated(SignedBeaconBlock block);

void onBlockImported(SignedBeaconBlock block, boolean executionOptimistic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ public void onBlockImported(final SignedBeaconBlock block) {
});
}

@Override
public void onBlockValidated(SignedBeaconBlock block) {}

public SafeFuture<AttestationProcessingResult> onAttestation(
final ValidatableAttestation attestation) {
if (pendingAttestations.contains(attestation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

@FunctionalInterface
public interface BlockImportNotifications extends VoidReturningChannelInterface {
void onBlockImported(SignedBeaconBlock block);

void onBlockValidated(SignedBeaconBlock block);
}
Loading

0 comments on commit c0739c6

Please sign in to comment.