From 8bd665e04c8f838cbc8239f406862d1f14057051 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 20 Feb 2024 13:33:58 +0700 Subject: [PATCH 1/2] feat: regen to consume state cache reload api --- .../beacon-node/src/chain/regen/errors.ts | 4 +- packages/beacon-node/src/chain/regen/regen.ts | 88 +++++++++++++------ .../src/metrics/metrics/lodestar.ts | 5 ++ 3 files changed, 71 insertions(+), 26 deletions(-) diff --git a/packages/beacon-node/src/chain/regen/errors.ts b/packages/beacon-node/src/chain/regen/errors.ts index 85d43d1a4fe8..7c1573b415f8 100644 --- a/packages/beacon-node/src/chain/regen/errors.ts +++ b/packages/beacon-node/src/chain/regen/errors.ts @@ -8,6 +8,7 @@ export enum RegenErrorCode { TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED", BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB", STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR", + INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT", } export type RegenErrorType = @@ -17,7 +18,8 @@ export type RegenErrorType = | {code: RegenErrorCode.NO_SEED_STATE} | {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root} | {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root} - | {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}; + | {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error} + | {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex}; export class RegenError extends Error { type: RegenErrorType; diff --git a/packages/beacon-node/src/chain/regen/regen.ts b/packages/beacon-node/src/chain/regen/regen.ts index 0d6bd89d8ce7..ba893c0f9b5e 100644 --- a/packages/beacon-node/src/chain/regen/regen.ts +++ b/packages/beacon-node/src/chain/regen/regen.ts @@ -10,29 +10,34 @@ import { stateTransition, } from "@lodestar/state-transition"; import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; -import {sleep} from "@lodestar/utils"; +import {Logger, sleep} from "@lodestar/utils"; import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {ChainForkConfig} from "@lodestar/config"; import {Metrics} from "../../metrics/index.js"; import {IBeaconDb} from "../../db/index.js"; -import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js"; import {getCheckpointFromState} from "../blocks/utils/checkpoint.js"; import {ChainEvent, ChainEventEmitter} from "../emitter.js"; +import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js"; import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js"; import {RegenError, RegenErrorCode} from "./errors.js"; export type RegenModules = { db: IBeaconDb; forkChoice: IForkChoice; - stateCache: StateContextCache; + stateCache: BlockStateCache; checkpointStateCache: CheckpointStateCache; config: ChainForkConfig; emitter: ChainEventEmitter; + logger: Logger; metrics: Metrics | null; }; /** * Regenerates states that have already been processed by the fork choice + * Since Feb 2024, we support reloading checkpoint state from disk via shouldReload flag. Due to its performance impact + * this flag is only set to true in this case: + * - getPreState: this is for block processing, it's important to reload state in unfinality time + * - updateHeadState: rarely happen, but it's important to make sure we always can regen head state */ export class StateRegenerator implements IStateRegeneratorInternal { constructor(private readonly modules: RegenModules) {} @@ -41,6 +46,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { * Get the state to run with `block`. May be: * - If parent is in same epoch -> Exact state at `block.parentRoot` * - If parent is in prev epoch -> State after `block.parentRoot` dialed forward through epoch transition + * - reload state if needed in this flow */ async getPreState( block: allForks.BeaconBlock, @@ -57,6 +63,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { const parentEpoch = computeEpochAtSlot(parentBlock.slot); const blockEpoch = computeEpochAtSlot(block.slot); + const shouldReload = true; // This may save us at least one epoch transition. // If the requested state crosses an epoch boundary @@ -64,11 +71,11 @@ export class StateRegenerator implements IStateRegeneratorInternal { // We may have the checkpoint state with parent root inside the checkpoint state cache // through gossip validation. if (parentEpoch < blockEpoch) { - return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller); + return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller, shouldReload); } // Otherwise, get the state normally. - return this.getState(parentBlock.stateRoot, rCaller); + return this.getState(parentBlock.stateRoot, rCaller, shouldReload); } /** @@ -77,20 +84,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { async getCheckpointState( cp: phase0.Checkpoint, opts: StateCloneOpts, - rCaller: RegenCaller + rCaller: RegenCaller, + shouldReload = false ): Promise { const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch); - return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller); + return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller, shouldReload); } /** * Get state after block `blockRoot` dialed forward to `slot` + * - shouldReload should be used with care, as it will cause the state to be reloaded from disk */ async getBlockSlotState( blockRoot: RootHex, slot: Slot, opts: StateCloneOpts, - rCaller: RegenCaller + rCaller: RegenCaller, + shouldReload = false ): Promise { const block = this.modules.forkChoice.getBlockHex(blockRoot); if (!block) { @@ -108,26 +118,31 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } - const latestCheckpointStateCtx = this.modules.checkpointStateCache.getLatest(blockRoot, computeEpochAtSlot(slot)); + const {checkpointStateCache} = this.modules; + const getLatestApi = shouldReload + ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) + : checkpointStateCache.getLatest.bind(checkpointStateCache); + const latestCheckpointStateCtx = await getLatestApi(blockRoot, computeEpochAtSlot(slot)); // If a checkpoint state exists with the given checkpoint root, it either is in requested epoch // or needs to have empty slots processed until the requested epoch if (latestCheckpointStateCtx) { - return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts); + return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, rCaller, opts); } // Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root // regenerate that state, // then process empty slots until the requested epoch - const blockStateCtx = await this.getState(block.stateRoot, rCaller); - return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts); + const blockStateCtx = await this.getState(block.stateRoot, rCaller, shouldReload); + return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, rCaller, opts); } /** * Get state by exact root. If not in cache directly, requires finding the block that references the state from the * forkchoice and replaying blocks to get to it. + * - shouldReload should be used with care, as it will cause the state to be reloaded from disk */ - async getState(stateRoot: RootHex, _rCaller: RegenCaller): Promise { + async getState(stateRoot: RootHex, _rCaller: RegenCaller, shouldReload = false): Promise { // Trivial case, state at stateRoot is already cached const cachedStateCtx = this.modules.stateCache.get(stateRoot); if (cachedStateCtx) { @@ -143,15 +158,17 @@ export class StateRegenerator implements IStateRegeneratorInternal { // gets reversed when replayed const blocksToReplay = [block]; let state: CachedBeaconStateAllForks | null = null; - for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.parentRoot)) { + const {checkpointStateCache} = this.modules; + const getLatestApi = shouldReload + ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) + : checkpointStateCache.getLatest.bind(checkpointStateCache); + // iterateAncestorBlocks only returns ancestor blocks, not the block itself + for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) { state = this.modules.stateCache.get(b.stateRoot); if (state) { break; } - state = this.modules.checkpointStateCache.getLatest( - b.blockRoot, - computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1) - ); + state = await getLatestApi(b.blockRoot, computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)); if (state) { break; } @@ -172,6 +189,8 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } + const replaySlots = blocksToReplay.map((b) => b.slot).join(","); + this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots}); for (const b of blocksToReplay.reverse()) { const block = await this.modules.db.block.get(fromHexString(b.blockRoot)); if (!block) { @@ -195,11 +214,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { verifyProposer: false, verifySignatures: false, }, - null + this.modules.metrics ); - // TODO: Persist states, note that regen could be triggered by old states. - // Should those take a place in the cache? + const stateRoot = toHexString(state.hashTreeRoot()); + if (b.stateRoot !== stateRoot) { + throw new RegenError({ + slot: b.slot, + code: RegenErrorCode.INVALID_STATE_ROOT, + actual: stateRoot, + expected: b.stateRoot, + }); + } + + if (shouldReload) { + // also with shouldReload flag, we "reload" it to the state cache too + this.modules.stateCache.add(state); + } // this avoids keeping our node busy processing blocks await sleep(0); @@ -210,13 +241,14 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } } + this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots}); return state; } private findFirstStateBlock(stateRoot: RootHex): ProtoBlock { for (const block of this.modules.forkChoice.forwarditerateAncestorBlocks()) { - if (block !== undefined) { + if (block.stateRoot === stateRoot) { return block; } } @@ -237,9 +269,10 @@ async function processSlotsByCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, + rCaller: RegenCaller, opts: StateCloneOpts ): Promise { - let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts); + let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, rCaller, opts); if (postState.slot < slot) { postState = processSlots(postState, slot, opts, modules.metrics); } @@ -257,6 +290,7 @@ async function processSlotsToNearestCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, + rCaller: RegenCaller, opts: StateCloneOpts ): Promise { const preSlot = preState.slot; @@ -272,12 +306,16 @@ async function processSlotsToNearestCheckpoint( ) { // processSlots calls .clone() before mutating postState = processSlots(postState, nextEpochSlot, opts, metrics); + modules.metrics?.epochTransitionByCaller.inc({caller: rCaller}); - // Cache state to preserve epoch transition work + // this is usually added when we prepare for next slot or validate gossip block + // then when we process the 1st block of epoch, we don't have to do state transition again + // This adds Previous Root Checkpoint State to the checkpoint state cache + // This may becomes the "official" checkpoint state if the 1st block of epoch is skipped const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); checkpointStateCache.add(cp, checkpointState); - emitter.emit(ChainEvent.checkpoint, cp, checkpointState); + emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone()); // this avoids keeping our node busy processing blocks await sleep(0); diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index c42dc4a747b3..957a962de103 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -291,6 +291,11 @@ export function createLodestarMetrics( // Beacon state transition metrics + epochTransitionByCaller: register.gauge<{caller: RegenCaller}>({ + name: "lodestar_epoch_transition_by_caller_total", + help: "Total count of epoch transition by caller", + labelNames: ["caller"], + }), epochTransitionTime: register.histogram({ name: "lodestar_stfn_epoch_transition_seconds", help: "Time to process a single epoch transition in seconds", From 4ea5461862ef6e2aeccb112ebabcff216821f95b Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 29 Feb 2024 15:00:27 +0700 Subject: [PATCH 2/2] chore: address PR comments --- packages/beacon-node/src/chain/regen/regen.ts | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/packages/beacon-node/src/chain/regen/regen.ts b/packages/beacon-node/src/chain/regen/regen.ts index ba893c0f9b5e..ab0e0b5f2dd7 100644 --- a/packages/beacon-node/src/chain/regen/regen.ts +++ b/packages/beacon-node/src/chain/regen/regen.ts @@ -34,7 +34,7 @@ export type RegenModules = { /** * Regenerates states that have already been processed by the fork choice - * Since Feb 2024, we support reloading checkpoint state from disk via shouldReload flag. Due to its performance impact + * Since Feb 2024, we support reloading checkpoint state from disk via allowDiskReload flag. Due to its performance impact * this flag is only set to true in this case: * - getPreState: this is for block processing, it's important to reload state in unfinality time * - updateHeadState: rarely happen, but it's important to make sure we always can regen head state @@ -51,7 +51,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { async getPreState( block: allForks.BeaconBlock, opts: StateCloneOpts, - rCaller: RegenCaller + regenCaller: RegenCaller ): Promise { const parentBlock = this.modules.forkChoice.getBlock(block.parentRoot); if (!parentBlock) { @@ -63,7 +63,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { const parentEpoch = computeEpochAtSlot(parentBlock.slot); const blockEpoch = computeEpochAtSlot(block.slot); - const shouldReload = true; + const allowDiskReload = true; // This may save us at least one epoch transition. // If the requested state crosses an epoch boundary @@ -71,11 +71,11 @@ export class StateRegenerator implements IStateRegeneratorInternal { // We may have the checkpoint state with parent root inside the checkpoint state cache // through gossip validation. if (parentEpoch < blockEpoch) { - return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller, shouldReload); + return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, regenCaller, allowDiskReload); } // Otherwise, get the state normally. - return this.getState(parentBlock.stateRoot, rCaller, shouldReload); + return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload); } /** @@ -84,23 +84,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { async getCheckpointState( cp: phase0.Checkpoint, opts: StateCloneOpts, - rCaller: RegenCaller, - shouldReload = false + regenCaller: RegenCaller, + allowDiskReload = false ): Promise { const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch); - return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller, shouldReload); + return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, regenCaller, allowDiskReload); } /** * Get state after block `blockRoot` dialed forward to `slot` - * - shouldReload should be used with care, as it will cause the state to be reloaded from disk + * - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk */ async getBlockSlotState( blockRoot: RootHex, slot: Slot, opts: StateCloneOpts, - rCaller: RegenCaller, - shouldReload = false + regenCaller: RegenCaller, + allowDiskReload = false ): Promise { const block = this.modules.forkChoice.getBlockHex(blockRoot); if (!block) { @@ -119,30 +119,34 @@ export class StateRegenerator implements IStateRegeneratorInternal { } const {checkpointStateCache} = this.modules; - const getLatestApi = shouldReload - ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) - : checkpointStateCache.getLatest.bind(checkpointStateCache); - const latestCheckpointStateCtx = await getLatestApi(blockRoot, computeEpochAtSlot(slot)); + const epoch = computeEpochAtSlot(slot); + const latestCheckpointStateCtx = allowDiskReload + ? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch) + : checkpointStateCache.getLatest(blockRoot, epoch); // If a checkpoint state exists with the given checkpoint root, it either is in requested epoch // or needs to have empty slots processed until the requested epoch if (latestCheckpointStateCtx) { - return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, rCaller, opts); + return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, regenCaller, opts); } // Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root // regenerate that state, // then process empty slots until the requested epoch - const blockStateCtx = await this.getState(block.stateRoot, rCaller, shouldReload); - return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, rCaller, opts); + const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload); + return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts); } /** * Get state by exact root. If not in cache directly, requires finding the block that references the state from the * forkchoice and replaying blocks to get to it. - * - shouldReload should be used with care, as it will cause the state to be reloaded from disk + * - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk */ - async getState(stateRoot: RootHex, _rCaller: RegenCaller, shouldReload = false): Promise { + async getState( + stateRoot: RootHex, + _rCaller: RegenCaller, + allowDiskReload = false + ): Promise { // Trivial case, state at stateRoot is already cached const cachedStateCtx = this.modules.stateCache.get(stateRoot); if (cachedStateCtx) { @@ -159,16 +163,16 @@ export class StateRegenerator implements IStateRegeneratorInternal { const blocksToReplay = [block]; let state: CachedBeaconStateAllForks | null = null; const {checkpointStateCache} = this.modules; - const getLatestApi = shouldReload - ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) - : checkpointStateCache.getLatest.bind(checkpointStateCache); // iterateAncestorBlocks only returns ancestor blocks, not the block itself for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) { state = this.modules.stateCache.get(b.stateRoot); if (state) { break; } - state = await getLatestApi(b.blockRoot, computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)); + const epoch = computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1); + state = allowDiskReload + ? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch) + : checkpointStateCache.getLatest(b.blockRoot, epoch); if (state) { break; } @@ -227,8 +231,8 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } - if (shouldReload) { - // also with shouldReload flag, we "reload" it to the state cache too + if (allowDiskReload) { + // also with allowDiskReload flag, we "reload" it to the state cache too this.modules.stateCache.add(state); } @@ -269,10 +273,10 @@ async function processSlotsByCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, - rCaller: RegenCaller, + regenCaller: RegenCaller, opts: StateCloneOpts ): Promise { - let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, rCaller, opts); + let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, regenCaller, opts); if (postState.slot < slot) { postState = processSlots(postState, slot, opts, modules.metrics); } @@ -290,7 +294,7 @@ async function processSlotsToNearestCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, - rCaller: RegenCaller, + regenCaller: RegenCaller, opts: StateCloneOpts ): Promise { const preSlot = preState.slot; @@ -306,7 +310,7 @@ async function processSlotsToNearestCheckpoint( ) { // processSlots calls .clone() before mutating postState = processSlots(postState, nextEpochSlot, opts, metrics); - modules.metrics?.epochTransitionByCaller.inc({caller: rCaller}); + modules.metrics?.epochTransitionByCaller.inc({caller: regenCaller}); // this is usually added when we prepare for next slot or validate gossip block // then when we process the 1st block of epoch, we don't have to do state transition again