Skip to content

Commit

Permalink
feat: add basic broadcast validation to the block publishing (#5762)
Browse files Browse the repository at this point in the history
* feat: add broadcast validation to the block publishing

* add basic broadcast validation

* cleanup

* fix tracking

* cleanup

* fix e2e errors

* make broadcast validation optional

* logging improvs and fix test

* resolve error

* undo validator changes as per feedback
  • Loading branch information
g11tech authored Aug 2, 2023
1 parent 549aa03 commit f9821df
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 72 deletions.
66 changes: 66 additions & 0 deletions packages/api/src/beacon/routes/beacon/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ export type BlockHeaderResponse = {
header: phase0.SignedBeaconBlockHeader;
};

export enum BroadcastValidation {
none = "none",
gossip = "gossip",
consensus = "consensus",
consensusAndEquivocation = "consensus_and_equivocation",
}

export type Api = {
/**
* Get block
Expand Down Expand Up @@ -167,6 +174,20 @@ export type Api = {
HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;

publishBlockV2(
blockOrContents: allForks.SignedBeaconBlock | SignedBlockContents,
opts: {broadcastValidation?: BroadcastValidation}
): Promise<
ApiClientResponse<
{
[HttpStatusCode.OK]: void;
[HttpStatusCode.ACCEPTED]: void;
},
HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;

/**
* Publish a signed blinded block by submitting it to the mev relay and patching in the block
* transactions beacon node gets in response.
Expand All @@ -180,6 +201,19 @@ export type Api = {
HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;

publishBlindedBlockV2(
blindedBlockOrContents: allForks.SignedBlindedBeaconBlock | SignedBlindedBlockContents,
opts: {broadcastValidation?: BroadcastValidation}
): Promise<
ApiClientResponse<
{
[HttpStatusCode.OK]: void;
[HttpStatusCode.ACCEPTED]: void;
},
HttpStatusCode.BAD_REQUEST | HttpStatusCode.SERVICE_UNAVAILABLE
>
>;
/**
* Get block BlobSidecar
* Retrieves BlobSidecar included in requested block.
Expand All @@ -204,7 +238,9 @@ export const routesData: RoutesData<Api> = {
getBlockHeaders: {url: "/eth/v1/beacon/headers", method: "GET"},
getBlockRoot: {url: "/eth/v1/beacon/blocks/{block_id}/root", method: "GET"},
publishBlock: {url: "/eth/v1/beacon/blocks", method: "POST"},
publishBlockV2: {url: "/eth/v2/beacon/blocks", method: "POST"},
publishBlindedBlock: {url: "/eth/v1/beacon/blinded_blocks", method: "POST"},
publishBlindedBlockV2: {url: "/eth/v2/beacon/blinded_blocks", method: "POST"},
getBlobSidecars: {url: "/eth/v1/beacon/blob_sidecars/{block_id}", method: "GET"},
};

Expand All @@ -220,7 +256,9 @@ export type ReqTypes = {
getBlockHeaders: {query: {slot?: number; parent_root?: string}};
getBlockRoot: BlockIdOnlyReq;
publishBlock: {body: unknown};
publishBlockV2: {body: unknown; query: {broadcast_validation?: string}};
publishBlindedBlock: {body: unknown};
publishBlindedBlockV2: {body: unknown; query: {broadcast_validation?: string}};
getBlobSidecars: BlockIdOnlyReq;
};

Expand Down Expand Up @@ -277,7 +315,35 @@ export function getReqSerializers(config: ChainForkConfig): ReqSerializers<Api,
},
getBlockRoot: blockIdOnlyReq,
publishBlock: reqOnlyBody(AllForksSignedBlockOrContents, Schema.Object),
publishBlockV2: {
writeReq: (item, {broadcastValidation}) => ({
body: AllForksSignedBlockOrContents.toJson(item),
query: {broadcast_validation: broadcastValidation},
}),
parseReq: ({body, query}) => [
AllForksSignedBlockOrContents.fromJson(body),
{broadcastValidation: query.broadcast_validation as BroadcastValidation},
],
schema: {
body: Schema.Object,
query: {broadcast_validation: Schema.String},
},
},
publishBlindedBlock: reqOnlyBody(AllForksSignedBlindedBlockOrContents, Schema.Object),
publishBlindedBlockV2: {
writeReq: (item, {broadcastValidation}) => ({
body: AllForksSignedBlindedBlockOrContents.toJson(item),
query: {broadcast_validation: broadcastValidation},
}),
parseReq: ({body, query}) => [
AllForksSignedBlindedBlockOrContents.fromJson(body),
{broadcastValidation: query.broadcast_validation as BroadcastValidation},
],
schema: {
body: Schema.Object,
query: {broadcast_validation: Schema.String},
},
},
getBlobSidecars: blockIdOnlyReq,
};
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/beacon/routes/beacon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import * as state from "./state.js";
export * as block from "./block.js";
export * as pool from "./pool.js";
export * as state from "./state.js";
export {BlockId, BlockHeaderResponse} from "./block.js";
export {BlockId, BlockHeaderResponse, BroadcastValidation} from "./block.js";
export {AttestationFilters} from "./pool.js";
// TODO: Review if re-exporting all these types is necessary
export {
Expand Down
15 changes: 14 additions & 1 deletion packages/api/test/unit/beacon/testData/beacon.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import {toHexString} from "@chainsafe/ssz";
import {ForkName} from "@lodestar/params";
import {ssz, Slot, allForks} from "@lodestar/types";
import {Api, BlockHeaderResponse, ValidatorResponse} from "../../../../src/beacon/routes/beacon/index.js";
import {
Api,
BlockHeaderResponse,
BroadcastValidation,
ValidatorResponse,
} from "../../../../src/beacon/routes/beacon/index.js";
import {GenericServerTestCases} from "../../../utils/genericServerTest.js";

const root = Buffer.alloc(32, 1);
Expand Down Expand Up @@ -52,10 +57,18 @@ export const testData: GenericServerTestCases<Api> = {
args: [ssz.phase0.SignedBeaconBlock.defaultValue()],
res: undefined,
},
publishBlockV2: {
args: [ssz.phase0.SignedBeaconBlock.defaultValue(), {broadcastValidation: BroadcastValidation.none}],
res: undefined,
},
publishBlindedBlock: {
args: [getDefaultBlindedBlock(64)],
res: undefined,
},
publishBlindedBlockV2: {
args: [getDefaultBlindedBlock(64), {broadcastValidation: BroadcastValidation.none}],
res: undefined,
},
getBlobSidecars: {
args: ["head"],
res: {executionOptimistic: true, data: ssz.deneb.BlobSidecars.defaultValue()},
Expand Down
198 changes: 131 additions & 67 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {fromHexString, toHexString} from "@chainsafe/ssz";
import {routes, ServerApi, isSignedBlockContents, isSignedBlindedBlockContents} from "@lodestar/api";
import {computeTimeAtSlot} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep} from "@lodestar/utils";
import {sleep, toHex} from "@lodestar/utils";
import {allForks, deneb} from "@lodestar/types";
import {
BlockSource,
Expand All @@ -19,6 +19,8 @@ import {NetworkEvent} from "../../../../network/index.js";
import {ApiModules} from "../../types.js";
import {resolveBlockId, toBeaconHeaderResponse} from "./utils.js";

type PublishBlockOpts = ImportBlockOpts & {broadcastValidation?: routes.beacon.BroadcastValidation};

/**
* Validator clock may be advanced from beacon's clock. If the validator requests a resource in a
* future slot, wait some time instead of rejecting the request because it's in the future
Expand All @@ -37,6 +39,127 @@ export function getBeaconBlockApi({
network,
db,
}: Pick<ApiModules, "chain" | "config" | "metrics" | "network" | "db">): ServerApi<routes.beacon.block.Api> {
const publishBlock: ServerApi<routes.beacon.block.Api>["publishBlock"] = async (
signedBlockOrContents,
opts: PublishBlockOpts = {}
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents);
blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map((sblob) => sblob.message)
),
null
);
} else {
signedBlock = signedBlockOrContents;
signedBlobs = [];
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null);
}

// check what validations have been requested before broadcasting and publishing the block
// TODO: add validation time to metrics
const broadcastValidation = opts.broadcastValidation ?? routes.beacon.BroadcastValidation.none;
// if block is locally produced, full or blinded, it already is 'consensus' validated as it went through
// state transition to produce the stateRoot
const slot = signedBlock.message.slot;
const blockRoot = toHex(chain.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(signedBlock.message));
const blockLocallyProduced =
chain.producedBlockRoot.has(blockRoot) || chain.producedBlindedBlockRoot.has(blockRoot);
const valLogMeta = {broadcastValidation, blockRoot, blockLocallyProduced, slot};

switch (broadcastValidation) {
case routes.beacon.BroadcastValidation.none: {
if (blockLocallyProduced) {
chain.logger.debug("No broadcast validation requested for the block", valLogMeta);
} else {
chain.logger.warn("No broadcast validation requested for the block", valLogMeta);
}
break;
}
case routes.beacon.BroadcastValidation.consensus: {
// check if this beacon node produced the block else run validations
if (!blockLocallyProduced) {
// error or log warning that we support consensus val on blocks produced via this beacon node
const message = "Consensus validation not implemented yet for block not produced by this beacon node";
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message, valLogMeta);
}
}
break;
}

default: {
// error or log warning we do not support this validation
const message = `Broadcast validation of ${broadcastValidation} type not implemented yet`;
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message);
}
}
}

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
}

// TODO: Validate block
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message);
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
});
}
throw e;
}),
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
};

const publishBlindedBlock: ServerApi<routes.beacon.block.Api>["publishBlindedBlock"] = async (
signedBlindedBlockOrContents,
opts: PublishBlockOpts = {}
) => {
const executionBuilder = chain.executionBuilder;
if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock");
// Mechanism for blobs & blocks on builder is not yet finalized
if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) {
throw Error("exeutionBuilder not yet implemented for deneb+ forks");
} else {
const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents);
// the full block is published by relay and it's possible that the block is already known to us by gossip
// see https://github.com/ChainSafe/lodestar/issues/5404
return publishBlock(signedBlockOrContents, {...opts, ignoreIfKnown: true});
}
};

return {
async getBlockHeaders(filters) {
// TODO - SLOW CODE: This code seems like it could be improved
Expand Down Expand Up @@ -189,74 +312,15 @@ export function getBeaconBlockApi({
};
},

async publishBlindedBlock(signedBlindedBlockOrContents) {
const executionBuilder = chain.executionBuilder;
if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock");
// Mechanism for blobs & blocks on builder is not yet finalized
if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) {
throw Error("exeutionBuilder not yet implemented for deneb+ forks");
} else {
const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents);
// the full block is published by relay and it's possible that the block is already known to us by gossip
// see https://github.com/ChainSafe/lodestar/issues/5404
return this.publishBlock(signedBlockOrContents, {ignoreIfKnown: true});
}
},

async publishBlock(signedBlockOrContents, opts: ImportBlockOpts = {}) {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;
publishBlock,
publishBlindedBlock,

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents);
blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map((sblob) => sblob.message)
),
null
);
} else {
signedBlock = signedBlockOrContents;
signedBlobs = [];
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null);
}

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
}
async publishBlindedBlockV2(signedBlindedBlockOrContents, opts) {
await publishBlindedBlock(signedBlindedBlockOrContents, opts);
},

// TODO: Validate block
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message);
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
});
}
throw e;
}),
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
async publishBlockV2(signedBlockOrContents, opts) {
await publishBlock(signedBlockOrContents, opts);
},

async getBlobSidecars(blockId) {
Expand Down
Loading

0 comments on commit f9821df

Please sign in to comment.