Perform block pre-check before validating execution (#5169)
* Perform block pre-check before validating execution When syncing, blocks have not been gossip-validated and are therefore prone to trivial faults like being known-unviable, duplicate or missing their parent. In addition, the duplicate-block check in BlockProcessor was not considering the quarantine flow and would therefore cause recently-quarantined blocks to be silenty dropped when their parent appears delaying the sync end-game and thus causing longer startup resync time. This PR verifies trivial conditions before performing execution validation thus avoiding duplicates and missing parents alike. It also ensures that the fast-sync EL mode is used for finalized blocks even if the EL is timing out / slow to respond - this allows the CL to complete its sync faster and switch to "normal" lock-step at the head of the chain more quickly, thus also allowing the EL to access the latest consensensus information earlier. * oops * remove unused constant
This commit is contained in:
parent
ca1775f725
commit
a2adbf809f
|
@ -34,7 +34,7 @@ proc addResolvedHeadBlock(
|
|||
dag: ChainDAGRef,
|
||||
state: var ForkedHashedBeaconState,
|
||||
trustedBlock: ForkyTrustedSignedBeaconBlock,
|
||||
blockVerified: bool,
|
||||
executionValid: bool,
|
||||
parent: BlockRef, cache: var StateCache,
|
||||
onBlockAdded: OnForkyBlockAdded,
|
||||
stateDataDur, sigVerifyDur, stateVerifyDur: Duration
|
||||
|
@ -46,7 +46,7 @@ proc addResolvedHeadBlock(
|
|||
let
|
||||
blockRoot = trustedBlock.root
|
||||
blockRef = BlockRef.init(
|
||||
blockRoot, executionValid = blockVerified, trustedBlock.message)
|
||||
blockRoot, executionValid = executionValid, trustedBlock.message)
|
||||
startTick = Moment.now()
|
||||
|
||||
link(parent, blockRef)
|
||||
|
@ -80,8 +80,7 @@ proc addResolvedHeadBlock(
|
|||
debug "Block resolved",
|
||||
blockRoot = shortLog(blockRoot),
|
||||
blck = shortLog(trustedBlock.message),
|
||||
blockVerified,
|
||||
heads = dag.heads.len(),
|
||||
executionValid, heads = dag.heads.len(),
|
||||
stateDataDur, sigVerifyDur, stateVerifyDur,
|
||||
putBlockDur = putBlockTick - startTick,
|
||||
epochRefDur = epochRefTick - putBlockTick
|
||||
|
@ -153,17 +152,13 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
|
|||
debug "Prepared clearance state for next block",
|
||||
next, updateStateDur = Moment.now() - startTick
|
||||
|
||||
proc addHeadBlock*(
|
||||
dag: ChainDAGRef, verifier: var BatchVerifier,
|
||||
signedBlock: ForkySignedBeaconBlock,
|
||||
blockVerified: bool,
|
||||
onBlockAdded: OnForkyBlockAdded
|
||||
): Result[BlockRef, VerifierError] =
|
||||
## Try adding a block to the chain, verifying first that it passes the state
|
||||
## transition function and contains correct cryptographic signature.
|
||||
proc checkHeadBlock*(
|
||||
dag: ChainDAGRef, signedBlock: ForkySignedBeaconBlock):
|
||||
Result[BlockRef, VerifierError] =
|
||||
## Perform pre-addHeadBlock sanity checks returning the parent to use when
|
||||
## calling `addHeadBlock`.
|
||||
##
|
||||
## Cryptographic checks can be skipped by adding skipBlsValidation to
|
||||
## dag.updateFlags
|
||||
## This function must be called before `addHeadBlockWithParent`.
|
||||
logScope:
|
||||
blockRoot = shortLog(signedBlock.root)
|
||||
blck = shortLog(signedBlock.message)
|
||||
|
@ -186,14 +181,14 @@ proc addHeadBlock*(
|
|||
debug "Duplicate block"
|
||||
return err(VerifierError.Duplicate)
|
||||
|
||||
# Block is older than finalized, but different from the block in our
|
||||
# canonical history: it must be from an unviable branch
|
||||
debug "Block from unviable fork",
|
||||
existing = shortLog(existing.get()),
|
||||
finalizedHead = shortLog(dag.finalizedHead),
|
||||
tail = shortLog(dag.tail)
|
||||
# Block is older than finalized, but different from the block in our
|
||||
# canonical history: it must be from an unviable branch
|
||||
debug "Block from unviable fork",
|
||||
existing = shortLog(existing.get()),
|
||||
finalizedHead = shortLog(dag.finalizedHead),
|
||||
tail = shortLog(dag.tail)
|
||||
|
||||
return err(VerifierError.UnviableFork)
|
||||
return err(VerifierError.UnviableFork)
|
||||
|
||||
# Check non-finalized blocks as well
|
||||
if dag.containsForkBlock(blockRoot):
|
||||
|
@ -222,6 +217,29 @@ proc addHeadBlock*(
|
|||
|
||||
return err(VerifierError.Invalid)
|
||||
|
||||
ok(parent)
|
||||
|
||||
proc addHeadBlockWithParent*(
|
||||
dag: ChainDAGRef, verifier: var BatchVerifier,
|
||||
signedBlock: ForkySignedBeaconBlock, parent: BlockRef,
|
||||
executionValid: bool, onBlockAdded: OnForkyBlockAdded
|
||||
): Result[BlockRef, VerifierError] =
|
||||
## Try adding a block to the chain, verifying first that it passes the state
|
||||
## transition function and contains correct cryptographic signature.
|
||||
##
|
||||
## Cryptographic checks can be skipped by adding skipBlsValidation to
|
||||
## dag.updateFlags.
|
||||
##
|
||||
## The parent must be obtained using `checkHeadBlock` to ensure complete
|
||||
## verification.
|
||||
logScope:
|
||||
blockRoot = shortLog(signedBlock.root)
|
||||
blck = shortLog(signedBlock.message)
|
||||
signature = shortLog(signedBlock.signature)
|
||||
|
||||
template blck(): untyped = signedBlock.message # shortcuts without copy
|
||||
template blockRoot(): untyped = signedBlock.root
|
||||
|
||||
# The block is resolved, now it's time to validate it to ensure that the
|
||||
# blocks we add to the database are clean for the given state
|
||||
let startTick = Moment.now()
|
||||
|
@ -276,7 +294,7 @@ proc addHeadBlock*(
|
|||
ok addResolvedHeadBlock(
|
||||
dag, dag.clearanceState,
|
||||
signedBlock.asTrusted(),
|
||||
blockVerified = blockVerified,
|
||||
executionValid,
|
||||
parent, cache,
|
||||
onBlockAdded,
|
||||
stateDataDur = stateDataTick - startTick,
|
||||
|
@ -286,10 +304,21 @@ proc addHeadBlock*(
|
|||
proc addHeadBlock*(
|
||||
dag: ChainDAGRef, verifier: var BatchVerifier,
|
||||
signedBlock: ForkySignedBeaconBlock,
|
||||
executionValid: bool,
|
||||
onBlockAdded: OnForkyBlockAdded
|
||||
): Result[BlockRef, VerifierError] =
|
||||
addHeadBlock(
|
||||
dag, verifier, signedBlock, blockVerified = true, onBlockAdded)
|
||||
addHeadBlockWithParent(
|
||||
dag, verifier, signedBlock, ? dag.checkHeadBlock(signedBlock),
|
||||
executionValid, onBlockAdded)
|
||||
|
||||
proc addHeadBlock*(
|
||||
dag: ChainDAGRef, verifier: var BatchVerifier,
|
||||
signedBlock: ForkySignedBeaconBlock,
|
||||
onBlockAdded: OnForkyBlockAdded
|
||||
): Result[BlockRef, VerifierError] =
|
||||
addHeadBlockWithParent(
|
||||
dag, verifier, signedBlock, ? dag.checkHeadBlock(signedBlock),
|
||||
executionValid = true, onBlockAdded)
|
||||
|
||||
proc addBackfillBlock*(
|
||||
dag: ChainDAGRef,
|
||||
|
|
|
@ -264,7 +264,7 @@ func addOrphan*(
|
|||
|
||||
if parent_root in quarantine.unviable:
|
||||
quarantine.unviable[signedBlock.root] = ()
|
||||
return ok()
|
||||
return err("block parent unviable")
|
||||
|
||||
# Even if the quarantine is full, we need to schedule its parent for
|
||||
# downloading or we'll never get to the bottom of things
|
||||
|
|
|
@ -51,10 +51,6 @@ const
|
|||
## syncing the finalized part of the chain
|
||||
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
|
||||
## Number of slots from wall time that we start processing every payload
|
||||
MAX_DEDUP_QUEUE_LEN = 16
|
||||
## Number of blocks, with FIFO discipline, against which to check queued
|
||||
## blocks before being processed to avoid spamming ELs. This should stay
|
||||
## small enough that even O(n) algorithms are reasonable.
|
||||
|
||||
type
|
||||
BlobSidecars* = seq[ref BlobSidecar]
|
||||
|
@ -111,9 +107,6 @@ type
|
|||
## The slot at which we sent a payload to the execution client the last
|
||||
## time
|
||||
|
||||
dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)]
|
||||
# Small buffer to allow for filtering of duplicate blocks in block queue
|
||||
|
||||
NewPayloadStatus {.pure.} = enum
|
||||
valid
|
||||
notValid
|
||||
|
@ -152,8 +145,6 @@ proc new*(T: type BlockProcessor,
|
|||
blobQuarantine: blobQuarantine,
|
||||
getBeaconTime: getBeaconTime,
|
||||
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
|
||||
dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)](
|
||||
initialSize = MAX_DEDUP_QUEUE_LEN)
|
||||
)
|
||||
|
||||
# Sync callbacks
|
||||
|
@ -184,7 +175,7 @@ proc dumpBlock[T](
|
|||
discard
|
||||
|
||||
from ../consensus_object_pools/block_clearance import
|
||||
addBackfillBlock, addHeadBlock
|
||||
addBackfillBlock, addHeadBlockWithParent, checkHeadBlock
|
||||
|
||||
proc storeBackfillBlock(
|
||||
self: var BlockProcessor,
|
||||
|
@ -406,12 +397,59 @@ proc storeBlock*(
|
|||
## blocks, regardless of origin, pass through here. When storing a block,
|
||||
## we will add it to the dag and pass it to all block consumers that need
|
||||
## to know about it, such as the fork choice and the monitoring
|
||||
|
||||
let
|
||||
attestationPool = self.consensusManager.attestationPool
|
||||
startTick = Moment.now()
|
||||
vm = self.validatorMonitor
|
||||
dag = self.consensusManager.dag
|
||||
wallSlot = wallTime.slotOrZero
|
||||
|
||||
# If the block is missing its parent, it will be re-orphaned below
|
||||
self.consensusManager.quarantine[].removeOrphan(signedBlock)
|
||||
# The block is certainly not missing any more
|
||||
self.consensusManager.quarantine[].missing.del(signedBlock.root)
|
||||
|
||||
if signedBlock.message.parent_root in
|
||||
self.consensusManager.quarantine[].unviable:
|
||||
# DAG doesn't know about unviable ancestor blocks - we do however!
|
||||
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
|
||||
|
||||
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
|
||||
|
||||
template handleVerifierError(errorParam: VerifierError): auto =
|
||||
let error = errorParam
|
||||
case error
|
||||
of VerifierError.MissingParent:
|
||||
if (let r = self.consensusManager.quarantine[].addOrphan(
|
||||
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock));
|
||||
r.isErr()):
|
||||
debug "could not add orphan",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
blck = shortLog(signedBlock.message),
|
||||
signature = shortLog(signedBlock.signature),
|
||||
err = r.error()
|
||||
else:
|
||||
debug "Block quarantined",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
blck = shortLog(signedBlock.message),
|
||||
signature = shortLog(signedBlock.signature)
|
||||
|
||||
of VerifierError.UnviableFork:
|
||||
# Track unviables so that descendants can be discarded promptly
|
||||
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
|
||||
else:
|
||||
discard
|
||||
|
||||
err((error, ProcessingStatus.completed))
|
||||
|
||||
let
|
||||
parent = dag.checkHeadBlock(signedBlock)
|
||||
|
||||
if parent.isErr():
|
||||
return handleVerifierError(parent.error())
|
||||
|
||||
let
|
||||
payloadStatus =
|
||||
if maybeFinalized and
|
||||
(self.lastPayload + SLOTS_PER_PAYLOAD) > signedBlock.message.slot and
|
||||
|
@ -431,9 +469,6 @@ proc storeBlock*(
|
|||
NewPayloadStatus.valid # vacuously
|
||||
payloadValid = payloadStatus == NewPayloadStatus.valid
|
||||
|
||||
# The block is certainly not missing any more
|
||||
self.consensusManager.quarantine[].missing.del(signedBlock.root)
|
||||
|
||||
if NewPayloadStatus.invalid == payloadStatus:
|
||||
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
|
||||
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
|
||||
|
@ -461,11 +496,6 @@ proc storeBlock*(
|
|||
else:
|
||||
discard
|
||||
|
||||
# We'll also remove the block as an orphan: it's unlikely the parent is
|
||||
# missing if we get this far - should that be the case, the block will
|
||||
# be re-added later
|
||||
self.consensusManager.quarantine[].removeOrphan(signedBlock)
|
||||
|
||||
# TODO with v1.4.0, not sure this is still relevant
|
||||
# Establish blob viability before calling addHeadBlock to avoid
|
||||
# writing the block in case of blob error.
|
||||
|
@ -486,28 +516,31 @@ proc storeBlock*(
|
|||
return err((VerifierError.Invalid, ProcessingStatus.completed))
|
||||
|
||||
type Trusted = typeof signedBlock.asTrusted()
|
||||
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
|
||||
|
||||
let
|
||||
blck = dag.addHeadBlockWithParent(
|
||||
self.verifier, signedBlock, parent.value(), payloadValid) do (
|
||||
blckRef: BlockRef, trustedBlock: Trusted,
|
||||
epochRef: EpochRef, unrealized: FinalityCheckpoints):
|
||||
# Callback add to fork choice if valid
|
||||
attestationPool[].addForkChoice(
|
||||
epochRef, blckRef, unrealized, trustedBlock.message, wallTime)
|
||||
# Callback add to fork choice if valid
|
||||
attestationPool[].addForkChoice(
|
||||
epochRef, blckRef, unrealized, trustedBlock.message, wallTime)
|
||||
|
||||
vm[].registerBeaconBlock(
|
||||
src, wallTime, trustedBlock.message)
|
||||
vm[].registerBeaconBlock(
|
||||
src, wallTime, trustedBlock.message)
|
||||
|
||||
for attestation in trustedBlock.message.body.attestations:
|
||||
for validator_index in dag.get_attesting_indices(attestation):
|
||||
vm[].registerAttestationInBlock(attestation.data, validator_index,
|
||||
trustedBlock.message.slot)
|
||||
for attestation in trustedBlock.message.body.attestations:
|
||||
for validator_index in dag.get_attesting_indices(attestation):
|
||||
vm[].registerAttestationInBlock(attestation.data, validator_index,
|
||||
trustedBlock.message.slot)
|
||||
|
||||
withState(dag[].clearanceState):
|
||||
when consensusFork >= ConsensusFork.Altair and
|
||||
Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+
|
||||
for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices():
|
||||
vm[].registerSyncAggregateInBlock(
|
||||
trustedBlock.message.slot, trustedBlock.root,
|
||||
forkyState.data.current_sync_committee.pubkeys.data[i])
|
||||
withState(dag[].clearanceState):
|
||||
when consensusFork >= ConsensusFork.Altair and
|
||||
Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+
|
||||
for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices():
|
||||
vm[].registerSyncAggregateInBlock(
|
||||
trustedBlock.message.slot, trustedBlock.root,
|
||||
forkyState.data.current_sync_committee.pubkeys.data[i])
|
||||
|
||||
self[].dumpBlock(signedBlock, blck)
|
||||
|
||||
|
@ -515,34 +548,13 @@ proc storeBlock*(
|
|||
# However this block was before the last finalized epoch and so its parent
|
||||
# was pruned from the ForkChoice.
|
||||
if blck.isErr():
|
||||
case blck.error()
|
||||
of VerifierError.MissingParent:
|
||||
if signedBlock.message.parent_root in
|
||||
self.consensusManager.quarantine[].unviable:
|
||||
# DAG doesn't know about unviable ancestor blocks - we do! Translate
|
||||
# this to the appropriate error so that sync etc doesn't retry the block
|
||||
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
|
||||
return handleVerifierError(blck.error())
|
||||
|
||||
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
|
||||
|
||||
if (let r = self.consensusManager.quarantine[].addOrphan(
|
||||
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock));
|
||||
r.isErr()):
|
||||
debug "storeBlock: could not add orphan",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
blck = shortLog(signedBlock.message),
|
||||
signature = shortLog(signedBlock.signature),
|
||||
err = r.error()
|
||||
of VerifierError.UnviableFork:
|
||||
# Track unviables so that descendants can be discarded properly
|
||||
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
|
||||
else: discard
|
||||
|
||||
return err((blck.error, ProcessingStatus.completed))
|
||||
|
||||
if payloadStatus in {NewPayloadStatus.valid, NewPayloadStatus.notValid}:
|
||||
# If the EL responded at all, we don't need to try again for a while
|
||||
self[].lastPayload = signedBlock.message.slot
|
||||
# Even if the EL is not responding, we'll only try once every now and then
|
||||
# to give it a block - this avoids a pathological slowdown where a busy EL
|
||||
# times out on every block we give it because it's busy with the previous
|
||||
# one
|
||||
self[].lastPayload = signedBlock.message.slot
|
||||
|
||||
# write blobs now that block has been written.
|
||||
let blobs = blobsOpt.valueOr: BlobSidecars @[]
|
||||
|
@ -666,12 +678,16 @@ proc storeBlock*(
|
|||
beacon_store_block_duration_seconds.observe(storeBlockDur.toFloatSeconds())
|
||||
|
||||
debug "Block processed",
|
||||
localHeadSlot = dag.head.slot,
|
||||
blockSlot = blck.get().slot,
|
||||
head = shortLog(dag.head),
|
||||
blck = shortLog(blck.get()),
|
||||
validationDur, queueDur, storeBlockDur, updateHeadDur
|
||||
|
||||
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
|
||||
# Process the blocks that had the newly accepted block as parent
|
||||
debug "Block from quarantine",
|
||||
blockRoot = shortLog(signedBlock.root),
|
||||
quarantined = shortLog(quarantined.root)
|
||||
|
||||
withBlck(quarantined):
|
||||
when typeof(blck).toFork() < ConsensusFork.Deneb:
|
||||
self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
|
||||
|
@ -694,7 +710,7 @@ proc storeBlock*(
|
|||
blockRoot = shortLog(quarantined.root),
|
||||
signature = shortLog(quarantined.signature)
|
||||
|
||||
return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get
|
||||
ok blck.value()
|
||||
|
||||
# Enqueue
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -734,19 +750,6 @@ proc addBlock*(
|
|||
except AsyncQueueFullError:
|
||||
raiseAssert "unbounded queue"
|
||||
|
||||
# Dedup
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool =
|
||||
let key = (entry.blck.root, entry.blck.signature)
|
||||
if self.dupBlckBuf.contains key:
|
||||
return true
|
||||
doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN
|
||||
if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN:
|
||||
self.dupBlckBuf.shrink(fromFirst = 1)
|
||||
self.dupBlckBuf.addLast key
|
||||
false
|
||||
|
||||
# Event Loop
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
|
@ -763,12 +766,6 @@ proc processBlock(
|
|||
error "Processing block before genesis, clock turned back?"
|
||||
quit 1
|
||||
|
||||
if self.checkDuplicateBlocks(entry):
|
||||
if entry.resfut != nil:
|
||||
entry.resfut.complete(Result[void, VerifierError].err(
|
||||
VerifierError.Duplicate))
|
||||
return
|
||||
|
||||
let res = withBlck(entry.blck):
|
||||
await self.storeBlock(
|
||||
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
|
||||
|
|
|
@ -493,7 +493,11 @@ proc validateBeaconBlock*(
|
|||
blockRoot = shortLog(signed_beacon_block.root),
|
||||
blck = shortLog(signed_beacon_block.message),
|
||||
err = r.error()
|
||||
|
||||
else:
|
||||
debug "Block quarantined",
|
||||
blockRoot = shortLog(signed_beacon_block.root),
|
||||
blck = shortLog(signed_beacon_block.message),
|
||||
signature = shortLog(signed_beacon_block.signature)
|
||||
return errIgnore("BeaconBlock: Parent not found")
|
||||
|
||||
# Continues block parent validity checking in optimistic case, where it does
|
||||
|
|
Loading…
Reference in New Issue