-
-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: n historical states #6008
Changes from 2 commits
8c25f9f
763ebde
e3d1bee
6df1b5a
e7bcece
a39cc5f
7214e9d
2b9c491
253811d
2390594
a48f8f6
c5dffed
c4090eb
87cca20
4302ecd
6c1c720
7d5e4f6
de65f2c
aaaa88a
801d521
b206639
ee2e55a
6fdae10
6beaf19
fe0883b
843b824
166ee37
96dba21
f116b93
f624126
0b15ae2
ea56579
dad4483
64e8a4c
63b156c
99d6af5
4669871
d4443ab
0da184d
76d6f99
22cf38c
6e16d94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -291,10 +291,12 @@ async function processSlotsToNearestCheckpoint( | |||||||||
// processSlots calls .clone() before mutating | ||||||||||
postState = processSlots(postState, nextEpochSlot, opts, metrics); | ||||||||||
|
||||||||||
// Non-spec checkpoint state because the root is of previous epoch | ||||||||||
// this is usually added when we validate gossip block at the start of an epoch | ||||||||||
// then when we process block, we don't have to do state transition again | ||||||||||
// TODO: figure out if it's worth to persist this state to disk | ||||||||||
// note that this state could be real checkpoint state or just a state after processing empty slots | ||||||||||
// - if the 1st block of the epoch is skipped, it's a checkpoint state | ||||||||||
// - if the 1st block of the epoch is processed, it's NOT a checkpoint state | ||||||||||
// however we still need to add this state to cache to preserve epoch transitions | ||||||||||
const checkpointState = postState; | ||||||||||
const cp = getCheckpointFromState(checkpointState); | ||||||||||
checkpointStateCache.add(cp, checkpointState); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,11 @@ | ||
import path from "node:path"; | ||
import {toHexString} from "@chainsafe/ssz"; | ||
import {phase0, Epoch, RootHex} from "@lodestar/types"; | ||
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition"; | ||
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition"; | ||
import {Logger, MapDef, ensureDir} from "@lodestar/utils"; | ||
import {routes} from "@lodestar/api"; | ||
import {loadCachedBeaconState} from "@lodestar/state-transition"; | ||
import {Metrics} from "../../metrics/index.js"; | ||
import {LinkedList} from "../../util/array.js"; | ||
import {IClock} from "../../util/clock.js"; | ||
import {ShufflingCache} from "../shufflingCache.js"; | ||
import {MapTracker} from "./mapMetrics.js"; | ||
|
@@ -23,7 +22,6 @@ import { | |
StateFile, | ||
CheckpointStateCache, | ||
} from "./types.js"; | ||
import {from} from "multiformats/dist/types/src/bases/base.js"; | ||
|
||
/** | ||
* Cache of CachedBeaconState belonging to checkpoint | ||
|
@@ -35,8 +33,8 @@ import {from} from "multiformats/dist/types/src/bases/base.js"; | |
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent module-wide description of the mechanism!! Super helpful an appreciated, I would encourage to do this level of documentation on more functions and new pieces that are not naive. |
||
export class PersistentCheckpointStateCache implements CheckpointStateCache { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. write unit tests for the weird case where we do multiple epoch transitions, no blocks in the middle |
||
private readonly cache: MapTracker<string, CachedBeaconStateAllForks | StateFile>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While typing the Map value as Something like:
Then in the consumer code you can be explicit about what to do in each case |
||
// key order of in memory items to implement LRU cache | ||
private readonly inMemoryKeyOrder: LinkedList<string>; | ||
// maintain order of epoch to decide which epoch to prune from memory | ||
private readonly inMemoryEpochs: Set<Epoch>; | ||
/** Epoch -> Set<blockRoot> */ | ||
private readonly epochIndex = new MapDef<Epoch, Set<string>>(() => new Set<string>()); | ||
private readonly metrics: Metrics["cpStateCache"] | null | undefined; | ||
|
@@ -79,12 +77,15 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
} | ||
this.logger = logger; | ||
this.clock = clock; | ||
if (opts.maxEpochsInMemory < 0) { | ||
throw new Error("maxEpochsInMemory must be >= 0"); | ||
} | ||
this.maxEpochsInMemory = opts.maxEpochsInMemory; | ||
// Specify different persistentApis for testing | ||
this.persistentApis = persistentApis ?? FILE_APIS; | ||
this.shufflingCache = shufflingCache; | ||
this.getHeadState = getHeadState; | ||
this.inMemoryKeyOrder = new LinkedList<string>(); | ||
this.inMemoryEpochs = new Set(); | ||
void ensureDir(CHECKPOINT_STATES_FOLDER); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function can throw an error, don't void |
||
} | ||
|
||
|
@@ -139,8 +140,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
// only remove file once we reload successfully | ||
void this.persistentApis.removeFile(filePath); | ||
this.cache.set(cpKey, newCachedState); | ||
// since item is file path, cpKey is not in inMemoryKeyOrder | ||
this.inMemoryKeyOrder.unshift(cpKey); | ||
this.inMemoryEpochs.add(cp.epoch); | ||
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch | ||
return newCachedState; | ||
} catch (e) { | ||
|
@@ -194,7 +194,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
|
||
if (typeof stateOrFilePath !== "string") { | ||
this.metrics?.stateClonedCount.observe(stateOrFilePath.clonedCount); | ||
this.inMemoryKeyOrder.moveToHead(cpKey); | ||
return stateOrFilePath; | ||
} | ||
|
||
|
@@ -208,23 +207,18 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
const cpHex = toCheckpointHex(cp); | ||
const key = toCheckpointKey(cpHex); | ||
const stateOrFilePath = this.cache.get(key); | ||
this.inMemoryEpochs.add(cp.epoch); | ||
if (stateOrFilePath !== undefined) { | ||
if (typeof stateOrFilePath === "string") { | ||
// was persisted to disk, set back to memory | ||
this.cache.set(key, state); | ||
void this.persistentApis.removeFile(stateOrFilePath); | ||
this.metrics?.stateFilesRemoveCount.inc({reason: RemoveFileReason.stateUpdate}); | ||
this.inMemoryKeyOrder.unshift(key); | ||
} else { | ||
// already in memory | ||
// move to head of inMemoryKeyOrder | ||
this.inMemoryKeyOrder.moveToHead(key); | ||
} | ||
return; | ||
} | ||
this.metrics?.adds.inc(); | ||
this.cache.set(key, state); | ||
this.inMemoryKeyOrder.unshift(key); | ||
this.epochIndex.getOrDefault(cp.epoch).add(cpHex.rootHex); | ||
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch | ||
} | ||
|
@@ -300,7 +294,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
delete(cp: phase0.Checkpoint): void { | ||
const key = toCheckpointKey(toCheckpointHex(cp)); | ||
this.cache.delete(key); | ||
this.inMemoryKeyOrder.deleteFirst(key); | ||
// check if there's any state left in memory for this epoch | ||
let foundState = false; | ||
for (const rootHex of this.epochIndex.get(cp.epoch)?.values() || []) { | ||
const cpKey = toCheckpointKey({epoch: cp.epoch, rootHex}); | ||
const stateOrFilePath = this.cache.get(cpKey); | ||
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") { | ||
// this is a state | ||
foundState = true; | ||
break; | ||
} | ||
} | ||
if (!foundState) { | ||
this.inMemoryEpochs.delete(cp.epoch); | ||
} | ||
const epochKey = toHexString(cp.root); | ||
const value = this.epochIndex.get(cp.epoch); | ||
if (value) { | ||
|
@@ -325,52 +332,85 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { | |
// this could be improved by looping through inMemoryKeyOrder once | ||
// however with this.maxEpochsInMemory = 2, the list is 6 maximum so it's not a big deal now | ||
this.cache.delete(key); | ||
this.inMemoryKeyOrder.deleteFirst(key); | ||
} | ||
this.inMemoryEpochs.delete(epoch); | ||
this.epochIndex.delete(epoch); | ||
} | ||
|
||
/** | ||
* This is slow code because it involves serializing the whole state to disk which takes 600ms as of Sep 2023 | ||
* This is slow code because it involves serializing the whole state to disk which takes 600ms to 900ms as of Sep 2023 | ||
* The add() is called after we process 1st block of an epoch, we don't want to pruneFromMemory at that time since it's the hot time | ||
* Call this code at the last 1/3 slot of slot 0 of an epoch | ||
*/ | ||
pruneFromMemory(): number { | ||
let count = 0; | ||
while (this.inMemoryKeyOrder.length > 0 && this.countEpochsInMemory() > this.maxEpochsInMemory) { | ||
const key = this.inMemoryKeyOrder.last(); | ||
if (!key) { | ||
while (this.inMemoryEpochs.size > this.maxEpochsInMemory) { | ||
let firstEpoch: Epoch | undefined; | ||
for (const epoch of this.inMemoryEpochs) { | ||
firstEpoch = epoch; | ||
break; | ||
} | ||
if (firstEpoch === undefined) { | ||
// should not happen | ||
throw new Error(`No key ${key} found in inMemoryKeyOrder}`); | ||
throw new Error("No epoch in memory"); | ||
} | ||
const stateOrFilePath = this.cache.get(key); | ||
// even if stateOrFilePath is undefined or string, we still need to pop the key | ||
this.inMemoryKeyOrder.pop(); | ||
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") { | ||
// do not update epochIndex | ||
const filePath = toTmpFilePath(key); | ||
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); | ||
const timer = this.metrics?.statePersistDuration.startTimer(); | ||
void this.persistentApis.writeIfNotExist(filePath, stateOrFilePath.serialize()); | ||
timer?.(); | ||
this.cache.set(key, filePath); | ||
count++; | ||
this.logger.verbose("Persist state to disk", {filePath, stateSlot: stateOrFilePath.slot}); | ||
} else { | ||
// should not happen, log anyway | ||
this.logger.debug(`Unexpected stateOrFilePath ${stateOrFilePath} for key ${key}`); | ||
// first loop to check if the 1st slot of epoch is a skipped slot or not | ||
let firstSlotBlockRoot: string | undefined; | ||
for (const rootHex of this.epochIndex.get(firstEpoch) ?? []) { | ||
const cpKey = toCheckpointKey({epoch: firstEpoch, rootHex}); | ||
const stateOrFilePath = this.cache.get(cpKey); | ||
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") { | ||
// this is a state | ||
if (rootHex !== toHexString(getBlockRootAtSlot(stateOrFilePath, computeStartSlotAtEpoch(firstEpoch) - 1))) { | ||
firstSlotBlockRoot = rootHex; | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
return count; | ||
} | ||
// if found firstSlotBlockRoot it means it's a checkpoint state and we should only persist that checkpoint, delete the other | ||
// if not found firstSlotBlockRoot, first slot of state is skipped, we should persist the other checkpoint state, with the root is the last slot of pervious epoch | ||
for (const rootHex of this.epochIndex.get(firstEpoch) ?? []) { | ||
let toPersist = false; | ||
let toDelete = false; | ||
if (firstSlotBlockRoot === undefined) { | ||
toPersist = true; | ||
} else { | ||
if (rootHex === firstSlotBlockRoot) { | ||
toPersist = true; | ||
} else { | ||
toDelete = true; | ||
} | ||
} | ||
const cpKey = toCheckpointKey({epoch: firstEpoch, rootHex}); | ||
const stateOrFilePath = this.cache.get(cpKey); | ||
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") { | ||
if (toPersist) { | ||
// do not update epochIndex | ||
const filePath = toTmpFilePath(cpKey); | ||
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); | ||
const timer = this.metrics?.statePersistDuration.startTimer(); | ||
void this.persistentApis.writeIfNotExist(filePath, stateOrFilePath.serialize()); | ||
timer?.(); | ||
this.cache.set(cpKey, filePath); | ||
count++; | ||
this.logger.verbose("Prune checkpoint state from memory and persist to disk", { | ||
filePath, | ||
stateSlot: stateOrFilePath.slot, | ||
rootHex, | ||
}); | ||
} else if (toDelete) { | ||
this.cache.delete(cpKey); | ||
this.metrics?.statePruneFromMemoryCount.inc(); | ||
this.logger.verbose("Prune checkpoint state from memory", {stateSlot: stateOrFilePath.slot, rootHex}); | ||
} | ||
} | ||
} | ||
|
||
private countEpochsInMemory(): number { | ||
const epochs = new Set<Epoch>(); | ||
for (const key of this.inMemoryKeyOrder) { | ||
epochs.add(fromCheckpointKey(key).epoch); | ||
this.inMemoryEpochs.delete(firstEpoch); | ||
} | ||
return epochs.size; | ||
|
||
return count; | ||
} | ||
|
||
clear(): void { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put this inside
if (block.message.slot % SLOTS_PER_EPOCH === 0) {
condition below, otherwise will get "Block error slot=452161 error=Checkpoint state slot must be first in an epoch" error as monitored on test branch