fix replays stalling processing (#2361)
* fix replays stalling processing Occasionally, attestations will arrive that vote for a target derived either from the finalized block or earlier. In these cases, Nimbus would replay the state transition of up to 32 epochs worth of blocks because the finalized state has been pruned, delaying other processing and leading to poor inclusion distance. * put cheap attestation checks before forming EpochRef * check that attestation target is not from an unviable history with regards to finalization * fix overly aggressive state pruning removing the state close to the finalized checkpoint resulting in rare long replays for valid attestations * log long replays * harden logging and traversal of nil BlockSlot * simplify target check no need to lookup target in chain dag again * fixup * fixup
This commit is contained in:
parent
3e2c0a220c
commit
3f8764ee61
|
@ -54,23 +54,28 @@ proc aggregate_attestations*(
|
|||
aggregate: maybe_slot_attestation.get,
|
||||
selection_proof: slot_signature))
|
||||
|
||||
func check_attestation_block_slot(
|
||||
pool: AttestationPool, attestationSlot: Slot, attestationBlck: BlockRef):
|
||||
func check_attestation_block(
|
||||
pool: AttestationPool, attestationSlot: Slot, blck: BlockRef):
|
||||
Result[void, (ValidationResult, cstring)] =
|
||||
# If we allow voting for very old blocks, the state transaction below will go
|
||||
# nuts and keep processing empty slots
|
||||
if not (attestationBlck.slot > pool.chainDag.finalizedHead.slot):
|
||||
# The voted-for block must be a descendant of the finalized block, thus it
|
||||
# must at least as new than the finalized checkpoint - in theory it could be
|
||||
# equal, but then we're voting for an already-finalized block which is pretty
|
||||
# useless - other blocks that are not rooted in the finalized chain will be
|
||||
# pruned by the chain dag, and thus we can no longer get a BlockRef for them
|
||||
if not (blck.slot > pool.chainDag.finalizedHead.slot):
|
||||
return err((ValidationResult.Ignore, cstring(
|
||||
"Voting for already-finalized block")))
|
||||
|
||||
# we'll also cap it at 4 epochs which is somewhat arbitrary, but puts an
|
||||
# upper bound on the processing done to validate the attestation
|
||||
# TODO revisit with less arbitrary approach
|
||||
if not (attestationSlot >= attestationBlck.slot):
|
||||
# The attestation shouldn't be voting for a block that didn't exist at the
|
||||
# time - not in spec, but hard to reason about
|
||||
if not (attestationSlot >= blck.slot):
|
||||
return err((ValidationResult.Ignore, cstring(
|
||||
"Voting for block that didn't exist at the time")))
|
||||
|
||||
if not ((attestationSlot - attestationBlck.slot) <= uint64(4 * SLOTS_PER_EPOCH)):
|
||||
# We'll also cap it at 4 epochs which is somewhat arbitrary, but puts an
|
||||
# upper bound on the processing done to validate the attestation
|
||||
# TODO revisit with less arbitrary approach
|
||||
if not ((attestationSlot - blck.slot) <= uint64(4 * SLOTS_PER_EPOCH)):
|
||||
return err((ValidationResult.Ignore, cstring("Voting for very old block")))
|
||||
|
||||
ok()
|
||||
|
@ -101,22 +106,36 @@ func check_propagation_slot_range(
|
|||
|
||||
ok()
|
||||
|
||||
func check_attestation_beacon_block(
|
||||
pool: var AttestationPool, attestation: Attestation):
|
||||
Result[void, (ValidationResult, cstring)] =
|
||||
# The block being voted for (attestation.data.beacon_block_root) passes
|
||||
# validation.
|
||||
func check_beacon_and_target_block(
|
||||
pool: var AttestationPool, data: AttestationData):
|
||||
Result[BlockRef, (ValidationResult, cstring)] =
|
||||
# The block being voted for (data.beacon_block_root) passes validation - by
|
||||
# extension, the target block must at that point also pass validation.
|
||||
# The target block is returned.
|
||||
# We rely on the chain DAG to have been validated, so check for the existence
|
||||
# of the block in the pool.
|
||||
let attestationBlck = pool.chainDag.getRef(attestation.data.beacon_block_root)
|
||||
if attestationBlck.isNil:
|
||||
pool.quarantine.addMissing(attestation.data.beacon_block_root)
|
||||
let blck = pool.chainDag.getRef(data.beacon_block_root)
|
||||
if blck.isNil:
|
||||
pool.quarantine.addMissing(data.beacon_block_root)
|
||||
return err((ValidationResult.Ignore, cstring("Attestation block unknown")))
|
||||
|
||||
# Not in spec - check that rewinding to the state is sane
|
||||
? check_attestation_block_slot(pool, attestation.data.slot, attestationBlck)
|
||||
? check_attestation_block(pool, data.slot, blck)
|
||||
|
||||
ok()
|
||||
# [REJECT] The attestation's target block is an ancestor of the block named
|
||||
# in the LMD vote -- i.e. get_ancestor(store,
|
||||
# attestation.data.beacon_block_root,
|
||||
# compute_start_slot_at_epoch(attestation.data.target.epoch)) ==
|
||||
# attestation.data.target.root
|
||||
let
|
||||
target = get_ancestor(
|
||||
blck, compute_start_slot_at_epoch(data.target.epoch), SLOTS_PER_EPOCH.int)
|
||||
|
||||
if not (target.root == data.target.root):
|
||||
return err((ValidationResult.Reject, cstring(
|
||||
"attestation's target block not an ancestor of LMD vote block")))
|
||||
|
||||
ok(target)
|
||||
|
||||
func check_aggregation_count(
|
||||
attestation: Attestation, singular: bool):
|
||||
|
@ -164,6 +183,11 @@ proc validateAttestation*(
|
|||
attestation: Attestation, wallTime: BeaconTime,
|
||||
topicCommitteeIndex: uint64, checksExpensive: bool):
|
||||
Result[seq[ValidatorIndex], (ValidationResult, cstring)] =
|
||||
# Some of the checks below have been reordered compared to the spec, to
|
||||
# perform the cheap checks first - in particular, we want to avoid loading
|
||||
# an `EpochRef` and checking signatures. This reordering might lead to
|
||||
# different IGNORE/REJECT results in turn affecting gossip scores.
|
||||
|
||||
# [REJECT] The attestation's epoch matches its target -- i.e.
|
||||
# attestation.data.target.epoch ==
|
||||
# compute_epoch_at_slot(attestation.data.slot)
|
||||
|
@ -184,11 +208,13 @@ proc validateAttestation*(
|
|||
# if bit == 0b1]) == 1).
|
||||
? check_aggregation_count(attestation, singular = true) # [REJECT]
|
||||
|
||||
let tgtBlck = pool.chainDag.getRef(attestation.data.target.root)
|
||||
if tgtBlck.isNil:
|
||||
pool.quarantine.addMissing(attestation.data.target.root)
|
||||
return err((ValidationResult.Ignore, cstring(
|
||||
"Attestation target block unknown")))
|
||||
# The block being voted for (attestation.data.beacon_block_root) has been seen
|
||||
# (via both gossip and non-gossip sources) (a client MAY queue attestations for
|
||||
# processing once block is retrieved).
|
||||
# The block being voted for (attestation.data.beacon_block_root) passes
|
||||
# validation.
|
||||
# [IGNORE] if block is unseen so far and enqueue it in missing blocks
|
||||
let target = ? check_beacon_and_target_block(pool, attestation.data) # [IGNORE/REJECT]
|
||||
|
||||
# The following rule follows implicitly from that we clear out any
|
||||
# unviable blocks from the chain dag:
|
||||
|
@ -198,9 +224,8 @@ proc validateAttestation*(
|
|||
# attestation.data.beacon_block_root,
|
||||
# compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) ==
|
||||
# store.finalized_checkpoint.root
|
||||
|
||||
let epochRef = pool.chainDag.getEpochRef(
|
||||
tgtBlck, attestation.data.target.epoch)
|
||||
let
|
||||
epochRef = pool.chainDag.getEpochRef(target, attestation.data.target.epoch)
|
||||
|
||||
# [REJECT] The committee index is within the expected range -- i.e.
|
||||
# data.index < get_committee_count_per_slot(state, data.target.epoch).
|
||||
|
@ -228,13 +253,6 @@ proc validateAttestation*(
|
|||
return err((ValidationResult.Reject, cstring(
|
||||
"validateAttestation: number of aggregation bits and committee size mismatch")))
|
||||
|
||||
# The block being voted for (attestation.data.beacon_block_root) has been seen
|
||||
# (via both gossip and non-gossip sources) (a client MAY queue aggregates for
|
||||
# processing once block is retrieved).
|
||||
# The block being voted for (attestation.data.beacon_block_root) passes
|
||||
# validation.
|
||||
? check_attestation_beacon_block(pool, attestation) # [IGNORE/REJECT]
|
||||
|
||||
let
|
||||
fork = pool.chainDag.headState.data.data.fork
|
||||
genesis_validators_root =
|
||||
|
@ -269,23 +287,6 @@ proc validateAttestation*(
|
|||
if v.isErr():
|
||||
return err((ValidationResult.Reject, v.error))
|
||||
|
||||
# [REJECT] The attestation's target block is an ancestor of the block named
|
||||
# in the LMD vote -- i.e. get_ancestor(store,
|
||||
# attestation.data.beacon_block_root,
|
||||
# compute_start_slot_at_epoch(attestation.data.target.epoch)) ==
|
||||
# attestation.data.target.root
|
||||
let attestationBlck = pool.chainDag.getRef(attestation.data.beacon_block_root)
|
||||
|
||||
# already checked in check_attestation_beacon_block()
|
||||
doAssert not attestationBlck.isNil
|
||||
|
||||
if not (get_ancestor(attestationBlck,
|
||||
compute_start_slot_at_epoch(attestation.data.target.epoch),
|
||||
SLOTS_PER_EPOCH.int).root ==
|
||||
attestation.data.target.root):
|
||||
return err((ValidationResult.Reject, cstring(
|
||||
"validateAttestation: attestation's target block not an ancestor of LMD vote block")))
|
||||
|
||||
# Only valid attestations go in the list, which keeps validator_index
|
||||
# in range
|
||||
if not (pool.nextAttestationEpoch.lenu64 > validator_index.uint64):
|
||||
|
@ -300,15 +301,13 @@ proc validateAggregate*(
|
|||
pool: var AttestationPool,
|
||||
signedAggregateAndProof: SignedAggregateAndProof, wallTime: BeaconTime):
|
||||
Result[seq[ValidatorIndex], (ValidationResult, cstring)] =
|
||||
let
|
||||
aggregate_and_proof = signedAggregateAndProof.message
|
||||
aggregate = aggregate_and_proof.aggregate
|
||||
# Some of the checks below have been reordered compared to the spec, to
|
||||
# perform the cheap checks first - in particular, we want to avoid loading
|
||||
# an `EpochRef` and checking signatures. This reordering might lead to
|
||||
# different IGNORE/REJECT results in turn affecting gossip scores.
|
||||
|
||||
# [IGNORE] aggregate.data.slot is within the last
|
||||
# ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a
|
||||
# MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot +
|
||||
# ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot
|
||||
? check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE]
|
||||
template aggregate_and_proof: untyped = signedAggregateAndProof.message
|
||||
template aggregate: untyped = aggregate_and_proof.aggregate
|
||||
|
||||
# [REJECT] The aggregate attestation's epoch matches its target -- i.e.
|
||||
# `aggregate.data.target.epoch == compute_epoch_at_slot(aggregate.data.slot)`
|
||||
|
@ -317,6 +316,12 @@ proc validateAggregate*(
|
|||
if v.isErr():
|
||||
return err((ValidationResult.Reject, v.error))
|
||||
|
||||
# [IGNORE] aggregate.data.slot is within the last
|
||||
# ATTESTATION_PROPAGATION_SLOT_RANGE slots (with a
|
||||
# MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. aggregate.data.slot +
|
||||
# ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= aggregate.data.slot
|
||||
? check_propagation_slot_range(aggregate.data, wallTime) # [IGNORE]
|
||||
|
||||
# [IGNORE] The valid aggregate attestation defined by
|
||||
# hash_tree_root(aggregate) has not already been seen (via aggregate gossip,
|
||||
# within a verified block, or through the creation of an equivalent aggregate
|
||||
|
@ -355,18 +360,14 @@ proc validateAggregate*(
|
|||
|
||||
# [REJECT] The block being voted for (aggregate.data.beacon_block_root)
|
||||
# passes validation.
|
||||
? check_attestation_beacon_block(pool, aggregate)
|
||||
# [IGNORE] if block is unseen so far and enqueue it in missing blocks
|
||||
let target = ? check_beacon_and_target_block(pool, aggregate.data)
|
||||
|
||||
# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
|
||||
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
|
||||
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
|
||||
let tgtBlck = pool.chainDag.getRef(aggregate.data.target.root)
|
||||
if tgtBlck.isNil:
|
||||
pool.quarantine.addMissing(aggregate.data.target.root)
|
||||
return err((ValidationResult.Ignore, cstring(
|
||||
"Aggregate target block unknown")))
|
||||
|
||||
let epochRef = pool.chainDag.getEpochRef(tgtBlck, aggregate.data.target.epoch)
|
||||
let
|
||||
epochRef = pool.chainDag.getEpochRef(target, aggregate.data.target.epoch)
|
||||
|
||||
if not is_aggregator(
|
||||
epochRef, aggregate.data.slot, aggregate.data.index.CommitteeIndex,
|
||||
|
|
|
@ -216,7 +216,9 @@ template head*(v: ChainDagRef): BlockRef = v.headState.blck
|
|||
|
||||
func shortLog*(v: BlockSlot): string =
|
||||
try:
|
||||
if v.blck.slot == v.slot:
|
||||
if v.blck.isNil():
|
||||
&"nil:0@{v.slot}"
|
||||
elif v.blck.slot == v.slot:
|
||||
&"{v.blck.root.data.toOpenArray(0, 3).toHex()}:{v.blck.slot}"
|
||||
else: # There was a gap - log it
|
||||
&"{v.blck.root.data.toOpenArray(0, 3).toHex()}:{v.blck.slot}@{v.slot}"
|
||||
|
@ -225,7 +227,7 @@ func shortLog*(v: BlockSlot): string =
|
|||
|
||||
func shortLog*(v: BlockRef): string =
|
||||
try:
|
||||
if v == nil:
|
||||
if v.isNil():
|
||||
"BlockRef(nil)"
|
||||
else:
|
||||
&"{v.root.data.toOpenArray(0, 3).toHex()}:{v.slot}"
|
||||
|
@ -234,7 +236,7 @@ func shortLog*(v: BlockRef): string =
|
|||
|
||||
func shortLog*(v: EpochRef): string =
|
||||
try:
|
||||
if v == nil:
|
||||
if v.isNil():
|
||||
"EpochRef(nil)"
|
||||
else:
|
||||
&"(epoch ref: {v.epoch})"
|
||||
|
|
|
@ -15,7 +15,8 @@ import
|
|||
../spec/[
|
||||
crypto, datatypes, digest, helpers, validator, state_transition,
|
||||
beaconstate],
|
||||
./block_pools_types, ./quarantine
|
||||
../time,
|
||||
"."/[block_pools_types, quarantine]
|
||||
|
||||
export block_pools_types, helpers
|
||||
|
||||
|
@ -82,7 +83,7 @@ func parent*(bs: BlockSlot): BlockSlot =
|
|||
func parentOrSlot*(bs: BlockSlot): BlockSlot =
|
||||
## Return a blockslot representing the previous slot, using the parent block
|
||||
## with the current slot if the current had a block
|
||||
if bs.slot == Slot(0):
|
||||
if bs.blck.isNil():
|
||||
BlockSlot(blck: nil, slot: Slot(0))
|
||||
elif bs.slot == bs.blck.slot:
|
||||
BlockSlot(blck: bs.blck.parent, slot: bs.slot)
|
||||
|
@ -494,16 +495,34 @@ proc getState(
|
|||
|
||||
true
|
||||
|
||||
func isStateCheckpoint(bs: BlockSlot): bool =
|
||||
## State checkpoints are the points in time for which we store full state
|
||||
## snapshots, which later serve as rewind starting points when replaying state
|
||||
## transitions from database, for example during reorgs.
|
||||
##
|
||||
# As a policy, we only store epoch boundary states without the epoch block
|
||||
# (if it exists) applied - the rest can be reconstructed by loading an epoch
|
||||
# boundary state and applying the missing blocks.
|
||||
# We also avoid states that were produced with empty slots only - as such,
|
||||
# there is only a checkpoint for the first epoch after a block.
|
||||
|
||||
# The tail block also counts as a state checkpoint!
|
||||
(bs.slot == bs.blck.slot and bs.blck.parent == nil) or
|
||||
(bs.slot.isEpoch and bs.slot.epoch == (bs.blck.slot.epoch + 1))
|
||||
|
||||
func stateCheckpoint*(bs: BlockSlot): BlockSlot =
|
||||
## The first ancestor BlockSlot that is a state checkpoint
|
||||
var bs = bs
|
||||
while not isStateCheckPoint(bs):
|
||||
bs = bs.parentOrSlot
|
||||
bs
|
||||
|
||||
proc getState(dag: ChainDAGRef, state: var StateData, bs: BlockSlot): bool =
|
||||
## Load a state from the database given a block and a slot - this will first
|
||||
## lookup the state root in the state root table then load the corresponding
|
||||
## state, if it exists
|
||||
if not bs.slot.isEpoch:
|
||||
return false # We only ever save epoch states - no need to hit database
|
||||
|
||||
# TODO earlier versions would store the epoch state with a the epoch block
|
||||
# applied - we generally shouldn't hit the database for such states but
|
||||
# will do so in a transitionary upgrade period!
|
||||
if not bs.isStateCheckpoint():
|
||||
return false # Only state checkpoints are stored - no need to hit DB
|
||||
|
||||
if (let stateRoot = dag.db.getStateRoot(bs.blck.root, bs.slot);
|
||||
stateRoot.isSome()):
|
||||
|
@ -518,17 +537,7 @@ proc putState*(dag: ChainDAGRef, state: StateData) =
|
|||
stateSlot = shortLog(state.data.data.slot)
|
||||
stateRoot = shortLog(state.data.root)
|
||||
|
||||
# As a policy, we only store epoch boundary states without the epoch block
|
||||
# (if it exists) applied - the rest can be reconstructed by loading an epoch
|
||||
# boundary state and applying the missing blocks.
|
||||
# We also avoid states that were produced with empty slots only - we should
|
||||
# not call this function for states that don't have a follow-up block
|
||||
if not state.data.data.slot.isEpoch:
|
||||
trace "Not storing non-epoch state"
|
||||
return
|
||||
|
||||
if state.data.data.slot.epoch != (state.blck.slot.epoch + 1):
|
||||
trace "Not storing state that isn't an immediate epoch successor to its block"
|
||||
if not isStateCheckpoint(state.blck.atSlot(state.data.data.slot)):
|
||||
return
|
||||
|
||||
if dag.db.containsState(state.data.root):
|
||||
|
@ -681,6 +690,8 @@ proc updateStateData*(
|
|||
# an earlier state must be loaded since there's no way to undo the slot
|
||||
# transitions
|
||||
|
||||
let startTime = Moment.now()
|
||||
|
||||
var
|
||||
ancestors: seq[BlockRef]
|
||||
cur = bs
|
||||
|
@ -785,15 +796,28 @@ proc updateStateData*(
|
|||
# ...and make sure to process empty slots as requested
|
||||
dag.advanceSlots(state, bs.slot, save, cache)
|
||||
|
||||
trace "State updated",
|
||||
blocks = ancestors.len,
|
||||
slots = state.data.data.slot - startSlot,
|
||||
stateRoot = shortLog(state.data.root),
|
||||
stateSlot = state.data.data.slot,
|
||||
startRoot = shortLog(startRoot),
|
||||
startSlot,
|
||||
blck = shortLog(bs),
|
||||
let diff = Moment.now() - startTime
|
||||
|
||||
logScope:
|
||||
blocks = ancestors.len
|
||||
slots = state.data.data.slot - startSlot
|
||||
stateRoot = shortLog(state.data.root)
|
||||
stateSlot = state.data.data.slot
|
||||
startRoot = shortLog(startRoot)
|
||||
startSlot
|
||||
blck = shortLog(bs)
|
||||
found
|
||||
diff = shortLog(diff)
|
||||
|
||||
if diff >= 1.seconds:
|
||||
# This might indicate there's a cache that's not in order or a disk that is
|
||||
# too slow - for now, it's here for investigative purposes and the cutoff
|
||||
# time might need tuning
|
||||
info "State replayed"
|
||||
elif ancestors.len > 0:
|
||||
debug "State replayed"
|
||||
else:
|
||||
trace "State advanced" # Normal case!
|
||||
|
||||
proc delState(dag: ChainDAGRef, bs: BlockSlot) =
|
||||
# Delete state state and mapping for a particular block+slot
|
||||
|
@ -890,8 +914,14 @@ proc updateHead*(
|
|||
|
||||
if finalizedHead != dag.finalizedHead:
|
||||
block: # Remove states, walking slot by slot
|
||||
var cur = finalizedHead
|
||||
while cur != dag.finalizedHead:
|
||||
# We remove all state checkpoints that come _before_ the current finalized
|
||||
# head, as we might frequently be asked to replay states from the
|
||||
# finalized checkpoint and onwards (for example when validating blocks and
|
||||
# attestations)
|
||||
var
|
||||
prev = dag.finalizedHead.stateCheckpoint.parentOrSlot
|
||||
cur = finalizedHead.stateCheckpoint.parentOrSlot
|
||||
while cur.blck != nil and cur != prev:
|
||||
# TODO This is a quick fix to prune some states from the database, but
|
||||
# not all, pending a smarter storage - the downside of pruning these
|
||||
# states is that certain rewinds will take longer
|
||||
|
|
|
@ -412,6 +412,14 @@ suiteReport "chain DAG finalization tests" & preset():
|
|||
let status = dag.addRawBlock(quarantine, lateBlock, nil)
|
||||
check: status.error == (ValidationResult.Ignore, Unviable)
|
||||
|
||||
block:
|
||||
let
|
||||
finalizedCheckpoint = dag.finalizedHead.stateCheckpoint
|
||||
headCheckpoint = dag.head.atSlot(dag.head.slot).stateCheckpoint
|
||||
check:
|
||||
db.getStateRoot(headCheckpoint.blck.root, headCheckpoint.slot).isSome
|
||||
db.getStateRoot(finalizedCheckpoint.blck.root, finalizedCheckpoint.slot).isSome
|
||||
|
||||
let
|
||||
dag2 = init(ChainDAGRef, defaultRuntimePreset, db)
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@ import
|
|||
eth/db/[kvstore, kvstore_sqlite3],
|
||||
testblockutil
|
||||
|
||||
export beacon_chain_db
|
||||
|
||||
type
|
||||
TestDuration = tuple[duration: float, label: string]
|
||||
|
||||
|
|
Loading…
Reference in New Issue