From aff0505807ed81d7ca1677117dcfa9b30707eb50 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 3 Jan 2023 20:37:09 +0100 Subject: [PATCH] Allow reindexing to start from an arbitrary state (#4437) When not backfilling all the way to genesis (#4421), it becomes more useful to start rebuilding the historical indices from an arbitrary starting point. To rebuild the index from non-genesis, a state and an unbroken block history is needed - here, we allow loading the state from an era file and recreating the history from there onwards. * speed up partial era state loading --- .../consensus_object_pools/blockchain_dag.nim | 73 +++++++++++++------ beacon_chain/era_db.nim | 18 ++++- beacon_chain/spec/forks.nim | 23 +++--- 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 6eca17acd..6af1a47a3 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -2184,21 +2184,13 @@ func needsBackfill*(dag: ChainDAGRef): bool = proc rebuildIndex*(dag: ChainDAGRef) = ## After a checkpoint sync, we lack intermediate states to replay from - this ## function rebuilds them so that historical replay can take place again - ## TODO handle databases without genesis state - if dag.backfill.slot > 0: - debug "Backfill not complete, cannot rebuild archive" - return - - if dag.tail.slot == GENESIS_SLOT: - # The tail is the earliest slot for which we're supposed to have states - - # if it's sufficiently recent, don't do anything - debug "Archive does not need rebuilding" - return - + ## TODO the pruning of junk states could be moved to a separate function that + ## runs either on startup # First, we check what states we already have in the database - that allows # resuming the operation at any time let roots = dag.db.loadStateRoots() + historicalRoots = getStateField(dag.headState, historical_roots).asSeq() var canonical = newSeq[Eth2Digest]( @@ -2211,6 +2203,8 @@ proc rebuildIndex*(dag: ChainDAGRef) = for k, v in roots: if k[0] >= dag.finalizedHead.slot: continue # skip newer stuff + if k[0] < dag.backfill.slot: + continue # skip stuff for which we have no blocks if not isFinalizedStateSnapshot(k[0]): # `tail` will move at the end of the process, so we won't need any @@ -2238,19 +2232,53 @@ proc rebuildIndex*(dag: ChainDAGRef) = var cache: StateCache info: ForkedEpochInfo + tailBid: Opt[BlockId] + states: int # `canonical` holds all slots at which a state is expected to appear, using a # zero root whenever a particular state is missing - this way, if there's # partial progress or gaps, they will be dealt with correctly for i, state_root in canonical.mpairs(): - if not state_root.isZero: + let + slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot + + if slot < dag.backfill.slot: + # TODO if we have era files, we could try to load blocks from them at + # this point + # TODO if we don't do the above, we can of course compute the starting `i` continue - doAssert i > 0, "Genesis should always be available" + if tailBid.isNone(): + if state_root.isZero: + # If we can find an era file with this state, use it as an alternative + # starting point - ignore failures for now + var bytes: seq[byte] + if dag.era.getState(historicalRoots, slot, state[]).isOk(): + state_root = getStateRoot(state[]) + + withState(state[]): dag.db.putState(forkyState) + tailBid = Opt.some state[].latest_block_id() + + else: + if not dag.db.getState( + dag.cfg.stateForkAtEpoch(slot.epoch), state_root, state[], noRollback): + fatal "Cannot load state, database corrupt or created for a different network?", + state_root, slot + quit 1 + tailBid = Opt.some state[].latest_block_id() + + continue + + if i == 0 or canonical[i - 1].isZero: + reset(tailBid) # No unbroken history! + continue + + if not state_root.isZero: + states += 1 + continue let startSlot = Epoch((i - 1) * EPOCHS_PER_STATE_SNAPSHOT).start_slot - slot = Epoch(i * EPOCHS_PER_STATE_SNAPSHOT).start_slot info "Recreating state snapshot", slot, startStateRoot = canonical[i - 1], startSlot @@ -2288,18 +2316,17 @@ proc rebuildIndex*(dag: ChainDAGRef) = state_root = forkyState.root - # Now that we have states all the way to genesis, we can adjust the tail - # and readjust the in-memory indices to what they would look like if we had - # started with an earlier tail - let - genesis = - dag.getBlockIdAtSlot(GENESIS_SLOT).expect("Genesis in database").bid - dag.db.putTailBlock(genesis.root) + # Now that we've found a starting point and topped up with "intermediate" + # states, we can update the tail to start at the starting point of the + # first loadable state - dag.tail = genesis + if tailBid.isSome(): + dag.tail = tailBid.get() + dag.db.putTailBlock(dag.tail.root) if junk.len > 0: - info "Dropping redundant states", states = junk.len + info "Dropping redundant states", states, redundant = junk.len for i in junk: + dag.db.delStateRoot(i[0][1], i[0][0]) dag.db.delState(i[1]) diff --git a/beacon_chain/era_db.nim b/beacon_chain/era_db.nim index b0f9f14fe..a44ee683e 100644 --- a/beacon_chain/era_db.nim +++ b/beacon_chain/era_db.nim @@ -138,7 +138,7 @@ proc getStateSZ*( proc getStateSSZ*( f: EraFile, slot: Slot, bytes: var seq[byte], - partial: Opt[int] = default(Opt[int])): Result[void, string] = + partial = Opt.none(int)): Result[void, string] = var tmp: seq[byte] ? f.getStateSZ(slot, tmp) @@ -319,11 +319,23 @@ proc getStateSZ*( proc getStateSSZ*( db: EraDB, historical_roots: openArray[Eth2Digest], slot: Slot, - bytes: var seq[byte], partial = Opt[int].err()): Result[void, string] = + bytes: var seq[byte], partial = Opt.none(int)): Result[void, string] = let f = ? db.getEraFile(historical_roots, slot.era) - f.getStateSSZ(slot, bytes) + f.getStateSSZ(slot, bytes, partial) + +proc getState*( + db: EraDB, historical_roots: openArray[Eth2Digest], slot: Slot, + state: var ForkedHashedBeaconState): Result[void, string] = + var bytes: seq[byte] + ? db.getStateSSZ(historical_roots, slot, bytes) + + try: + state = readSszForkedHashedBeaconState(db.cfg, slot, bytes) + ok() + except CatchableError as exc: + err(exc.msg) type PartialBeaconState = object diff --git a/beacon_chain/spec/forks.nim b/beacon_chain/spec/forks.nim index 4f7de8dfc..05db39fdb 100644 --- a/beacon_chain/spec/forks.nim +++ b/beacon_chain/spec/forks.nim @@ -874,11 +874,21 @@ type genesis_validators_root: Eth2Digest slot: Slot +func readSszForkedHashedBeaconState*( + cfg: RuntimeConfig, slot: Slot, data: openArray[byte]): + ForkedHashedBeaconState {.raises: [Defect, SszError].} = + # TODO https://github.com/nim-lang/Nim/issues/19357 + result = ForkedHashedBeaconState( + kind: cfg.stateForkAtEpoch(slot.epoch())) + + withState(result): + readSszBytes(data, forkyState.data) + forkyState.root = hash_tree_root(forkyState.data) + func readSszForkedHashedBeaconState*(cfg: RuntimeConfig, data: openArray[byte]): ForkedHashedBeaconState {.raises: [Defect, SszError].} = - ## Helper to read a header from bytes when it's not certain what kind of state - ## it is - this happens for example when loading an SSZ state from command - ## line + ## Read a state picking the right fork by first reading the slot from the byte + ## source if data.len() < sizeof(BeaconStateHeader): raise (ref MalformedSszError)(msg: "Not enough data for BeaconState header") let header = SSZ.decode( @@ -886,12 +896,7 @@ func readSszForkedHashedBeaconState*(cfg: RuntimeConfig, data: openArray[byte]): BeaconStateHeader) # TODO https://github.com/nim-lang/Nim/issues/19357 - result = ForkedHashedBeaconState( - kind: cfg.stateForkAtEpoch(header.slot.epoch())) - - withState(result): - readSszBytes(data, forkyState.data) - forkyState.root = hash_tree_root(forkyState.data) + result = readSszForkedHashedBeaconState(cfg, header.slot, data) type ForkedBeaconBlockHeader = object