Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blob_sidecar event #7561

Merged
merged 5 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
Expand Down Expand Up @@ -225,6 +226,7 @@ private void setupAndStartRestAPI(BeaconRestApiConfig config) {
.syncService(syncService)
.validatorApiChannel(validatorApiChannel)
.blockManager(blockManager)
.blobSidecarPool(BlobSidecarPool.NOOP)
.attestationManager(attestationManager)
.activeValidatorChannel(activeValidatorChannel)
.attestationPool(attestationPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"in" : "query",
"schema" : {
"type" : "string",
"description" : "Event types to subscribe to. Available values include: [`head`, `finalized_checkpoint`, `chain_reorg`, `block`, `attestation`, `voluntary_exit`, `contribution_and_proof`]\n\n",
"description" : "Event types to subscribe to. Available values include: [`head`, `finalized_checkpoint`, `chain_reorg`, `block`, `attestation`, `voluntary_exit`, `contribution_and_proof`, `blob_sidecar`]\n\n",
"example" : "head"
}
} ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public class BeaconRestApiTypes {
CoreTypes.string(
"Event types to subscribe to."
+ " Available values include: [`head`, `finalized_checkpoint`, `chain_reorg`, `block`, "
+ "`attestation`, `voluntary_exit`, `contribution_and_proof`]\n\n",
+ "`attestation`, `voluntary_exit`, `contribution_and_proof`, `blob_sidecar`]\n\n",
"head"));

public static final SerializableTypeDefinition<Bytes32> ROOT_TYPE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ private static RestApi create(
// Event Handler
.endpoint(
new GetEvents(
spec,
dataProvider,
eventChannels,
asyncRunner,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beaconrestapi.handlers.v1.events;

import static tech.pegasys.teku.ethereum.json.types.EthereumTypes.KZG_COMMITMENT_TYPE;
import static tech.pegasys.teku.ethereum.json.types.EthereumTypes.VERSIONED_HASH_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beaconrestapi.handlers.v1.events.BlobSidecarEvent.BlobSidecarData;
import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.kzg.KZGCommitment;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash;

public class BlobSidecarEvent extends Event<BlobSidecarData> {

public static final SerializableTypeDefinition<BlobSidecarData> BLOB_SIDECAR_EVENT_TYPE =
SerializableTypeDefinition.object(BlobSidecarData.class)
.name("BlobSidecarEvent")
.withField("block_root", BYTES32_TYPE, BlobSidecarData::getBlockRoot)
.withField("index", UINT64_TYPE, BlobSidecarData::getIndex)
.withField("slot", UINT64_TYPE, BlobSidecarData::getSlot)
.withField("kzg_commitment", KZG_COMMITMENT_TYPE, BlobSidecarData::getKzgCommitment)
.withField("versioned_hash", VERSIONED_HASH_TYPE, BlobSidecarData::getVersionedHash)
.build();

private BlobSidecarEvent(
final Bytes32 blockRoot,
final UInt64 index,
final UInt64 slot,
final KZGCommitment kzgCommitment,
final VersionedHash versionedHash) {
super(
BLOB_SIDECAR_EVENT_TYPE,
new BlobSidecarData(blockRoot, index, slot, kzgCommitment, versionedHash));
}

public static BlobSidecarEvent create(final Spec spec, final BlobSidecar blobSidecar) {
return new BlobSidecarEvent(
blobSidecar.getBlockRoot(),
blobSidecar.getIndex(),
blobSidecar.getSlot(),
blobSidecar.getKZGCommitment(),
spec.atSlot(blobSidecar.getSlot())
.miscHelpers()
.kzgCommitmentToVersionedHash(blobSidecar.getKZGCommitment()));
}

public static class BlobSidecarData {
private final Bytes32 blockRoot;
private final UInt64 index;
private final UInt64 slot;
private final KZGCommitment kzgCommitment;
private final VersionedHash versionedHash;

BlobSidecarData(
final Bytes32 blockRoot,
final UInt64 index,
final UInt64 slot,
final KZGCommitment kzgCommitment,
final VersionedHash versionedHash) {
this.blockRoot = blockRoot;
this.index = index;
this.slot = slot;
this.kzgCommitment = kzgCommitment;
this.versionedHash = versionedHash;
}

public Bytes32 getBlockRoot() {
return blockRoot;
}

public UInt64 getIndex() {
return index;
}

public UInt64 getSlot() {
return slot;
}

public KZGCommitment getKzgCommitment() {
return kzgCommitment;
}

public VersionedHash getVersionedHash() {
return versionedHash;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import tech.pegasys.teku.infrastructure.restapi.endpoints.ListQueryParameterUtils;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange;
import tech.pegasys.teku.spec.datastructures.operations.SignedVoluntaryExit;
Expand All @@ -52,6 +54,7 @@
public class EventSubscriptionManager implements ChainHeadChannel, FinalizedCheckpointChannel {
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final ConfigProvider configProvider;
private final ChainDataProvider provider;
private final AsyncRunner asyncRunner;
Expand All @@ -61,6 +64,7 @@ public class EventSubscriptionManager implements ChainHeadChannel, FinalizedChec
private final Collection<EventSubscriber> eventSubscribers;

public EventSubscriptionManager(
final Spec spec,
final NodeDataProvider nodeDataProvider,
final ChainDataProvider chainDataProvider,
final SyncDataProvider syncDataProvider,
Expand All @@ -69,6 +73,7 @@ public EventSubscriptionManager(
final EventChannels eventChannels,
final TimeProvider timeProvider,
final int maxPendingEvents) {
this.spec = spec;
this.provider = chainDataProvider;
this.asyncRunner = asyncRunner;
this.timeProvider = timeProvider;
Expand All @@ -79,6 +84,7 @@ public EventSubscriptionManager(
eventChannels.subscribe(FinalizedCheckpointChannel.class, this);
syncDataProvider.subscribeToSyncStateChanges(this::onSyncStateChange);
nodeDataProvider.subscribeToReceivedBlocks(this::onNewBlock);
nodeDataProvider.subscribeToReceivedBlobSidecar(this::onNewBlobSidecar);
nodeDataProvider.subscribeToValidAttestations(this::onNewAttestation);
nodeDataProvider.subscribeToNewVoluntaryExits(this::onNewVoluntaryExit);
nodeDataProvider.subscribeToSyncCommitteeContributions(this::onSyncCommitteeContribution);
Expand Down Expand Up @@ -182,6 +188,11 @@ protected void onNewBlock(final SignedBeaconBlock block, final boolean execution
notifySubscribersOfEvent(EventType.block, blockEvent);
}

protected void onNewBlobSidecar(final BlobSidecar blobSidecar) {
final BlobSidecarEvent blobSidecarEvent = BlobSidecarEvent.create(spec, blobSidecar);
notifySubscribersOfEvent(EventType.blob_sidecar, blobSidecarEvent);
}

@Override
public void onNewFinalizedCheckpoint(
final Checkpoint checkpoint, final boolean fromOptimisticBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiRequest;
import tech.pegasys.teku.infrastructure.restapi.openapi.response.EventStreamResponseContentTypeDefinition;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.spec.Spec;

public class GetEvents extends RestApiEndpoint {
public static final String ROUTE = "/eth/v1/events";
private final EventSubscriptionManager eventSubscriptionManager;

public GetEvents(
final Spec spec,
final DataProvider dataProvider,
final EventChannels eventChannels,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final int maxPendingEvents) {
this(
spec,
dataProvider.getNodeDataProvider(),
dataProvider.getChainDataProvider(),
dataProvider.getSyncDataProvider(),
Expand All @@ -54,6 +57,7 @@ public GetEvents(
}

GetEvents(
final Spec spec,
final NodeDataProvider nodeDataProvider,
final ChainDataProvider chainDataProvider,
final SyncDataProvider syncDataProvider,
Expand All @@ -78,6 +82,7 @@ public GetEvents(
.build());
eventSubscriptionManager =
new EventSubscriptionManager(
spec,
nodeDataProvider,
chainDataProvider,
syncDataProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange;
import tech.pegasys.teku.spec.datastructures.operations.SignedVoluntaryExit;
Expand All @@ -54,7 +55,7 @@
import tech.pegasys.teku.storage.api.ReorgContext;

public class EventSubscriptionManagerTest {
private final Spec spec = TestSpecFactory.createMinimalCapella();
private final Spec spec = TestSpecFactory.createMinimalDeneb();
private final SpecConfig specConfig = spec.getGenesisSpecConfig();
private final DataStructureUtil data = new DataStructureUtil(spec);
protected final NodeDataProvider nodeDataProvider = mock(NodeDataProvider.class);
Expand Down Expand Up @@ -94,6 +95,7 @@ public class EventSubscriptionManagerTest {
private final SyncState sampleSyncState = SyncState.IN_SYNC;
private final SignedBeaconBlock sampleBlock =
SignedBeaconBlock.create(data.randomSignedBeaconBlock(0));
private final BlobSidecar sampleBlobSidecar = data.randomBlobSidecar();
private final Attestation sampleAttestation = data.randomAttestation(0);
private final SignedVoluntaryExit sampleVoluntaryExit = data.randomSignedVoluntaryExit();
private final SignedBlsToExecutionChange sampleBlsToExecutionChange =
Expand All @@ -116,6 +118,7 @@ void setup() throws IOException {
when(res.getOutputStream()).thenReturn(outputStream);
manager =
new EventSubscriptionManager(
spec,
nodeDataProvider,
chainDataProvider,
syncDataProvider,
Expand Down Expand Up @@ -205,6 +208,15 @@ void shouldPropagateBlock() throws IOException {
checkEvent("block", new BlockEvent(sampleBlock.asInternalSignedBeaconBlock(spec), false));
}

@Test
void shouldPropagateBlobSidecar() throws IOException {
when(req.getQueryString()).thenReturn("&topics=blob_sidecar");
manager.registerClient(client1);

triggerBlobSidecarEvent();
checkEvent("blob_sidecar", BlobSidecarEvent.create(spec, sampleBlobSidecar));
}

@Test
void shouldPropagateAttestation() throws IOException {
when(req.getQueryString()).thenReturn("&topics=attestation");
Expand Down Expand Up @@ -258,6 +270,15 @@ void shouldNotGetBlockIfNotSubscribed() {
assertThat(outputStream.countEvents()).isEqualTo(0);
}

@Test
void shouldNotGetBlobSidecarIfNotSubscribed() {
when(req.getQueryString()).thenReturn("&topics=head");
manager.registerClient(client1);

triggerBlobSidecarEvent();
assertThat(outputStream.countEvents()).isEqualTo(0);
}

@Test
void shouldNotGetAttestationIfNotSubscribed() {
when(req.getQueryString()).thenReturn("&topics=head");
Expand Down Expand Up @@ -315,6 +336,11 @@ private void triggerBlockEvent() {
asyncRunner.executeQueuedActions();
}

private void triggerBlobSidecarEvent() {
manager.onNewBlobSidecar(sampleBlobSidecar);
asyncRunner.executeQueuedActions();
}

private void triggerSyncStateEvent() {
manager.onSyncStateChange(sampleSyncState);
asyncRunner.executeQueuedActions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.teku.statetransition.OperationPool;
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
Expand Down Expand Up @@ -97,6 +98,7 @@ public static class Builder {
private ValidatorApiChannel validatorApiChannel;
private AggregatingAttestationPool attestationPool;
private BlockManager blockManager;
private BlobSidecarPool blobSidecarPool;
private AttestationManager attestationManager;
private ActiveValidatorChannel activeValidatorChannel;
private OperationPool<AttesterSlashing> attesterSlashingPool;
Expand Down Expand Up @@ -145,6 +147,11 @@ public Builder blockManager(final BlockManager blockManager) {
return this;
}

public Builder blobSidecarPool(final BlobSidecarPool blobSidecarPool) {
this.blobSidecarPool = blobSidecarPool;
return this;
}

public Builder attestationManager(final AttestationManager attestationManager) {
this.attestationManager = attestationManager;
return this;
Expand Down Expand Up @@ -216,6 +223,7 @@ public DataProvider build() {
blsToExecutionChangePool,
syncCommitteeContributionPool,
blockManager,
blobSidecarPool,
attestationManager,
isLivenessTrackingEnabled,
activeValidatorChannel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import tech.pegasys.teku.statetransition.OperationPool;
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool.NewBlobSidecarSubscriber;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.forkchoice.PreparedProposerInfo;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
Expand All @@ -56,6 +58,7 @@ public class NodeDataProvider {
private final OperationPool<SignedBlsToExecutionChange> blsToExecutionChangePool;
private final SyncCommitteeContributionPool syncCommitteeContributionPool;
private final BlockManager blockManager;
private final BlobSidecarPool blobSidecarPool;
private final AttestationManager attestationManager;
private final ActiveValidatorChannel activeValidatorChannel;
private final boolean isLivenessTrackingEnabled;
Expand All @@ -70,6 +73,7 @@ public NodeDataProvider(
final OperationPool<SignedBlsToExecutionChange> blsToExecutionChangePool,
final SyncCommitteeContributionPool syncCommitteeContributionPool,
final BlockManager blockManager,
final BlobSidecarPool blobSidecarPool,
final AttestationManager attestationManager,
final boolean isLivenessTrackingEnabled,
final ActiveValidatorChannel activeValidatorChannel,
Expand All @@ -82,6 +86,7 @@ public NodeDataProvider(
this.blsToExecutionChangePool = blsToExecutionChangePool;
this.syncCommitteeContributionPool = syncCommitteeContributionPool;
this.blockManager = blockManager;
this.blobSidecarPool = blobSidecarPool;
this.attestationManager = attestationManager;
this.activeValidatorChannel = activeValidatorChannel;
this.isLivenessTrackingEnabled = isLivenessTrackingEnabled;
Expand Down Expand Up @@ -172,6 +177,10 @@ public void subscribeToReceivedBlocks(ImportedBlockListener listener) {
blockManager.subscribeToReceivedBlocks(listener);
}

public void subscribeToReceivedBlobSidecar(NewBlobSidecarSubscriber listener) {
blobSidecarPool.subscribeNewBlobSidecar(listener);
}

public void subscribeToValidAttestations(ProcessedAttestationListener listener) {
attestationManager.subscribeToAllValidAttestations(listener);
}
Expand Down
Loading
Loading