
365 lines
14 KiB
Raw Normal View History

# beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# * Apache v2 license (license terms in the root directory or at
# at your option. This file may not be copied, modified, or distributed except according to those terms.
2022-07-29 10:53:42 +00:00
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
{.push raises: [].}
disentangle eth2 types from the ssz library (#2785) * reorganize ssz dependencies This PR continues the work in, as well as past issues with serialization and type, to disentangle SSZ from eth2 and at the same time simplify imports and exports with a structured approach. The principal idea here is that when a library wants to introduce SSZ support, they do so via 3 files: * `ssz_codecs` which imports and reexports `codecs` - this covers the basic byte conversions and ensures no overloads get lost * `xxx_merkleization` imports and exports `merkleization` to specialize and get access to `hash_tree_root` and friends * `xxx_ssz_serialization` imports and exports `ssz_serialization` to specialize ssz for a specific library Those that need to interact with SSZ always import the `xxx_` versions of the modules and never `ssz` itself so as to keep imports simple and safe. This is similar to how the REST / JSON-RPC serializers are structured in that someone wanting to serialize spec types to REST-JSON will import `eth2_rest_serialization` and nothing else. * split up ssz into a core library that is independendent of eth2 types * rename `bytes_reader` to `codec` to highlight that it contains coding and decoding of bytes and native ssz types * remove tricky List init overload that causes compile issues * get rid of top-level ssz import * reenable merkleization tests * move some "standard" json serializers to spec * remove `ValidatorIndex` serialization for now * remove test_ssz_merkleization * add tests for over/underlong byte sequences * fix broken seq[byte] test - seq[byte] is not an SSZ type There are a few things this PR doesn't solve: * like #2646 this PR is weak on how to handle root and other dontSerialize fields that "sometimes" should be computed - the same problem appears in REST / JSON-RPC etc * Fix a build problem on macOS * Another way to fix the macOS builds Co-authored-by: Zahary Karadjov <>
2021-08-18 18:57:58 +00:00
chronicles, chronos,
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient
ForkChoiceUpdatedInformation* = object
payloadId*: PayloadID
headBlockRoot*: Eth2Digest
safeBlockRoot*: Eth2Digest
finalizedBlockRoot*: Eth2Digest
timestamp*: uint64
feeRecipient*: Eth1Address
ConsensusManager* = object
expectedSlot: Slot
expectedBlockReceived: Future[bool]
# Validated & Verified
# ----------------------------------------------------------------
dag*: ChainDAGRef
attestationPool*: ref AttestationPool
# Missing info
# ----------------------------------------------------------------
quarantine*: ref Quarantine
# Execution layer integration
# ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor
# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
keymanagerHost: ref KeymanagerHost
defaultFeeRecipient: Eth1Address
# Tracking last proposal forkchoiceUpdated payload information
# ----------------------------------------------------------------
forkchoiceUpdatedInfo*: Opt[ForkchoiceUpdatedInformation]
optimisticHead: tuple[bid: BlockId, execution_block_hash: Eth2Digest]
# Initialization
# ------------------------------------------------------------------------------
func new*(T: type ConsensusManager,
dag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost,
defaultFeeRecipient: Eth1Address
): ref ConsensusManager =
(ref ConsensusManager)(
dag: dag,
attestationPool: attestationPool,
quarantine: quarantine,
eth1Monitor: eth1Monitor,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost,
forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation,
defaultFeeRecipient: defaultFeeRecipient
# Consensus Management
# -----------------------------------------------------------------------------------
proc checkExpectedBlock(self: var ConsensusManager) =
if self.expectedBlockReceived == nil:
if self.dag.head.slot < self.expectedSlot:
self.expectedBlockReceived = nil # Don't keep completed futures around!
proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] =
## Return a future that will complete when a head is selected whose slot is
## equal or greater than the given slot, or a new expectation is created
if self.expectedBlockReceived != nil:
# Reset the old future to not leave it hanging.. an alternative would be to
# cancel it, but it doesn't make any practical difference for now
let fut = newFuture[bool]("ConsensusManager.expectBlock")
self.expectedSlot = expectedSlot
self.expectedBlockReceived = fut
# It might happen that by the time we're expecting a block, it might have
# already been processed!
return fut
from eth/async_utils import awaitWithTimeout
from web3/engine_api_types import
ForkchoiceUpdatedResponse, PayloadExecutionStatus, PayloadStatusV1
func `$`(h: BlockHash): string = $h.asEth2Digest
func shouldSyncOptimistically*(
optimisticSlot, dagSlot, wallSlot: Slot): bool =
## Determine whether an optimistic execution block hash should be reported
## to the EL client instead of the current head as determined by fork choice.
# Check whether optimistic head is sufficiently ahead of DAG
const minProgress = 8 * SLOTS_PER_EPOCH # Set arbitrarily
if optimisticSlot < dagSlot or optimisticSlot - dagSlot < minProgress:
return false
# Check whether optimistic head has synced sufficiently close to wall slot
const maxAge = 2 * SLOTS_PER_EPOCH # Set arbitrarily
if optimisticSlot < max(wallSlot, maxAge.Slot) - maxAge:
return false
func shouldSyncOptimistically*(self: ConsensusManager, wallSlot: Slot): bool =
if self.eth1Monitor == nil:
return false
if self.optimisticHead.execution_block_hash.isZero:
return false
optimisticSlot =,
dagSlot = getStateField(self.dag.headState, slot),
wallSlot = wallSlot)
func optimisticHead*(self: ConsensusManager): BlockId =
func optimisticExecutionPayloadHash*(self: ConsensusManager): Eth2Digest =
func setOptimisticHead*(
self: var ConsensusManager,
bid: BlockId, execution_block_hash: Eth2Digest) =
self.optimisticHead = (bid: bid, execution_block_hash: execution_block_hash)
proc runForkchoiceUpdated*(
eth1Monitor: Eth1Monitor,
headBlockRoot, safeBlockRoot, finalizedBlockRoot: Eth2Digest):
Future[PayloadExecutionStatus] {.async.} =
# Allow finalizedBlockRoot to be 0 to avoid sync deadlocks.
# has "Before the first finalized block occurs in the system the finalized
# block hash provided by this event is stubbed with
# `0x0000000000000000000000000000000000000000000000000000000000000000`."
# and
2022-08-20 16:03:32 +00:00
# notes "`finalized_block_hash` is the hash of the latest finalized execution
# payload (`Hash32()` if none yet finalized)"
doAssert not headBlockRoot.isZero
# Minimize window for Eth1 monitor to shut down connection
await eth1Monitor.ensureDataProvider()
let fcuR = awaitWithTimeout(
eth1Monitor, headBlockRoot, safeBlockRoot, finalizedBlockRoot),
debug "runForkchoiceUpdated: forkchoiceUpdated timed out"
payloadStatus: PayloadStatusV1(
status: PayloadExecutionStatus.syncing))
debug "runForkchoiceUpdated: ran forkchoiceUpdated",
headBlockRoot, safeBlockRoot, finalizedBlockRoot,
payloadStatus = $fcuR.payloadStatus.status,
latestValidHash = $fcuR.payloadStatus.latestValidHash,
validationError = $fcuR.payloadStatus.validationError
return fcuR.payloadStatus.status
except CatchableError as err:
error "runForkchoiceUpdated: forkchoiceUpdated failed",
err = err.msg
return PayloadExecutionStatus.syncing
proc runForkchoiceUpdatedDiscardResult*(
eth1Monitor: Eth1Monitor,
headBlockRoot, safeBlockRoot, finalizedBlockRoot: Eth2Digest) {.async.} =
discard await eth1Monitor.runForkchoiceUpdated(
headBlockRoot, safeBlockRoot, finalizedBlockRoot)
proc updateExecutionClientHead(self: ref ConsensusManager, newHead: BeaconHead)
{.async.} =
if self.eth1Monitor.isNil:
let headExecutionPayloadHash = self.dag.loadExecutionBlockRoot(newHead.blck)
if headExecutionPayloadHash.isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(self.quarantine[], newHead.blck.root)
# Can't use dag.head here because it hasn't been updated yet
let payloadExecutionStatus = await self.eth1Monitor.runForkchoiceUpdated(
case payloadExecutionStatus
of PayloadExecutionStatus.valid:
self.dag.markBlockVerified(self.quarantine[], newHead.blck.root)
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
self.dag.optimisticRoots.incl newHead.blck.root
proc updateHead*(self: var ConsensusManager, newHead: BlockRef) =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])
proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectOptimisticHead(
limit by-root requests to non-finalized blocks (#3293) * limit by-root requests to non-finalized blocks Presently, we keep a mapping from block root to `BlockRef` in memory - this has simplified reasoning about the dag, but is not sustainable with the chain growing. We can distinguish between two cases where by-root access is useful: * unfinalized blocks - this is where the beacon chain is operating generally, by validating incoming data as interesting for future fork choice decisions - bounded by the length of the unfinalized period * finalized blocks - historical access in the REST API etc - no bounds, really In this PR, we limit the by-root block index to the first use case: finalized chain data can more efficiently be addressed by slot number. Future work includes: * limiting the `BlockRef` horizon in general - each instance is 40 bytes+overhead which adds up - this needs further refactoring to deal with the tail vs state problem * persisting the finalized slot-to-hash index - this one also keeps growing unbounded (albeit slowly) Anyway, this PR easily shaves ~128mb of memory usage at the time of writing. * No longer honor `BeaconBlocksByRoot` requests outside of the non-finalized period - previously, Nimbus would generously return any block through this libp2p request - per the spec, finalized blocks should be fetched via `BeaconBlocksByRange` instead. * return `Opt[BlockRef]` instead of `nil` when blocks can't be found - this becomes a lot more common now and thus deserves more attention * `dag.blocks` -> `dag.forkBlocks` - this index only carries unfinalized blocks from now - `finalizedBlocks` covers the other `BlockRef` instances * in backfill, verify that the last backfilled block leads back to genesis, or panic * add backfill timings to log * fix missing check that `BlockRef` block can be fetched with `getForkedBlock` reliably * shortcut doppelganger check when feature is not enabled * in REST/JSON-RPC, fetch blocks without involving `BlockRef` * fix dag.blocks ref
2022-01-21 11:33:16 +00:00
warn "Head selection failed, using previous head",
head = shortLog(self.dag.head), wallSlot
if self.dag.loadExecutionBlockRoot(newHead.blck).isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(self.quarantine[], newHead.blck.root)
proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))
proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
self.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if self.keymanagerHost != nil:
from ../spec/datatypes/bellatrix import PayloadID
proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} =
nextSlot = + 1
(validatorIndex, nextProposer) =
# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
timestamp = compute_timestamp_at_slot(, nextSlot)
randomData =
get_randao_mix(, get_current_epoch(
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck)
if headBlockRoot.isZero:
let fcResult = awaitWithTimeout(
timestamp, randomData, feeRecipient),
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))
if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
safeBlockRoot: beaconHead.safeExecutionPayloadHash,
finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data
# Ensure dag.updateHead has most current information
await self.updateExecutionClientHead(newHead)
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(newHead.blck, self.quarantine[])
# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()
except CatchableError as exc:
debug "updateHeadWithExecution error",
error = exc.msg
proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) =
## Prune unneeded and invalidated data after finalization
## - the DAG state checkpoints
## - the DAG EpochRef
## - the attestation pool/fork choice
# Cleanup DAG & fork choice if we have a finalized head
if self.dag.needStateCachesAndForkChoicePruning():