mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-20 02:08:12 +00:00
Allow reindexing to start from an arbitrary state (#4437)
When not backfilling all the way to genesis (#4421), it becomes more useful to start rebuilding the historical indices from an arbitrary starting point. To rebuild the index from non-genesis, a state and an unbroken block history is needed - here, we allow loading the state from an era file and recreating the history from there onwards. * speed up partial era state loading
This commit is contained in:
parent
45654984a9
commit
aff0505807
@ -2184,21 +2184,13 @@ func needsBackfill*(dag: ChainDAGRef): bool =
|
|||||||
proc rebuildIndex*(dag: ChainDAGRef) =
|
proc rebuildIndex*(dag: ChainDAGRef) =
|
||||||
## After a checkpoint sync, we lack intermediate states to replay from - this
|
## After a checkpoint sync, we lack intermediate states to replay from - this
|
||||||
## function rebuilds them so that historical replay can take place again
|
## function rebuilds them so that historical replay can take place again
|
||||||
## TODO handle databases without genesis state
|
## TODO the pruning of junk states could be moved to a separate function that
|
||||||
if dag.backfill.slot > 0:
|
## runs either on startup
|
||||||
debug "Backfill not complete, cannot rebuild archive"
|
|
||||||
return
|
|
||||||
|
|
||||||
if dag.tail.slot == GENESIS_SLOT:
|
|
||||||
# The tail is the earliest slot for which we're supposed to have states -
|
|
||||||
# if it's sufficiently recent, don't do anything
|
|
||||||
debug "Archive does not need rebuilding"
|
|
||||||
return
|
|
||||||
|
|
||||||
# First, we check what states we already have in the database - that allows
|
# First, we check what states we already have in the database - that allows
|
||||||
# resuming the operation at any time
|
# resuming the operation at any time
|
||||||
let
|
let
|
||||||
roots = dag.db.loadStateRoots()
|
roots = dag.db.loadStateRoots()
|
||||||
|
historicalRoots = getStateField(dag.headState, historical_roots).asSeq()
|
||||||
|
|
||||||
var
|
var
|
||||||
canonical = newSeq[Eth2Digest](
|
canonical = newSeq[Eth2Digest](
|
||||||
@ -2211,6 +2203,8 @@ proc rebuildIndex*(dag: ChainDAGRef) =
|
|||||||
for k, v in roots:
|
for k, v in roots:
|
||||||
if k[0] >= dag.finalizedHead.slot:
|
if k[0] >= dag.finalizedHead.slot:
|
||||||
continue # skip newer stuff
|
continue # skip newer stuff
|
||||||
|
if k[0] < dag.backfill.slot:
|
||||||
|
continue # skip stuff for which we have no blocks
|
||||||
|
|
||||||
if not isFinalizedStateSnapshot(k[0]):
|
if not isFinalizedStateSnapshot(k[0]):
|
||||||
# `tail` will move at the end of the process, so we won't need any
|
# `tail` will move at the end of the process, so we won't need any
|
||||||
@ -2238,19 +2232,53 @@ proc rebuildIndex*(dag: ChainDAGRef) =
|
|||||||
var
|
var
|
||||||
cache: StateCache
|
cache: StateCache
|
||||||
info: ForkedEpochInfo
|
info: ForkedEpochInfo
|
||||||
|
tailBid: Opt[BlockId]
|
||||||
|
states: int
|
||||||
|
|
||||||
# `canonical` holds all slots at which a state is expected to appear, using a
|
# `canonical` holds all slots at which a state is expected to appear, using a
|
||||||
# zero root whenever a particular state is missing - this way, if there's
|
# zero root whenever a particular state is missing - this way, if there's
|
||||||
# partial progress or gaps, they will be dealt with correctly
|
# partial progress or gaps, they will be dealt with correctly
|
||||||
for i, state_root in canonical.mpairs():
|
for i, state_root in canonical.mpairs():
|
||||||
if not state_root.isZero:
|
let
|
||||||
|
slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
||||||
|
|
||||||
|
if slot < dag.backfill.slot:
|
||||||
|
# TODO if we have era files, we could try to load blocks from them at
|
||||||
|
# this point
|
||||||
|
# TODO if we don't do the above, we can of course compute the starting `i`
|
||||||
continue
|
continue
|
||||||
|
|
||||||
doAssert i > 0, "Genesis should always be available"
|
if tailBid.isNone():
|
||||||
|
if state_root.isZero:
|
||||||
|
# If we can find an era file with this state, use it as an alternative
|
||||||
|
# starting point - ignore failures for now
|
||||||
|
var bytes: seq[byte]
|
||||||
|
if dag.era.getState(historicalRoots, slot, state[]).isOk():
|
||||||
|
state_root = getStateRoot(state[])
|
||||||
|
|
||||||
|
withState(state[]): dag.db.putState(forkyState)
|
||||||
|
tailBid = Opt.some state[].latest_block_id()
|
||||||
|
|
||||||
|
else:
|
||||||
|
if not dag.db.getState(
|
||||||
|
dag.cfg.stateForkAtEpoch(slot.epoch), state_root, state[], noRollback):
|
||||||
|
fatal "Cannot load state, database corrupt or created for a different network?",
|
||||||
|
state_root, slot
|
||||||
|
quit 1
|
||||||
|
tailBid = Opt.some state[].latest_block_id()
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
if i == 0 or canonical[i - 1].isZero:
|
||||||
|
reset(tailBid) # No unbroken history!
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not state_root.isZero:
|
||||||
|
states += 1
|
||||||
|
continue
|
||||||
|
|
||||||
let
|
let
|
||||||
startSlot = Epoch((i - 1) * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
startSlot = Epoch((i - 1) * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
||||||
slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot
|
|
||||||
|
|
||||||
info "Recreating state snapshot",
|
info "Recreating state snapshot",
|
||||||
slot, startStateRoot = canonical[i - 1], startSlot
|
slot, startStateRoot = canonical[i - 1], startSlot
|
||||||
@ -2288,18 +2316,17 @@ proc rebuildIndex*(dag: ChainDAGRef) =
|
|||||||
|
|
||||||
state_root = forkyState.root
|
state_root = forkyState.root
|
||||||
|
|
||||||
# Now that we have states all the way to genesis, we can adjust the tail
|
# Now that we've found a starting point and topped up with "intermediate"
|
||||||
# and readjust the in-memory indices to what they would look like if we had
|
# states, we can update the tail to start at the starting point of the
|
||||||
# started with an earlier tail
|
# first loadable state
|
||||||
let
|
|
||||||
genesis =
|
|
||||||
dag.getBlockIdAtSlot(GENESIS_SLOT).expect("Genesis in database").bid
|
|
||||||
dag.db.putTailBlock(genesis.root)
|
|
||||||
|
|
||||||
dag.tail = genesis
|
if tailBid.isSome():
|
||||||
|
dag.tail = tailBid.get()
|
||||||
|
dag.db.putTailBlock(dag.tail.root)
|
||||||
|
|
||||||
if junk.len > 0:
|
if junk.len > 0:
|
||||||
info "Dropping redundant states", states = junk.len
|
info "Dropping redundant states", states, redundant = junk.len
|
||||||
|
|
||||||
for i in junk:
|
for i in junk:
|
||||||
|
dag.db.delStateRoot(i[0][1], i[0][0])
|
||||||
dag.db.delState(i[1])
|
dag.db.delState(i[1])
|
||||||
|
@ -138,7 +138,7 @@ proc getStateSZ*(
|
|||||||
|
|
||||||
proc getStateSSZ*(
|
proc getStateSSZ*(
|
||||||
f: EraFile, slot: Slot, bytes: var seq[byte],
|
f: EraFile, slot: Slot, bytes: var seq[byte],
|
||||||
partial: Opt[int] = default(Opt[int])): Result[void, string] =
|
partial = Opt.none(int)): Result[void, string] =
|
||||||
var tmp: seq[byte]
|
var tmp: seq[byte]
|
||||||
? f.getStateSZ(slot, tmp)
|
? f.getStateSZ(slot, tmp)
|
||||||
|
|
||||||
@ -319,11 +319,23 @@ proc getStateSZ*(
|
|||||||
|
|
||||||
proc getStateSSZ*(
|
proc getStateSSZ*(
|
||||||
db: EraDB, historical_roots: openArray[Eth2Digest], slot: Slot,
|
db: EraDB, historical_roots: openArray[Eth2Digest], slot: Slot,
|
||||||
bytes: var seq[byte], partial = Opt[int].err()): Result[void, string] =
|
bytes: var seq[byte], partial = Opt.none(int)): Result[void, string] =
|
||||||
let
|
let
|
||||||
f = ? db.getEraFile(historical_roots, slot.era)
|
f = ? db.getEraFile(historical_roots, slot.era)
|
||||||
|
|
||||||
f.getStateSSZ(slot, bytes)
|
f.getStateSSZ(slot, bytes, partial)
|
||||||
|
|
||||||
|
proc getState*(
|
||||||
|
db: EraDB, historical_roots: openArray[Eth2Digest], slot: Slot,
|
||||||
|
state: var ForkedHashedBeaconState): Result[void, string] =
|
||||||
|
var bytes: seq[byte]
|
||||||
|
? db.getStateSSZ(historical_roots, slot, bytes)
|
||||||
|
|
||||||
|
try:
|
||||||
|
state = readSszForkedHashedBeaconState(db.cfg, slot, bytes)
|
||||||
|
ok()
|
||||||
|
except CatchableError as exc:
|
||||||
|
err(exc.msg)
|
||||||
|
|
||||||
type
|
type
|
||||||
PartialBeaconState = object
|
PartialBeaconState = object
|
||||||
|
@ -874,11 +874,21 @@ type
|
|||||||
genesis_validators_root: Eth2Digest
|
genesis_validators_root: Eth2Digest
|
||||||
slot: Slot
|
slot: Slot
|
||||||
|
|
||||||
|
func readSszForkedHashedBeaconState*(
|
||||||
|
cfg: RuntimeConfig, slot: Slot, data: openArray[byte]):
|
||||||
|
ForkedHashedBeaconState {.raises: [Defect, SszError].} =
|
||||||
|
# TODO https://github.com/nim-lang/Nim/issues/19357
|
||||||
|
result = ForkedHashedBeaconState(
|
||||||
|
kind: cfg.stateForkAtEpoch(slot.epoch()))
|
||||||
|
|
||||||
|
withState(result):
|
||||||
|
readSszBytes(data, forkyState.data)
|
||||||
|
forkyState.root = hash_tree_root(forkyState.data)
|
||||||
|
|
||||||
func readSszForkedHashedBeaconState*(cfg: RuntimeConfig, data: openArray[byte]):
|
func readSszForkedHashedBeaconState*(cfg: RuntimeConfig, data: openArray[byte]):
|
||||||
ForkedHashedBeaconState {.raises: [Defect, SszError].} =
|
ForkedHashedBeaconState {.raises: [Defect, SszError].} =
|
||||||
## Helper to read a header from bytes when it's not certain what kind of state
|
## Read a state picking the right fork by first reading the slot from the byte
|
||||||
## it is - this happens for example when loading an SSZ state from command
|
## source
|
||||||
## line
|
|
||||||
if data.len() < sizeof(BeaconStateHeader):
|
if data.len() < sizeof(BeaconStateHeader):
|
||||||
raise (ref MalformedSszError)(msg: "Not enough data for BeaconState header")
|
raise (ref MalformedSszError)(msg: "Not enough data for BeaconState header")
|
||||||
let header = SSZ.decode(
|
let header = SSZ.decode(
|
||||||
@ -886,12 +896,7 @@ func readSszForkedHashedBeaconState*(cfg: RuntimeConfig, data: openArray[byte]):
|
|||||||
BeaconStateHeader)
|
BeaconStateHeader)
|
||||||
|
|
||||||
# TODO https://github.com/nim-lang/Nim/issues/19357
|
# TODO https://github.com/nim-lang/Nim/issues/19357
|
||||||
result = ForkedHashedBeaconState(
|
result = readSszForkedHashedBeaconState(cfg, header.slot, data)
|
||||||
kind: cfg.stateForkAtEpoch(header.slot.epoch()))
|
|
||||||
|
|
||||||
withState(result):
|
|
||||||
readSszBytes(data, forkyState.data)
|
|
||||||
forkyState.root = hash_tree_root(forkyState.data)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
ForkedBeaconBlockHeader = object
|
ForkedBeaconBlockHeader = object
|
||||||
|
Loading…
x
Reference in New Issue
Block a user