Skip to content

Commit

Permalink
implement blob_sidecar Beacon API streaming (#5728)
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec authored Jan 13, 2024
1 parent a45609c commit 69af8f9
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 7 deletions.
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type
blsToExecQueue*: AsyncEventQueue[SignedBLSToExecutionChange]
propSlashQueue*: AsyncEventQueue[ProposerSlashing]
attSlashQueue*: AsyncEventQueue[AttesterSlashing]
blobSidecarQueue*: AsyncEventQueue[BlobSidecarInfoObject]
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
contribQueue*: AsyncEventQueue[SignedContributionAndProof]
Expand Down
13 changes: 12 additions & 1 deletion beacon_chain/consensus_object_pools/blob_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

{.push raises: [].}

import ../spec/helpers
import
std/tables,
../spec/helpers

from std/sequtils import mapIt
from std/strutils import join

Expand All @@ -18,10 +21,14 @@ type
BlobQuarantine* = object
blobs*:
OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar]
onBlobSidecarCallback*: OnBlobSidecarCallback

BlobFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[BlobIndex]

OnBlobSidecarCallback = proc(data: BlobSidecar) {.gcsafe, raises: [].}

func shortLog*(x: seq[BlobIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"

Expand Down Expand Up @@ -86,3 +93,7 @@ func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock)
(blck.root, idx, blck.message.body.blob_kzg_commitments[i])):
indices.add(idx)
BlobFetchRecord(block_root: blck.root, indices: indices)

func init*(
T: type BlobQuarantine, onBlobSidecarCallback: OnBlobSidecarCallback): T =
T(onBlobSidecarCallback: onBlobSidecarCallback)
5 changes: 4 additions & 1 deletion beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,11 @@ proc validateBlobSidecar*(
if not ok:
return dag.checkedReject("BlobSidecar: blob invalid")

ok()
# Send notification about new blob sidecar via callback
if not(isNil(blobQuarantine.onBlobSidecarCallback)):
blobQuarantine.onBlobSidecarCallback(blob_sidecar)

ok()

# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block
Expand Down
17 changes: 15 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{.push raises: [].}

import
std/[os, random, sequtils, terminal, times],
std/[os, random, terminal, times],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
Expand Down Expand Up @@ -273,6 +273,8 @@ proc checkWeakSubjectivityCheckpoint(
headStateSlot = getStateField(dag.headState, slot)
quit 1

from ./spec/state_transition_block import kzg_commitment_to_versioned_hash

proc initFullNode(
node: BeaconNode,
rng: ref HmacDrbgContext,
Expand All @@ -293,6 +295,14 @@ proc initFullNode(
node.eventBus.propSlashQueue.emit(data)
proc onAttesterSlashingAdded(data: AttesterSlashing) =
node.eventBus.attSlashQueue.emit(data)
proc onBlobSidecarAdded(data: BlobSidecar) =
node.eventBus.blobSidecarQueue.emit(
BlobSidecarInfoObject(
block_root: hash_tree_root(data.signed_block_header.message),
index: data.index,
slot: data.signed_block_header.message.slot,
kzg_commitment: data.kzg_commitment,
versioned_hash: data.kzg_commitment.kzg_commitment_to_versioned_hash))
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
let optimistic =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
Expand Down Expand Up @@ -373,7 +383,7 @@ proc initFullNode(
validatorChangePool = newClone(ValidatorChangePool.init(
dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded,
onProposerSlashingAdded, onAttesterSlashingAdded))
blobQuarantine = newClone(BlobQuarantine())
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
Expand Down Expand Up @@ -553,6 +563,7 @@ proc init*(T: type BeaconNode,
blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](),
propSlashQueue: newAsyncEventQueue[ProposerSlashing](),
attSlashQueue: newAsyncEventQueue[AttesterSlashing](),
blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](),
finalQueue: newAsyncEventQueue[FinalizationInfoObject](),
reorgQueue: newAsyncEventQueue[ReorgInfoObject](),
contribQueue: newAsyncEventQueue[SignedContributionAndProof](),
Expand Down Expand Up @@ -869,6 +880,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch

from std/sequtils import toSeq

func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())

Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/rpc/rest_event_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
res.add(handler)
if EventTopic.FinalizedCheckpoint in eventTopics:
let handler = response.eventHandler(node.eventBus.finalQueue,
"finalized_checkpoint")
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/datatypes/deneb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type
kzg_commitment_inclusion_proof*:
array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest]

# https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142
# Spec object, not only internal, because it gets serialized out for the
# event stream Beacon API
BlobSidecarInfoObject* = object
block_root*: Eth2Digest
index*: BlobIndex
slot*: Slot
kzg_commitment*: KzgCommitment
versioned_hash*: VersionedHash

# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobidentifier
BlobIdentifier* = object
block_root*: Eth2Digest
Expand Down
6 changes: 6 additions & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ RestJson.useDefaultSerializationFor(
BLSToExecutionChange,
BeaconBlockHeader,
BlobSidecar,
BlobSidecarInfoObject,
BlobsBundle,
Checkpoint,
ContributionAndProof,
Expand Down Expand Up @@ -299,6 +300,7 @@ const
type
EncodeTypes* =
AttesterSlashing |
BlobSidecarInfoObject |
DeleteKeystoresBody |
EmptyBody |
ImportDistributedKeystoresBody |
Expand Down Expand Up @@ -4145,6 +4147,8 @@ proc decodeString*(t: typedesc[EventTopic],
ok(EventTopic.ProposerSlashing)
of "attester_slashing":
ok(EventTopic.AttesterSlashing)
of "blob_sidecar":
ok(EventTopic.BlobSidecar)
of "finalized_checkpoint":
ok(EventTopic.FinalizedCheckpoint)
of "chain_reorg":
Expand Down Expand Up @@ -4174,6 +4178,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
res.add("proposer_slashing,")
if EventTopic.AttesterSlashing in value:
res.add("attester_slashing,")
if EventTopic.BlobSidecar in value:
res.add("blob_sidecar,")
if EventTopic.FinalizedCheckpoint in value:
res.add("finalized_checkpoint,")
if EventTopic.ChainReorg in value:
Expand Down
5 changes: 3 additions & 2 deletions beacon_chain/spec/eth2_apis/rest_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml
EventTopic* {.pure.} = enum
Head, Block, Attestation, VoluntaryExit, BLSToExecutionChange,
ProposerSlashing, AttesterSlashing, FinalizedCheckpoint, ChainReorg,
ContributionAndProof, LightClientFinalityUpdate, LightClientOptimisticUpdate
ProposerSlashing, AttesterSlashing, BlobSidecar, FinalizedCheckpoint,
ChainReorg, ContributionAndProof, LightClientFinalityUpdate,
LightClientOptimisticUpdate

EventTopics* = set[EventTopic]

Expand Down
4 changes: 3 additions & 1 deletion ncli/validator_db_aggregator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
std/[parsecsv, streams],
std/parsecsv,
stew/[io2, byteutils], chronicles, confutils, snappy,
../beacon_chain/spec/datatypes/base,
./ncli_common
Expand Down Expand Up @@ -200,6 +200,8 @@ proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch,
aggregator.epochsAggregated = 0

when isMainModule:
import std/streams

when defined(posix):
import system/ansi_c

Expand Down

0 comments on commit 69af8f9

Please sign in to comment.