nimbus-eth2/beacon_chain/gossip_processing/block_processor.nim

813 lines
34 KiB
Nim
Raw Normal View History

# beacon_chain
# Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
stew/results,
chronicles, chronos, metrics,
../spec/[signatures, signatures_batch],
disentangle eth2 types from the ssz library (#2785) * reorganize ssz dependencies This PR continues the work in https://github.com/status-im/nimbus-eth2/pull/2646, https://github.com/status-im/nimbus-eth2/pull/2779 as well as past issues with serialization and type, to disentangle SSZ from eth2 and at the same time simplify imports and exports with a structured approach. The principal idea here is that when a library wants to introduce SSZ support, they do so via 3 files: * `ssz_codecs` which imports and reexports `codecs` - this covers the basic byte conversions and ensures no overloads get lost * `xxx_merkleization` imports and exports `merkleization` to specialize and get access to `hash_tree_root` and friends * `xxx_ssz_serialization` imports and exports `ssz_serialization` to specialize ssz for a specific library Those that need to interact with SSZ always import the `xxx_` versions of the modules and never `ssz` itself so as to keep imports simple and safe. This is similar to how the REST / JSON-RPC serializers are structured in that someone wanting to serialize spec types to REST-JSON will import `eth2_rest_serialization` and nothing else. * split up ssz into a core library that is independendent of eth2 types * rename `bytes_reader` to `codec` to highlight that it contains coding and decoding of bytes and native ssz types * remove tricky List init overload that causes compile issues * get rid of top-level ssz import * reenable merkleization tests * move some "standard" json serializers to spec * remove `ValidatorIndex` serialization for now * remove test_ssz_merkleization * add tests for over/underlong byte sequences * fix broken seq[byte] test - seq[byte] is not an SSZ type There are a few things this PR doesn't solve: * like #2646 this PR is weak on how to handle root and other dontSerialize fields that "sometimes" should be computed - the same problem appears in REST / JSON-RPC etc * Fix a build problem on macOS * Another way to fix the macOS builds Co-authored-by: Zahary Karadjov <zahary@gmail.com>
2021-08-18 18:57:58 +00:00
../sszdump
from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
from std/sequtils import mapIt
from ../consensus_object_pools/consensus_manager import
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead,
updateHeadWithExecution
from ../consensus_object_pools/blockchain_dag import
getBlockRef, getProposer, forkAtEpoch, loadExecutionBlockHash,
markBlockVerified, validatorKey
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
EpochRef, VerifierError
from ../consensus_object_pools/block_quarantine import
addBlobless, addOrphan, addUnviable, pop, removeOrphan
from ../consensus_object_pools/blob_quarantine import
BlobQuarantine, hasBlobs, popBlobs
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
from ../beacon_chain_db import putBlobSidecar
from ../spec/state_transition_block import validate_blobs
export sszdump, signatures_batch
# Block Processor
# ------------------------------------------------------------------------------
# The block processor moves blocks from "Incoming" to "Consensus verified"
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
const
SLOTS_PER_PAYLOAD = SLOTS_PER_HISTORICAL_ROOT
## Number of slots we process between each execution payload execution, while
## syncing the finalized part of the chain
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
## Number of slots from wall time that we start processing every payload
MAX_DEDUP_QUEUE_LEN = 16
## Number of blocks, with FIFO discipline, against which to check queued
## blocks before being processed to avoid spamming ELs. This should stay
## small enough that even O(n) algorithms are reasonable.
type
BlobSidecars* = seq[ref BlobSidecar]
BlockEntry = object
blck*: ForkedSignedBeaconBlock
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
BlockProcessor* = object
## This manages the processing of blocks from different sources
## Blocks and attestations are enqueued in a gossip-validated state
##
## from:
## - Gossip (when synced)
## - SyncManager (during sync)
## - RequestManager (missing ancestor blocks)
##
## are then consensus-verified and added to:
## - the blockchain DAG
## - database
## - attestation pool
## - fork choice
##
## The processor will also reinsert blocks from the quarantine, should a
## parent be found.
# Config
# ----------------------------------------------------------------
dumpEnabled: bool
dumpDirInvalid: string
dumpDirIncoming: string
# Producers
# ----------------------------------------------------------------
blockQueue: AsyncQueue[BlockEntry]
# Consumer
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
## Blockchain DAG, AttestationPool, Quarantine, and ELManager
validatorMonitor: ref ValidatorMonitor
getBeaconTime: GetBeaconTimeFn
blobQuarantine: ref BlobQuarantine
verifier: BatchVerifier
lastPayload: Slot
## The slot at which we sent a payload to the execution client the last
## time
dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)]
# Small buffer to allow for filtering of duplicate blocks in block queue
NewPayloadStatus {.pure.} = enum
valid
notValid
invalid
noResponse
ProcessingStatus {.pure.} = enum
completed
notCompleted
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
maybeFinalized = false,
validationDur = Duration())
# Initialization
# ------------------------------------------------------------------------------
proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
2022-06-21 08:29:16 +00:00
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)](
initialSize = MAX_DEDUP_QUEUE_LEN)
)
# Sync callbacks
# ------------------------------------------------------------------------------
func hasBlocks*(self: BlockProcessor): bool =
self.blockQueue.len() > 0
# Storage
# ------------------------------------------------------------------------------
proc dumpInvalidBlock*(
self: BlockProcessor, signedBlock: ForkySignedBeaconBlock) =
if self.dumpEnabled:
dump(self.dumpDirInvalid, signedBlock)
proc dumpBlock[T](
self: BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
res: Result[T, VerifierError]) =
if self.dumpEnabled and res.isErr:
case res.error
of VerifierError.Invalid:
self.dumpInvalidBlock(signedBlock)
of VerifierError.MissingParent:
dump(self.dumpDirIncoming, signedBlock)
else:
discard
from ../consensus_object_pools/block_clearance import
addBackfillBlock, addHeadBlock
proc storeBackfillBlock(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
# Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error.
var blobsOk = true
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
if blobsOpt.isSome:
let blobs = blobsOpt.get()
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
if blobs.len > 0 or kzgCommits.len > 0:
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
blobs.mapIt(it.kzg_proof))
if r.isErr():
debug "backfill blob validation failed",
blockRoot = shortLog(signedBlock.root),
blobs = shortLog(blobs),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature),
msg = r.error()
blobsOk = r.isOk()
if not blobsOk:
return err(VerifierError.Invalid)
let res = self.consensusManager.dag.addBackfillBlock(signedBlock)
if res.isErr():
case res.error
of VerifierError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err(VerifierError.UnviableFork)
of VerifierError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return res
# Only store blobs after successfully establishing block viability.
let blobs = blobsOpt.valueOr: BlobSidecars @[]
for b in blobs:
self.consensusManager.dag.db.putBlobSidecar(b[])
res
from web3/engine_api_types import
PayloadAttributesV1, PayloadAttributesV2, PayloadExecutionStatus,
PayloadStatusV1
from ../el/el_manager import
ELManager, forkchoiceUpdated, hasConnection, hasProperlyConfiguredConnection,
sendNewPayload
proc expectValidForkchoiceUpdated(
elManager: ELManager, headBlockPayloadAttributesType: typedesc,
headBlockHash, safeBlockHash, finalizedBlockHash: Eth2Digest,
receivedBlock: ForkySignedBeaconBlock): Future[void] {.async.} =
let
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
(payloadExecutionStatus, _) = await elManager.forkchoiceUpdated(
headBlockHash = headBlockHash,
safeBlockHash = safeBlockHash,
finalizedBlockHash = finalizedBlockHash,
payloadAttributes = none headBlockPayloadAttributesType)
receivedExecutionBlockHash =
when typeof(receivedBlock).toFork >= ConsensusFork.Bellatrix:
receivedBlock.message.body.execution_payload.block_hash
else:
# https://github.com/nim-lang/Nim/issues/19802
(static(default(Eth2Digest)))
# Only called when expecting this to be valid because `newPayload` or some
# previous `forkchoiceUpdated` had already marked it as valid. However, if
# it's not the block that was received, don't info/warn either way given a
# relative lack of immediate evidence.
if receivedExecutionBlockHash != headBlockHash:
return
case payloadExecutionStatus
of PayloadExecutionStatus.valid:
# situation nominal
discard
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
info "execution payload forkChoiceUpdated status ACCEPTED/SYNCING, but was previously VALID",
payloadExecutionStatus = $payloadExecutionStatus, headBlockHash,
safeBlockHash, finalizedBlockHash,
receivedBlock = shortLog(receivedBlock)
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
warn "execution payload forkChoiceUpdated status INVALID, but was previously VALID",
payloadExecutionStatus = $payloadExecutionStatus, headBlockHash,
safeBlockHash, finalizedBlockHash,
receivedBlock = shortLog(receivedBlock)
from ../consensus_object_pools/attestation_pool import
addForkChoice, selectOptimisticHead, BeaconHead
from ../consensus_object_pools/spec_cache import get_attesting_indices
from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock
from ../spec/datatypes/altair import SignedBeaconBlock
from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
from ../spec/datatypes/capella import
ExecutionPayload, SignedBeaconBlock, asTrusted, shortLog
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
# TODO investigate why this seems to allow compilation even though it doesn't
# directly address deneb.ExecutionPayload when complaint was that it didn't
# know about "deneb"
from ../spec/datatypes/deneb import SignedBeaconBlock, asTrusted, shortLog
proc newExecutionPayload*(
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
elManager: ELManager,
blockBody: SomeForkyBeaconBlockBody):
Future[Opt[PayloadExecutionStatus]] {.async.} =
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
template executionPayload: untyped = blockBody.execution_payload
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
if not elManager.hasProperlyConfiguredConnection:
if elManager.hasConnection:
info "No execution client connected; cannot process block payloads",
executionPayload = shortLog(executionPayload)
else:
debug "No execution client connected; cannot process block payloads",
executionPayload = shortLog(executionPayload)
return Opt.none PayloadExecutionStatus
debug "newPayload: inserting block into execution engine",
executionPayload = shortLog(executionPayload)
try:
let payloadStatus = await elManager.sendNewPayload(blockBody)
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
blockNumber = executionPayload.block_number,
payloadStatus = $payloadStatus
return Opt.some payloadStatus
except CatchableError as err:
warn "newPayload failed - check execution client",
msg = err.msg,
parentHash = shortLog(executionPayload.parent_hash),
blockHash = shortLog(executionPayload.block_hash),
blockNumber = executionPayload.block_number
return Opt.none PayloadExecutionStatus
proc getExecutionValidity(
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
elManager: ELManager,
blck: bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
deneb.SignedBeaconBlock):
Future[NewPayloadStatus] {.async.} =
if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously
try:
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
let executionPayloadStatus = await elManager.newExecutionPayload(
blck.message.body)
if executionPayloadStatus.isNone:
return NewPayloadStatus.noResponse
case executionPayloadStatus.get
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
# Blocks come either from gossip or request manager requests. In the
# former case, they've passed libp2p gosisp validation which implies
# correct signature for correct proposer,which makes spam expensive,
# while for the latter, spam is limited by the request manager.
info "execution payload invalid from EL client newPayload",
executionPayloadStatus = $executionPayloadStatus.get,
executionPayload = shortLog(blck.message.body.execution_payload),
blck = shortLog(blck)
return NewPayloadStatus.invalid
of PayloadExecutionStatus.syncing, PayloadExecutionStatus.accepted:
return NewPayloadStatus.notValid
of PayloadExecutionStatus.valid:
return NewPayloadStatus.valid
except CatchableError as err:
error "newPayload failed and leaked exception",
err = err.msg,
executionPayload = shortLog(blck.message.body.execution_payload),
blck = shortLog(blck)
return NewPayloadStatus.noResponse
proc checkBloblessSignature(self: BlockProcessor,
signed_beacon_block: deneb.SignedBeaconBlock):
Result[void, cstring] =
let dag = self.consensusManager.dag
let parent = dag.getBlockRef(signed_beacon_block.message.parent_root).valueOr:
return err("checkBloblessSignature called with orphan block")
let proposer = getProposer(
dag, parent, signed_beacon_block.message.slot).valueOr:
return err("checkBloblessSignature: Cannot compute proposer")
if uint64(proposer) != signed_beacon_block.message.proposer_index:
return err("checkBloblessSignature: Incorrect proposer")
if not verify_block_signature(
dag.forkAtEpoch(signed_beacon_block.message.slot.epoch),
getStateField(dag.headState, genesis_validators_root),
signed_beacon_block.message.slot,
signed_beacon_block.root,
dag.validatorKey(proposer).get(),
signed_beacon_block.signature):
return err("checkBloblessSignature: Invalid proposer signature")
ok()
proc storeBlock*(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars],
maybeFinalized = false,
queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need
## to know about it, such as the fork choice and the monitoring
let
attestationPool = self.consensusManager.attestationPool
startTick = Moment.now()
vm = self.validatorMonitor
dag = self.consensusManager.dag
wallSlot = wallTime.slotOrZero
payloadStatus =
if maybeFinalized and
(self.lastPayload + SLOTS_PER_PAYLOAD) > signedBlock.message.slot and
(signedBlock.message.slot + PAYLOAD_PRE_WALL_SLOTS) < wallSlot and
signedBlock.message.is_execution_block:
# Skip payload validation when message source (reasonably) claims block
# has been finalized - this speeds up forward sync - in the worst case
# that the claim is false, we will correct every time we process a block
# from an honest source (or when we're close to head).
# Occasionally we also send a payload to the the EL so that it can
# progress in its own sync.
NewPayloadStatus.noResponse
else:
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
await self.consensusManager.elManager.getExecutionValidity(signedBlock)
else:
NewPayloadStatus.valid # vacuously
payloadValid = payloadStatus == NewPayloadStatus.valid
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
if NewPayloadStatus.invalid == payloadStatus:
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
if NewPayloadStatus.noResponse == payloadStatus:
# When the execution layer is not available to verify the payload, we do the
# required check on the CL side instead and proceed as if the EL was syncing
# TODO run https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/beacon-chain.md#blob-kzg-commitments
# https://github.com/ethereum/execution-apis/blob/main/src/engine/experimental/blob-extension.md#specification
# "This validation MUST be instantly run in all cases even during active sync process."
#
# Client software MUST validate `blockHash` value as being equivalent to
# `Keccak256(RLP(ExecutionBlockHeader))`
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification
when typeof(signedBlock).toFork() >= ConsensusFork.Bellatrix:
template payload(): auto = signedBlock.message.body.execution_payload
if signedBlock.message.is_execution_block and
payload.block_hash != payload.compute_execution_block_hash():
debug "Execution block hash validation failed",
execution_payload = shortLog(payload)
doAssert strictVerification notin dag.updateFlags
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.Invalid, ProcessingStatus.completed))
else:
discard
# We'll also remove the block as an orphan: it's unlikely the parent is
# missing if we get this far - should that be the case, the block will
# be re-added later
self.consensusManager.quarantine[].removeOrphan(signedBlock)
# TODO with v1.4.0, not sure this is still relevant
# Establish blob viability before calling addHeadBlock to avoid
# writing the block in case of blob error.
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
if blobsOpt.isSome:
let blobs = blobsOpt.get()
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
if blobs.len > 0 or kzgCommits.len > 0:
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
blobs.mapIt(it.kzg_proof))
if r.isErr():
debug "blob validation failed",
blockRoot = shortLog(signedBlock.root),
blobs = shortLog(blobs),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature),
msg = r.error()
return err((VerifierError.Invalid, ProcessingStatus.completed))
type Trusted = typeof signedBlock.asTrusted()
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
blckRef: BlockRef, trustedBlock: Trusted,
epochRef: EpochRef, unrealized: FinalityCheckpoints):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(
epochRef, blckRef, unrealized, trustedBlock.message, wallTime)
vm[].registerBeaconBlock(
src, wallTime, trustedBlock.message)
for attestation in trustedBlock.message.body.attestations:
for validator_index in dag.get_attesting_indices(attestation):
vm[].registerAttestationInBlock(attestation.data, validator_index,
trustedBlock.message.slot)
withState(dag[].clearanceState):
when consensusFork >= ConsensusFork.Altair and
Trusted isnot phase0.TrustedSignedBeaconBlock: # altair+
for i in trustedBlock.message.body.sync_aggregate.sync_committee_bits.oneIndices():
vm[].registerSyncAggregateInBlock(
trustedBlock.message.slot, trustedBlock.root,
forkyState.data.current_sync_committee.pubkeys.data[i])
self[].dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr():
case blck.error()
of VerifierError.MissingParent:
if signedBlock.message.parent_root in
self.consensusManager.quarantine[].unviable:
# DAG doesn't know about unviable ancestor blocks - we do! Translate
# this to the appropriate error so that sync etc doesn't retry the block
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
if (let r = self.consensusManager.quarantine[].addOrphan(
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock));
r.isErr()):
debug "storeBlock: could not add orphan",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
signature = shortLog(signedBlock.signature),
err = r.error()
of VerifierError.UnviableFork:
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return err((blck.error, ProcessingStatus.completed))
if payloadStatus in {NewPayloadStatus.valid, NewPayloadStatus.notValid}:
# If the EL responded at all, we don't need to try again for a while
self[].lastPayload = signedBlock.message.slot
# write blobs now that block has been written.
let blobs = blobsOpt.valueOr: BlobSidecars @[]
for b in blobs:
self.consensusManager.dag.db.putBlobSidecar(b[])
let storeBlockTick = Moment.now()
# Eagerly update head: the incoming block "should" get selected.
#
# storeBlock gets called from validator_duties, which depends on its not
# blocking progress any longer than necessary, and processBlock here, in
# which case it's fine to await for a while on engine API results.
#
# Three general scenarios: (1) pre-merge; (2) merge, already `VALID` by way
# of `newPayload`; (3) optimistically imported, need to call fcU before DAG
# updateHead. Because in a non-finalizing network, completing sync isn't as
# useful because regular reorgs likely still occur, and when finalizing the
# EL is only called every SLOTS_PER_PAYLOAD slots regardless, await, rather
# than asyncSpawn forkchoiceUpdated calls.
#
# This reduces in-flight fcU spam, which both reduces EL load and decreases
# otherwise somewhat unpredictable CL head movement.
# Grab the new head according to our latest attestation data; determines how
# async this needs to be.
let newHead = attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time)
if newHead.isOk:
Support for driving multiple EL nodes from a single Nimbus BN (#4465) * Support for driving multiple EL nodes from a single Nimbus BN Full list of changes: * Eth1Monitor has been renamed to ELManager to match its current responsibilities better. * The ELManager is no longer optional in the code (it won't have a nil value under any circumstances). * The support for subscribing for headers was removed as it only worked with WebSockets and contributed significant complexity while bringing only a very minor advantage. * The `--web3-url` parameter has been deprecated in favor of a new `--el` parameter. The new parameter has a reasonable default value and supports specifying a different JWT for each connection. Each connection can also be configured with a different set of responsibilities (e.g. download deposits, validate blocks and/or produce blocks). On the command-line, these properties can be configured through URL properties stored in the #anchor part of the URL. In TOML files, they come with a very natural syntax (althrough the URL scheme is also supported). * The previously scattered EL-related state and logic is now moved to `eth1_monitor.nim` (this module will be renamed to `el_manager.nim` in a follow-up commit). State is assigned properly either to the `ELManager` or the to individual `ELConnection` objects where appropriate. The ELManager executes all Engine API requests against all attached EL nodes, in parallel. It compares their results and if there is a disagreement regarding the validity of a certain payload, this is detected and the beacon node is protected from publishing a block with a potential execution layer consensus bug in it. The BN provides metrics per EL node for the number of successful or failed requests for each type Engine API requests. If an EL node goes offline and connectivity is resoted later, we report the problem and the remedy in edge-triggered fashion. * More progress towards implementing Deneb block production in the VC and comparing the value of blocks produced by the EL and the builder API. * Adds a Makefile target for the zhejiang testnet
2023-03-05 01:40:21 +00:00
template elManager(): auto = self.consensusManager.elManager
if self.consensusManager[].shouldSyncOptimistically(wallSlot):
# Optimistic head is far in the future; report it as head block to EL.
# Note that the specification allows an EL client to skip fcU processing
# if an update to an ancestor is requested.
# > Client software MAY skip an update of the forkchoice state and MUST
# NOT begin a payload build process if `forkchoiceState.headBlockHash`
# references an ancestor of the head of canonical chain.
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/paris.md#specification-1
#
# However, in practice, an EL client may not have completed importing all
# block headers, so may be unaware of a block's ancestor status.
# Therefore, hopping back and forth between the optimistic head and the
# chain DAG head does not work well in practice, e.g., Geth:
# - "Beacon chain gapped" from DAG head to optimistic head,
# - followed by "Beacon chain reorged" from optimistic head back to DAG.
self.consensusManager[].updateHead(newHead.get.blck)
template callForkchoiceUpdated(attributes: untyped) =
discard await elManager.forkchoiceUpdated(
headBlockHash = self.consensusManager[].optimisticExecutionPayloadHash,
safeBlockHash = newHead.get.safeExecutionPayloadHash,
finalizedBlockHash = newHead.get.finalizedExecutionPayloadHash,
payloadAttributes = none attributes)
case self.consensusManager.dag.cfg.consensusForkAtEpoch(
newHead.get.blck.bid.slot.epoch)
of ConsensusFork.Capella, ConsensusFork.Deneb:
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#specification-1
# Consensus layer client MUST call this method instead of
# `engine_forkchoiceUpdatedV1` under any of the following conditions:
# `headBlockHash` references a block which `timestamp` is greater or
# equal to the Shanghai timestamp
callForkchoiceUpdated(PayloadAttributesV2)
of ConsensusFork.Bellatrix:
callForkchoiceUpdated(PayloadAttributesV1)
of ConsensusFork.Phase0, ConsensusFork.Altair:
discard
else:
let
headExecutionPayloadHash =
dag.loadExecutionBlockHash(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero
if headExecutionPayloadHash.isZero or
NewPayloadStatus.noResponse == payloadStatus:
# Blocks without execution payloads can't be optimistic, and don't try
# to fcU to a block the EL hasn't seen
self.consensusManager[].updateHead(newHead.get.blck)
elif newHead.get.blck.executionValid:
# `forkchoiceUpdated` necessary for EL client only.
self.consensusManager[].updateHead(newHead.get.blck)
if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
template callForkchoiceUpdated(payloadAttributeType: untyped): auto =
await elManager.expectValidForkchoiceUpdated(
headBlockPayloadAttributesType = payloadAttributeType,
headBlockHash = headExecutionPayloadHash,
safeBlockHash = newHead.get.safeExecutionPayloadHash,
finalizedBlockHash = newHead.get.finalizedExecutionPayloadHash,
receivedBlock = signedBlock)
case self.consensusManager.dag.cfg.consensusForkAtEpoch(
newHead.get.blck.bid.slot.epoch)
of ConsensusFork.Capella, ConsensusFork.Deneb:
callForkchoiceUpdated(payloadAttributeType = PayloadAttributesV2)
of ConsensusFork.Phase0, ConsensusFork.Altair,
ConsensusFork.Bellatrix:
callForkchoiceUpdated(payloadAttributeType = PayloadAttributesV1)
else:
# Some attached validator is next proposer, so prepare payload. As
# updateHead() updated the DAG head, runProposalForkchoiceUpdated,
# which needs the state corresponding to that head block, can run.
await self.consensusManager.runProposalForkchoiceUpdated(
wallSlot)
else:
await self.consensusManager.updateHeadWithExecution(
newHead.get, self.getBeaconTime)
else:
warn "Head selection failed, using previous head",
head = shortLog(dag.head), wallSlot
let
updateHeadTick = Moment.now()
queueDur = startTick - queueTick
storeBlockDur = storeBlockTick - startTick
updateHeadDur = updateHeadTick - storeBlockTick
beacon_store_block_duration_seconds.observe(storeBlockDur.toFloatSeconds())
debug "Block processed",
localHeadSlot = dag.head.slot,
blockSlot = blck.get().slot,
validationDur, queueDur, storeBlockDur, updateHeadDur
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
withBlck(quarantined):
when typeof(blck).toFork() < ConsensusFork.Deneb:
self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
else:
if len(blck.message.body.blob_kzg_commitments) == 0:
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]))
else:
if (let res = checkBloblessSignature(self[], blck); res.isErr):
warn "Failed to verify signature of unorphaned blobless block",
blck = shortLog(blck),
error = res.error()
continue
if self.blobQuarantine[].hasBlobs(blck):
let blobs = self.blobQuarantine[].popBlobs(blck.root)
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
else:
if not self.consensusManager.quarantine[].addBlobless(
dag.finalizedHead.slot, blck):
notice "Block quarantine full (blobless)",
blockRoot = shortLog(quarantined.root),
signature = shortLog(quarantined.signature)
return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get
# Enqueue
# ------------------------------------------------------------------------------
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
maybeFinalized = false,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
# constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
# - RequestManager (missing ancestor blocks)
withBlck(blck):
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.storeBackfillBlock(blck, blobs)
if resfut != nil:
resfut.complete(res)
return
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
blobs: blobs,
maybeFinalized: maybeFinalized,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
src: src))
except AsyncQueueFullError:
raiseAssert "unbounded queue"
# Dedup
# ------------------------------------------------------------------------------
func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool =
let key = (entry.blck.root, entry.blck.signature)
if self.dupBlckBuf.contains key:
return true
doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN
if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN:
self.dupBlckBuf.shrink(fromFirst = 1)
self.dupBlckBuf.addLast key
false
# Event Loop
# ------------------------------------------------------------------------------
proc processBlock(
self: ref BlockProcessor, entry: BlockEntry) {.async.} =
logScope:
blockRoot = shortLog(entry.blck.root)
let
wallTime = self.getBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing block before genesis, clock turned back?"
quit 1
if self.checkDuplicateBlocks(entry):
if entry.resfut != nil:
entry.resfut.complete(Result[void, VerifierError].err(
VerifierError.Duplicate))
return
let res = withBlck(entry.blck):
await self.storeBlock(
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
entry.queueTick, entry.validationDur)
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
# When an execution engine returns an error or fails to respond to a
# payload validity request for some block, a consensus engine:
# - MUST NOT optimistically import the block.
# - MUST NOT apply the block to the fork choice store.
# - MAY queue the block for later processing.
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.0/sync/optimistic.md#execution-engine-errors
await sleepAsync(chronos.seconds(1))
self[].addBlock(
entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized,
entry.validationDur)
# To ensure backpressure on the sync manager, do not complete these futures.
return
if entry.resfut != nil:
entry.resfut.complete(
if res.isOk(): Result[void, VerifierError].ok()
else: Result[void, VerifierError].err(res.error()[0]))
proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
while true:
# Cooperative concurrency: one block per loop iteration - because
# we run both networking and CPU-heavy things like block processing
# on the same thread, we need to make sure that there is steady progress
# on the networking side or we get long lockups that lead to timeouts.
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case - doing so also allows us to benefit from more batching /
# larger network reads when under load.
idleTimeout = 10.milliseconds
discard await idleAsync().withTimeout(idleTimeout)
await self.processBlock(await self[].blockQueue.popFirst())