From f9821df51caad56fb38ed66e774ac0f9bf4e6c29 Mon Sep 17 00:00:00 2001 From: g11tech Date: Wed, 2 Aug 2023 19:28:51 +0530 Subject: [PATCH] feat: add basic broadcast validation to the block publishing (#5762) * feat: add broadcast validation to the block publishing * add basic broadcast validation * cleanup * fix tracking * cleanup * fix e2e errors * make broadcast validation optional * logging improvs and fix test * resolve error * undo validator changes as per feedback --- .../api/src/beacon/routes/beacon/block.ts | 66 ++++++ .../api/src/beacon/routes/beacon/index.ts | 2 +- .../api/test/unit/beacon/testData/beacon.ts | 15 +- .../src/api/impl/beacon/blocks/index.ts | 198 ++++++++++++------ packages/beacon-node/src/chain/chain.ts | 18 +- packages/beacon-node/src/chain/interface.ts | 2 + packages/beacon-node/src/chain/options.ts | 4 + .../src/options/beaconNodeOptions/chain.ts | 11 + 8 files changed, 244 insertions(+), 72 deletions(-) diff --git a/packages/api/src/beacon/routes/beacon/block.ts b/packages/api/src/beacon/routes/beacon/block.ts index 20203fa9fcb4..a36f4505dc5f 100644 --- a/packages/api/src/beacon/routes/beacon/block.ts +++ b/packages/api/src/beacon/routes/beacon/block.ts @@ -44,6 +44,13 @@ export type BlockHeaderResponse = { header: phase0.SignedBeaconBlockHeader; }; +export enum BroadcastValidation { + none = "none", + gossip = "gossip", + consensus = "consensus", + consensusAndEquivocation = "consensus_and_equivocation", +} + export type Api = { /** * Get block @@ -167,6 +174,20 @@ export type Api = { HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE > >; + + publishBlockV2( + blockOrContents: allForks.SignedBeaconBlock | SignedBlockContents, + opts: {broadcastValidation?: BroadcastValidation} + ): Promise< + ApiClientResponse< + { + [HttpStatusCode.OK]: void; + [HttpStatusCode.ACCEPTED]: void; + }, + HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE + > + >; + /** * Publish a signed blinded block by submitting it to the mev relay and patching in the block * transactions beacon node gets in response. @@ -180,6 +201,19 @@ export type Api = { HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE > >; + + publishBlindedBlockV2( + blindedBlockOrContents: allForks.SignedBlindedBeaconBlock | SignedBlindedBlockContents, + opts: {broadcastValidation?: BroadcastValidation} + ): Promise< + ApiClientResponse< + { + [HttpStatusCode.OK]: void; + [HttpStatusCode.ACCEPTED]: void; + }, + HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE + > + >; /** * Get block BlobSidecar * Retrieves BlobSidecar included in requested block. @@ -204,7 +238,9 @@ export const routesData: RoutesData = { getBlockHeaders: {url: "/eth/v1/beacon/headers", method: "GET"}, getBlockRoot: {url: "/eth/v1/beacon/blocks/{block_id}/root", method: "GET"}, publishBlock: {url: "/eth/v1/beacon/blocks", method: "POST"}, + publishBlockV2: {url: "/eth/v2/beacon/blocks", method: "POST"}, publishBlindedBlock: {url: "/eth/v1/beacon/blinded_blocks", method: "POST"}, + publishBlindedBlockV2: {url: "/eth/v2/beacon/blinded_blocks", method: "POST"}, getBlobSidecars: {url: "/eth/v1/beacon/blob_sidecars/{block_id}", method: "GET"}, }; @@ -220,7 +256,9 @@ export type ReqTypes = { getBlockHeaders: {query: {slot?: number; parent_root?: string}}; getBlockRoot: BlockIdOnlyReq; publishBlock: {body: unknown}; + publishBlockV2: {body: unknown; query: {broadcast_validation?: string}}; publishBlindedBlock: {body: unknown}; + publishBlindedBlockV2: {body: unknown; query: {broadcast_validation?: string}}; getBlobSidecars: BlockIdOnlyReq; }; @@ -277,7 +315,35 @@ export function getReqSerializers(config: ChainForkConfig): ReqSerializers ({ + body: AllForksSignedBlockOrContents.toJson(item), + query: {broadcast_validation: broadcastValidation}, + }), + parseReq: ({body, query}) => [ + AllForksSignedBlockOrContents.fromJson(body), + {broadcastValidation: query.broadcast_validation as BroadcastValidation}, + ], + schema: { + body: Schema.Object, + query: {broadcast_validation: Schema.String}, + }, + }, publishBlindedBlock: reqOnlyBody(AllForksSignedBlindedBlockOrContents, Schema.Object), + publishBlindedBlockV2: { + writeReq: (item, {broadcastValidation}) => ({ + body: AllForksSignedBlindedBlockOrContents.toJson(item), + query: {broadcast_validation: broadcastValidation}, + }), + parseReq: ({body, query}) => [ + AllForksSignedBlindedBlockOrContents.fromJson(body), + {broadcastValidation: query.broadcast_validation as BroadcastValidation}, + ], + schema: { + body: Schema.Object, + query: {broadcast_validation: Schema.String}, + }, + }, getBlobSidecars: blockIdOnlyReq, }; } diff --git a/packages/api/src/beacon/routes/beacon/index.ts b/packages/api/src/beacon/routes/beacon/index.ts index 3146aa694d3e..d8879ca2d695 100644 --- a/packages/api/src/beacon/routes/beacon/index.ts +++ b/packages/api/src/beacon/routes/beacon/index.ts @@ -15,7 +15,7 @@ import * as state from "./state.js"; export * as block from "./block.js"; export * as pool from "./pool.js"; export * as state from "./state.js"; -export {BlockId, BlockHeaderResponse} from "./block.js"; +export {BlockId, BlockHeaderResponse, BroadcastValidation} from "./block.js"; export {AttestationFilters} from "./pool.js"; // TODO: Review if re-exporting all these types is necessary export { diff --git a/packages/api/test/unit/beacon/testData/beacon.ts b/packages/api/test/unit/beacon/testData/beacon.ts index 2cca22564431..85068fb83e48 100644 --- a/packages/api/test/unit/beacon/testData/beacon.ts +++ b/packages/api/test/unit/beacon/testData/beacon.ts @@ -1,7 +1,12 @@ import {toHexString} from "@chainsafe/ssz"; import {ForkName} from "@lodestar/params"; import {ssz, Slot, allForks} from "@lodestar/types"; -import {Api, BlockHeaderResponse, ValidatorResponse} from "../../../../src/beacon/routes/beacon/index.js"; +import { + Api, + BlockHeaderResponse, + BroadcastValidation, + ValidatorResponse, +} from "../../../../src/beacon/routes/beacon/index.js"; import {GenericServerTestCases} from "../../../utils/genericServerTest.js"; const root = Buffer.alloc(32, 1); @@ -52,10 +57,18 @@ export const testData: GenericServerTestCases = { args: [ssz.phase0.SignedBeaconBlock.defaultValue()], res: undefined, }, + publishBlockV2: { + args: [ssz.phase0.SignedBeaconBlock.defaultValue(), {broadcastValidation: BroadcastValidation.none}], + res: undefined, + }, publishBlindedBlock: { args: [getDefaultBlindedBlock(64)], res: undefined, }, + publishBlindedBlockV2: { + args: [getDefaultBlindedBlock(64), {broadcastValidation: BroadcastValidation.none}], + res: undefined, + }, getBlobSidecars: { args: ["head"], res: {executionOptimistic: true, data: ssz.deneb.BlobSidecars.defaultValue()}, diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index 663e316f413c..70659497aed2 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -2,7 +2,7 @@ import {fromHexString, toHexString} from "@chainsafe/ssz"; import {routes, ServerApi, isSignedBlockContents, isSignedBlindedBlockContents} from "@lodestar/api"; import {computeTimeAtSlot} from "@lodestar/state-transition"; import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params"; -import {sleep} from "@lodestar/utils"; +import {sleep, toHex} from "@lodestar/utils"; import {allForks, deneb} from "@lodestar/types"; import { BlockSource, @@ -19,6 +19,8 @@ import {NetworkEvent} from "../../../../network/index.js"; import {ApiModules} from "../../types.js"; import {resolveBlockId, toBeaconHeaderResponse} from "./utils.js"; +type PublishBlockOpts = ImportBlockOpts & {broadcastValidation?: routes.beacon.BroadcastValidation}; + /** * Validator clock may be advanced from beacon's clock. If the validator requests a resource in a * future slot, wait some time instead of rejecting the request because it's in the future @@ -37,6 +39,127 @@ export function getBeaconBlockApi({ network, db, }: Pick): ServerApi { + const publishBlock: ServerApi["publishBlock"] = async ( + signedBlockOrContents, + opts: PublishBlockOpts = {} + ) => { + const seenTimestampSec = Date.now() / 1000; + let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars; + + if (isSignedBlockContents(signedBlockOrContents)) { + ({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents); + blockForImport = getBlockInput.postDeneb( + config, + signedBlock, + BlockSource.api, + // The blobsSidecar will be replaced in the followup PRs with just blobs + blobSidecarsToBlobsSidecar( + config, + signedBlock, + signedBlobs.map((sblob) => sblob.message) + ), + null + ); + } else { + signedBlock = signedBlockOrContents; + signedBlobs = []; + // TODO: Once API supports submitting data as SSZ, replace null with blockBytes + blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null); + } + + // check what validations have been requested before broadcasting and publishing the block + // TODO: add validation time to metrics + const broadcastValidation = opts.broadcastValidation ?? routes.beacon.BroadcastValidation.none; + // if block is locally produced, full or blinded, it already is 'consensus' validated as it went through + // state transition to produce the stateRoot + const slot = signedBlock.message.slot; + const blockRoot = toHex(chain.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(signedBlock.message)); + const blockLocallyProduced = + chain.producedBlockRoot.has(blockRoot) || chain.producedBlindedBlockRoot.has(blockRoot); + const valLogMeta = {broadcastValidation, blockRoot, blockLocallyProduced, slot}; + + switch (broadcastValidation) { + case routes.beacon.BroadcastValidation.none: { + if (blockLocallyProduced) { + chain.logger.debug("No broadcast validation requested for the block", valLogMeta); + } else { + chain.logger.warn("No broadcast validation requested for the block", valLogMeta); + } + break; + } + case routes.beacon.BroadcastValidation.consensus: { + // check if this beacon node produced the block else run validations + if (!blockLocallyProduced) { + // error or log warning that we support consensus val on blocks produced via this beacon node + const message = "Consensus validation not implemented yet for block not produced by this beacon node"; + if (chain.opts.broadcastValidationStrictness === "error") { + throw Error(message); + } else { + chain.logger.warn(message, valLogMeta); + } + } + break; + } + + default: { + // error or log warning we do not support this validation + const message = `Broadcast validation of ${broadcastValidation} type not implemented yet`; + if (chain.opts.broadcastValidationStrictness === "error") { + throw Error(message); + } else { + chain.logger.warn(message); + } + } + } + + // Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the + // REST request promise without any extra infrastructure. + const msToBlockSlot = + computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now(); + if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) { + // If block is a bit early, hold it in a promise. Equivalent to a pending queue. + await sleep(msToBlockSlot); + } + + // TODO: Validate block + metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message); + const publishPromises = [ + // Send the block, regardless of whether or not it is valid. The API + // specification is very clear that this is the desired behaviour. + () => network.publishBeaconBlock(signedBlock) as Promise, + () => + // there is no rush to persist block since we published it to gossip anyway + chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => { + if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { + network.events.emit(NetworkEvent.unknownBlockParent, { + blockInput: blockForImport, + peer: IDENTITY_PEER_ID, + }); + } + throw e; + }), + ...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)), + ]; + await promiseAllMaybeAsync(publishPromises); + }; + + const publishBlindedBlock: ServerApi["publishBlindedBlock"] = async ( + signedBlindedBlockOrContents, + opts: PublishBlockOpts = {} + ) => { + const executionBuilder = chain.executionBuilder; + if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock"); + // Mechanism for blobs & blocks on builder is not yet finalized + if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) { + throw Error("exeutionBuilder not yet implemented for deneb+ forks"); + } else { + const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents); + // the full block is published by relay and it's possible that the block is already known to us by gossip + // see https://github.com/ChainSafe/lodestar/issues/5404 + return publishBlock(signedBlockOrContents, {...opts, ignoreIfKnown: true}); + } + }; + return { async getBlockHeaders(filters) { // TODO - SLOW CODE: This code seems like it could be improved @@ -189,74 +312,15 @@ export function getBeaconBlockApi({ }; }, - async publishBlindedBlock(signedBlindedBlockOrContents) { - const executionBuilder = chain.executionBuilder; - if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock"); - // Mechanism for blobs & blocks on builder is not yet finalized - if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) { - throw Error("exeutionBuilder not yet implemented for deneb+ forks"); - } else { - const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents); - // the full block is published by relay and it's possible that the block is already known to us by gossip - // see https://github.com/ChainSafe/lodestar/issues/5404 - return this.publishBlock(signedBlockOrContents, {ignoreIfKnown: true}); - } - }, - - async publishBlock(signedBlockOrContents, opts: ImportBlockOpts = {}) { - const seenTimestampSec = Date.now() / 1000; - let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars; + publishBlock, + publishBlindedBlock, - if (isSignedBlockContents(signedBlockOrContents)) { - ({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents); - blockForImport = getBlockInput.postDeneb( - config, - signedBlock, - BlockSource.api, - // The blobsSidecar will be replaced in the followup PRs with just blobs - blobSidecarsToBlobsSidecar( - config, - signedBlock, - signedBlobs.map((sblob) => sblob.message) - ), - null - ); - } else { - signedBlock = signedBlockOrContents; - signedBlobs = []; - // TODO: Once API supports submitting data as SSZ, replace null with blockBytes - blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null); - } - - // Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the - // REST request promise without any extra infrastructure. - const msToBlockSlot = - computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now(); - if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) { - // If block is a bit early, hold it in a promise. Equivalent to a pending queue. - await sleep(msToBlockSlot); - } + async publishBlindedBlockV2(signedBlindedBlockOrContents, opts) { + await publishBlindedBlock(signedBlindedBlockOrContents, opts); + }, - // TODO: Validate block - metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message); - const publishPromises = [ - // Send the block, regardless of whether or not it is valid. The API - // specification is very clear that this is the desired behaviour. - () => network.publishBeaconBlock(signedBlock) as Promise, - () => - // there is no rush to persist block since we published it to gossip anyway - chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => { - if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { - network.events.emit(NetworkEvent.unknownBlockParent, { - blockInput: blockForImport, - peer: IDENTITY_PEER_ID, - }); - } - throw e; - }), - ...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)), - ]; - await promiseAllMaybeAsync(publishPromises); + async publishBlockV2(signedBlockOrContents, opts) { + await publishBlock(signedBlockOrContents, opts); }, async getBlobSidecars(blockId) { diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 9e91b329b0d4..8dacf79c642b 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -29,7 +29,7 @@ import { import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {ProcessShutdownCallback} from "@lodestar/validator"; import {Logger, isErrorAborted, pruneSetToMax, sleep, toHex} from "@lodestar/utils"; -import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params"; +import {ForkSeq, SLOTS_PER_EPOCH, MAX_BLOBS_PER_BLOCK} from "@lodestar/params"; import {GENESIS_EPOCH, ZERO_HASH} from "../constants/index.js"; import {IBeaconDb} from "../db/index.js"; @@ -80,8 +80,10 @@ import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js"; * Arbitrary constants, blobs should be consumed immediately in the same slot they are produced. * A value of 1 would probably be sufficient. However it's sensible to allow some margin if the node overloads. */ -const DEFAULT_MAX_CACHED_BLOB_SIDECARS = 8; +const DEFAULT_MAX_CACHED_BLOB_SIDECARS = MAX_BLOBS_PER_BLOCK * 2; const MAX_RETAINED_SLOTS_CACHED_BLOBS_SIDECAR = 8; +// we have seen two attempts in a single slot so we factor for four +const DEFAULT_MAX_CACHED_PRODUCED_ROOTS = 4; export class BeaconChain implements IBeaconChain { readonly genesisTime: UintNum64; @@ -135,6 +137,10 @@ export class BeaconChain implements IBeaconChain { BlockHash, {blobSidecars: deneb.BlindedBlobSidecars; slot: Slot} >(); + + readonly producedBlockRoot = new Set(); + readonly producedBlindedBlockRoot = new Set(); + readonly opts: IChainOptions; protected readonly blockProcessor: BlockProcessor; @@ -501,12 +507,18 @@ export class BeaconChain implements IBeaconChain { block.stateRoot = computeNewStateRoot(this.metrics, state, block); + // track the produced block for consensus broadcast validations + const blockRoot = this.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block); + const blockRootHex = toHex(blockRoot); + const producedRootTracker = blockType === BlockType.Full ? this.producedBlockRoot : this.producedBlindedBlockRoot; + producedRootTracker.add(blockRootHex); + pruneSetToMax(producedRootTracker, this.opts.maxCachedProducedRoots ?? DEFAULT_MAX_CACHED_PRODUCED_ROOTS); + // Cache for latter broadcasting // // blinded blobs will be fetched and added to this cache later before finally // publishing the blinded block's full version if (blobs.type === BlobsResultType.produced) { - const blockRoot = this.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block); // body is of full type here const blockHash = toHex((block as bellatrix.BeaconBlock).body.executionPayload.blockHash); const blobSidecars = blobs.blobSidecars.map((blobSidecar) => ({ diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 1edd646af83d..78fbf2c5a3fe 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -95,6 +95,8 @@ export interface IBeaconChain { readonly checkpointBalancesCache: CheckpointBalancesCache; readonly producedBlobSidecarsCache: Map; readonly producedBlindedBlobSidecarsCache: Map; + readonly producedBlockRoot: Set; + readonly producedBlindedBlockRoot: Set; readonly opts: IChainOptions; /** Stop beacon chain processing */ diff --git a/packages/beacon-node/src/chain/options.ts b/packages/beacon-node/src/chain/options.ts index 199732a9a940..93947f105c94 100644 --- a/packages/beacon-node/src/chain/options.ts +++ b/packages/beacon-node/src/chain/options.ts @@ -21,8 +21,11 @@ export type IChainOptions = BlockProcessOpts & sanityCheckExecutionEngineBlobs?: boolean; /** Max number of produced blobs by local validators to cache */ maxCachedBlobSidecars?: number; + /** Max number of produced block roots (blinded or full) cached for broadcast validations */ + maxCachedProducedRoots?: number; /** Option to load a custom kzg trusted setup in txt format */ trustedSetup?: string; + broadcastValidationStrictness?: string; }; export type BlockProcessOpts = { @@ -79,4 +82,5 @@ export const defaultChainOptions: IChainOptions = { // for gossip block validation, it's unlikely we see a reorg with 32 slots // for attestation validation, having this value ensures we don't have to regen states most of the time maxSkipSlots: 32, + broadcastValidationStrictness: "warn", }; diff --git a/packages/cli/src/options/beaconNodeOptions/chain.ts b/packages/cli/src/options/beaconNodeOptions/chain.ts index dd2049ea00a7..9c7e1edc49c6 100644 --- a/packages/cli/src/options/beaconNodeOptions/chain.ts +++ b/packages/cli/src/options/beaconNodeOptions/chain.ts @@ -22,6 +22,7 @@ export type ChainArgs = { "safe-slots-to-import-optimistically": number; "chain.archiveStateEpochFrequency": number; emitPayloadAttributes?: boolean; + broadcastValidationStrictness?: string; }; export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] { @@ -44,6 +45,7 @@ export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] { safeSlotsToImportOptimistically: args["safe-slots-to-import-optimistically"], archiveStateEpochFrequency: args["chain.archiveStateEpochFrequency"], emitPayloadAttributes: args["emitPayloadAttributes"], + broadcastValidationStrictness: args["broadcastValidationStrictness"], }; } @@ -171,4 +173,13 @@ Will double processing times. Use only for debugging purposes.", type: "number", group: "chain", }, + + broadcastValidationStrictness: { + // TODO: hide the option till validations fully implemented + hidden: true, + description: + "'warn' or 'error' - options to either throw error or to log warning when broadcast validation can't be performed", + type: "string", + default: "warn", + }, };