From 90a073feb778f7c6ea198258ec57d3a66588280d Mon Sep 17 00:00:00 2001 From: tersec Date: Thu, 11 Jan 2024 23:10:04 +0000 Subject: [PATCH] implement blob_sidecar Beacon API streaming --- beacon_chain/beacon_node.nim | 1 + .../consensus_object_pools/blob_quarantine.nim | 13 ++++++++++++- .../gossip_processing/gossip_validation.nim | 5 ++++- beacon_chain/nimbus_beacon_node.nim | 17 +++++++++++++++-- beacon_chain/rpc/rest_event_api.nim | 4 ++++ beacon_chain/spec/datatypes/deneb.nim | 10 ++++++++++ .../spec/eth2_apis/eth2_rest_serialization.nim | 6 ++++++ beacon_chain/spec/eth2_apis/rest_types.nim | 5 +++-- ncli/validator_db_aggregator.nim | 4 +++- 9 files changed, 58 insertions(+), 7 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d5a57ed904..6bece50647 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -47,6 +47,7 @@ type blsToExecQueue*: AsyncEventQueue[SignedBLSToExecutionChange] propSlashQueue*: AsyncEventQueue[ProposerSlashing] attSlashQueue*: AsyncEventQueue[AttesterSlashing] + blobSidecarQueue*: AsyncEventQueue[BlobSidecarInfoObject] finalQueue*: AsyncEventQueue[FinalizationInfoObject] reorgQueue*: AsyncEventQueue[ReorgInfoObject] contribQueue*: AsyncEventQueue[SignedContributionAndProof] diff --git a/beacon_chain/consensus_object_pools/blob_quarantine.nim b/beacon_chain/consensus_object_pools/blob_quarantine.nim index 1ba9686fdf..1178f5eddd 100644 --- a/beacon_chain/consensus_object_pools/blob_quarantine.nim +++ b/beacon_chain/consensus_object_pools/blob_quarantine.nim @@ -7,7 +7,10 @@ {.push raises: [].} -import ../spec/helpers +import + std/tables, + ../spec/helpers + from std/sequtils import mapIt from std/strutils import join @@ -18,10 +21,14 @@ type BlobQuarantine* = object blobs*: OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar] + onBlobSidecarCallback*: OnBlobSidecarCallback + BlobFetchRecord* = object block_root*: Eth2Digest indices*: seq[BlobIndex] + OnBlobSidecarCallback = proc(data: BlobSidecar) {.gcsafe, raises: [].} + func shortLog*(x: seq[BlobIndex]): string = "<" & x.mapIt($it).join(", ") & ">" @@ -86,3 +93,7 @@ func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock) (blck.root, idx, blck.message.body.blob_kzg_commitments[i])): indices.add(idx) BlobFetchRecord(block_root: blck.root, indices: indices) + +func init*( + T: type BlobQuarantine, onBlobSidecarCallback: OnBlobSidecarCallback): T = + T(onBlobSidecarCallback: onBlobSidecarCallback) diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 213ae2b2d8..9faad0fc59 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -440,8 +440,11 @@ proc validateBlobSidecar*( if not ok: return dag.checkedReject("BlobSidecar: blob invalid") - ok() + # Send notification about new blob sidecar via callback + if not(isNil(blobQuarantine.onBlobSidecarCallback)): + blobQuarantine.onBlobSidecarCallback(blob_sidecar) + ok() # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 8be8cb1171..2c3d22058f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -8,7 +8,7 @@ {.push raises: [].} import - std/[os, random, sequtils, terminal, times], + std/[os, random, terminal, times], chronos, chronicles, metrics, metrics/chronos_httpserver, stew/[byteutils, io2], @@ -273,6 +273,8 @@ proc checkWeakSubjectivityCheckpoint( headStateSlot = getStateField(dag.headState, slot) quit 1 +from ./spec/state_transition_block import kzg_commitment_to_versioned_hash + proc initFullNode( node: BeaconNode, rng: ref HmacDrbgContext, @@ -293,6 +295,14 @@ proc initFullNode( node.eventBus.propSlashQueue.emit(data) proc onAttesterSlashingAdded(data: AttesterSlashing) = node.eventBus.attSlashQueue.emit(data) + proc onBlobSidecarAdded(data: BlobSidecar) = + node.eventBus.blobSidecarQueue.emit( + BlobSidecarInfoObject( + block_root: hash_tree_root(data.signed_block_header.message), + index: data.index, + slot: data.signed_block_header.message.slot, + kzg_commitment: data.kzg_commitment, + versioned_hash: data.kzg_commitment.kzg_commitment_to_versioned_hash)) proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) = let optimistic = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: @@ -373,7 +383,7 @@ proc initFullNode( validatorChangePool = newClone(ValidatorChangePool.init( dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded, onProposerSlashingAdded, onAttesterSlashingAdded)) - blobQuarantine = newClone(BlobQuarantine()) + blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded)) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), @@ -553,6 +563,7 @@ proc init*(T: type BeaconNode, blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](), propSlashQueue: newAsyncEventQueue[ProposerSlashing](), attSlashQueue: newAsyncEventQueue[AttesterSlashing](), + blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](), finalQueue: newAsyncEventQueue[FinalizationInfoObject](), reorgQueue: newAsyncEventQueue[ReorgInfoObject](), contribQueue: newAsyncEventQueue[SignedContributionAndProof](), @@ -869,6 +880,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch +from std/sequtils import toSeq + func subnetLog(v: BitArray): string = $toSeq(v.oneIndices()) diff --git a/beacon_chain/rpc/rest_event_api.nim b/beacon_chain/rpc/rest_event_api.nim index 6d83d1a297..5f38d4ff11 100644 --- a/beacon_chain/rpc/rest_event_api.nim +++ b/beacon_chain/rpc/rest_event_api.nim @@ -145,6 +145,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) = let handler = response.eventHandler(node.eventBus.attSlashQueue, "attester_slashing") res.add(handler) + if EventTopic.BlobSidecar in eventTopics: + let handler = response.eventHandler(node.eventBus.blobSidecarQueue, + "blob_sidecar") + res.add(handler) if EventTopic.FinalizedCheckpoint in eventTopics: let handler = response.eventHandler(node.eventBus.finalQueue, "finalized_checkpoint") diff --git a/beacon_chain/spec/datatypes/deneb.nim b/beacon_chain/spec/datatypes/deneb.nim index d5f20dcff9..3a95a0eb17 100644 --- a/beacon_chain/spec/datatypes/deneb.nim +++ b/beacon_chain/spec/datatypes/deneb.nim @@ -60,6 +60,16 @@ type kzg_commitment_inclusion_proof*: array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest] + # https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142 + # Spec object, not only internal, because it gets serialized out for the + # event stream Beacon API + BlobSidecarInfoObject* = object + block_root*: Eth2Digest + index*: BlobIndex + slot*: Slot + kzg_commitment*: KzgCommitment + versioned_hash*: VersionedHash + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobidentifier BlobIdentifier* = object block_root*: Eth2Digest diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index 27a20001ac..7c7e0b3477 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -51,6 +51,7 @@ RestJson.useDefaultSerializationFor( BLSToExecutionChange, BeaconBlockHeader, BlobSidecar, + BlobSidecarInfoObject, BlobsBundle, Checkpoint, ContributionAndProof, @@ -299,6 +300,7 @@ const type EncodeTypes* = AttesterSlashing | + BlobSidecarInfoObject | DeleteKeystoresBody | EmptyBody | ImportDistributedKeystoresBody | @@ -4145,6 +4147,8 @@ proc decodeString*(t: typedesc[EventTopic], ok(EventTopic.ProposerSlashing) of "attester_slashing": ok(EventTopic.AttesterSlashing) + of "blob_sidecar": + ok(EventTopic.BlobSidecar) of "finalized_checkpoint": ok(EventTopic.FinalizedCheckpoint) of "chain_reorg": @@ -4174,6 +4178,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] = res.add("proposer_slashing,") if EventTopic.AttesterSlashing in value: res.add("attester_slashing,") + if EventTopic.BlobSidecar in value: + res.add("blob_sidecar,") if EventTopic.FinalizedCheckpoint in value: res.add("finalized_checkpoint,") if EventTopic.ChainReorg in value: diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index e73a7bca6e..627586eecc 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -55,8 +55,9 @@ type # https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml EventTopic* {.pure.} = enum Head, Block, Attestation, VoluntaryExit, BLSToExecutionChange, - ProposerSlashing, AttesterSlashing, FinalizedCheckpoint, ChainReorg, - ContributionAndProof, LightClientFinalityUpdate, LightClientOptimisticUpdate + ProposerSlashing, AttesterSlashing, BlobSidecar, FinalizedCheckpoint, + ChainReorg, ContributionAndProof, LightClientFinalityUpdate, + LightClientOptimisticUpdate EventTopics* = set[EventTopic] diff --git a/ncli/validator_db_aggregator.nim b/ncli/validator_db_aggregator.nim index 64a36f5040..9714c5116b 100644 --- a/ncli/validator_db_aggregator.nim +++ b/ncli/validator_db_aggregator.nim @@ -6,7 +6,7 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import - std/[parsecsv, streams], + std/parsecsv, stew/[io2, byteutils], chronicles, confutils, snappy, ../beacon_chain/spec/datatypes/base, ./ncli_common @@ -200,6 +200,8 @@ proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch, aggregator.epochsAggregated = 0 when isMainModule: + import std/streams + when defined(posix): import system/ansi_c