Skip to content

Commit

Permalink
Cleanup the strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Nov 1, 2024
1 parent a0a9bb3 commit b3b1698
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 41 deletions.
22 changes: 9 additions & 13 deletions packages/beacon-node/src/chain/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,21 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
const {regen, bufferPool, historicalStateRegen} = chain;
switch (opts.stateArchiveMode) {
case StateArchiveMode.Frequency:
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(
chain.regen,
db,
logger,
opts,
chain.bufferPool
{
regen,
db,
logger,
bufferPool,
},
opts
);
break;
case StateArchiveMode.Differential:
this.statesArchiverStrategy = new DifferentialStateArchiveStrategy(
chain.historicalStateRegen,
chain.regen,
db,
logger,
opts,
chain.bufferPool
);
this.statesArchiverStrategy = new DifferentialStateArchiveStrategy({historicalStateRegen, regen, logger});
break;
default:
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {RootHex} from "@lodestar/types";
import {Metrics} from "../../../metrics/metrics.js";
import {StateArchiveStrategy, StatesArchiverOpts} from "../interface.js";
import {StateArchiveStrategy} from "../interface.js";
import {IStateRegenerator} from "../../regen/interface.js";
import {IBeaconDb} from "../../../db/interface.js";
import {Logger} from "@lodestar/logger";
import {BufferPool} from "../../../util/bufferPool.js";
import {IHistoricalStateRegen} from "../../historicalState/types.js";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition";

export class DifferentialStateArchiveStrategy implements StateArchiveStrategy {
constructor(
private readonly historicalStateRegen: IHistoricalStateRegen | undefined,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
protected modules: {
historicalStateRegen: IHistoricalStateRegen | undefined;
regen: IStateRegenerator;
logger: Logger;
}
) {}

onCheckpoint(_stateRoot: RootHex, _metrics?: Metrics | null): Promise<void> {
Expand All @@ -29,17 +26,20 @@ export class DifferentialStateArchiveStrategy implements StateArchiveStrategy {

async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const state = await this.regen.getCheckpointStateOrBytes(finalized);
const state = await this.modules.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.logger.warn("Checkpoint state not available to archive.", {epoch: finalized.epoch, root: finalized.rootHex});
this.modules.logger.warn("Checkpoint state not available to archive.", {
epoch: finalized.epoch,
root: finalized.rootHex,
});
return;
}

if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
return this.modules.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}

return this.historicalStateRegen?.storeHistoricalState(
return this.modules.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ export const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
*/
export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
constructor(
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
protected modules: {regen: IStateRegenerator; db: IBeaconDb; logger: Logger; bufferPool?: BufferPool | null},
protected readonly opts: StatesArchiverOpts
) {}

async onFinalizedCheckpoint(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
Expand All @@ -50,7 +47,7 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredSlot = await this.modules.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

Expand All @@ -63,18 +60,18 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);

const storedStateSlots = await this.db.stateArchive.keys({
const storedStateSlots = await this.modules.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
await this.modules.db.stateArchive.batchDelete(statesSlotsToDelete);
}

// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
this.modules.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
Expand All @@ -89,15 +86,15 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
*/
private async archiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const finalizedStateOrBytes = await this.modules.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
await this.modules.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.modules.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// serialize state using BufferPool if provided
const timer = metrics?.stateSerializeDuration.startTimer({source: AllocSource.ARCHIVE_STATE});
Expand All @@ -106,12 +103,12 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
AllocSource.ARCHIVE_STATE,
(stateBytes) => {
timer?.();
return this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
return this.modules.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
},
this.bufferPool
this.modules.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
this.modules.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
Expand Down

0 comments on commit b3b1698

Please sign in to comment.