Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement blob_sidecar Beacon API streaming #5728

Merged
merged 1 commit into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type
blsToExecQueue*: AsyncEventQueue[SignedBLSToExecutionChange]
propSlashQueue*: AsyncEventQueue[ProposerSlashing]
attSlashQueue*: AsyncEventQueue[AttesterSlashing]
blobSidecarQueue*: AsyncEventQueue[BlobSidecarInfoObject]
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
contribQueue*: AsyncEventQueue[SignedContributionAndProof]
Expand Down
13 changes: 12 additions & 1 deletion beacon_chain/consensus_object_pools/blob_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

{.push raises: [].}

import ../spec/helpers
import
std/tables,
../spec/helpers

from std/sequtils import mapIt
from std/strutils import join

Expand All @@ -18,10 +21,14 @@ type
BlobQuarantine* = object
blobs*:
OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar]
onBlobSidecarCallback*: OnBlobSidecarCallback

BlobFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[BlobIndex]

OnBlobSidecarCallback = proc(data: BlobSidecar) {.gcsafe, raises: [].}

func shortLog*(x: seq[BlobIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"

Expand Down Expand Up @@ -86,3 +93,7 @@ func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock)
(blck.root, idx, blck.message.body.blob_kzg_commitments[i])):
indices.add(idx)
BlobFetchRecord(block_root: blck.root, indices: indices)

func init*(
T: type BlobQuarantine, onBlobSidecarCallback: OnBlobSidecarCallback): T =
T(onBlobSidecarCallback: onBlobSidecarCallback)
5 changes: 4 additions & 1 deletion beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,11 @@ proc validateBlobSidecar*(
if not ok:
return dag.checkedReject("BlobSidecar: blob invalid")

ok()
# Send notification about new blob sidecar via callback
if not(isNil(blobQuarantine.onBlobSidecarCallback)):
blobQuarantine.onBlobSidecarCallback(blob_sidecar)

ok()

# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block
Expand Down
17 changes: 15 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{.push raises: [].}

import
std/[os, random, sequtils, terminal, times],
std/[os, random, terminal, times],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
Expand Down Expand Up @@ -273,6 +273,8 @@ proc checkWeakSubjectivityCheckpoint(
headStateSlot = getStateField(dag.headState, slot)
quit 1

from ./spec/state_transition_block import kzg_commitment_to_versioned_hash

proc initFullNode(
node: BeaconNode,
rng: ref HmacDrbgContext,
Expand All @@ -293,6 +295,14 @@ proc initFullNode(
node.eventBus.propSlashQueue.emit(data)
proc onAttesterSlashingAdded(data: AttesterSlashing) =
node.eventBus.attSlashQueue.emit(data)
proc onBlobSidecarAdded(data: BlobSidecar) =
node.eventBus.blobSidecarQueue.emit(
BlobSidecarInfoObject(
block_root: hash_tree_root(data.signed_block_header.message),
index: data.index,
slot: data.signed_block_header.message.slot,
kzg_commitment: data.kzg_commitment,
versioned_hash: data.kzg_commitment.kzg_commitment_to_versioned_hash))
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
let optimistic =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
Expand Down Expand Up @@ -373,7 +383,7 @@ proc initFullNode(
validatorChangePool = newClone(ValidatorChangePool.init(
dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded,
onProposerSlashingAdded, onAttesterSlashingAdded))
blobQuarantine = newClone(BlobQuarantine())
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
Expand Down Expand Up @@ -553,6 +563,7 @@ proc init*(T: type BeaconNode,
blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](),
propSlashQueue: newAsyncEventQueue[ProposerSlashing](),
attSlashQueue: newAsyncEventQueue[AttesterSlashing](),
blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](),
finalQueue: newAsyncEventQueue[FinalizationInfoObject](),
reorgQueue: newAsyncEventQueue[ReorgInfoObject](),
contribQueue: newAsyncEventQueue[SignedContributionAndProof](),
Expand Down Expand Up @@ -869,6 +880,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch

from std/sequtils import toSeq

func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())

Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/rpc/rest_event_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
res.add(handler)
if EventTopic.FinalizedCheckpoint in eventTopics:
let handler = response.eventHandler(node.eventBus.finalQueue,
"finalized_checkpoint")
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/datatypes/deneb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type
kzg_commitment_inclusion_proof*:
array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest]

# https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142
# Spec object, not only internal, because it gets serialized out for the
# event stream Beacon API
BlobSidecarInfoObject* = object
block_root*: Eth2Digest
index*: BlobIndex
slot*: Slot
kzg_commitment*: KzgCommitment
versioned_hash*: VersionedHash

# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobidentifier
BlobIdentifier* = object
block_root*: Eth2Digest
Expand Down
6 changes: 6 additions & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ RestJson.useDefaultSerializationFor(
BLSToExecutionChange,
BeaconBlockHeader,
BlobSidecar,
BlobSidecarInfoObject,
BlobsBundle,
Checkpoint,
ContributionAndProof,
Expand Down Expand Up @@ -299,6 +300,7 @@ const
type
EncodeTypes* =
AttesterSlashing |
BlobSidecarInfoObject |
DeleteKeystoresBody |
EmptyBody |
ImportDistributedKeystoresBody |
Expand Down Expand Up @@ -4145,6 +4147,8 @@ proc decodeString*(t: typedesc[EventTopic],
ok(EventTopic.ProposerSlashing)
of "attester_slashing":
ok(EventTopic.AttesterSlashing)
of "blob_sidecar":
ok(EventTopic.BlobSidecar)
of "finalized_checkpoint":
ok(EventTopic.FinalizedCheckpoint)
of "chain_reorg":
Expand Down Expand Up @@ -4174,6 +4178,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
res.add("proposer_slashing,")
if EventTopic.AttesterSlashing in value:
res.add("attester_slashing,")
if EventTopic.BlobSidecar in value:
res.add("blob_sidecar,")
if EventTopic.FinalizedCheckpoint in value:
res.add("finalized_checkpoint,")
if EventTopic.ChainReorg in value:
Expand Down
5 changes: 3 additions & 2 deletions beacon_chain/spec/eth2_apis/rest_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml
EventTopic* {.pure.} = enum
Head, Block, Attestation, VoluntaryExit, BLSToExecutionChange,
ProposerSlashing, AttesterSlashing, FinalizedCheckpoint, ChainReorg,
ContributionAndProof, LightClientFinalityUpdate, LightClientOptimisticUpdate
ProposerSlashing, AttesterSlashing, BlobSidecar, FinalizedCheckpoint,
ChainReorg, ContributionAndProof, LightClientFinalityUpdate,
LightClientOptimisticUpdate

EventTopics* = set[EventTopic]

Expand Down
4 changes: 3 additions & 1 deletion ncli/validator_db_aggregator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
std/[parsecsv, streams],
std/parsecsv,
stew/[io2, byteutils], chronicles, confutils, snappy,
../beacon_chain/spec/datatypes/base,
./ncli_common
Expand Down Expand Up @@ -200,6 +200,8 @@ proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch,
aggregator.epochsAggregated = 0

when isMainModule:
import std/streams

when defined(posix):
import system/ansi_c

Expand Down
Loading