From 527c0a37e349fd1e778e242e09dfc76f893f5d9f Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 9 Jan 2024 15:36:24 +0700 Subject: [PATCH 1/6] feat: implement BufferPool for PersistentCPStateCache --- .../src/chain/blocks/importBlock.ts | 42 +-- .../beacon-node/src/chain/regen/interface.ts | 2 +- .../beacon-node/src/chain/regen/queued.ts | 10 +- .../src/chain/stateCache/datastore/db.ts | 4 +- .../src/chain/stateCache/datastore/file.ts | 52 ++++ .../src/chain/stateCache/datastore/types.ts | 4 +- .../stateCache/persistentCheckpointsCache.ts | 280 +++++++++++++----- .../src/metrics/metrics/lodestar.ts | 36 +++ packages/beacon-node/src/util/bufferPool.ts | 62 ++++ packages/beacon-node/src/util/file.ts | 27 ++ .../persistentCheckpointsCache.test.ts | 10 +- .../test/unit/util/bufferPool.test.ts | 25 ++ .../test/utils/chain/stateCache/datastore.ts | 4 +- .../state-transition/src/cache/stateCache.ts | 10 +- .../src/util/loadState/loadState.ts | 15 +- .../test/perf/util/serializeState.test.ts | 120 ++++++++ 16 files changed, 590 insertions(+), 113 deletions(-) create mode 100644 packages/beacon-node/src/chain/stateCache/datastore/file.ts create mode 100644 packages/beacon-node/src/util/bufferPool.ts create mode 100644 packages/beacon-node/test/unit/util/bufferPool.test.ts create mode 100644 packages/state-transition/test/perf/util/serializeState.test.ts diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index 12b43359fa4e..89ed52b66750 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -59,10 +59,11 @@ export async function importBlock( ): Promise { const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock; const {block, source} = blockInput; - const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); + const {slot: blockSlot} = block.message; + const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message); const blockRootHex = toHexString(blockRoot); const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime()); - const blockEpoch = computeEpochAtSlot(block.message.slot); + const blockEpoch = computeEpochAtSlot(blockSlot); const parentEpoch = computeEpochAtSlot(parentBlockSlot); const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch; const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT; @@ -87,17 +88,16 @@ export async function importBlock( // This adds the state necessary to process the next block // Some block event handlers require state being in state cache so need to do this before emitting EventType.block - this.regen.addPostState(postState); + this.regen.processState(blockRootHex, postState); this.metrics?.importBlock.bySource.inc({source}); - this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex}); + this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex}); // We want to import block asap so call all event handler in the next event loop setTimeout(() => { - const slot = block.message.slot; this.emitter.emit(routes.events.EventType.block, { block: blockRootHex, - slot, + slot: blockSlot, executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary), }); @@ -106,7 +106,7 @@ export async function importBlock( const {index, kzgCommitment} = blobSidecar; this.emitter.emit(routes.events.EventType.blobSidecar, { blockRoot: blockRootHex, - slot, + slot: blockSlot, index, kzgCommitment: toHexString(kzgCommitment), versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)), @@ -171,7 +171,7 @@ export async function importBlock( correctHead, missedSlotVote, blockRootHex, - block.message.slot + blockSlot ); } catch (e) { // a block has a lot of attestations and it may has same error, we don't want to log all of them @@ -185,7 +185,7 @@ export async function importBlock( } } else { // always log other errors - this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error); + this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error); } } } @@ -193,7 +193,7 @@ export async function importBlock( for (const {error, count} of invalidAttestationErrorsByCode.values()) { this.logger.warn( "Error processing attestations from block", - {slot: block.message.slot, erroredAttestations: count}, + {slot: blockSlot, erroredAttestations: count}, error ); } @@ -214,7 +214,7 @@ export async function importBlock( // all AttesterSlashings are valid before reaching this this.forkChoice.onAttesterSlashing(slashing); } catch (e) { - this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error); + this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error); } } } @@ -297,7 +297,7 @@ export async function importBlock( parentBlockSlot ); } catch (e) { - this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error); + this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error); } }, 0); } @@ -351,10 +351,10 @@ export async function importBlock( if (parentEpoch < blockEpoch) { // current epoch and previous epoch are likely cached in previous states this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch); - this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot}); + this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot}); } - if (block.message.slot % SLOTS_PER_EPOCH === 0) { + if (blockSlot % SLOTS_PER_EPOCH === 0) { // Cache state to preserve epoch transition work const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); @@ -397,7 +397,7 @@ export async function importBlock( // Send block events, only for recent enough blocks - if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) { + if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) { // NOTE: Skip looping if there are no listeners from the API if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) { for (const voluntaryExit of block.message.body.voluntaryExits) { @@ -417,10 +417,10 @@ export async function importBlock( } // Register stat metrics about the block after importing it - this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot); + this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot); this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta); this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock); - if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) { + if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) { this.metrics?.registerSyncAggregateInBlock( blockEpoch, (block as altair.SignedBeaconBlock).message.body.syncAggregate, @@ -433,18 +433,18 @@ export async function importBlock( // Gossip blocks need to be imported as soon as possible, waiting attestations could be processed // in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789 setTimeout(() => { - this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot); + this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot); }, 0); if (opts.seenTimestampSec !== undefined) { const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec; this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock); - this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock}); + this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock}); } this.logger.verbose("Block processed", { - slot: block.message.slot, + slot: blockSlot, root: blockRootHex, - delaySec: this.clock.secFromSlot(block.message.slot), + delaySec: this.clock.secFromSlot(blockSlot), }); } diff --git a/packages/beacon-node/src/chain/regen/interface.ts b/packages/beacon-node/src/chain/regen/interface.ts index e7be64d0eecb..be481de9abc8 100644 --- a/packages/beacon-node/src/chain/regen/interface.ts +++ b/packages/beacon-node/src/chain/regen/interface.ts @@ -39,7 +39,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal { getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null; pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void; pruneOnFinalized(finalizedEpoch: Epoch): void; - addPostState(postState: CachedBeaconStateAllForks): void; + processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void; addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void; updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void; updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null; diff --git a/packages/beacon-node/src/chain/regen/queued.ts b/packages/beacon-node/src/chain/regen/queued.ts index dfda56cc1eea..928c2e399b9a 100644 --- a/packages/beacon-node/src/chain/regen/queued.ts +++ b/packages/beacon-node/src/chain/regen/queued.ts @@ -4,9 +4,10 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition"; import {Logger} from "@lodestar/utils"; import {routes} from "@lodestar/api"; -import {CheckpointHex, CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js"; +import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js"; import {Metrics} from "../../metrics/index.js"; import {JobItemQueue} from "../../util/queue/index.js"; +import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js"; import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js"; import {StateRegenerator, RegenModules} from "./regen.js"; import {RegenError, RegenErrorCode} from "./errors.js"; @@ -34,7 +35,7 @@ export class QueuedStateRegenerator implements IStateRegenerator { private readonly regen: StateRegenerator; private readonly forkChoice: IForkChoice; - private readonly stateCache: StateContextCache; + private readonly stateCache: BlockStateCache; private readonly checkpointStateCache: CheckpointStateCache; private readonly metrics: Metrics | null; private readonly logger: Logger; @@ -88,8 +89,11 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.stateCache.deleteAllBeforeEpoch(finalizedEpoch); } - addPostState(postState: CachedBeaconStateAllForks): void { + processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void { this.stateCache.add(postState); + this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => { + this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e); + }); } addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void { diff --git a/packages/beacon-node/src/chain/stateCache/datastore/db.ts b/packages/beacon-node/src/chain/stateCache/datastore/db.ts index fef38a7f8dd2..c6c9a3ee924b 100644 --- a/packages/beacon-node/src/chain/stateCache/datastore/db.ts +++ b/packages/beacon-node/src/chain/stateCache/datastore/db.ts @@ -1,4 +1,3 @@ -import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {phase0, ssz} from "@lodestar/types"; import {IBeaconDb} from "../../../db/interface.js"; import {CPStateDatastore, DatastoreKey} from "./types.js"; @@ -9,9 +8,8 @@ import {CPStateDatastore, DatastoreKey} from "./types.js"; export class DbCPStateDatastore implements CPStateDatastore { constructor(private readonly db: IBeaconDb) {} - async write(cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks): Promise { + async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise { const serializedCheckpoint = checkpointToDatastoreKey(cpKey); - const stateBytes = state.serialize(); await this.db.checkpointState.putBinary(serializedCheckpoint, stateBytes); return serializedCheckpoint; } diff --git a/packages/beacon-node/src/chain/stateCache/datastore/file.ts b/packages/beacon-node/src/chain/stateCache/datastore/file.ts new file mode 100644 index 000000000000..17de7c4b7d73 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/datastore/file.ts @@ -0,0 +1,52 @@ +import path from "node:path"; +import {toHexString, fromHexString} from "@chainsafe/ssz"; +import {phase0, ssz} from "@lodestar/types"; +import {ensureDir, readFile, readFileNames, removeFile, writeIfNotExist} from "../../../util/file.js"; +import {CPStateDatastore, DatastoreKey} from "./types.js"; + +const CHECKPOINT_STATES_FOLDER = "checkpoint_states"; +const CHECKPOINT_FILE_NAME_LENGTH = 82; + +/** + * Implementation of CPStatePersistentApis using file system, this is beneficial for debugging. + */ +export class FileCPStateDatastore implements CPStateDatastore { + private readonly folderPath: string; + + constructor(parentDir: string = ".") { + // by default use the beacon folder `/beacon/checkpoint_states` + this.folderPath = path.join(parentDir, CHECKPOINT_STATES_FOLDER); + } + + async init(): Promise { + try { + await ensureDir(this.folderPath); + } catch (_) { + // do nothing + } + } + + async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise { + const serializedCheckpoint = ssz.phase0.Checkpoint.serialize(cpKey); + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + await writeIfNotExist(filePath, stateBytes); + return serializedCheckpoint; + } + + async remove(serializedCheckpoint: DatastoreKey): Promise { + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + await removeFile(filePath); + } + + async read(serializedCheckpoint: DatastoreKey): Promise { + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + return readFile(filePath); + } + + async readKeys(): Promise { + const fileNames = await readFileNames(this.folderPath); + return fileNames + .filter((fileName) => fileName.startsWith("0x") && fileName.length === CHECKPOINT_FILE_NAME_LENGTH) + .map((fileName) => fromHexString(fileName)); + } +} diff --git a/packages/beacon-node/src/chain/stateCache/datastore/types.ts b/packages/beacon-node/src/chain/stateCache/datastore/types.ts index 66ea67f93500..0f81e6ae1e75 100644 --- a/packages/beacon-node/src/chain/stateCache/datastore/types.ts +++ b/packages/beacon-node/src/chain/stateCache/datastore/types.ts @@ -1,4 +1,3 @@ -import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {phase0} from "@lodestar/types"; // With db implementation, persistedKey is serialized data of a checkpoint @@ -6,8 +5,9 @@ export type DatastoreKey = Uint8Array; // Make this generic to support testing export interface CPStateDatastore { - write: (cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks) => Promise; + write: (cpKey: phase0.Checkpoint, stateBytes: Uint8Array) => Promise; remove: (key: DatastoreKey) => Promise; read: (key: DatastoreKey) => Promise; readKeys: () => Promise; + init?: () => Promise; } diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index 8ad5c5098118..1a6f2a0484b1 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -1,15 +1,24 @@ import {fromHexString, toHexString} from "@chainsafe/ssz"; import {phase0, Epoch, RootHex} from "@lodestar/types"; import {CachedBeaconStateAllForks, computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition"; -import {Logger, MapDef} from "@lodestar/utils"; +import {Logger, MapDef, sleep} from "@lodestar/utils"; import {routes} from "@lodestar/api"; import {loadCachedBeaconState} from "@lodestar/state-transition"; +import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {Metrics} from "../../metrics/index.js"; import {IClock} from "../../util/clock.js"; import {ShufflingCache} from "../shufflingCache.js"; +import {BufferPool} from "../../util/bufferPool.js"; import {MapTracker} from "./mapMetrics.js"; -import {CheckpointHex, CheckpointStateCache, CacheItemType} from "./types.js"; import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js"; +import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js"; + +export type PersistentCheckpointStateCacheOpts = { + // Keep max n states in memory, persist the rest to disk + maxCPStateEpochsInMemory?: number; + // for test only + processLateBlock?: boolean; +}; type GetHeadStateFn = () => CachedBeaconStateAllForks; @@ -17,14 +26,11 @@ type PersistentCheckpointStateCacheModules = { metrics?: Metrics | null; logger: Logger; clock?: IClock | null; + signal?: AbortSignal; shufflingCache: ShufflingCache; datastore: CPStateDatastore; getHeadState?: GetHeadStateFn; -}; - -type PersistentCheckpointStateCacheOpts = { - // Keep max n states in memory, persist the rest to disk - maxCPStateEpochsInMemory?: number; + bufferPool?: BufferPool; }; /** checkpoint serialized as a string */ @@ -90,15 +96,27 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { private readonly metrics: Metrics["cpStateCache"] | null | undefined; private readonly logger: Logger; private readonly clock: IClock | null | undefined; + private readonly signal: AbortSignal | undefined; private preComputedCheckpoint: string | null = null; private preComputedCheckpointHits: number | null = null; private readonly maxEpochsInMemory: number; + private readonly processLateBlock: boolean; private readonly datastore: CPStateDatastore; private readonly shufflingCache: ShufflingCache; private readonly getHeadState?: GetHeadStateFn; + private readonly bufferPool?: BufferPool; constructor( - {metrics, logger, clock, shufflingCache, datastore, getHeadState}: PersistentCheckpointStateCacheModules, + { + metrics, + logger, + clock, + signal, + shufflingCache, + datastore, + getHeadState, + bufferPool, + }: PersistentCheckpointStateCacheModules, opts: PersistentCheckpointStateCacheOpts ) { this.cache = new MapTracker(metrics?.cpStateCache); @@ -127,20 +145,26 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { } this.logger = logger; this.clock = clock; + this.signal = signal; if (opts.maxCPStateEpochsInMemory !== undefined && opts.maxCPStateEpochsInMemory < 0) { throw new Error("maxEpochsInMemory must be >= 0"); } this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY; + this.processLateBlock = opts.processLateBlock ?? false; // Specify different datastore for testing this.datastore = datastore; this.shufflingCache = shufflingCache; this.getHeadState = getHeadState; + this.bufferPool = bufferPool; } /** * Reload checkpoint state keys from the last run. */ async init(): Promise { + if (this.datastore?.init) { + await this.datastore.init(); + } const persistedKeys = await this.datastore.readKeys(); for (const persistedKey of persistedKeys) { const cp = datastoreKeyToCheckpoint(persistedKey); @@ -176,11 +200,26 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { this.metrics?.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch)); this.logger.debug("Reload: found seed state", {...logMeta, seedSlot: seedState.slot}); + let bufferPoolKey: number | undefined = undefined; try { + // 80% of validators serialization time comes from memory allocation, this is to avoid it + const bytesWithKey = this.serializeStateValidators(seedState); + bufferPoolKey = bytesWithKey.key; const timer = this.metrics?.stateReloadDuration.startTimer(); - const newCachedState = loadCachedBeaconState(seedState, stateBytes, { - shufflingGetter: this.shufflingCache.getSync.bind(this.shufflingCache), - }); + const newCachedState = loadCachedBeaconState( + seedState, + stateBytes, + { + shufflingGetter: (shufflingEpoch, decisionRootHex) => { + const shuffling = this.shufflingCache.getSync(shufflingEpoch, decisionRootHex); + if (shuffling == null) { + this.metrics?.stateReloadShufflingCacheMiss.inc(); + } + return shuffling; + }, + }, + bytesWithKey.data + ); newCachedState.commit(); const stateRoot = toHexString(newCachedState.hashTreeRoot()); timer?.(); @@ -200,6 +239,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { } catch (e) { this.logger.debug("Reload: error loading cached state", logMeta, e as Error); return null; + } finally { + if (bufferPoolKey !== undefined) { + this.bufferPool?.free(bufferPoolKey); + } } } @@ -421,7 +464,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { * - 1 then we'll persist {root: b1, epoch n-1} checkpoint state to disk. Note that at epoch n there is both {root: b0, epoch: n} and {root: c0, epoch: n} checkpoint states in memory * - 2 then we'll persist {root: b2, epoch n-2} checkpoint state to disk, there are also 2 checkpoint states in memory at epoch n, same to the above (maxEpochsInMemory=1) * - * As of Nov 2023, it takes 1.3s to 1.5s to persist a state on holesky on fast server. TODO: + * As of Jan 2024, it takes 1.2s to persist a holesky state on fast server. TODO: * - improve state serialization time * - or research how to only store diff against the finalized state */ @@ -433,65 +476,36 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { return 0; } - for (const lowestEpoch of sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory)) { - const epochBoundarySlot = computeStartSlotAtEpoch(lowestEpoch); - const epochBoundaryRoot = - epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot); - const epochBoundaryHex = toHexString(epochBoundaryRoot); - - // for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State - for (const rootHex of this.epochIndex.get(lowestEpoch) ?? []) { - const cpKey = toCacheKey({epoch: lowestEpoch, rootHex}); - const cacheItem = this.cache.get(cpKey); + const blockSlot = state.slot; + const twoThirdsSlot = (2 * state.config.SECONDS_PER_SLOT) / INTERVALS_PER_SLOT; + // we always have clock in production, fallback value is only for test + const secFromSlot = this.clock?.secFromSlot(blockSlot) ?? twoThirdsSlot; + const secToTwoThirdsSlot = twoThirdsSlot - secFromSlot; + if (secToTwoThirdsSlot > 0) { + // 2/3 of slot is the most free time of every slot, take that chance to persist checkpoint states + // normally it should only persist checkpoint states at 2/3 of slot 0 of epoch + await sleep(secToTwoThirdsSlot * 1000, this.signal); + } else if (!this.processLateBlock) { + // normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run + // there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged + this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex}); + return 0; + } - if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) { - // this is state in memory, we don't care if the checkpoint state is already persisted - let {persistedKey} = cacheItem; - const {state} = cacheItem; - const logMeta = { - stateSlot: state.slot, - rootHex, - epochBoundaryHex, - persistedKey: persistedKey ? toHexString(persistedKey) : "", - }; - - if (rootHex === epochBoundaryHex) { - if (persistedKey) { - // no need to persist - this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta); - } else { - // persist and do not update epochIndex - this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); - const timer = this.metrics?.statePersistDuration.startTimer(); - const cpPersist = {epoch: lowestEpoch, root: epochBoundaryRoot}; - persistedKey = await this.datastore.write(cpPersist, state); - timer?.(); - persistCount++; - this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { - ...logMeta, - persistedKey: toHexString(persistedKey), - }); - } - // overwrite cpKey, this means the state is deleted from memory - this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); - } else { - if (persistedKey) { - // persisted file will be eventually deleted by the archive task - // this also means the state is deleted from memory - this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); - // do not update epochIndex - } else { - // delete the state from memory - this.cache.delete(cpKey); - this.epochIndex.get(lowestEpoch)?.delete(rootHex); - } - this.metrics?.statePruneFromMemoryCount.inc(); - this.logger.verbose("Pruned checkpoint state from memory", logMeta); - } - } - } + const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory); + for (const lowestEpoch of persistEpochs) { + // usually there is only 0 or 1 epoch to persist in this loop + persistCount += await this.processPastEpoch(blockRootHex, state, lowestEpoch); } + if (persistCount > 0) { + this.logger.verbose("Persisted checkpoint states", { + slot: blockSlot, + root: blockRootHex, + persistCount, + persistEpochs: persistEpochs.length, + }); + } return persistCount; } @@ -575,6 +589,84 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { return Array.from(this.cache.keys()); } + /** + * Prune or persist checkpoint states in an epoch, see the description in `processState()` function + */ + private async processPastEpoch( + blockRootHex: RootHex, + state: CachedBeaconStateAllForks, + epoch: Epoch + ): Promise { + let persistCount = 0; + const epochBoundarySlot = computeStartSlotAtEpoch(epoch); + const epochBoundaryRoot = + epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot); + const epochBoundaryHex = toHexString(epochBoundaryRoot); + + // for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State + for (const rootHex of this.epochIndex.get(epoch) ?? []) { + const cpKey = toCacheKey({epoch: epoch, rootHex}); + const cacheItem = this.cache.get(cpKey); + + if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) { + // this is state in memory, we don't care if the checkpoint state is already persisted + let {persistedKey} = cacheItem; + const {state} = cacheItem; + const logMeta = { + stateSlot: state.slot, + rootHex, + epochBoundaryHex, + persistedKey: persistedKey ? toHexString(persistedKey) : "", + }; + + if (rootHex === epochBoundaryHex) { + if (persistedKey) { + // no need to persist + this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta); + } else { + // persist and do not update epochIndex + this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); + const cpPersist = {epoch: epoch, root: epochBoundaryRoot}; + let bufferPoolKey: number | undefined = undefined; + try { + const timer = this.metrics?.statePersistDuration.startTimer(); + const stateBytesWithKey = this.serializeState(state); + bufferPoolKey = stateBytesWithKey.key; + persistedKey = await this.datastore.write(cpPersist, stateBytesWithKey.data); + timer?.(); + } finally { + if (bufferPoolKey !== undefined) { + this.bufferPool?.free(bufferPoolKey); + } + } + persistCount++; + this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { + ...logMeta, + persistedKey: toHexString(persistedKey), + }); + } + // overwrite cpKey, this means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + } else { + if (persistedKey) { + // persisted file will be eventually deleted by the archive task + // this also means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + // do not update epochIndex + } else { + // delete the state from memory + this.cache.delete(cpKey); + this.epochIndex.get(epoch)?.delete(rootHex); + } + this.metrics?.statePruneFromMemoryCount.inc(); + this.logger.verbose("Pruned checkpoint state from memory", logMeta); + } + } + } + + return persistCount; + } + /** * Delete all items of an epoch from disk and memory */ @@ -602,9 +694,59 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { rootHexes: Array.from(rootHexes).join(","), }); } + + /* + * It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory. + * As monitored on holesky as of Jan 2024: + * - This does not increase heap allocation while gc time is the same + * - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s) + * - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s) + * - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization + */ + private serializeState(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { + const size = state.type.tree_serializedSize(state.node); + if (this.bufferPool) { + const bufferWithKey = this.bufferPool.alloc(size); + if (bufferWithKey) { + const stateBytes = bufferWithKey.buffer; + const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); + state.type.tree_serializeToBytes({uint8Array: stateBytes, dataView}, 0, state.node); + return {data: stateBytes, key: bufferWithKey.key}; + } + } + + this.metrics?.persistedStateAllocCount.inc(); + return {data: state.serialize()}; + } + + /** + * Serialize validators to bytes leveraging the buffer pool to save memory allocation. + * - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s) + * - Also `serializeState.test.ts` perf test shows a lot of differences allocating validators bytes once vs every time, + * This is 2x - 3x faster than allocating memory every time. + * TODO: consider serializing validators manually like in `serializeState.test.ts` perf test, this could be 3x faster than this + */ + private serializeStateValidators(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { + const validatorsSszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); + const type = state.type.fields.validators; + const size = type.tree_serializedSize(state.validators.node); + if (this.bufferPool) { + const bufferWithKey = this.bufferPool.alloc(size); + if (bufferWithKey) { + const validatorsBytes = bufferWithKey.buffer; + const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength); + type.tree_serializeToBytes({uint8Array: validatorsBytes, dataView}, 0, state.validators.node); + return {data: validatorsBytes, key: bufferWithKey.key}; + } + } + + this.metrics?.stateReloadValidatorsSszAllocCount.inc(); + validatorsSszTimer?.(); + return {data: state.validators.serialize()}; + } } -function toCheckpointHex(checkpoint: phase0.Checkpoint): CheckpointHex { +export function toCheckpointHex(checkpoint: phase0.Checkpoint): CheckpointHex { return { epoch: checkpoint.epoch, rootHex: toHexString(checkpoint.root), diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index f6b143913346..284f4e75c064 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -1086,6 +1086,25 @@ export function createLodestarMetrics( }), }, + bufferPool: { + length: register.gauge({ + name: "lodestar_buffer_pool_length", + help: "Buffer pool length", + }), + hits: register.counter({ + name: "lodestar_buffer_pool_hits_total", + help: "Total number of buffer pool hits", + }), + misses: register.counter({ + name: "lodestar_buffer_pool_misses_total", + help: "Total number of buffer pool misses", + }), + grows: register.counter({ + name: "lodestar_buffer_pool_grows_total", + help: "Total number of buffer pool length increases", + }), + }, + cpStateCache: { lookups: register.gauge({ name: "lodestar_cp_state_cache_lookups_total", @@ -1136,6 +1155,19 @@ export function createLodestarMetrics( help: "Histogram of time to persist state to db since the clock slot", buckets: [0, 2, 4, 6, 8, 10, 12], }), + stateReloadValidatorsSszDuration: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_validators_ssz_seconds", + help: "Histogram of time to serialize validators", + buckets: [0.1, 0.2, 0.5, 1], + }), + stateReloadValidatorsSszAllocCount: register.counter({ + name: "lodestar_cp_state_cache_state_reload_validators_ssz_alloc_count", + help: "Total number time to allocate memory for validators serialization", + }), + stateReloadShufflingCacheMiss: register.counter({ + name: "lodestar_cp_state_cache_state_reload_shuffling_cache_miss_count", + help: "Total number of shuffling cache misses when loading a state", + }), stateReloadDuration: register.histogram({ name: "lodestar_cp_state_cache_state_reload_seconds", help: "Histogram of time to load state from db", @@ -1160,6 +1192,10 @@ export function createLodestarMetrics( name: "lodestar_cp_state_cache_persisted_state_remove_count", help: "Total number of persisted states removed", }), + persistedStateAllocCount: register.counter({ + name: "lodestar_cp_state_cache_persisted_state_alloc_count", + help: "Total number time to allocate memory for persisted state", + }), }, balancesCache: { diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts new file mode 100644 index 000000000000..0ce3a3d2e50e --- /dev/null +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -0,0 +1,62 @@ +import {Metrics} from "../metrics/metrics.js"; + +/** + * If consumer wants more memory than available, we grow the buffer by this ratio. + */ +const GROW_RATIO = 1.1; + +/** + * A simple implementation to manage a single buffer. + * This is initially used for state serialization at every epoch and for state reload. + * We can enhance and use this for other purposes in the future. + */ +export class BufferPool { + private buffer: Uint8Array; + private inUse = false; + private currentKey: number; + private readonly metrics: Metrics["bufferPool"] | null = null; + + constructor(size: number, metrics: Metrics | null = null) { + this.buffer = new Uint8Array(Math.floor(size * GROW_RATIO)); + this.currentKey = 0; + if (metrics) { + this.metrics = metrics.bufferPool; + metrics.bufferPool.length.addCollect(() => { + metrics.bufferPool.length.set(this.buffer.length); + }); + } + } + + get length(): number { + return this.buffer.length; + } + + /** + * Returns a buffer of the given size. + * If the buffer is already in use, return null. + * Grow the buffer if the requested size is larger than the current buffer. + */ + alloc(size: number): {buffer: Uint8Array; key: number} | null { + if (this.inUse) { + this.metrics?.misses.inc(); + return null; + } + this.inUse = true; + this.metrics?.hits.inc(); + this.currentKey += 1; + if (size > this.buffer.length) { + this.metrics?.grows.inc(); + this.buffer = new Uint8Array(Math.floor(size * GROW_RATIO)); + } + return {buffer: this.buffer.subarray(0, size), key: this.currentKey}; + } + + /** + * Marks the buffer as free. + */ + free(key: number): void { + if (key === this.currentKey) { + this.inUse = false; + } + } +} diff --git a/packages/beacon-node/src/util/file.ts b/packages/beacon-node/src/util/file.ts index af78ca8b6126..c194fb07a3b8 100644 --- a/packages/beacon-node/src/util/file.ts +++ b/packages/beacon-node/src/util/file.ts @@ -23,3 +23,30 @@ export async function writeIfNotExist(filepath: string, bytes: Uint8Array): Prom return true; } } + +/** Remove a file if it exists */ +export async function removeFile(path: string): Promise { + try { + await promisify(fs.unlink)(path); + return true; + } catch (_) { + // may not exists + return false; + } +} + +export async function readFile(path: string): Promise { + try { + return await fs.promises.readFile(path); + } catch (_) { + return null; + } +} + +export async function readFileNames(folderPath: string): Promise { + try { + return await fs.promises.readdir(folderPath); + } catch (_) { + return []; + } +} diff --git a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts index 83a2dddd65dd..18bdfac89793 100644 --- a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts +++ b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts @@ -88,7 +88,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -156,7 +156,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -228,7 +228,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -527,7 +527,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 1} + {maxCPStateEpochsInMemory: 1, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -792,7 +792,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 0} + {maxCPStateEpochsInMemory: 0, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); diff --git a/packages/beacon-node/test/unit/util/bufferPool.test.ts b/packages/beacon-node/test/unit/util/bufferPool.test.ts new file mode 100644 index 000000000000..2f8004e8315e --- /dev/null +++ b/packages/beacon-node/test/unit/util/bufferPool.test.ts @@ -0,0 +1,25 @@ +import {describe, it, expect} from "vitest"; +import {BufferPool} from "../../../src/util/bufferPool.js"; + +describe("BufferPool", () => { + const pool = new BufferPool(100); + + it("should increase length", () => { + expect(pool.length).toEqual(110); + const mem = pool.alloc(200); + if (mem === null) { + throw Error("Expected non-null mem"); + } + expect(pool.length).toEqual(220); + pool.free(mem.key); + }); + + it("should not allow alloc if in use", () => { + const mem = pool.alloc(20); + if (mem === null) { + throw Error("Expected non-null mem"); + } + expect(pool.alloc(20)).toEqual(null); + pool.free(mem.key); + }); +}); diff --git a/packages/beacon-node/test/utils/chain/stateCache/datastore.ts b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts index 8a944f4c2d88..73eab697de96 100644 --- a/packages/beacon-node/test/utils/chain/stateCache/datastore.ts +++ b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts @@ -3,11 +3,11 @@ import {CPStateDatastore, checkpointToDatastoreKey} from "../../../../src/chain/ export function getTestDatastore(fileApisBuffer: Map): CPStateDatastore { const datastore: CPStateDatastore = { - write: (cp, state) => { + write: (cp, stateBytes) => { const persistentKey = checkpointToDatastoreKey(cp); const stringKey = toHexString(persistentKey); if (!fileApisBuffer.has(stringKey)) { - fileApisBuffer.set(stringKey, state.serialize()); + fileApisBuffer.set(stringKey, stateBytes); } return Promise.resolve(persistentKey); }, diff --git a/packages/state-transition/src/cache/stateCache.ts b/packages/state-transition/src/cache/stateCache.ts index b01ca0c409b2..8b45152a3646 100644 --- a/packages/state-transition/src/cache/stateCache.ts +++ b/packages/state-transition/src/cache/stateCache.ts @@ -164,9 +164,15 @@ export function createCachedBeaconState( export function loadCachedBeaconState( cachedSeedState: T, stateBytes: Uint8Array, - opts?: EpochCacheOpts + opts?: EpochCacheOpts, + seedValidatorsBytes?: Uint8Array ): T { - const {state: migratedState, modifiedValidators} = loadState(cachedSeedState.config, cachedSeedState, stateBytes); + const {state: migratedState, modifiedValidators} = loadState( + cachedSeedState.config, + cachedSeedState, + stateBytes, + seedValidatorsBytes + ); const {pubkey2index, index2pubkey} = cachedSeedState.epochCtx; // Get the validators sub tree once for all the loop const validators = migratedState.validators; diff --git a/packages/state-transition/src/util/loadState/loadState.ts b/packages/state-transition/src/util/loadState/loadState.ts index 83377101609d..dc9f8fe4fcab 100644 --- a/packages/state-transition/src/util/loadState/loadState.ts +++ b/packages/state-transition/src/util/loadState/loadState.ts @@ -20,7 +20,8 @@ type MigrateStateOutput = {state: BeaconStateAllForks; modifiedValidators: numbe export function loadState( config: ChainForkConfig, seedState: BeaconStateAllForks, - stateBytes: Uint8Array + stateBytes: Uint8Array, + seedValidatorsBytes?: Uint8Array ): MigrateStateOutput { // casting only to make typescript happy const stateType = getStateTypeFromBytes(config, stateBytes) as typeof ssz.capella.BeaconState; @@ -42,7 +43,8 @@ export function loadState( const modifiedValidators = loadValidators( migratedState, seedState, - stateBytes.subarray(validatorsRange.start, validatorsRange.end) + stateBytes.subarray(validatorsRange.start, validatorsRange.end), + seedValidatorsBytes ); // inactivityScores are rarely changed @@ -128,7 +130,7 @@ function loadInactivityScores( } /** - * As of Sep 2021, common validators of 2 mainnet states are rarely changed. However, the benchmark shows that + * As of Sep 2023, common validators of 2 mainnet states are rarely changed. However, the benchmark shows that * 10k modified validators is not an issue. (see packages/state-transition/test/perf/util/loadState/findModifiedValidators.test.ts) * * This method loads validators from bytes given a seed state so that they share the same base tree. This gives some benefits: @@ -159,7 +161,8 @@ function loadInactivityScores( function loadValidators( migratedState: BeaconStateAllForks, seedState: BeaconStateAllForks, - newValidatorsBytes: Uint8Array + newValidatorsBytes: Uint8Array, + seedStateValidatorsBytes?: Uint8Array ): number[] { const seedValidatorCount = seedState.validators.length; const newValidatorCount = Math.floor(newValidatorsBytes.length / VALIDATOR_BYTES_SIZE); @@ -167,7 +170,9 @@ function loadValidators( const minValidatorCount = Math.min(seedValidatorCount, newValidatorCount); // migrated state starts with the same validators to seed state migratedState.validators = seedState.validators.clone(); - const seedValidatorsBytes = seedState.validators.serialize(); + // 80% of validators serialization time comes from memory allocation + // seedStateValidatorsBytes is an optimization at beacon-node side to avoid memory allocation here + const seedValidatorsBytes = seedStateValidatorsBytes ?? seedState.validators.serialize(); const modifiedValidators: number[] = []; findModifiedValidators( isMoreValidator ? seedValidatorsBytes : seedValidatorsBytes.subarray(0, minValidatorCount * VALIDATOR_BYTES_SIZE), diff --git a/packages/state-transition/test/perf/util/serializeState.test.ts b/packages/state-transition/test/perf/util/serializeState.test.ts new file mode 100644 index 000000000000..4a8446156135 --- /dev/null +++ b/packages/state-transition/test/perf/util/serializeState.test.ts @@ -0,0 +1,120 @@ +import {itBench, setBenchOpts} from "@dapplion/benchmark"; +import {ssz} from "@lodestar/types"; +import {generatePerfTestCachedStateAltair} from "../util.js"; + +/** + * This shows different statistics between allocating memory once vs every time. + * Due to gc, the test is not consistent so skipping it for CI. + */ +describe.skip("serialize state and validators", function () { + this.timeout(0); + + setBenchOpts({ + // increasing this may have different statistics due to gc time + minMs: 60_000, + }); + const valicatorCount = 1_500_000; + const seedState = generatePerfTestCachedStateAltair({vc: 1_500_000, goBackOneSlot: false}); + + /** + * Allocate memory every time, on a Mac M1: + * - 700ms to 750ms + * - Used to see 2.8s + * Allocate memory once, may test multiple times but seems consistent: + * - 430ms to 480ms + */ + const stateType = ssz.altair.BeaconState; + const rootNode = seedState.node; + const stateBytes = new Uint8Array(stateType.tree_serializedSize(rootNode)); + const stateDataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); + itBench({ + id: `serialize state ${valicatorCount} validators, alloc once`, + fn: () => { + stateType.tree_serializeToBytes({uint8Array: stateBytes, dataView: stateDataView}, 0, rootNode); + }, + }); + + itBench({ + id: `serialize altair state ${valicatorCount} validators`, + fn: () => { + seedState.serialize(); + }, + }); + + /** + * Allocate memory once, this takes 450ms - 500ms on a Mac M1. + */ + const validatorsType = seedState.type.fields.validators; + const validatorsSize = validatorsType.tree_serializedSize(seedState.validators.node); + const validatorsBytes = new Uint8Array(validatorsSize); + const validatorsDataView = new DataView( + validatorsBytes.buffer, + validatorsBytes.byteOffset, + validatorsBytes.byteLength + ); + itBench({ + id: `serialize state validators ${valicatorCount} validators, alloc once`, + fn: () => { + validatorsType.tree_serializeToBytes( + {uint8Array: validatorsBytes, dataView: validatorsDataView}, + 0, + seedState.validators.node + ); + }, + }); + + /** + * Allocate memory every time, this takes 640ms to more than 1s on a Mac M1. + */ + itBench({ + id: `serialize state validators ${valicatorCount} validators`, + fn: () => { + seedState.validators.serialize(); + }, + }); + + /** + * Allocating once and populate validators nodes once, this takes 120ms - 150ms on a Mac M1, + * this is 3x faster than the previous approach. + */ + const NUMBER_2_POW_32 = 2 ** 32; + const output = new Uint8Array(121 * 1_500_000); + const dataView = new DataView(output.buffer, output.byteOffset, output.byteLength); + // this caches validators nodes which is what happen after we run a state transition + const validators = seedState.validators.getAllReadonlyValues(); + itBench({ + id: `serialize ${valicatorCount} validators manually`, + fn: () => { + let offset = 0; + for (const validator of validators) { + output.set(validator.pubkey, offset); + offset += 48; + output.set(validator.withdrawalCredentials, offset); + offset += 32; + const {effectiveBalance, activationEligibilityEpoch, activationEpoch, exitEpoch, withdrawableEpoch} = validator; + dataView.setUint32(offset, effectiveBalance & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (effectiveBalance / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + output[offset] = validator.slashed ? 1 : 0; + offset += 1; + dataView.setUint32(offset, activationEligibilityEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (activationEligibilityEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, activationEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (activationEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, exitEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (exitEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, withdrawableEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (withdrawableEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + } + }, + }); +}); From 32b8328cd69f662ff5f250ce5fb5caab54ca6620 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 11 Jan 2024 14:24:36 +0700 Subject: [PATCH 2/6] fix: alloc vs allocUnsafe for BufferPool --- packages/beacon-node/src/util/bufferPool.ts | 19 +++++++++++++++++-- .../test/perf/util/serializeState.test.ts | 2 ++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts index 0ce3a3d2e50e..ff881aff5b23 100644 --- a/packages/beacon-node/src/util/bufferPool.ts +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -32,11 +32,22 @@ export class BufferPool { } /** - * Returns a buffer of the given size. + * Returns a buffer of the given size with all 0. * If the buffer is already in use, return null. * Grow the buffer if the requested size is larger than the current buffer. */ alloc(size: number): {buffer: Uint8Array; key: number} | null { + return this._alloc(size, false); + } + + /** + * Same to alloc() but the buffer is not zeroed. + */ + allocUnsafe(size: number): {buffer: Uint8Array; key: number} | null { + return this._alloc(size, true); + } + + _alloc(size: number, isUnsafe = false): {buffer: Uint8Array; key: number} | null { if (this.inUse) { this.metrics?.misses.inc(); return null; @@ -48,7 +59,11 @@ export class BufferPool { this.metrics?.grows.inc(); this.buffer = new Uint8Array(Math.floor(size * GROW_RATIO)); } - return {buffer: this.buffer.subarray(0, size), key: this.currentKey}; + const bytes = this.buffer.subarray(0, size); + if (!isUnsafe) { + bytes.fill(0); + } + return {buffer: bytes, key: this.currentKey}; } /** diff --git a/packages/state-transition/test/perf/util/serializeState.test.ts b/packages/state-transition/test/perf/util/serializeState.test.ts index 4a8446156135..5bd6c6b38e6a 100644 --- a/packages/state-transition/test/perf/util/serializeState.test.ts +++ b/packages/state-transition/test/perf/util/serializeState.test.ts @@ -30,6 +30,7 @@ describe.skip("serialize state and validators", function () { itBench({ id: `serialize state ${valicatorCount} validators, alloc once`, fn: () => { + stateBytes.fill(0); stateType.tree_serializeToBytes({uint8Array: stateBytes, dataView: stateDataView}, 0, rootNode); }, }); @@ -55,6 +56,7 @@ describe.skip("serialize state and validators", function () { itBench({ id: `serialize state validators ${valicatorCount} validators, alloc once`, fn: () => { + validatorsBytes.fill(0); validatorsType.tree_serializeToBytes( {uint8Array: validatorsBytes, dataView: validatorsDataView}, 0, From a97905b53f1a4f37afa4574a1d79688017d00505 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 11 Jan 2024 15:21:05 +0700 Subject: [PATCH 3/6] chore: conform to style guide --- packages/beacon-node/src/util/bufferPool.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts index ff881aff5b23..c637b036df33 100644 --- a/packages/beacon-node/src/util/bufferPool.ts +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -37,17 +37,17 @@ export class BufferPool { * Grow the buffer if the requested size is larger than the current buffer. */ alloc(size: number): {buffer: Uint8Array; key: number} | null { - return this._alloc(size, false); + return this.doAlloc(size, false); } /** * Same to alloc() but the buffer is not zeroed. */ allocUnsafe(size: number): {buffer: Uint8Array; key: number} | null { - return this._alloc(size, true); + return this.doAlloc(size, true); } - _alloc(size: number, isUnsafe = false): {buffer: Uint8Array; key: number} | null { + private doAlloc(size: number, isUnsafe = false): {buffer: Uint8Array; key: number} | null { if (this.inUse) { this.metrics?.misses.inc(); return null; From 545599d7456b484dfcbe60b386f1b43097a23d6e Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 16 Jan 2024 11:10:42 +0700 Subject: [PATCH 4/6] feat: use using with Disposable object --- .../stateCache/persistentCheckpointsCache.ts | 55 +++++++++---------- packages/beacon-node/src/util/bufferPool.ts | 20 +++++-- .../test/unit/util/bufferPool.test.ts | 21 ++++--- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index 1a6f2a0484b1..8a3c9f2e8424 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -8,7 +8,7 @@ import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {Metrics} from "../../metrics/index.js"; import {IClock} from "../../util/clock.js"; import {ShufflingCache} from "../shufflingCache.js"; -import {BufferPool} from "../../util/bufferPool.js"; +import {BufferPool, BufferWithKey} from "../../util/bufferPool.js"; import {MapTracker} from "./mapMetrics.js"; import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js"; import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js"; @@ -200,11 +200,17 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { this.metrics?.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch)); this.logger.debug("Reload: found seed state", {...logMeta, seedSlot: seedState.slot}); - let bufferPoolKey: number | undefined = undefined; try { // 80% of validators serialization time comes from memory allocation, this is to avoid it - const bytesWithKey = this.serializeStateValidators(seedState); - bufferPoolKey = bytesWithKey.key; + const sszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); + using validatorsBytesWithKey = this.serializeStateValidators(seedState); + let validatorsBytes = validatorsBytesWithKey?.buffer; + if (validatorsBytes == null) { + // fallback logic in case we can't use the buffer pool + this.metrics?.stateReloadValidatorsSszAllocCount.inc(); + validatorsBytes = seedState.validators.serialize(); + } + sszTimer?.(); const timer = this.metrics?.stateReloadDuration.startTimer(); const newCachedState = loadCachedBeaconState( seedState, @@ -218,7 +224,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { return shuffling; }, }, - bytesWithKey.data + validatorsBytes ); newCachedState.commit(); const stateRoot = toHexString(newCachedState.hashTreeRoot()); @@ -239,10 +245,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { } catch (e) { this.logger.debug("Reload: error loading cached state", logMeta, e as Error); return null; - } finally { - if (bufferPoolKey !== undefined) { - this.bufferPool?.free(bufferPoolKey); - } } } @@ -627,17 +629,17 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { // persist and do not update epochIndex this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); const cpPersist = {epoch: epoch, root: epochBoundaryRoot}; - let bufferPoolKey: number | undefined = undefined; - try { + { const timer = this.metrics?.statePersistDuration.startTimer(); - const stateBytesWithKey = this.serializeState(state); - bufferPoolKey = stateBytesWithKey.key; - persistedKey = await this.datastore.write(cpPersist, stateBytesWithKey.data); - timer?.(); - } finally { - if (bufferPoolKey !== undefined) { - this.bufferPool?.free(bufferPoolKey); + using stateBytesWithKey = this.serializeState(state); + let stateBytes = stateBytesWithKey?.buffer; + if (stateBytes == null) { + // fallback logic to use regular way to get state ssz bytes + this.metrics?.persistedStateAllocCount.inc(); + stateBytes = state.serialize(); } + persistedKey = await this.datastore.write(cpPersist, stateBytes); + timer?.(); } persistCount++; this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { @@ -703,7 +705,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { * - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s) * - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization */ - private serializeState(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { + private serializeState(state: CachedBeaconStateAllForks): BufferWithKey | null { const size = state.type.tree_serializedSize(state.node); if (this.bufferPool) { const bufferWithKey = this.bufferPool.alloc(size); @@ -711,12 +713,11 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { const stateBytes = bufferWithKey.buffer; const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); state.type.tree_serializeToBytes({uint8Array: stateBytes, dataView}, 0, state.node); - return {data: stateBytes, key: bufferWithKey.key}; + return bufferWithKey; } } - this.metrics?.persistedStateAllocCount.inc(); - return {data: state.serialize()}; + return null; } /** @@ -726,8 +727,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { * This is 2x - 3x faster than allocating memory every time. * TODO: consider serializing validators manually like in `serializeState.test.ts` perf test, this could be 3x faster than this */ - private serializeStateValidators(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { - const validatorsSszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); + private serializeStateValidators(state: CachedBeaconStateAllForks): BufferWithKey | null { + // const validatorsSszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); const type = state.type.fields.validators; const size = type.tree_serializedSize(state.validators.node); if (this.bufferPool) { @@ -736,13 +737,11 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { const validatorsBytes = bufferWithKey.buffer; const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength); type.tree_serializeToBytes({uint8Array: validatorsBytes, dataView}, 0, state.validators.node); - return {data: validatorsBytes, key: bufferWithKey.key}; + return bufferWithKey; } } - this.metrics?.stateReloadValidatorsSszAllocCount.inc(); - validatorsSszTimer?.(); - return {data: state.validators.serialize()}; + return null; } } diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts index c637b036df33..f9e18a6d64a5 100644 --- a/packages/beacon-node/src/util/bufferPool.ts +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -36,18 +36,18 @@ export class BufferPool { * If the buffer is already in use, return null. * Grow the buffer if the requested size is larger than the current buffer. */ - alloc(size: number): {buffer: Uint8Array; key: number} | null { + alloc(size: number): BufferWithKey | null { return this.doAlloc(size, false); } /** * Same to alloc() but the buffer is not zeroed. */ - allocUnsafe(size: number): {buffer: Uint8Array; key: number} | null { + allocUnsafe(size: number): BufferWithKey | null { return this.doAlloc(size, true); } - private doAlloc(size: number, isUnsafe = false): {buffer: Uint8Array; key: number} | null { + private doAlloc(size: number, isUnsafe = false): BufferWithKey | null { if (this.inUse) { this.metrics?.misses.inc(); return null; @@ -63,7 +63,7 @@ export class BufferPool { if (!isUnsafe) { bytes.fill(0); } - return {buffer: bytes, key: this.currentKey}; + return new BufferWithKey(bytes, this.currentKey, this); } /** @@ -75,3 +75,15 @@ export class BufferPool { } } } + +export class BufferWithKey implements Disposable { + constructor( + readonly buffer: Uint8Array, + private readonly key: number, + private readonly pool: BufferPool + ) {} + + [Symbol.dispose](): void { + this.pool.free(this.key); + } +} diff --git a/packages/beacon-node/test/unit/util/bufferPool.test.ts b/packages/beacon-node/test/unit/util/bufferPool.test.ts index 2f8004e8315e..b0b91cf8f71a 100644 --- a/packages/beacon-node/test/unit/util/bufferPool.test.ts +++ b/packages/beacon-node/test/unit/util/bufferPool.test.ts @@ -1,25 +1,32 @@ import {describe, it, expect} from "vitest"; import {BufferPool} from "../../../src/util/bufferPool.js"; +/** + * As of Jan 2024, I get this error: Error: Using declaration is not enabled. Set jsc.parser.usingDecl to true + * need to wait for this https://github.com/rollup/rollup/issues/5113 + */ describe("BufferPool", () => { const pool = new BufferPool(100); it("should increase length", () => { expect(pool.length).toEqual(110); - const mem = pool.alloc(200); + using mem = pool.alloc(200); if (mem === null) { throw Error("Expected non-null mem"); } expect(pool.length).toEqual(220); - pool.free(mem.key); }); it("should not allow alloc if in use", () => { - const mem = pool.alloc(20); - if (mem === null) { - throw Error("Expected non-null mem"); + { + using mem = pool.alloc(20); + if (mem === null) { + throw Error("Expected non-null mem"); + } + expect(pool.alloc(20)).toEqual(null); } - expect(pool.alloc(20)).toEqual(null); - pool.free(mem.key); + + // out of the scope we can allocate again + expect(pool.alloc(20)).not.toEqual(null); }); }); From a79d35a44964e0fa6a8a8e751b44dbf55662dd00 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Tue, 16 Jan 2024 17:00:50 +0100 Subject: [PATCH 5/6] Add custom build target for beacon-node unit tests --- packages/beacon-node/vitest.config.ts | 4 ++++ scripts/vitest/plugins/buildTargetPlugin.ts | 15 +++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 scripts/vitest/plugins/buildTargetPlugin.ts diff --git a/packages/beacon-node/vitest.config.ts b/packages/beacon-node/vitest.config.ts index 1df0de848936..2a2b2c65304c 100644 --- a/packages/beacon-node/vitest.config.ts +++ b/packages/beacon-node/vitest.config.ts @@ -1,9 +1,13 @@ import {defineConfig, mergeConfig} from "vitest/config"; import vitestConfig from "../../vitest.base.config"; +import {buildTargetPlugin} from "../../scripts/vitest/plugins/buildTargetPlugin.js"; export default mergeConfig( vitestConfig, defineConfig({ + // We need to change the build target to test code which is based on `using` keyword + // Note this target is not fully supported for the browsers + plugins: [buildTargetPlugin("es2022")], test: { globalSetup: ["./test/globalSetup.ts"], }, diff --git a/scripts/vitest/plugins/buildTargetPlugin.ts b/scripts/vitest/plugins/buildTargetPlugin.ts new file mode 100644 index 000000000000..40b6c0a9a833 --- /dev/null +++ b/scripts/vitest/plugins/buildTargetPlugin.ts @@ -0,0 +1,15 @@ +/* eslint-disable import/no-extraneous-dependencies */ +import {UserConfig, ConfigEnv, Plugin} from "vite"; + +export function buildTargetPlugin(target: string): Plugin { + return { + name: "buildTargetPlugin", + config(_config: UserConfig, _env: ConfigEnv) { + return { + esbuild: { + target, + }, + }; + }, + }; +} From 1fc6ed9284eb0c895dd219b9daf20e7fd33eed88 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 17 Jan 2024 10:41:45 +0700 Subject: [PATCH 6/6] chore: address PR comments --- packages/beacon-node/src/chain/stateCache/datastore/file.ts | 2 +- .../src/chain/stateCache/persistentCheckpointsCache.ts | 6 ++++-- packages/beacon-node/test/unit/util/bufferPool.test.ts | 5 +---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/beacon-node/src/chain/stateCache/datastore/file.ts b/packages/beacon-node/src/chain/stateCache/datastore/file.ts index 17de7c4b7d73..6529d12f84db 100644 --- a/packages/beacon-node/src/chain/stateCache/datastore/file.ts +++ b/packages/beacon-node/src/chain/stateCache/datastore/file.ts @@ -8,7 +8,7 @@ const CHECKPOINT_STATES_FOLDER = "checkpoint_states"; const CHECKPOINT_FILE_NAME_LENGTH = 82; /** - * Implementation of CPStatePersistentApis using file system, this is beneficial for debugging. + * Implementation of CPStateDatastore using file system, this is beneficial for debugging. */ export class FileCPStateDatastore implements CPStateDatastore { private readonly folderPath: string; diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index 8a3c9f2e8424..b57e51f8f2d6 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -14,9 +14,9 @@ import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datast import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js"; export type PersistentCheckpointStateCacheOpts = { - // Keep max n states in memory, persist the rest to disk + /** Keep max n states in memory, persist the rest to disk */ maxCPStateEpochsInMemory?: number; - // for test only + /** for testing only */ processLateBlock?: boolean; }; @@ -203,6 +203,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { try { // 80% of validators serialization time comes from memory allocation, this is to avoid it const sszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); + // automatically free the buffer pool after this scope using validatorsBytesWithKey = this.serializeStateValidators(seedState); let validatorsBytes = validatorsBytesWithKey?.buffer; if (validatorsBytes == null) { @@ -631,6 +632,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { const cpPersist = {epoch: epoch, root: epochBoundaryRoot}; { const timer = this.metrics?.statePersistDuration.startTimer(); + // automatically free the buffer pool after this scope using stateBytesWithKey = this.serializeState(state); let stateBytes = stateBytesWithKey?.buffer; if (stateBytes == null) { diff --git a/packages/beacon-node/test/unit/util/bufferPool.test.ts b/packages/beacon-node/test/unit/util/bufferPool.test.ts index b0b91cf8f71a..2c789c19f74d 100644 --- a/packages/beacon-node/test/unit/util/bufferPool.test.ts +++ b/packages/beacon-node/test/unit/util/bufferPool.test.ts @@ -1,10 +1,6 @@ import {describe, it, expect} from "vitest"; import {BufferPool} from "../../../src/util/bufferPool.js"; -/** - * As of Jan 2024, I get this error: Error: Using declaration is not enabled. Set jsc.parser.usingDecl to true - * need to wait for this https://github.com/rollup/rollup/issues/5113 - */ describe("BufferPool", () => { const pool = new BufferPool(100); @@ -23,6 +19,7 @@ describe("BufferPool", () => { if (mem === null) { throw Error("Expected non-null mem"); } + // in the same scope we can't allocate again expect(pool.alloc(20)).toEqual(null); }