From 51418a789401e08c07f79d0069c5e4b40d80abd8 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 12 May 2023 12:37:15 +0200 Subject: [PATCH] Incremental pruning (#4887) * Incremental pruning When turning on pruning the first time the current pruning algorithm will prune the full database at startup. This delays restart unnecessarily, since all of the pruned space is not needed at once. This PR introduces incremental pruning such that we will never prune more than 32 blocks or the sync speed, whichever is higher. This mode is expected to become default in a follow-up release. --- .../block_pools_types.nim | 6 + .../consensus_object_pools/blockchain_dag.nim | 152 +++++++++++------- beacon_chain/nimbus_beacon_node.nim | 5 +- docs/the_nimbus_book/src/history.md | 16 +- 4 files changed, 111 insertions(+), 68 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 70864205f..75c1477f7 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -175,6 +175,12 @@ type ## The last prune point ## We can prune up to finalizedHead + lastHistoryPruneHorizon*: Slot + ## The horizon when we last pruned, for horizon diff computation + + lastHistoryPruneBlockHorizon*: Slot + ## Block pruning progress at the last call + # ----------------------------------- # Rewinder - Mutable state processing diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index e4c460c28..fbca88758 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -50,10 +50,16 @@ declareGauge beacon_processed_deposits_total, "Number of total deposits included logScope: topics = "chaindag" const - # When finality happens, we prune historical states from the database except - # for a snapshort every 32 epochs from which replays can happen - there's a - # balance here between making long replays and saving on disk space EPOCHS_PER_STATE_SNAPSHOT* = 32 + ## When finality happens, we prune historical states from the database except + ## for a snapshot every 32 epochs from which replays can happen - there's a + ## balance here between making long replays and saving on disk space + MAX_SLOTS_PER_PRUNE* = SLOTS_PER_EPOCH + ## We prune the database incrementally so as not to introduce long + ## processing breaks - this number is the maximum number of blocks we allow + ## to be pruned every time the prune call is made (once per slot typically) + ## unless head is moving faster (ie during sync) + proc putBlock*( dag: ChainDAGRef, signedBlock: ForkyTrustedSignedBeaconBlock) = @@ -1049,6 +1055,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, info).expect("head blocks should apply") dag.head = headRef + dag.heads = @[headRef] assign(dag.clearanceState, dag.headState) @@ -1091,8 +1098,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, dag.finalizedHead = headRef.atSlot(finalizedSlot) dag.lastPrunePoint = dag.finalizedHead.toBlockSlotId().expect("not nil") - dag.heads = @[headRef] - doAssert dag.finalizedHead.blck != nil, "The finalized head should exist at the slot" doAssert dag.finalizedHead.blck.parent == nil, @@ -1201,6 +1206,16 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # have a cache dag.updateValidatorKeys(getStateField(dag.headState, validators).asSeq()) + # Initialize pruning such that when starting with a database that hasn't been + # pruned, we work our way from the tail to the horizon in incremental steps + dag.lastHistoryPruneHorizon = dag.horizon() + dag.lastHistoryPruneBlockHorizon = block: + let boundary = min(dag.tail.slot, dag.horizon()) + if boundary.epoch() >= EPOCHS_PER_STATE_SNAPSHOT: + start_slot(boundary.epoch() - EPOCHS_PER_STATE_SNAPSHOT) + else: + Slot(0) + info "Block DAG initialized", head = shortLog(dag.head), finalizedHead = shortLog(dag.finalizedHead), @@ -1841,30 +1856,75 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) = statePruneDur = statePruneTick - startTick, epochRefPruneDur = epochRefPruneTick - statePruneTick +proc pruneStep(horizon, lastHorizon, lastBlockHorizon: Slot): + tuple[stateHorizon, blockHorizon: Slot] = + ## Compute a reasonable incremental pruning step considering the current + ## horizon, how far the database has been pruned already and where we want the + ## tail to be - the return value shows the first state and block that we + ## should _keep_ (inclusive). + + const SLOTS_PER_STATE_SNAPSHOT = + uint64(EPOCHS_PER_STATE_SNAPSHOT * SLOTS_PER_EPOCH) + + let + blockHorizon = block: + let + # Keep up with horizon if it's moving fast, ie if we're syncing + maxSlots = max(horizon - lastHorizon, MAX_SLOTS_PER_PRUNE) + + # Move the block horizon cap with a lag so that it moves slot-by-slot + # instead of a big jump every time we prune a state - assuming we + # prune every slot, this makes us prune one slot at a time instead of + # a burst of prunes (as computed by maxSlots) around every snapshot + # change followed by no pruning for the rest of the period + maxBlockHorizon = + if horizon + 1 >= SLOTS_PER_STATE_SNAPSHOT: + horizon + 1 - SLOTS_PER_STATE_SNAPSHOT + else: + Slot(0) + + # `lastBlockHorizon` captures the case where we're incrementally + # pruning a database that hasn't been pruned for a while: it's + # initialized to a pre-tail value on startup and moves to approach + # `maxBlockHorizon`. + min(maxBlockHorizon, lastBlockHorizon + maxSlots) + + # Round up such that we remove state only once blocks have been removed + stateHorizon = + ((blockHorizon + SLOTS_PER_STATE_SNAPSHOT - 1) div + SLOTS_PER_STATE_SNAPSHOT) * SLOTS_PER_STATE_SNAPSHOT + + (Slot(stateHorizon), blockHorizon) + proc pruneHistory*(dag: ChainDAGRef, startup = false) = + ## Perform an incremental pruning step of the history if dag.db.db.readOnly: return - let horizon = dag.horizon() - if horizon == GENESIS_SLOT: - return - let - preTail = dag.tail - # Round to state snapshot boundary - this is where we'll leave the tail - # after pruning - stateHorizon = Epoch((horizon.epoch div EPOCHS_PER_STATE_SNAPSHOT) * EPOCHS_PER_STATE_SNAPSHOT) + horizon = dag.horizon() + (stateHorizon, blockHorizon) = pruneStep( + horizon, dag.lastHistoryPruneHorizon, dag.lastHistoryPruneBlockHorizon) + + doAssert blockHorizon <= stateHorizon, + "we must never prune blocks while leaving the state" + + debug "Pruning history", + horizon, blockHorizon, stateHorizon, + lastHorizon = dag.lastHistoryPruneHorizon, + lastBlockHorizon = dag.lastHistoryPruneBlockHorizon, + tail = dag.tail, head = dag.head + + dag.lastHistoryPruneHorizon = horizon + dag.lastHistoryPruneBlockHorizon = blockHorizon dag.db.withManyWrites: - if stateHorizon > 0 and - stateHorizon > (dag.tail.slot + SLOTS_PER_EPOCH - 1).epoch(): + if stateHorizon > dag.tail.slot: # First, we want to see if it's possible to prune any states - we store one # state every EPOCHS_PER_STATE_SNAPSHOT, so this happens infrequently. - debug "Pruning states", - horizon, stateHorizon, tail = dag.tail, head = dag.head var - cur = dag.getBlockIdAtSlot(stateHorizon.start_slot) + cur = dag.getBlockIdAtSlot(stateHorizon) var first = true while cur.isSome(): @@ -1882,7 +1942,7 @@ proc pruneHistory*(dag: ChainDAGRef, startup = false) = debug "Pruning historical state", bs dag.delState(bs) elif not bs.isProposed: - debug "Reached already-pruned slot, done pruning states", bs + trace "Reached already-pruned slot, done pruning states", bs break if bs.isProposed: @@ -1896,40 +1956,15 @@ proc pruneHistory*(dag: ChainDAGRef, startup = false) = else: break - # We can now prune all blocks before the tail - however, we'll add a - # small lag so that we typically prune one block at a time - otherwise, - # we'd be pruning `EPOCHS_PER_STATE_SNAPSHOT` every time the tail is - # updated - if H is the "normal" pruning point, E is the adjusted one and - # when T0 is reset to T1, we'll continue removing block by block instead - # of removing all blocks between T0 and T1 - # T0 T1 - # | | - # --------------------- - # | | - # E H - - const extraSlots = EPOCHS_PER_STATE_SNAPSHOT * SLOTS_PER_EPOCH - - if horizon < extraSlots: - return - - let - # We don't need the tail block itself, but we do need everything after - # that in order to be able to recreate states - tailSlot = dag.tail.slot - blockHorizon = - min(horizon - extraSlots, tailSlot) - - if dag.tail.slot - preTail.slot > 8192: - # First-time pruning or long offline period - notice "Pruning deep block history, this may take several minutes", - preTail, tail = dag.tail, head = dag.head, blockHorizon - else: - debug "Pruning blocks", - preTail, tail = dag.tail, head = dag.head, blockHorizon - - block: - var cur = dag.getBlockIdAtSlot(blockHorizon).map(proc(x: auto): auto = x.bid) + # Prune blocks after sanity-checking that we don't prune post-tail blocks - + # this could happen if a state is missing at the expected state horizon and + # would indicate a partially inconsistent database since the base + # invariant is that there exists a state at the snapshot slot - better not + # further mess things up regardless + if blockHorizon > GENESIS_SLOT and blockHorizon <= dag.tail.slot: + var + # Leave the horizon block itself + cur = dag.getBlockIdAtSlot(blockHorizon - 1).map(proc(x: auto): auto = x.bid) while cur.isSome: let @@ -1941,19 +1976,22 @@ proc pruneHistory*(dag: ChainDAGRef, startup = false) = break if not dag.db.delBlock(fork, bid.root): - # Stop at the first gap - a buggy DB might have more blocks but we - # have no efficient way of detecting that + # Stop at the first gap - this is typically the pruning point of the + # previous call to pruneHistory. An inconsistent DB might have more + # blocks beyond that point but we have no efficient way of detecting + # that. break cur = dag.parent(bid) - if startup: + if startup and + dag.cfg.consensusForkAtEpoch(blockHorizon.epoch) > ConsensusFork.Phase0: # Once during start, we'll clear all "old fork" data - this ensures we get # rid of any leftover junk in the tables - we do so after linear pruning # so as to "mostly" clean up the phase0 tables as well (which cannot be # pruned easily by fork) - let stateFork = dag.cfg.consensusForkAtEpoch(tailSlot.epoch) + let stateFork = dag.cfg.consensusForkAtEpoch(dag.tail.slot.epoch) if stateFork > ConsensusFork.Phase0: for fork in ConsensusFork.Phase0..