692 lines
28 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
stew/results,
chronicles, chronos, metrics,
../spec/signatures_batch,
../sszdump
from ../consensus_object_pools/consensus_manager import
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
runForkchoiceUpdated, runProposalForkchoiceUpdated, shouldSyncOptimistically,
updateHead, updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
EpochRef, VerifierError
from ../consensus_object_pools/block_quarantine import
addOrphan, addUnviable, pop, removeOrphan
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
from ../spec/datatypes/eip4844 import BlobsSidecar
from ../spec/state_transition_block import validate_blobs_sidecar
export sszdump, signatures_batch
# Block Processor
# ------------------------------------------------------------------------------
# The block processor moves blocks from "Incoming" to "Consensus verified"
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
const
SLOTS_PER_PAYLOAD = SLOTS_PER_HISTORICAL_ROOT
## Number of slots we process between each execution payload execution, while
## syncing the finalized part of the chain
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
## Number of slots from wall time that we start processing every payload
type
BlockEntry* = object
blck*: ForkedSignedBeaconBlock
blobs*: Opt[eip4844.BlobsSidecar]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
BlockProcessor* = object
## This manages the processing of blocks from different sources
## Blocks and attestations are enqueued in a gossip-validated state
##
## from:
## - Gossip (when synced)
## - SyncManager (during sync)
## - RequestManager (missing ancestor blocks)
##
## are then consensus-verified and added to:
## - the blockchain DAG
## - database
## - attestation pool
## - fork choice
##
## The processor will also reinsert blocks from the quarantine, should a
## parent be found.
# Config
# ----------------------------------------------------------------
dumpEnabled: bool
dumpDirInvalid: string
dumpDirIncoming: string
# Producers
# ----------------------------------------------------------------
blockQueue: AsyncQueue[BlockEntry]
# Consumer
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
## Blockchain DAG, AttestationPool, Quarantine, and Eth1Manager
validatorMonitor: ref ValidatorMonitor
getBeaconTime: GetBeaconTimeFn
verifier: BatchVerifier
lastPayload: Slot
## The slot at which we sent a payload to the execution client the last
## time
NewPayloadStatus {.pure.} = enum
valid
notValid
invalid
noResponse
ProcessingStatus {.pure.} = enum
completed
notCompleted
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
resfut: Future[Result[void, VerifierError]] = nil,
maybeFinalized = false,
validationDur = Duration())
# Initialization
# ------------------------------------------------------------------------------
proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
)
# Sync callbacks
# ------------------------------------------------------------------------------
func hasBlocks*(self: BlockProcessor): bool =
self.blockQueue.len() > 0
# Storage
# ------------------------------------------------------------------------------
proc dumpInvalidBlock*(
self: BlockProcessor, signedBlock: ForkySignedBeaconBlock) =
if self.dumpEnabled:
dump(self.dumpDirInvalid, signedBlock)
proc dumpBlock[T](
self: BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
res: Result[T, VerifierError]) =
if self.dumpEnabled and res.isErr:
case res.error
of VerifierError.Invalid:
self.dumpInvalidBlock(signedBlock)
of VerifierError.MissingParent:
dump(self.dumpDirIncoming, signedBlock)
else:
discard
from ../consensus_object_pools/block_clearance import
addBackfillBlock, addHeadBlock
from ../beacon_chain_db import putBlobsSidecar
proc storeBackfillBlock(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar]): Result[void, VerifierError] =
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
# Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error.
let blobsOk =
when typeof(signedBlock).toFork() >= ConsensusFork.EIP4844:
blobs.isNone or
validate_blobs_sidecar(signedBlock.message.slot,
signedBlock.root,
signedBlock.message
.body.blob_kzg_commitments.asSeq,
blobs.get()).isOk()
else:
true
if not blobsOk:
return err(VerifierError.Invalid)
let res = self.consensusManager.dag.addBackfillBlock(signedBlock)
if res.isErr():
case res.error
of VerifierError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err(VerifierError.UnviableFork)
of VerifierError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return res
if blobs.isSome():
# Only store blobs after successfully establishing block viability.
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
res
from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1
from ../eth1/eth1_monitor import
Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload
proc expectValidForkchoiceUpdated(
eth1Monitor: Eth1Monitor,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async.} =
let
(payloadExecutionStatus, _) = await eth1Monitor.runForkchoiceUpdated(
headBlockHash, safeBlockHash, finalizedBlockHash)
receivedExecutionBlockHash =
when typeof(receivedBlock).toFork >= ConsensusFork.Bellatrix:
receivedBlock.message.body.execution_payload.block_hash
else:
# https://github.com/nim-lang/Nim/issues/19802
(static(default(Eth2Digest)))
# Only called when expecting this to be valid because `newPayload` or some
# previous `forkchoiceUpdated` had already marked it as valid. However, if
# it's not the block that was received, don't info/warn either way given a
# relative lack of immediate evidence.
if receivedExecutionBlockHash != headBlockHash:
return
case payloadExecutionStatus
of PayloadExecutionStatus.valid:
# situation nominal
discard
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
info "execution payload forkChoiceUpdated status ACCEPTED/SYNCING, but was previously VALID",
payloadExecutionStatus = $payloadExecutionStatus, headBlockHash,
safeBlockHash, finalizedBlockHash,
receivedBlock = shortLog(receivedBlock)
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
warn "execution payload forkChoiceUpdated status INVALID, but was previously VALID",
payloadExecutionStatus = $payloadExecutionStatus, headBlockHash,
safeBlockHash, finalizedBlockHash,
receivedBlock = shortLog(receivedBlock)
from ../consensus_object_pools/attestation_pool import
addForkChoice, selectOptimisticHead, BeaconHead
from ../consensus_object_pools/blockchain_dag import
is_optimistic, loadExecutionBlockRoot, markBlockVerified
from ../consensus_object_pools/spec_cache import get_attesting_indices
from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock
from ../spec/datatypes/altair import SignedBeaconBlock
from eth/async_utils import awaitWithTimeout
from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
from ../spec/datatypes/capella import
ExecutionPayload, SignedBeaconBlock, asTrusted, shortLog
proc newExecutionPayload*(
eth1Monitor: Eth1Monitor,
executionPayload: bellatrix.ExecutionPayload | capella.ExecutionPayload |
eip4844.ExecutionPayload):
Future[Opt[PayloadExecutionStatus]] {.async.} =
if eth1Monitor.isNil:
return Opt.none PayloadExecutionStatus
debug "newPayload: inserting block into execution engine",
executionPayload = shortLog(executionPayload)
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.2/src/engine/paris.md#request
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.2/src/engine/shanghai.md#request
const NEWPAYLOAD_TIMEOUT = 8.seconds
try:
let
payloadResponse =
awaitWithTimeout(
eth1Monitor.newPayload(
executionPayload.asEngineExecutionPayload),
NEWPAYLOAD_TIMEOUT):
info "newPayload: newPayload timed out"
return Opt.none PayloadExecutionStatus
# Placeholder for type system
PayloadStatusV1(status: PayloadExecutionStatus.syncing)
payloadStatus = payloadResponse.status
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
blockNumber = executionPayload.block_number,
payloadStatus = $payloadStatus
return Opt.some payloadStatus
except CatchableError as err:
warn "newPayload failed - check execution client",
msg = err.msg,
parentHash = shortLog(executionPayload.parent_hash),
blockHash = shortLog(executionPayload.block_hash),
blockNumber = executionPayload.block_number
return Opt.none PayloadExecutionStatus
# TODO investigate why this seems to allow compilation even though it doesn't
# directly address eip4844.ExecutionPayload when complaint was that it didn't
# know about "eip4844"
from ../spec/datatypes/eip4844 import SignedBeaconBlock, asTrusted, shortLog
proc getExecutionValidity(
eth1Monitor: Eth1Monitor,
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
eip4844.SignedBeaconBlock):
Future[NewPayloadStatus] {.async.} =
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously
if eth1Monitor.isNil:
return NewPayloadStatus.noResponse
try:
# Minimize window for Eth1 monitor to shut down connection
await eth1Monitor.ensureDataProvider()
let executionPayloadStatus = await newExecutionPayload(
eth1Monitor, blck.message.body.execution_payload)
if executionPayloadStatus.isNone:
return NewPayloadStatus.noResponse
case executionPayloadStatus.get
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
debug "getExecutionValidity: execution payload invalid",
executionPayloadStatus = $executionPayloadStatus.get,
blck = shortLog(blck)
return NewPayloadStatus.invalid
of PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted:
return NewPayloadStatus.notValid
of PayloadExecutionStatus.valid:
return NewPayloadStatus.valid
except CatchableError as err:
error "getExecutionValidity: newPayload failed", err = err.msg
return NewPayloadStatus.noResponse
proc storeBlock*(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
maybeFinalized = false,
queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need
## to know about it, such as the fork choice and the monitoring
let
attestationPool = self.consensusManager.attestationPool
startTick = Moment.now()
vm = self.validatorMonitor
dag = self.consensusManager.dag
wallSlot = wallTime.slotOrZero
payloadStatus =
if maybeFinalized and
(self.lastPayload + SLOTS_PER_PAYLOAD) > signedBlock.message.slot and
(signedBlock.message.slot + PAYLOAD_PRE_WALL_SLOTS) < wallSlot:
# Skip payload validation when message source (reasonably) claims block
# has been finalized - this speeds up forward sync - in the worst case
# that the claim is false, we will correct every time we process a block
# from an honest source (or when we're close to head).
# Occasionally we also send a payload to the the EL so that it can
# progress in its own sync.
NewPayloadStatus.noResponse
else:
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
await self.consensusManager.eth1Monitor.getExecutionValidity(signedBlock)
else:
NewPayloadStatus.valid # vacuously
payloadValid = payloadStatus == NewPayloadStatus.valid
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
if NewPayloadStatus.invalid == payloadStatus:
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
if NewPayloadStatus.noResponse == payloadStatus:
# When the execution layer is not available to verify the payload, we do the
# required check on the CL side instead and proceed as if the EL was syncing
# Client software MUST validate blockHash value as being equivalent to
# Keccak256(RLP(ExecutionBlockHeader))
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.2/src/engine/paris.md#specification
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
template payload(): auto = signedBlock.message.body.execution_payload
if payload.block_hash != payload.compute_execution_block_hash():
debug "Execution block hash validation failed",
execution_payload = shortLog(payload)
doAssert strictVerification notin dag.updateFlags
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.Invalid, ProcessingStatus.completed))
else:
discard
# We'll also remove the block as an orphan: it's unlikely the parent is
# missing if we get this far - should that be the case, the block will
# be re-added later
self.consensusManager.quarantine[].removeOrphan(signedBlock)
# Establish blob viability before calling addHeadBlock to avoid
# writing the block in case of blob error.
when typeof(signedBlock).toFork() >= ConsensusFork.EIP4844:
if blobs.isSome():
let res = validate_blobs_sidecar(signedBlock.message.slot,
signedBlock.root,
signedBlock.message
.body.blob_kzg_commitments.asSeq,
blobs.get())
if res.isErr():
debug "blobs sidecar validation failed", err = res.error()
return err((VerifierError.Invalid, ProcessingStatus.completed))
type Trusted = typeof signedBlock.asTrusted()
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
blckRef: BlockRef, trustedBlock: Trusted,
epochRef: EpochRef, unrealized: FinalityCheckpoints):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(
epochRef, blckRef, unrealized, trustedBlock.message, wallTime)
vm[].registerBeaconBlock(
src, wallTime, trustedBlock.message)
for attestation in trustedBlock.message.body.attestations:
for validator_index in dag.get_attesting_indices(attestation):
vm[].registerAttestationInBlock(attestation.data, validator_index,
trustedBlock.message.slot)
withState(dag[].clearanceState):
when stateFork >= ConsensusFork.Altair and
Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+
for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices():
vm[].registerSyncAggregateInBlock(
trustedBlock.message.slot, trustedBlock.root,
forkyState.data.current_sync_committee.pubkeys.data[i])
self[].dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr():
case blck.error()
of VerifierError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
if not self.consensusManager.quarantine[].addOrphan(
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock), blobs):
debug "Block quarantine full",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature)
of VerifierError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return err((blck.error, ProcessingStatus.completed))
if payloadStatus in {NewPayloadStatus.valid, NewPayloadStatus.notValid}:
# If the EL responded at all, we don't need to try again for a while
self[].lastPayload = signedBlock.message.slot
# write blobs now that block has been written.
if blobs.isSome():
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
let storeBlockTick = Moment.now()
# Eagerly update head: the incoming block "should" get selected.
#
# storeBlock gets called from validator_duties, which depends on its not
# blocking progress any longer than necessary, and processBlock here, in
# which case it's fine to await for a while on engine API results.
#
# Three general scenarios: (1) pre-merge; (2) merge, already `VALID` by way
# of `newPayload`; (3) optimistically imported, need to call fcU before DAG
# updateHead. Because in a non-finalizing network, completing sync isn't as
# useful because regular reorgs likely still occur, and when finalizing the
# EL is only called every SLOTS_PER_PAYLOAD slots regardless, await, rather
# than asyncSpawn forkchoiceUpdated calls.
#
# This reduces in-flight fcU spam, which both reduces EL load and decreases
# otherwise somewhat unpredictable CL head movement.
if payloadValid:
self.consensusManager.dag.markBlockVerified(
self.consensusManager.quarantine[], signedBlock.root)
# Grab the new head according to our latest attestation data; determines how
# async this needs to be.
let newHead = attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time)
if newHead.isOk:
template eth1Monitor(): auto = self.consensusManager.eth1Monitor
if self.consensusManager[].shouldSyncOptimistically(wallSlot):
# Optimistic head is far in the future; report it as head block to EL.
# Note that the specification allows an EL client to skip fcU processing
# if an update to an ancestor is requested.
# > Client software MAY skip an update of the forkchoice state and MUST
# NOT begin a payload build process if `forkchoiceState.headBlockHash`
# references an ancestor of the head of canonical chain.
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.2/src/engine/paris.md#specification-1
#
# However, in practice, an EL client may not have completed importing all
# block headers, so may be unaware of a block's ancestor status.
# Therefore, hopping back and forth between the optimistic head and the
# chain DAG head does not work well in practice, e.g., Geth:
# - "Beacon chain gapped" from DAG head to optimistic head,
# - followed by "Beacon chain reorged" from optimistic head back to DAG.
self.consensusManager[].updateHead(newHead.get.blck)
discard await eth1Monitor.runForkchoiceUpdated(
headBlockHash = self.consensusManager[].optimisticExecutionPayloadHash,
safeBlockHash = newHead.get.safeExecutionPayloadHash,
finalizedBlockHash = newHead.get.finalizedExecutionPayloadHash)
else:
let
headExecutionPayloadHash =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero
if headExecutionPayloadHash.isZero or
NewPayloadStatus.noResponse == payloadStatus:
# Blocks without execution payloads can't be optimistic, and don't try
# to fcU to a block the EL hasn't seen
self.consensusManager[].updateHead(newHead.get.blck)
elif not self.consensusManager.dag.is_optimistic newHead.get.blck.root:
# Not `NOT_VALID`; either `VALID` or `INVALIDATED`, but latter wouldn't
# be selected as head, so `VALID`. `forkchoiceUpdated` necessary for EL
# client only.
self.consensusManager[].updateHead(newHead.get.blck)
if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
await eth1Monitor.expectValidForkchoiceUpdated(
headBlockHash = headExecutionPayloadHash,
safeBlockHash = newHead.get.safeExecutionPayloadHash,
finalizedBlockHash = newHead.get.finalizedExecutionPayloadHash,
receivedBlock = signedBlock)
else:
# Some attached validator is next proposer, so prepare payload. As
# updateHead() updated the DAG head, runProposalForkchoiceUpdated,
# which needs the state corresponding to that head block, can run.
await self.consensusManager.runProposalForkchoiceUpdated(
wallSlot)
else:
await self.consensusManager.updateHeadWithExecution(
newHead.get, self.getBeaconTime)
else:
warn "Head selection failed, using previous head",
head = shortLog(self.consensusManager.dag.head), wallSlot
let
updateHeadTick = Moment.now()
queueDur = startTick - queueTick
storeBlockDur = storeBlockTick - startTick
updateHeadDur = updateHeadTick - storeBlockTick
beacon_store_block_duration_seconds.observe(storeBlockDur.toFloatSeconds())
debug "Block processed",
localHeadSlot = self.consensusManager.dag.head.slot,
blockSlot = blck.get().slot,
validationDur, queueDur, storeBlockDur, updateHeadDur
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self[].addBlock(MsgSource.gossip, quarantined[0], quarantined[1])
return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
resfut: Future[Result[void, VerifierError]] = nil,
maybeFinalized = false,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
# constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
withBlck(blck):
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.storeBackfillBlock(blck, blobs)
if resfut != nil:
resfut.complete(res)
return
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
blobs: blobs,
maybeFinalized: maybeFinalized,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
src: src))
except AsyncQueueFullError:
raiseAssert "unbounded queue"
# Event Loop
# ------------------------------------------------------------------------------
proc processBlock(
self: ref BlockProcessor, entry: BlockEntry) {.async.} =
logScope:
blockRoot = shortLog(entry.blck.root)
let
wallTime = self.getBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing block before genesis, clock turned back?"
quit 1
let res = withBlck(entry.blck):
await self.storeBlock(
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
entry.queueTick, entry.validationDur)
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
# When an execution engine returns an error or fails to respond to a
# payload validity request for some block, a consensus engine:
# - MUST NOT optimistically import the block.
# - MUST NOT apply the block to the fork choice store.
# - MAY queue the block for later processing.
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.2/sync/optimistic.md#execution-engine-errors
await sleepAsync(chronos.seconds(1))
self[].addBlock(
entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized,
entry.validationDur)
# To ensure backpressure on the sync manager, do not complete these futures.
return
if entry.resfut != nil:
entry.resfut.complete(
if res.isOk(): Result[void, VerifierError].ok()
else: Result[void, VerifierError].err(res.error()[0]))
proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
while true:
# Cooperative concurrency: one block per loop iteration - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case - doing so also allows us to benefit from more batching /
# larger network reads when under load.
idleTimeout = 10.milliseconds
discard await idleAsync().withTimeout(idleTimeout)
await self.processBlock(await self[].blockQueue.popFirst())