mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-14 00:29:04 +00:00
Historical state reindex for trusted node sync (#3452)
When performing trusted node sync, historical access is limited to states after the checkpoint. Reindexing restores full historical access by replaying historical blocks against the state and storing snapshots in the database. The process can be initiated or resumed at any point in time.
This commit is contained in:
parent
857a71be6c
commit
d0183ccd77
@ -994,6 +994,27 @@ proc getBeaconBlockSummary*(db: BeaconChainDB, root: Eth2Digest):
|
||||
else:
|
||||
err()
|
||||
|
||||
proc loadStateRoots*(db: BeaconChainDB): Table[(Slot, Eth2Digest), Eth2Digest] =
|
||||
## Load all known state roots - just because we have a state root doesn't
|
||||
## mean we also have a state (and vice versa)!
|
||||
var state_roots = initTable[(Slot, Eth2Digest), Eth2Digest](1024)
|
||||
|
||||
discard db.state_roots.find([], proc(k, v: openArray[byte]) =
|
||||
if k.len() == 40 and v.len() == 32:
|
||||
# For legacy reasons, the first byte of the slot is not part of the slot
|
||||
# but rather a subkey identifier - see subkey
|
||||
var tmp = toArray(8, k.toOpenArray(0, 7))
|
||||
tmp[0] = 0
|
||||
state_roots[
|
||||
(Slot(uint64.fromBytesBE(tmp)),
|
||||
Eth2Digest(data: toArray(sizeof(Eth2Digest), k.toOpenArray(8, 39))))] =
|
||||
Eth2Digest(data: toArray(sizeof(Eth2Digest), v))
|
||||
else:
|
||||
warn "Invalid state root in database", klen = k.len(), vlen = v.len()
|
||||
)
|
||||
|
||||
state_roots
|
||||
|
||||
proc loadSummaries*(db: BeaconChainDB): Table[Eth2Digest, BeaconBlockSummary] =
|
||||
# Load summaries into table - there's no telling what order they're in so we
|
||||
# load them all - bugs in nim prevent this code from living in the iterator.
|
||||
|
@ -694,6 +694,10 @@ type
|
||||
defaultValue: true
|
||||
name: "backfill"}: bool
|
||||
|
||||
reindex* {.
|
||||
desc: "Recreate historical state index at end of backfill, allowing full history access (requires full backfill)"
|
||||
defaultValue: false}: bool
|
||||
|
||||
ValidatorClientConf* = object
|
||||
configFile* {.
|
||||
desc: "Loads the configuration from a TOML file"
|
||||
|
@ -8,7 +8,7 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, sequtils, tables, sets],
|
||||
std/[algorithm, options, sequtils, tables, sets],
|
||||
stew/[assign2, byteutils, results],
|
||||
metrics, snappy, chronicles,
|
||||
../spec/[beaconstate, eth2_merkleization, eth2_ssz_serialization, helpers,
|
||||
@ -342,6 +342,9 @@ proc containsBlock(
|
||||
cfg: RuntimeConfig, db: BeaconChainDB, slot: Slot, root: Eth2Digest): bool =
|
||||
db.containsBlock(root, cfg.blockForkAtEpoch(slot.epoch))
|
||||
|
||||
func isFinalizedStateSnapshot(slot: Slot): bool =
|
||||
slot.is_epoch and slot.epoch mod EPOCHS_PER_STATE_SNAPSHOT == 0
|
||||
|
||||
func isStateCheckpoint(bs: BlockSlot): bool =
|
||||
## State checkpoints are the points in time for which we store full state
|
||||
## snapshots, which later serve as rewind starting points when replaying state
|
||||
@ -357,6 +360,24 @@ func isStateCheckpoint(bs: BlockSlot): bool =
|
||||
(bs.slot == bs.blck.slot and bs.blck.parent == nil) or
|
||||
(bs.slot.is_epoch and bs.slot.epoch == (bs.blck.slot.epoch + 1))
|
||||
|
||||
proc getState(
|
||||
db: BeaconChainDB, cfg: RuntimeConfig, slot: Slot, state_root: Eth2Digest,
|
||||
state: var ForkedHashedBeaconState, rollback: RollbackProc): bool =
|
||||
let expectedFork = cfg.stateForkAtEpoch(slot.epoch)
|
||||
if state.kind != expectedFork:
|
||||
# Avoid temporary (!)
|
||||
state = (ref ForkedHashedBeaconState)(kind: expectedFork)[]
|
||||
|
||||
let ok = withState(state):
|
||||
db.getState(state_root, state.data, rollback)
|
||||
|
||||
if not ok:
|
||||
return false
|
||||
|
||||
setStateRoot(state, state_root)
|
||||
|
||||
true
|
||||
|
||||
proc getStateData(
|
||||
db: BeaconChainDB, cfg: RuntimeConfig, state: var StateData, bs: BlockSlot,
|
||||
rollback: RollbackProc): bool =
|
||||
@ -367,23 +388,10 @@ proc getStateData(
|
||||
if not root.isSome():
|
||||
return false
|
||||
|
||||
let expectedFork = cfg.stateForkAtEpoch(bs.slot.epoch)
|
||||
if state.data.kind != expectedFork:
|
||||
state.data = (ref ForkedHashedBeaconState)(kind: expectedFork)[]
|
||||
|
||||
case expectedFork
|
||||
of BeaconStateFork.Bellatrix:
|
||||
if not db.getState(root.get(), state.data.bellatrixData.data, rollback):
|
||||
return false
|
||||
of BeaconStateFork.Altair:
|
||||
if not db.getState(root.get(), state.data.altairData.data, rollback):
|
||||
return false
|
||||
of BeaconStateFork.Phase0:
|
||||
if not db.getState(root.get(), state.data.phase0Data.data, rollback):
|
||||
return false
|
||||
if not db.getState(cfg, bs.slot, root.get(), state.data, rollback):
|
||||
return false
|
||||
|
||||
state.blck = bs.blck
|
||||
setStateRoot(state.data, root.get())
|
||||
|
||||
true
|
||||
|
||||
@ -999,6 +1007,29 @@ proc advanceSlots(
|
||||
if preEpoch != postEpoch:
|
||||
dag.validatorMonitor[].registerEpochInfo(postEpoch, info, state.data)
|
||||
|
||||
proc applyBlock(
|
||||
dag: ChainDAGRef, state: var ForkedHashedBeaconState, bid: BlockId,
|
||||
cache: var StateCache, info: var ForkedEpochInfo): Result[void, cstring] =
|
||||
case dag.cfg.blockForkAtEpoch(bid.slot.epoch)
|
||||
of BeaconBlockFork.Phase0:
|
||||
let data = getBlock(dag, bid, phase0.TrustedSignedBeaconBlock).valueOr:
|
||||
return err("Block load failed")
|
||||
state_transition(
|
||||
dag.cfg, state, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback)
|
||||
of BeaconBlockFork.Altair:
|
||||
let data = getBlock(dag, bid, altair.TrustedSignedBeaconBlock).valueOr:
|
||||
return err("Block load failed")
|
||||
state_transition(
|
||||
dag.cfg, state, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback)
|
||||
of BeaconBlockFork.Bellatrix:
|
||||
let data = getBlock(dag, bid, bellatrix.TrustedSignedBeaconBlock).valueOr:
|
||||
return err("Block load failed")
|
||||
state_transition(
|
||||
dag.cfg, state, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback)
|
||||
|
||||
proc applyBlock(
|
||||
dag: ChainDAGRef,
|
||||
state: var StateData, blck: BlockRef,
|
||||
@ -1010,28 +1041,8 @@ proc applyBlock(
|
||||
|
||||
loadStateCache(dag, cache, state.blck, getStateField(state.data, slot).epoch)
|
||||
|
||||
case dag.cfg.blockForkAtEpoch(blck.slot.epoch)
|
||||
of BeaconBlockFork.Phase0:
|
||||
let data = getBlock(dag, blck.bid, phase0.TrustedSignedBeaconBlock).expect(
|
||||
"block loaded")
|
||||
state_transition(
|
||||
dag.cfg, state.data, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback).expect(
|
||||
"Blocks from database must not fail to apply")
|
||||
of BeaconBlockFork.Altair:
|
||||
let data = getBlock(dag, blck.bid, altair.TrustedSignedBeaconBlock).expect(
|
||||
"block loaded")
|
||||
state_transition(
|
||||
dag.cfg, state.data, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback).expect(
|
||||
"Blocks from database must not fail to apply")
|
||||
of BeaconBlockFork.Bellatrix:
|
||||
let data = getBlock(dag, blck.bid, bellatrix.TrustedSignedBeaconBlock).expect(
|
||||
"block loaded")
|
||||
state_transition(
|
||||
dag.cfg, state.data, data, cache, info,
|
||||
dag.updateFlags + {slotProcessed}, noRollback).expect(
|
||||
"Blocks from database must not fail to apply")
|
||||
dag.applyBlock(state.data, blck.bid, cache, info).expect(
|
||||
"Blocks from database must not fail to apply")
|
||||
|
||||
state.blck = blck
|
||||
|
||||
@ -1399,7 +1410,7 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) =
|
||||
cur = dag.finalizedHead.stateCheckpoint.parentOrSlot
|
||||
prev = dag.lastPrunePoint.stateCheckpoint.parentOrSlot
|
||||
while cur.blck != nil and cur != prev:
|
||||
if cur.slot.epoch mod EPOCHS_PER_STATE_SNAPSHOT != 0 and
|
||||
if not isFinalizedStateSnapshot(cur.slot) and
|
||||
cur.slot != dag.tail.slot:
|
||||
dag.delState(cur)
|
||||
cur = cur.parentOrSlot
|
||||
@ -1763,3 +1774,142 @@ proc aggregateAll*(
|
||||
|
||||
func needsBackfill*(dag: ChainDAGRef): bool =
|
||||
dag.backfill.slot > dag.genesis.slot
|
||||
|
||||
proc rebuildIndex*(dag: ChainDAGRef) =
|
||||
## After a checkpoint sync, we lack intermediate states to replay from - this
|
||||
## function rebuilds them so that historical replay can take place again
|
||||
if dag.backfill.slot > 0:
|
||||
debug "Backfill not complete, cannot rebuild archive"
|
||||
return
|
||||
|
||||
if dag.tail.slot == dag.genesis.slot:
|
||||
# The tail is the earliest slot for which we're supposed to have states -
|
||||
# if it's sufficiently recent, don't do anything
|
||||
debug "Archive does not need rebuilding"
|
||||
return
|
||||
|
||||
# First, we check what states we already have in the database - that allows
|
||||
# resuming the operation at any time
|
||||
let
|
||||
roots = dag.db.loadStateRoots()
|
||||
|
||||
var
|
||||
canonical = newSeq[Eth2Digest](
|
||||
(dag.finalizedHead.slot.epoch + EPOCHS_PER_STATE_SNAPSHOT - 1) div
|
||||
EPOCHS_PER_STATE_SNAPSHOT)
|
||||
# `junk` puts in place some infrastructure to prune unnecessary states - it
|
||||
# will be more useful in the future as a base for pruning
|
||||
junk: seq[((Slot, Eth2Digest), Eth2Digest)]
|
||||
|
||||
for k, v in roots:
|
||||
if k[0] >= dag.finalizedHead.slot:
|
||||
continue # skip newer stuff
|
||||
|
||||
if not isFinalizedStateSnapshot(k[0]):
|
||||
# `tail` will move at the end of the process, so we won't need any
|
||||
# intermediate states
|
||||
junk.add((k, v))
|
||||
|
||||
continue # skip non-snapshot slots
|
||||
|
||||
if k[0] > 0 and dag.getBlockIdAtSlot(k[0] - 1).bid.root != k[1]:
|
||||
junk.add((k, v))
|
||||
continue # skip things that are no longer a canonical part of the chain
|
||||
|
||||
if not dag.db.containsState(v):
|
||||
continue # If it's not in the database..
|
||||
|
||||
canonical[k[0].epoch div EPOCHS_PER_STATE_SNAPSHOT] = v
|
||||
|
||||
let
|
||||
state = (ref ForkedHashedBeaconState)()
|
||||
|
||||
var
|
||||
cache: StateCache
|
||||
info: ForkedEpochInfo
|
||||
|
||||
# `canonical` holds all slots at which a state is expected to appear, using a
|
||||
# zero root whenever a particular state is missing - this way, if there's
|
||||
# partial progress or gaps, they will be dealt with correctly
|
||||
for i, state_root in canonical.mpairs():
|
||||
if not state_root.isZero:
|
||||
continue
|
||||
|
||||
doAssert i > 0, "Genesis should always be available"
|
||||
|
||||
let
|
||||
startSlot = Epoch((i - 1) * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
||||
slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
||||
|
||||
info "Recreating state snapshot",
|
||||
slot, startStateRoot = canonical[i - 1], startSlot
|
||||
|
||||
if getStateRoot(state[]) != canonical[i - 1]:
|
||||
if not dag.db.getState(dag.cfg, startSlot, canonical[i - 1], state[], noRollback):
|
||||
error "Can't load start state, database corrupt?",
|
||||
startStateRoot = shortLog(canonical[i - 1]), slot = startSlot
|
||||
return
|
||||
|
||||
for slot in startSlot..<startSlot + (EPOCHS_PER_STATE_SNAPSHOT * SLOTS_PER_EPOCH):
|
||||
let bids = dag.getBlockIdAtSlot(slot)
|
||||
if bids.bid.root.isZero:
|
||||
warn "Block id missing, cannot continue - database corrupt?", slot
|
||||
return
|
||||
|
||||
# The slot check is needed to avoid re-applying a block
|
||||
if bids.isProposed and getStateField(state[], latest_block_header).slot < bids.bid.slot:
|
||||
let res = dag.applyBlock(state[], bids.bid, cache, info)
|
||||
if res.isErr:
|
||||
error "Failed to apply block while ", bids, slot
|
||||
return
|
||||
|
||||
if slot.is_epoch:
|
||||
cache.prune(slot.epoch)
|
||||
|
||||
process_slots(
|
||||
dag.cfg, state[], slot, cache, info,
|
||||
dag.updateFlags).expect("process_slots shouldn't fail when state slot is correct")
|
||||
|
||||
withState(state[]):
|
||||
dag.db.putState(state)
|
||||
|
||||
state_root = state.root
|
||||
|
||||
# Now that we have states all the way to genesis, we can adjust the tail
|
||||
# and readjust the in-memory indices to what they would look like if we had
|
||||
# started with an earlier tail
|
||||
dag.db.putTailBlock(dag.genesis.root)
|
||||
|
||||
var
|
||||
midRef = dag.genesis
|
||||
finBlocks = newSeqOfCap[BlockRef](
|
||||
(dag.finalizedHead.slot.int + 1) * 3 div 2)
|
||||
|
||||
finBlocks.setLen(dag.finalizedHead.slot.int + 1)
|
||||
|
||||
finBlocks[dag.tail.slot.int..^1] = dag.finalizedBlocks
|
||||
finBlocks[0] = dag.genesis
|
||||
|
||||
for slot, root in dag.db.finalizedBlocks:
|
||||
if slot == midRef.slot:
|
||||
continue
|
||||
|
||||
if slot == dag.tail.slot:
|
||||
link(midRef, dag.tail)
|
||||
break
|
||||
|
||||
let next = BlockRef.init(root, slot)
|
||||
|
||||
link(midRef, next)
|
||||
midRef = next
|
||||
|
||||
finBlocks[midRef.slot.int] = midRef
|
||||
|
||||
dag.finalizedBlocks = finBlocks
|
||||
dag.tail = dag.genesis
|
||||
|
||||
if junk.len > 0:
|
||||
info "Dropping redundant states", states = junk.len
|
||||
|
||||
for i in junk:
|
||||
dag.db.delState(i[1])
|
||||
|
@ -1819,6 +1819,7 @@ proc handleStartUpCmd(config: var BeaconNodeConf) {.raises: [Defect, CatchableEr
|
||||
config.trustedNodeUrl,
|
||||
config.blockId,
|
||||
config.backfillBlocks,
|
||||
config.reindex,
|
||||
genesis)
|
||||
|
||||
{.pop.} # TODO moduletests exceptions
|
||||
|
@ -896,6 +896,39 @@ template isomorphicCast*[T, U](x: U): T =
|
||||
doAssert getSizeofSig(T()) == getSizeofSig(U())
|
||||
cast[ptr T](unsafeAddr x)[]
|
||||
|
||||
func prune*(cache: var StateCache, epoch: Epoch) =
|
||||
# Prune all cache information that is no longer relevant in order to process
|
||||
# the given epoch
|
||||
if epoch < 2: return
|
||||
|
||||
let
|
||||
pruneEpoch = epoch - 2
|
||||
|
||||
var drops: seq[Slot]
|
||||
block:
|
||||
for k in cache.shuffled_active_validator_indices.keys:
|
||||
if k < pruneEpoch:
|
||||
drops.add prune_epoch.start_slot
|
||||
for drop in drops:
|
||||
cache.shuffled_active_validator_indices.del drop.epoch
|
||||
drops.setLen(0)
|
||||
|
||||
block:
|
||||
for k in cache.beacon_proposer_indices.keys:
|
||||
if k < pruneEpoch.start_slot:
|
||||
drops.add k
|
||||
for drop in drops:
|
||||
cache.beacon_proposer_indices.del drop
|
||||
drops.setLen(0)
|
||||
|
||||
block:
|
||||
for k in cache.sync_committees.keys:
|
||||
if k < pruneEpoch.sync_committee_period:
|
||||
drops.add(k.start_epoch.start_slot)
|
||||
for drop in drops:
|
||||
cache.sync_committees.del drop.sync_committee_period
|
||||
drops.setLen(0)
|
||||
|
||||
func clear*(cache: var StateCache) =
|
||||
cache.shuffled_active_validator_indices.clear
|
||||
cache.beacon_proposer_indices.clear
|
||||
|
@ -66,10 +66,10 @@ proc isKnown(cache: DbCache, slot: Slot): bool =
|
||||
|
||||
proc doTrustedNodeSync*(
|
||||
cfg: RuntimeConfig, databaseDir: string, restUrl: string,
|
||||
blockId: string, backfill: bool,
|
||||
blockId: string, backfill: bool, reindex: bool,
|
||||
genesisState: ref ForkedHashedBeaconState = nil) {.async.} =
|
||||
notice "Starting trusted node sync",
|
||||
databaseDir, restUrl, blockId, backfill
|
||||
databaseDir, restUrl, blockId, backfill, reindex
|
||||
|
||||
let
|
||||
db = BeaconChainDB.new(databaseDir, inMemory = false)
|
||||
@ -86,7 +86,7 @@ proc doTrustedNodeSync*(
|
||||
error "Database missing head block summary - database too old or corrupt"
|
||||
quit 1
|
||||
|
||||
let slot = dbCache.summaries[dbHead.get()].slot
|
||||
let slot = dbCache.summaries[dbHead.get()].slot
|
||||
dbCache.updateSlots(dbHead.get(), slot)
|
||||
slot
|
||||
else:
|
||||
@ -167,121 +167,117 @@ proc doTrustedNodeSync*(
|
||||
remoteGenesisRoot = shortLog(remoteGenesisRoot)
|
||||
quit 1
|
||||
|
||||
notice "Downloading checkpoint block", restUrl, blockId
|
||||
let (checkpointSlot, checkpointRoot) = if dbHead.isNone:
|
||||
notice "Downloading checkpoint block", restUrl, blockId
|
||||
|
||||
let checkpointBlock = block:
|
||||
# Finding a checkpoint block is tricky: we need the block to fall on an
|
||||
# epoch boundary and when making the first request, we don't know exactly
|
||||
# what slot we'll get - to find it, we'll keep walking backwards for a
|
||||
# reasonable number of tries
|
||||
var
|
||||
checkpointBlock: ref ForkedSignedBeaconBlock
|
||||
id = BlockIdent.decodeString(blockId).valueOr:
|
||||
error "Cannot decode checkpoint block id, must be a slot, hash, 'finalized' or 'head'",
|
||||
blockId
|
||||
quit 1
|
||||
found = false
|
||||
let checkpointBlock = block:
|
||||
# Finding a checkpoint block is tricky: we need the block to fall on an
|
||||
# epoch boundary and when making the first request, we don't know exactly
|
||||
# what slot we'll get - to find it, we'll keep walking backwards for a
|
||||
# reasonable number of tries
|
||||
var
|
||||
checkpointBlock: ref ForkedSignedBeaconBlock
|
||||
id = BlockIdent.decodeString(blockId).valueOr:
|
||||
error "Cannot decode checkpoint block id, must be a slot, hash, 'finalized' or 'head'",
|
||||
blockId
|
||||
quit 1
|
||||
found = false
|
||||
|
||||
for i in 0..<10:
|
||||
let blck = try:
|
||||
await client.getBlockV2(id, cfg)
|
||||
except CatchableError as exc:
|
||||
error "Unable to download checkpoint block",
|
||||
error = exc.msg, restUrl
|
||||
quit 1
|
||||
|
||||
if blck.isNone():
|
||||
# Server returned 404 - no block was found at the given id, so we need
|
||||
# to try an earlier slot - assuming we know of one!
|
||||
if id.kind == BlockQueryKind.Slot:
|
||||
let slot = id.slot
|
||||
id = BlockIdent.init((id.slot.epoch() - 1).start_slot)
|
||||
|
||||
info "No block found at given slot, trying an earlier epoch",
|
||||
slot, id
|
||||
continue
|
||||
else:
|
||||
error "Cannot find a block at given block id, and cannot compute an earlier slot",
|
||||
id, blockId
|
||||
for i in 0..<10:
|
||||
let blck = try:
|
||||
await client.getBlockV2(id, cfg)
|
||||
except CatchableError as exc:
|
||||
error "Unable to download checkpoint block",
|
||||
error = exc.msg, restUrl
|
||||
quit 1
|
||||
|
||||
checkpointBlock = blck.get()
|
||||
if blck.isNone():
|
||||
# Server returned 404 - no block was found at the given id, so we need
|
||||
# to try an earlier slot - assuming we know of one!
|
||||
if id.kind == BlockQueryKind.Slot:
|
||||
let slot = id.slot
|
||||
id = BlockIdent.init((id.slot.epoch() - 1).start_slot)
|
||||
|
||||
let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
|
||||
if checkpointSlot > headSlot:
|
||||
# When the checkpoint is newer than the head, we run into trouble: the
|
||||
# current backfill in ChainDAG does not support filling in arbitrary gaps.
|
||||
# If we were to update the backfill pointer in this case, the ChainDAG
|
||||
# backfiller would re-download the entire backfill history.
|
||||
# For now, we'll abort and let the user choose what to do.
|
||||
error "Checkpoint block is newer than head slot - start with a new database or use a checkpoint no more recent than the head",
|
||||
checkpointSlot, checkpointRoot = shortLog(checkpointBlock[].root), headSlot
|
||||
info "No block found at given slot, trying an earlier epoch",
|
||||
slot, id
|
||||
continue
|
||||
else:
|
||||
error "Cannot find a block at given block id, and cannot compute an earlier slot",
|
||||
id, blockId
|
||||
quit 1
|
||||
|
||||
checkpointBlock = blck.get()
|
||||
|
||||
let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
|
||||
if checkpointSlot.is_epoch():
|
||||
found = true
|
||||
break
|
||||
|
||||
id = BlockIdent.init((checkpointSlot.epoch() - 1).start_slot)
|
||||
|
||||
info "Downloaded checkpoint block does not fall on epoch boundary, trying an earlier epoch",
|
||||
checkpointSlot, id
|
||||
|
||||
if not found:
|
||||
# The ChainDAG requires that the tail falls on an epoch boundary, or it
|
||||
# will be unable to load the corresponding state - this could be fixed, but
|
||||
# for now, we ask the user to fix it instead
|
||||
error "A checkpoint block from the first slot of an epoch could not be found with the given block id - pass an epoch slot with a block using the --block-id parameter",
|
||||
blockId
|
||||
quit 1
|
||||
checkpointBlock
|
||||
|
||||
let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
|
||||
if checkpointBlock[].root in dbCache.summaries:
|
||||
notice "Checkpoint block is already known, skipping checkpoint state download"
|
||||
|
||||
withBlck(checkpointBlock[]):
|
||||
dbCache.updateSlots(blck.root, blck.message.slot)
|
||||
|
||||
else:
|
||||
notice "Downloading checkpoint state", restUrl, checkpointSlot
|
||||
|
||||
let
|
||||
state = try:
|
||||
await client.getStateV2(StateIdent.init(checkpointSlot), cfg)
|
||||
except CatchableError as exc:
|
||||
error "Unable to download checkpoint state",
|
||||
error = exc.msg, restUrl, checkpointSlot
|
||||
quit 1
|
||||
|
||||
if isNil(state):
|
||||
notice "No state found at given checkpoint", checkpointSlot
|
||||
quit 1
|
||||
|
||||
if checkpointSlot.is_epoch():
|
||||
found = true
|
||||
break
|
||||
withState(state[]):
|
||||
let latest_block_root = state.latest_block_root
|
||||
|
||||
id = BlockIdent.init((checkpointSlot.epoch() - 1).start_slot)
|
||||
if latest_block_root != checkpointBlock[].root:
|
||||
error "Checkpoint state does not match checkpoint block, server error?",
|
||||
blockRoot = shortLog(checkpointBlock[].root),
|
||||
blck = shortLog(checkpointBlock[]),
|
||||
stateBlockRoot = shortLog(latest_block_root)
|
||||
quit 1
|
||||
|
||||
info "Downloaded checkpoint block does not fall on epoch boundary, trying an earlier epoch",
|
||||
checkpointSlot, id
|
||||
info "Writing checkpoint state",
|
||||
stateRoot = shortLog(state.root)
|
||||
db.putState(state)
|
||||
|
||||
if not found:
|
||||
# The ChainDAG requires that the tail falls on an epoch boundary, or it
|
||||
# will be unable to load the corresponding state - this could be fixed, but
|
||||
# for now, we ask the user to fix it instead
|
||||
error "A checkpoint block from the first slot of an epoch could not be found with the given block id - pass an epoch slot with a block using the --block-id parameter",
|
||||
blockId
|
||||
quit 1
|
||||
checkpointBlock
|
||||
withBlck(checkpointBlock[]):
|
||||
info "Writing checkpoint block",
|
||||
blockRoot = shortLog(blck.root),
|
||||
blck = shortLog(blck.message)
|
||||
|
||||
let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
|
||||
if checkpointBlock[].root in dbCache.summaries:
|
||||
notice "Checkpoint block is already known, skipping checkpoint state download"
|
||||
|
||||
withBlck(checkpointBlock[]):
|
||||
dbCache.updateSlots(blck.root, blck.message.slot)
|
||||
db.putBlock(blck.asTrusted())
|
||||
db.putHeadBlock(blck.root)
|
||||
db.putTailBlock(blck.root)
|
||||
|
||||
dbCache.update(blck)
|
||||
(checkpointSlot, checkpointBlock[].root)
|
||||
else:
|
||||
notice "Downloading checkpoint state", restUrl, checkpointSlot
|
||||
|
||||
let
|
||||
state = try:
|
||||
await client.getStateV2(StateIdent.init(checkpointSlot), cfg)
|
||||
except CatchableError as exc:
|
||||
error "Unable to download checkpoint state",
|
||||
error = exc.msg, restUrl, checkpointSlot
|
||||
quit 1
|
||||
|
||||
if isNil(state):
|
||||
notice "No state found at given checkpoint", checkpointSlot
|
||||
quit 1
|
||||
|
||||
withState(state[]):
|
||||
let latest_block_root = state.latest_block_root
|
||||
|
||||
if latest_block_root != checkpointBlock[].root:
|
||||
error "Checkpoint state does not match checkpoint block, server error?",
|
||||
blockRoot = shortLog(checkpointBlock[].root),
|
||||
blck = shortLog(checkpointBlock[]),
|
||||
stateBlockRoot = shortLog(latest_block_root)
|
||||
quit 1
|
||||
|
||||
info "Writing checkpoint state",
|
||||
stateRoot = shortLog(state.root)
|
||||
db.putState(state)
|
||||
|
||||
withBlck(checkpointBlock[]):
|
||||
info "Writing checkpoint block",
|
||||
blockRoot = shortLog(blck.root),
|
||||
blck = shortLog(blck.message)
|
||||
|
||||
db.putBlock(blck.asTrusted())
|
||||
db.putHeadBlock(blck.root)
|
||||
db.putTailBlock(blck.root)
|
||||
|
||||
dbCache.update(blck)
|
||||
notice "Skipping checkpoint download, database already exists",
|
||||
head = shortLog(dbHead.get())
|
||||
(headSlot, dbHead.get())
|
||||
|
||||
# Coming this far, we've done what ChainDAGRef.preInit would normally do -
|
||||
# Let's do a sanity check and start backfilling blocks from the trusted node
|
||||
@ -297,8 +293,9 @@ proc doTrustedNodeSync*(
|
||||
total += 1
|
||||
total
|
||||
|
||||
if missingSlots == 0:
|
||||
let canReindex = if missingSlots == 0:
|
||||
info "Database fully backfilled"
|
||||
true
|
||||
elif backfill:
|
||||
notice "Downloading historical blocks - you can interrupt this process at any time and it automatically be completed when you start the beacon node",
|
||||
checkpointSlot, missingSlots
|
||||
@ -394,12 +391,25 @@ proc doTrustedNodeSync*(
|
||||
continue
|
||||
|
||||
gets[int(i mod gets.lenu64)] = downloadBlock(slot)
|
||||
true
|
||||
else:
|
||||
notice "Database initialized, historical blocks will be backfilled when starting the node",
|
||||
missingSlots
|
||||
|
||||
false
|
||||
|
||||
if reindex and canReindex:
|
||||
notice "Reindexing historical state lookup tables (you can interrupt this process at any time)"
|
||||
|
||||
# Build a DAG
|
||||
let
|
||||
validatorMonitor = newClone(ValidatorMonitor.init(false, false))
|
||||
dag = ChainDAGRef.init(cfg, db, validatorMonitor, {})
|
||||
|
||||
dag.rebuildIndex()
|
||||
|
||||
notice "Done, your beacon node is ready to serve you! Don't forget to check that you're on the canoncial chain by comparing the checkpoint root with other online sources. See https://nimbus.guide/trusted-node-sync.html for more information.",
|
||||
checkpointRoot = checkpointBlock[].root
|
||||
checkpointRoot
|
||||
|
||||
when isMainModule:
|
||||
import std/[os]
|
||||
|
@ -14,7 +14,7 @@ It is possibly to use trusted node sync with a third-party API provider -- see [
|
||||
|
||||
## Perform a trusted node sync
|
||||
|
||||
> **Tip:** Make sure to replace `http://localhost:5052` in the commands below with the appropriate endpoint for you. `http://localhost:5052` is the endpoint exposed by Nimbus but this is not consistent across all clients. For example, if your trusted node is a [Prysm node](https://docs.prylabs.network/docs/how-prysm-works/ethereum-public-api#performing-requests-against-a-local-prysm-node), it exposes `127.0.0.1:3500` by default. Which means you would run the commands below with
|
||||
> **Tip:** Make sure to replace `http://localhost:5052` in the commands below with the appropriate endpoint for you. `http://localhost:5052` is the endpoint exposed by Nimbus but this is not consistent across all clients. For example, if your trusted node is a [Prysm node](https://docs.prylabs.network/docs/how-prysm-works/ethereum-public-api#performing-requests-against-a-local-prysm-node), it exposes `127.0.0.1:3500` by default. Which means you would run the commands below with
|
||||
>
|
||||
> `--trusted-node-url=http://127.0.0.1:3500`
|
||||
|
||||
@ -38,8 +38,6 @@ build/nimbus_beacon_node trustedNodeSync --network:prater \
|
||||
--trusted-node-url=http://localhost:5052
|
||||
```
|
||||
|
||||
|
||||
|
||||
> **Note:**
|
||||
> Because trusted node sync by default copies all blocks via REST, if you use a third-party service to sync from, you may hit API limits. If this happens to you, you may need to use the `--backfill` option to [delay the backfill of the block history](./trusted-node-sync.md#delay-block-history-backfill).
|
||||
|
||||
@ -64,7 +62,6 @@ The `head` root is also printed in the log output at regular intervals.
|
||||
> ```
|
||||
|
||||
|
||||
|
||||
## Advanced
|
||||
|
||||
### Delay block history backfill
|
||||
@ -73,6 +70,8 @@ By default, both the state and the full block history will be downloaded from th
|
||||
|
||||
It is possible to get started more quickly by delaying the backfill of the block history using the `--backfill=false` parameter. In this case, the beacon node will first sync to the current head so that it can start performing its duties, then backfill the blocks from the network.
|
||||
|
||||
You can also resume the trusted node backfill at any time by simply running the trusted node sync command again.
|
||||
|
||||
> **Warning:** While backfilling blocks, your node will not be able to answer historical requests or sync requests. This might lead to you being de-scored, and eventually disconnected, by your peers.
|
||||
|
||||
### Modify sync point
|
||||
@ -97,6 +96,10 @@ curl -o block.32000.ssz -H 'Accept: application/octet-stream' http://localhost:5
|
||||
build/nimbus_beacon_node --data-dir:trusted --finalized-checkpoint-block=block.32000.ssz --finalized-checkpoint-state=state.32000.ssz
|
||||
```
|
||||
|
||||
## Caveats
|
||||
## Recreate historical state access indices
|
||||
|
||||
A node synced using trusted node sync will not be able to serve historical requests via the Beacon API from before the checkpoint. Future versions will resolve this issue.
|
||||
When performing checkpoint sync, the historical state data from the time before the checkpoint is not available. To recreate the indices and caches necessary for historical state access, run trusted node sync with the `--reindex` flag - this can be done on an already-synced node as well, in which case the process will simply resume where it left off:
|
||||
|
||||
```
|
||||
build/nimbus_beacon_node trustedNodeSync --reindex=true
|
||||
```
|
||||
|
@ -766,6 +766,11 @@ suite "Backfill":
|
||||
|
||||
dag.backfill.slot == GENESIS_SLOT
|
||||
|
||||
dag.rebuildIndex()
|
||||
|
||||
check:
|
||||
dag.getFinalizedEpochRef() != nil
|
||||
|
||||
test "reload backfill position":
|
||||
let
|
||||
tailBlock = blocks[^1]
|
||||
|
Loading…
x
Reference in New Issue
Block a user