Move pruning out of latency critical path (#2384)

* Deferred DAG and fork choice pruning

* fixup

* Address https://github.com/status-im/nimbus-eth2/pull/2384/files#r589448448, rely only on onSLotEnd for state pruning

* no need to store needPruning in the data structure

* lastPrunePoint is updated in pruning proc

* Split eager and LazyPruning

* enforce pruning in updateHead
This commit is contained in:
Mamy Ratsimbazafy 2021-03-09 15:36:17 +01:00 committed by GitHub
parent a88d17dc04
commit 8e28a05cea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 153 additions and 72 deletions

View File

@ -122,6 +122,13 @@ type
## The latest block that was finalized according to the block in head
## Ancestors of this block are guaranteed to have 1 child only.
# -----------------------------------
# Pruning metadata
lastPrunePoint*: BlockSlot ##\
## The last prune point
## We can prune up to finalizedHead
# -----------------------------------
# Rewinder - Mutable state processing

View File

@ -438,6 +438,9 @@ proc init*(T: type ChainDAGRef,
res.clearanceState = res.headState
# Pruning metadata
res.lastPrunePoint = res.finalizedHead
info "Block dag initialized",
head = shortLog(headRef),
finalizedHead = shortLog(res.finalizedHead),
@ -827,10 +830,100 @@ proc delState(dag: ChainDAGRef, bs: BlockSlot) =
dag.db.delState(root.get())
dag.db.delStateRoot(bs.blck.root, bs.slot)
proc pruneBlocksDAG(dag: ChainDAGRef) =
## This prunes the block DAG
## This does NOT prune the cached state checkpoints and EpochRef
## This should be done after a new finalization point is reached
## to invalidate pending blocks or attestations referring
## to a now invalid fork.
##
## This does NOT update the `dag.lastPrunePoint` field.
## as the caches and fork choice can be pruned at a later time.
# Clean up block refs, walking block by block
if dag.lastPrunePoint != dag.finalizedHead:
# Finalization means that we choose a single chain as the canonical one -
# it also means we're no longer interested in any branches from that chain
# up to the finalization point
let hlen = dag.heads.len
for i in 0..<hlen:
let n = hlen - i - 1
let head = dag.heads[n]
if dag.finalizedHead.blck.isAncestorOf(head):
continue
var cur = head.atSlot(head.slot)
while not cur.blck.isAncestorOf(dag.finalizedHead.blck):
# TODO there may be more empty states here: those that have a slot
# higher than head.slot and those near the branch point - one
# needs to be careful though because those close to the branch
# point should not necessarily be cleaned up
dag.delState(cur) # TODO: should we move that disk I/O to `onSlotEnd`
if cur.blck.slot == cur.slot:
dag.blocks.del(cur.blck.root)
dag.db.delBlock(cur.blck.root)
if cur.blck.parent.isNil:
break
cur = cur.parentOrSlot
dag.heads.del(n)
debug "Pruned the blockchain DAG",
currentCandidateHeads = dag.heads.len
func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool =
dag.lastPrunePoint != dag.finalizedHead
proc pruneStateCachesDAG*(dag: ChainDAGRef) =
## This prunes the cached state checkpoints and EpochRef
## This does NOT prune the state associated with invalidated blocks on a fork
## They are pruned via `pruneBlocksDAG`
##
## This updates the `dag.lastPrunePoint` variable
doAssert dag.needStateCachesAndForkChoicePruning()
block: # Remove states, walking slot by slot
# We remove all state checkpoints that come _before_ the current finalized
# head, as we might frequently be asked to replay states from the
# finalized checkpoint and onwards (for example when validating blocks and
# attestations)
var
cur = dag.finalizedHead.stateCheckpoint.parentOrSlot
prev = dag.lastPrunePoint.stateCheckpoint.parentOrSlot
while cur.blck != nil and cur != prev:
# TODO This is a quick fix to prune some states from the database, but
# not all, pending a smarter storage - the downside of pruning these
# states is that certain rewinds will take longer
# After long periods of non-finalization, it can also take some time to
# release all these states!
if cur.slot.epoch mod 32 != 0 and cur.slot != dag.tail.slot:
dag.delState(cur)
cur = cur.parentOrSlot
block: # Clean up old EpochRef instances
# After finalization, we can clear up the epoch cache and save memory -
# it will be recomputed if needed
# TODO don't store recomputed pre-finalization epoch refs
var tmp = dag.finalizedHead.blck
while tmp != dag.lastPrunePoint.blck:
# leave the epoch cache in the last block of the epoch..
tmp = tmp.parent
if tmp.parent != nil:
tmp.parent.epochRefs = @[]
dag.lastPrunePoint = dag.finalizedHead
debug "Pruned the state checkpoints and DAG caches."
proc updateHead*(
dag: ChainDAGRef, newHead: BlockRef, quarantine: var QuarantineRef) =
dag: ChainDAGRef,
newHead: BlockRef,
quarantine: var QuarantineRef) =
## Update what we consider to be the current head, as given by the fork
## choice.
##
## The choice of head affects the choice of finalization point - the order
## of operations naturally becomes important here - after updating the head,
## blocks that were once considered potential candidates for a tree will
@ -913,74 +1006,19 @@ proc updateHead*(
epochRef.shuffled_active_validator_indices.lenu64().toGaugeValue)
if finalizedHead != dag.finalizedHead:
block: # Remove states, walking slot by slot
# We remove all state checkpoints that come _before_ the current finalized
# head, as we might frequently be asked to replay states from the
# finalized checkpoint and onwards (for example when validating blocks and
# attestations)
var
prev = dag.finalizedHead.stateCheckpoint.parentOrSlot
cur = finalizedHead.stateCheckpoint.parentOrSlot
while cur.blck != nil and cur != prev:
# TODO This is a quick fix to prune some states from the database, but
# not all, pending a smarter storage - the downside of pruning these
# states is that certain rewinds will take longer
# After long periods of non-finalization, it can also take some time to
# release all these states!
if cur.slot.epoch mod 32 != 0 and cur.slot != dag.tail.slot:
dag.delState(cur)
cur = cur.parentOrSlot
block: # Clean up block refs, walking block by block
# Finalization means that we choose a single chain as the canonical one -
# it also means we're no longer interested in any branches from that chain
# up to the finalization point
let hlen = dag.heads.len
for i in 0..<hlen:
let n = hlen - i - 1
let head = dag.heads[n]
if finalizedHead.blck.isAncestorOf(head):
continue
var cur = head.atSlot(head.slot)
while not cur.blck.isAncestorOf(finalizedHead.blck):
# TODO there may be more empty states here: those that have a slot
# higher than head.slot and those near the branch point - one
# needs to be careful though because those close to the branch
# point should not necessarily be cleaned up
dag.delState(cur)
if cur.blck.slot == cur.slot:
dag.blocks.del(cur.blck.root)
dag.db.delBlock(cur.blck.root)
if cur.blck.parent.isNil:
break
cur = cur.parentOrSlot
dag.heads.del(n)
block: # Clean up old EpochRef instances
# After finalization, we can clear up the epoch cache and save memory -
# it will be recomputed if needed
# TODO don't store recomputed pre-finalization epoch refs
var tmp = finalizedHead.blck
while tmp != dag.finalizedHead.blck:
# leave the epoch cache in the last block of the epoch..
tmp = tmp.parent
if tmp.parent != nil:
tmp.parent.epochRefs = @[]
notice "Reached new finalization checkpoint",
newFinalizedHead = shortLog(finalizedHead),
oldFinalizedHead = shortLog(dag.finalizedHead)
dag.finalizedHead = finalizedHead
notice "Reached new finalization checkpoint",
finalizedHead = shortLog(finalizedHead),
heads = dag.heads.len
beacon_finalized_epoch.set(
dag.headState.data.data.finalized_checkpoint.epoch.toGaugeValue)
beacon_finalized_root.set(
dag.headState.data.data.finalized_checkpoint.root.toGaugeValue)
dag.pruneBlocksDAG()
proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): bool =
let
headBlockRoot = db.getHeadBlock()

View File

@ -111,8 +111,12 @@ proc expectBlock*(self: var Eth2Processor, expectedSlot: Slot): Future[bool] =
return fut
proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
## Trigger fork choice and returns the new head block.
## Can return `nil`
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# TODO: DAG & fork choice procs are unrelated to gossip validation
# Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectHead(wallSlot)
if newHead.isNil():
@ -122,17 +126,23 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
let
oldFinalized = self.chainDag.finalizedHead.blck
self.chainDag.updateHead(newHead, self.quarantine)
# Cleanup the fork choice v2 if we have a finalized head
if oldFinalized != self.chainDag.finalizedHead.blck:
self.attestationPool[].prune()
self.checkExpectedBlock()
proc pruneStateCachesAndForkChoice*(self: var Eth2Processor) =
## Prune unneeded and invalidated data after finalization
## - the DAG state checkpoints
## - the DAG EpochRef
## - the attestation pool/fork choice
# TODO: DAG & fork choice procs are unrelated to gossip validation
# Cleanup DAG & fork choice if we have a finalized head
if self.chainDag.needStateCachesAndForkChoicePruning():
self.chainDag.pruneStateCachesDAG()
self.attestationPool[].prune()
proc dumpBlock[T](
self: Eth2Processor, signedBlock: SignedBeaconBlock,
res: Result[T, (ValidationResult, BlockError)]) =
@ -244,7 +254,8 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
if res.isOk():
# Eagerly update head in case the new block gets selected
self.updateHead(wallSlot)
self.updateHead(wallSlot) # This also eagerly prunes the blocks DAG to prevent processing forks.
# self.pruneStateCachesDAG() # Amortized pruning, we don't prune states & fork choice here but in `onSlotEnd`()
let updateDone = now(chronos.Moment)
let storeBlockDuration = storeDone - start
@ -277,7 +288,7 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practise
{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practice
proc blockValidator*(
self: var Eth2Processor,

View File

@ -894,6 +894,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the
# next slot
# Delay part of pruning until latency critical duties are done.
# The other part of pruning, `pruneBlocksDAG`, is done eagerly.
node.processor[].pruneStateCachesAndForkChoice()
when declared(GC_fullCollect):
# The slots in the beacon node work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy

View File

@ -173,6 +173,9 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
blck() = added[]
chainDag.updateHead(added[], quarantine)
if chainDag.needStateCachesAndForkChoicePruning():
chainDag.pruneStateCachesDAG()
attPool.prune()
var
lastEth1BlockAt = genesisTime

View File

@ -52,6 +52,11 @@ template wrappedTimedTest(name: string, body: untyped) =
body
wrappedTest()
proc pruneAtFinalization(dag: ChainDAGRef, attPool: AttestationPool) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
# pool[].prune() # We test logic without attestation pool / fork choice pruning
suiteReport "Attestation pool processing" & preset():
## For now just test that we can compile and execute block processing with
## mock data.
@ -343,6 +348,7 @@ suiteReport "Attestation pool processing" & preset():
let head = pool[].selectHead(blockRef[].slot)
doassert: head == blockRef[]
chainDag.updateHead(head, quarantine)
pruneAtFinalization(chainDag, pool[])
attestations.setlen(0)
for index in 0'u64 ..< committees_per_slot:
@ -413,6 +419,7 @@ suiteReport "Attestation validation " & preset():
check: added.isOk()
chainDag.updateHead(added[], quarantine)
pruneAtFinalization(chainDag, pool[])
var
# Create an attestation for slot 1!

View File

@ -33,6 +33,10 @@ template wrappedTimedTest(name: string, body: untyped) =
body
wrappedTest()
proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
suiteReport "BlockRef and helpers" & preset():
wrappedTimedTest "isAncestorOf sanity" & preset():
let
@ -182,6 +186,7 @@ suiteReport "Block pool processing" & preset():
b4Add[].parent == b2Add[]
dag.updateHead(b4Add[], quarantine)
dag.pruneAtFinalization()
var blocks: array[3, BlockRef]
@ -245,6 +250,7 @@ suiteReport "Block pool processing" & preset():
b2Get.get().refs.parent == b1Get.get().refs
dag.updateHead(b2Get.get().refs, quarantine)
dag.pruneAtFinalization()
# The heads structure should have been updated to contain only the new
# b2 head
@ -278,6 +284,7 @@ suiteReport "Block pool processing" & preset():
b1Add = dag.addRawBlock(quarantine, b1, nil)
dag.updateHead(b1Add[], quarantine)
dag.pruneAtFinalization()
check:
dag.head == b1Add[]
@ -372,6 +379,7 @@ suiteReport "chain DAG finalization tests" & preset():
let added = dag.addRawBlock(quarantine, blck, nil)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
check:
dag.heads.len() == 1
@ -444,6 +452,7 @@ suiteReport "chain DAG finalization tests" & preset():
let added = dag.addRawBlock(quarantine, blck, nil)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
check:
dag.heads.len() == 1
@ -483,6 +492,7 @@ suiteReport "chain DAG finalization tests" & preset():
let added = dag.addRawBlock(quarantine, blck, nil)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
# Advance past epoch so that the epoch transition is gapped
check:
@ -498,6 +508,7 @@ suiteReport "chain DAG finalization tests" & preset():
let added = dag.addRawBlock(quarantine, blck, nil)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag.pruneAtFinalization()
let
dag2 = init(ChainDAGRef, defaultRuntimePreset, db)