optimistic sync (#3793)

* optimistic sync

* flag that initially loaded blocks from database might need execution block root filled in

* return optimistic status in REST calls

* refactor blockslot pruning

* ensure beacon_blocks_by_{root,range} do not provide optimistic blocks

* handle forkchoice head being pre-merge with block being postmerge

* re-enable blocking head updates on validator duties

* fix is_optimistic_candidate_block per spec; don't crash with nil future

* fix is_optimistic_candidate_block per spec; don't crash with nil future

* mark blocks sans execution payloads valid during head update
This commit is contained in:
tersec 2022-07-04 20:35:33 +00:00 committed by GitHub
parent 9eb1a3efb3
commit 1221bb66e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 470 additions and 253 deletions

View File

@ -723,9 +723,11 @@ func getAggregatedAttestation*(pool: var AttestationPool,
res res
proc selectHead*(pool: var AttestationPool, wallTime: BeaconTime): Opt[BlockRef] = proc selectOptimisticHead*(
pool: var AttestationPool, wallTime: BeaconTime): Opt[BlockRef] =
## Trigger fork choice and returns the new head block. ## Trigger fork choice and returns the new head block.
## Can return `nil` ## Can return `nil`
# TODO rename this to get_optimistic_head
let newHead = pool.forkChoice.get_head(pool.dag, wallTime) let newHead = pool.forkChoice.get_head(pool.dag, wallTime)
if newHead.isErr: if newHead.isErr:

View File

@ -29,6 +29,7 @@ proc addResolvedHeadBlock(
dag: ChainDAGRef, dag: ChainDAGRef,
state: var ForkedHashedBeaconState, state: var ForkedHashedBeaconState,
trustedBlock: ForkyTrustedSignedBeaconBlock, trustedBlock: ForkyTrustedSignedBeaconBlock,
blockVerified: bool,
parent: BlockRef, cache: var StateCache, parent: BlockRef, cache: var StateCache,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded, onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded,
stateDataDur, sigVerifyDur, stateVerifyDur: Duration stateDataDur, sigVerifyDur, stateVerifyDur: Duration
@ -73,6 +74,7 @@ proc addResolvedHeadBlock(
debug "Block resolved", debug "Block resolved",
blockRoot = shortLog(blockRoot), blockRoot = shortLog(blockRoot),
blck = shortLog(trustedBlock.message), blck = shortLog(trustedBlock.message),
blockVerified,
heads = dag.heads.len(), heads = dag.heads.len(),
stateDataDur, sigVerifyDur, stateVerifyDur, stateDataDur, sigVerifyDur, stateVerifyDur,
putBlockDur = putBlockTick - startTick, putBlockDur = putBlockTick - startTick,
@ -81,6 +83,9 @@ proc addResolvedHeadBlock(
# Update light client data # Update light client data
dag.processNewBlockForLightClient(state, trustedBlock, parent.bid) dag.processNewBlockForLightClient(state, trustedBlock, parent.bid)
if not blockVerified:
dag.optimisticRoots.incl blockRoot
# Notify others of the new block before processing the quarantine, such that # Notify others of the new block before processing the quarantine, such that
# notifications for parents happens before those of the children # notifications for parents happens before those of the children
if onBlockAdded != nil: if onBlockAdded != nil:
@ -136,6 +141,7 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
proc addHeadBlock*( proc addHeadBlock*(
dag: ChainDAGRef, verifier: var BatchVerifier, dag: ChainDAGRef, verifier: var BatchVerifier,
signedBlock: ForkySignedBeaconBlock, signedBlock: ForkySignedBeaconBlock,
blockVerified: bool,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded |
OnBellatrixBlockAdded OnBellatrixBlockAdded
): Result[BlockRef, BlockError] = ): Result[BlockRef, BlockError] =
@ -256,12 +262,22 @@ proc addHeadBlock*(
ok addResolvedHeadBlock( ok addResolvedHeadBlock(
dag, dag.clearanceState, dag, dag.clearanceState,
signedBlock.asTrusted(), signedBlock.asTrusted(),
blockVerified = blockVerified,
parent, cache, parent, cache,
onBlockAdded, onBlockAdded,
stateDataDur = stateDataTick - startTick, stateDataDur = stateDataTick - startTick,
sigVerifyDur = sigVerifyTick - stateDataTick, sigVerifyDur = sigVerifyTick - stateDataTick,
stateVerifyDur = stateVerifyTick - sigVerifyTick) stateVerifyDur = stateVerifyTick - sigVerifyTick)
proc addHeadBlock*(
dag: ChainDAGRef, verifier: var BatchVerifier,
signedBlock: ForkySignedBeaconBlock,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded |
OnBellatrixBlockAdded
): Result[BlockRef, BlockError] =
addHeadBlock(
dag, verifier, signedBlock, blockVerified = true, onBlockAdded)
proc addBackfillBlock*( proc addBackfillBlock*(
dag: ChainDAGRef, dag: ChainDAGRef,
signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] =

View File

@ -8,6 +8,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/options,
chronicles, chronicles,
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
../spec/forks ../spec/forks
@ -31,7 +32,7 @@ type
bid*: BlockId ##\ bid*: BlockId ##\
## Root that can be used to retrieve block data from database ## Root that can be used to retrieve block data from database
executionBlockRoot*: Eth2Digest executionBlockRoot*: Option[Eth2Digest]
parent*: BlockRef ##\ parent*: BlockRef ##\
## Not nil, except for the finalized head ## Not nil, except for the finalized head
@ -50,8 +51,8 @@ template root*(blck: BlockRef): Eth2Digest = blck.bid.root
template slot*(blck: BlockRef): Slot = blck.bid.slot template slot*(blck: BlockRef): Slot = blck.bid.slot
func init*( func init*(
T: type BlockRef, root: Eth2Digest, executionPayloadRoot: Eth2Digest, T: type BlockRef, root: Eth2Digest,
slot: Slot): BlockRef = executionPayloadRoot: Option[Eth2Digest], slot: Slot): BlockRef =
BlockRef( BlockRef(
bid: BlockId(root: root, slot: slot), bid: BlockId(root: root, slot: slot),
executionBlockRoot: executionPayloadRoot, executionBlockRoot: executionPayloadRoot,
@ -61,13 +62,13 @@ func init*(
T: type BlockRef, root: Eth2Digest, T: type BlockRef, root: Eth2Digest,
blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock | blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock |
phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef = phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef =
BlockRef.init(root, ZERO_HASH, blck.slot) BlockRef.init(root, some ZERO_HASH, blck.slot)
func init*( func init*(
T: type BlockRef, root: Eth2Digest, T: type BlockRef, root: Eth2Digest,
blck: bellatrix.SomeBeaconBlock | bellatrix.TrustedBeaconBlock): BlockRef = blck: bellatrix.SomeBeaconBlock | bellatrix.TrustedBeaconBlock): BlockRef =
BlockRef.init( BlockRef.init(
root, Eth2Digest(blck.body.execution_payload.block_hash), blck.slot) root, some Eth2Digest(blck.body.execution_payload.block_hash), blck.slot)
func parent*(bs: BlockSlot): BlockSlot = func parent*(bs: BlockSlot): BlockSlot =
## Return a blockslot representing the previous slot, using the parent block ## Return a blockslot representing the previous slot, using the parent block

View File

@ -141,7 +141,8 @@ type
## in the case where an earlier genesis block exists. ## in the case where an earlier genesis block exists.
head*: BlockRef head*: BlockRef
## The most recently known head, as chosen by fork choice ## The most recently known head, as chosen by fork choice; might be
## optimistic
backfill*: BeaconBlockSummary backfill*: BeaconBlockSummary
## The backfill points to the oldest block with an unbroken ancestry from ## The backfill points to the oldest block with an unbroken ancestry from
@ -226,6 +227,9 @@ type
## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync ## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync
## committee messages will be rejected ## committee messages will be rejected
optimisticRoots*: HashSet[Eth2Digest]
## https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers
EpochKey* = object EpochKey* = object
## The epoch key fully determines the shuffling for proposers and ## The epoch key fully determines the shuffling for proposers and
## committees in a beacon state - the epoch level information in the state ## committees in a beacon state - the epoch level information in the state

View File

@ -165,12 +165,11 @@ func init*(
attester_dependent_root: attester_dependent_root, attester_dependent_root: attester_dependent_root,
merge_transition_complete: merge_transition_complete:
case state.kind: case state.kind:
of BeaconStateFork.Phase0: false of BeaconStateFork.Phase0, BeaconStateFork.Altair: false
of BeaconStateFork.Altair: false
of BeaconStateFork.Bellatrix: of BeaconStateFork.Bellatrix:
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_complete # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_complete
state.bellatrixData.data.latest_execution_payload_header != state.bellatrixData.data.latest_execution_payload_header !=
ExecutionPayloadHeader() (static(ExecutionPayloadHeader()))
) )
epochStart = epoch.start_slot() epochStart = epoch.start_slot()
@ -747,7 +746,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# Load head -> finalized, or all summaries in case the finalized block table # Load head -> finalized, or all summaries in case the finalized block table
# hasn't been written yet # hasn't been written yet
for blck in db.getAncestorSummaries(head.root): for blck in db.getAncestorSummaries(head.root):
let newRef = BlockRef.init(blck.root, ZERO_HASH, blck.summary.slot) # The execution block root gets filled in as needed
let newRef = BlockRef.init(blck.root, none Eth2Digest, blck.summary.slot)
if headRef == nil: if headRef == nil:
doAssert blck.root == head.root doAssert blck.root == head.root
headRef = newRef headRef = newRef
@ -950,6 +950,17 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
dag.initLightClientDataCache() dag.initLightClientDataCache()
# If these aren't actually optimistic, the first fcU will resolve that
withState(dag.headState):
when stateFork >= BeaconStateFork.Bellatrix:
template executionPayloadHeader(): auto =
state().data.latest_execution_payload_header
const emptyExecutionPayloadHeader =
default(type(executionPayloadHeader))
if executionPayloadHeader != emptyExecutionPayloadHeader:
dag.optimisticRoots.incl dag.head.root
dag.optimisticRoots.incl dag.finalizedHead.blck.root
dag dag
template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest = template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest =
@ -1341,6 +1352,18 @@ proc delState(dag: ChainDAGRef, bsi: BlockSlotId) =
dag.db.delState(root.get()) dag.db.delState(root.get())
dag.db.delStateRoot(bsi.bid.root, bsi.slot) dag.db.delStateRoot(bsi.bid.root, bsi.slot)
proc pruneBlockSlot(dag: ChainDAGRef, bs: BlockSlot) =
# TODO: should we move that disk I/O to `onSlotEnd`
dag.delState(bs.toBlockSlotId().expect("not nil"))
if bs.isProposed():
# Update light client data
dag.deleteLightClientData(bs.blck.bid)
dag.optimisticRoots.excl bs.blck.root
dag.forkBlocks.excl(KeyedBlockRef.init(bs.blck))
dag.db.delBlock(bs.blck.root)
proc pruneBlocksDAG(dag: ChainDAGRef) = proc pruneBlocksDAG(dag: ChainDAGRef) =
## This prunes the block DAG ## This prunes the block DAG
## This does NOT prune the cached state checkpoints and EpochRef ## This does NOT prune the cached state checkpoints and EpochRef
@ -1375,16 +1398,7 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
"finalizedHead parent should have been pruned from memory already" "finalizedHead parent should have been pruned from memory already"
while cur.blck.parent != nil: while cur.blck.parent != nil:
# TODO: should we move that disk I/O to `onSlotEnd` dag.pruneBlockSlot(cur)
dag.delState(cur.toBlockSlotId().expect("not nil"))
if cur.isProposed():
# Update light client data
dag.deleteLightClientData(cur.blck.bid)
dag.forkBlocks.excl(KeyedBlockRef.init(cur.blck))
dag.db.delBlock(cur.blck.root)
cur = cur.parentOrSlot cur = cur.parentOrSlot
dag.heads.del(n) dag.heads.del(n)
@ -1394,6 +1408,63 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
prunedHeads = hlen - dag.heads.len, prunedHeads = hlen - dag.heads.len,
dagPruneDur = Moment.now() - startTick dagPruneDur = Moment.now() - startTick
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers
template is_optimistic*(dag: ChainDAGRef, root: Eth2Digest): bool =
root in dag.optimisticRoots
proc markBlockInvalid*(dag: ChainDAGRef, root: Eth2Digest) =
let blck = dag.getBlockRef(root).valueOr:
return
logScope: blck = shortLog(blck)
if not dag.is_optimistic(root):
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#transitioning-from-valid---invalidated-or-invalidated---valid
# "It is outside of the scope of the specification since it's only possible
# with a faulty EE. Such a scenario requires manual intervention."
warn "markBlockInvalid: attempt to invalidate valid block"
doAssert verifyFinalization notin dag.updateFlags
return
if root == dag.finalizedHead.blck.root:
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#re-orgs
# "If the justified checkpoint transitions from `NOT_VALIDATED` ->
# `INVALIDATED`, a consensus engine MAY choose to alert the user and force
# the application to exit."
#
# But be slightly less aggressive, and only check finalized.
warn "markBlockInvalid: finalized block invalidated"
doAssert verifyFinalization notin dag.updateFlags
return
debug "markBlockInvalid"
dag.pruneBlockSlot(blck.atSlot())
proc markBlockVerified*(
dag: ChainDAGRef, quarantine: var Quarantine, root: Eth2Digest) =
# Might be called when block was not optimistic to begin with, or had been
# but already had been marked verified.
if not dag.is_optimistic(root):
return
var cur = dag.getBlockRef(root).valueOr:
return
logScope: blck = shortLog(cur)
debug "markBlockVerified"
while true:
if not dag.is_optimistic(cur.bid.root):
return
dag.optimisticRoots.excl cur.bid.root
debug "markBlockVerified ancestor"
if cur.parent.isNil:
break
cur = cur.parent
iterator syncSubcommittee*( iterator syncSubcommittee*(
syncCommittee: openArray[ValidatorIndex], syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex = subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex =
@ -1533,6 +1604,27 @@ template getHeadStateMergeComplete*(dag: ChainDAGRef): bool =
else: else:
false false
proc loadExecutionBlockRoot*(dag: ChainDAGRef, blck: BlockRef): Eth2Digest =
if dag.cfg.blockForkAtEpoch(blck.bid.slot.epoch) < BeaconBlockFork.Bellatrix:
return ZERO_HASH
if blck.executionBlockRoot.isSome:
return blck.executionBlockRoot.get
let blockData = dag.getForkedBlock(blck.bid).valueOr:
blck.executionBlockRoot = some ZERO_HASH
return ZERO_HASH
let executionBlockRoot =
withBlck(blockData):
when stateFork >= BeaconStateFork.Bellatrix:
blck.message.body.execution_payload.block_hash
else:
ZERO_HASH
blck.executionBlockRoot = some executionBlockRoot
executionBlockRoot
proc updateHead*( proc updateHead*(
dag: ChainDAGRef, dag: ChainDAGRef,
newHead: BlockRef, newHead: BlockRef,
@ -1614,7 +1706,8 @@ proc updateHead*(
stateRoot = shortLog(getStateRoot(dag.headState)), stateRoot = shortLog(getStateRoot(dag.headState)),
justified = shortLog(getStateField( justified = shortLog(getStateField(
dag.headState, current_justified_checkpoint)), dag.headState, current_justified_checkpoint)),
finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)),
isOptHead = dag.is_optimistic(newHead.root)
if not(isNil(dag.onReorgHappened)): if not(isNil(dag.onReorgHappened)):
let let
@ -1635,7 +1728,8 @@ proc updateHead*(
stateRoot = shortLog(getStateRoot(dag.headState)), stateRoot = shortLog(getStateRoot(dag.headState)),
justified = shortLog(getStateField( justified = shortLog(getStateField(
dag.headState, current_justified_checkpoint)), dag.headState, current_justified_checkpoint)),
finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)),
isOptHead = dag.is_optimistic(newHead.root)
if not(isNil(dag.onHeadChanged)): if not(isNil(dag.onHeadChanged)):
let let
@ -1685,8 +1779,8 @@ proc updateHead*(
dag.db.updateFinalizedBlocks(newFinalized) dag.db.updateFinalizedBlocks(newFinalized)
if oldFinalizedHead.blck.executionBlockRoot.isZero and if dag.loadExecutionBlockRoot(oldFinalizedHead.blck).isZero and
not dag.finalizedHead.blck.executionBlockRoot.isZero: not dag.loadExecutionBlockRoot(dag.finalizedHead.blck).isZero:
dag.vanityLogs.onFinalizedMergeTransitionBlock() dag.vanityLogs.onFinalizedMergeTransitionBlock()
# Pruning the block dag is required every time the finalized head changes # Pruning the block dag is required every time the finalized head changes

View File

@ -8,21 +8,22 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/math,
stew/results, stew/results,
chronicles, chronos, metrics, chronicles, chronos, metrics,
eth/async_utils, ../spec/signatures_batch,
web3/engine_api_types,
../spec/datatypes/[phase0, altair, bellatrix],
../spec/[forks, signatures_batch],
../consensus_object_pools/[
attestation_pool, block_clearance, blockchain_dag, block_quarantine,
spec_cache],
../eth1/eth1_monitor,
./consensus_manager,
../beacon_clock,
../sszdump ../sszdump
from ./consensus_manager import
ConsensusManager, updateHead, updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
from ../consensus_object_pools/block_quarantine import
addOrphan, addUnviable, pop, removeOrphan
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
export sszdump, signatures_batch export sszdump, signatures_batch
# Block Processor # Block Processor
@ -63,6 +64,7 @@ type
dumpEnabled: bool dumpEnabled: bool
dumpDirInvalid: string dumpDirInvalid: string
dumpDirIncoming: string dumpDirIncoming: string
safeSlotsToImportOptimistically: uint16
# Producers # Producers
# ---------------------------------------------------------------- # ----------------------------------------------------------------
@ -92,7 +94,8 @@ proc new*(T: type BlockProcessor,
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr, rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager, consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor, validatorMonitor: ref ValidatorMonitor,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor = getBeaconTime: GetBeaconTimeFn,
safeSlotsToImportOptimistically: uint16): ref BlockProcessor =
(ref BlockProcessor)( (ref BlockProcessor)(
dumpEnabled: dumpEnabled, dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid, dumpDirInvalid: dumpDirInvalid,
@ -101,6 +104,7 @@ proc new*(T: type BlockProcessor,
consensusManager: consensusManager, consensusManager: consensusManager,
validatorMonitor: validatorMonitor, validatorMonitor: validatorMonitor,
getBeaconTime: getBeaconTime, getBeaconTime: getBeaconTime,
safeSlotsToImportOptimistically: safeSlotsToImportOptimistically,
verifier: BatchVerifier(rng: rng, taskpool: taskpool) verifier: BatchVerifier(rng: rng, taskpool: taskpool)
) )
@ -131,6 +135,9 @@ proc dumpBlock[T](
else: else:
discard discard
from ../consensus_object_pools/block_clearance import
addBackfillBlock, addHeadBlock
proc storeBackfillBlock( proc storeBackfillBlock(
self: var BlockProcessor, self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] =
@ -157,11 +164,17 @@ proc storeBackfillBlock(
res res
from ../consensus_object_pools/attestation_pool import addForkChoice
from ../consensus_object_pools/spec_cache import get_attesting_indices
from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock
proc storeBlock*( proc storeBlock*(
self: var BlockProcessor, self: ref BlockProcessor,
src: MsgSource, wallTime: BeaconTime, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(), signedBlock: ForkySignedBeaconBlock, payloadValid: bool,
validationDur = Duration()): Result[BlockRef, BlockError] = queueTick: Moment = Moment.now(),
validationDur = Duration()):
Future[Result[BlockRef, BlockError]] {.async.} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted ## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block, ## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need ## we will add it to the dag and pass it to all block consumers that need
@ -182,7 +195,7 @@ proc storeBlock*(
self.consensusManager.quarantine[].removeOrphan(signedBlock) self.consensusManager.quarantine[].removeOrphan(signedBlock)
type Trusted = typeof signedBlock.asTrusted() type Trusted = typeof signedBlock.asTrusted()
let blck = dag.addHeadBlock(self.verifier, signedBlock) do ( let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef): blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef):
# Callback add to fork choice if valid # Callback add to fork choice if valid
attestationPool[].addForkChoice( attestationPool[].addForkChoice(
@ -208,7 +221,7 @@ proc storeBlock*(
trustedBlock.message.slot, trustedBlock.root, trustedBlock.message.slot, trustedBlock.root,
state.data.current_sync_committee.pubkeys.data[i]) state.data.current_sync_committee.pubkeys.data[i])
self.dumpBlock(signedBlock, blck) self[].dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received. # There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent # However this block was before the last finalized epoch and so its parent
@ -239,8 +252,23 @@ proc storeBlock*(
let storeBlockTick = Moment.now() let storeBlockTick = Moment.now()
# Eagerly update head: the incoming block "should" get selected # Eagerly update head: the incoming block "should" get selected.
self.consensusManager[].updateHead(wallTime.slotOrZero) #
# storeBlock gets called from validator_duties, which depends on its not
# blocking progress any longer than necessary, and processBlock here, in
# which case it's fine to await for a while on engine API results.
if not is_execution_block(signedBlock.message):
self.consensusManager[].updateHead(wallTime.slotOrZero)
else:
# This primarily exists to ensure that by the time the DAG updateHead is
# called valid blocks have already been registered as verified. The head
# can lag a slot behind wall clock, complicating detecting synced status
# for validating, otherwise.
#
# TODO have a third version which is fire-and-forget for when it is merge
# but payloadValid is true, i.e. fcU is for EL's benefit, not CL. Current
# behavior adds unnecessary latency to CL event loop.
await self.consensusManager.updateHeadWithExecution(wallTime.slotOrZero)
let let
updateHeadTick = Moment.now() updateHeadTick = Moment.now()
@ -257,9 +285,9 @@ proc storeBlock*(
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent # Process the blocks that had the newly accepted block as parent
self.addBlock(MsgSource.gossip, quarantined) self[].addBlock(MsgSource.gossip, quarantined)
blck return blck
# Enqueue # Enqueue
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -299,7 +327,9 @@ proc addBlock*(
# Event Loop # Event Loop
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc processBlock(self: var BlockProcessor, entry: BlockEntry) = proc processBlock(
self: ref BlockProcessor, entry: BlockEntry, payloadValid: bool)
{.async.} =
logScope: logScope:
blockRoot = shortLog(entry.blck.root) blockRoot = shortLog(entry.blck.root)
@ -311,59 +341,29 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
error "Processing block before genesis, clock turned back?" error "Processing block before genesis, clock turned back?"
quit 1 quit 1
let let res = withBlck(entry.blck):
res = withBlck(entry.blck): await self.storeBlock(
self.storeBlock(entry.src, wallTime, blck, entry.queueTick, entry.validationDur) entry.src, wallTime, blck, payloadValid, entry.queueTick,
entry.validationDur)
if entry.resfut != nil: if entry.resfut != nil:
entry.resfut.complete( entry.resfut.complete(
if res.isOk(): Result[void, BlockError].ok() if res.isOk(): Result[void, BlockError].ok()
else: Result[void, BlockError].err(res.error())) else: Result[void, BlockError].err(res.error()))
func `$`(h: BlockHash): string = $h.asEth2Digest from eth/async_utils import awaitWithTimeout
from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1
proc runForkchoiceUpdated( from ../eth1/eth1_monitor import
self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest): Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload
Future[bool] {.async.} = from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
# Allow finalizedBlockRoot to be 0 to avoid sync deadlocks.
#
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events
# has "Before the first finalized block occurs in the system the finalized
# block hash provided by this event is stubbed with
# `0x0000000000000000000000000000000000000000000000000000000000000000`."
# and
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/validator.md#executionpayload
# notes "`finalized_block_hash` is the hash of the latest finalized execution
# payload (`Hash32()` if none yet finalized)"
doAssert not headBlockRoot.isZero
try:
# Minimize window for Eth1 monitor to shut down connection
await self.consensusManager.eth1Monitor.ensureDataProvider()
let fcuR = awaitWithTimeout(
forkchoiceUpdated(
self.consensusManager.eth1Monitor, headBlockRoot, finalizedBlockRoot),
FORKCHOICEUPDATED_TIMEOUT):
debug "runForkChoiceUpdated: forkchoiceUpdated timed out"
default(ForkchoiceUpdatedResponse)
debug "runForkChoiceUpdated: running forkchoiceUpdated",
headBlockRoot,
finalizedBlockRoot,
payloadStatus = $fcuR.payloadStatus.status,
latestValidHash = $fcuR.payloadStatus.latestValidHash,
validationError = $fcuR.payloadStatus.validationError
return fcuR.payloadStatus.status == PayloadExecutionStatus.valid
except CatchableError as err:
debug "runForkChoiceUpdated: forkchoiceUpdated failed",
err = err.msg
return false
proc newExecutionPayload*( proc newExecutionPayload*(
eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload): eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload):
Future[PayloadExecutionStatus] {.async.} = Future[PayloadExecutionStatus] {.async.} =
if eth1Monitor.isNil:
warn "newPayload: attempting to process execution payload without an Eth1Monitor. Ensure --web3-url setting is correct."
return PayloadExecutionStatus.syncing
debug "newPayload: inserting block into execution engine", debug "newPayload: inserting block into execution engine",
parentHash = executionPayload.parent_hash, parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash, blockHash = executionPayload.block_hash,
@ -375,14 +375,9 @@ proc newExecutionPayload*(
gasUsed = executionPayload.gas_used, gasUsed = executionPayload.gas_used,
timestamp = executionPayload.timestamp, timestamp = executionPayload.timestamp,
extraDataLen = executionPayload.extra_data.len, extraDataLen = executionPayload.extra_data.len,
blockHash = executionPayload.block_hash, baseFeePerGas = $executionPayload.base_fee_per_gas,
baseFeePerGas = executionPayload.base_fee_per_gas,
numTransactions = executionPayload.transactions.len numTransactions = executionPayload.transactions.len
if eth1Monitor.isNil:
info "newPayload: attempting to process execution payload without an Eth1Monitor. Ensure --web3-url setting is correct."
return PayloadExecutionStatus.syncing
try: try:
let let
payloadResponse = payloadResponse =
@ -394,22 +389,40 @@ proc newExecutionPayload*(
PayloadStatusV1(status: PayloadExecutionStatus.syncing) PayloadStatusV1(status: PayloadExecutionStatus.syncing)
payloadStatus = payloadResponse.status payloadStatus = payloadResponse.status
debug "newPayload: succeeded",
parentHash = executionPayload.parent_hash,
blockHash = executionPayload.block_hash,
blockNumber = executionPayload.block_number,
payloadStatus
return payloadStatus return payloadStatus
except CatchableError as err: except CatchableError as err:
debug "newPayload failed", msg = err.msg debug "newPayload failed", msg = err.msg
return PayloadExecutionStatus.syncing return PayloadExecutionStatus.syncing
from ../consensus_object_pools/blockchain_dag import
getBlockRef, loadExecutionBlockRoot, markBlockInvalid
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers
proc is_optimistic_candidate_block(
self: BlockProcessor, blck: ForkedSignedBeaconBlock): bool =
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#when-to-optimistically-import-blocks
# The current slot (as per the system clock) is at least
# `SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY` ahead of the slot of the block being
# imported.
if blck.slot + self.safeSlotsToImportOptimistically <=
self.getBeaconTime().slotOrZero:
return true
let
parentRoot = withBlck(blck): blck.message.parent_root
parentBlck = self.consensusManager.dag.getBlockRef(parentRoot).valueOr:
return false
# The parent of the block has execution enabled.
not self.consensusManager.dag.loadExecutionBlockRoot(parentBlck).isZero
proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
# Don't want to vacillate between "optimistic" sync and non-optimistic
# sync heads. Relies on runQueueProcessingLoop() being the only place,
# in Nimbus, which does this.
var
optForkchoiceHeadSlot = GENESIS_SLOT # safe default
optForkchoiceHeadRoot: Eth2Digest
# don't keep spamming same fcU to Geth; might be restarting sync each time
lastFcHead: Eth2Digest
while true: while true:
# Cooperative concurrency: one block per loop iteration - because # Cooperative concurrency: one block per loop iteration - because
# we run both networking and CPU-heavy things like block processing # we run both networking and CPU-heavy things like block processing
@ -426,110 +439,65 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
let let
blck = await self[].blockQueue.popFirst() blck = await self[].blockQueue.popFirst()
hasExecutionPayload = blck.blck.kind >= BeaconBlockFork.Bellatrix hasExecutionPayload =
isExecutionBlock = withBlck(blck.blck): blck.message.is_execution_block
hasExecutionPayload and
blck.blck.bellatrixData.message.body.is_execution_block
executionPayloadStatus = executionPayloadStatus =
if isExecutionBlock: if hasExecutionPayload:
# Eth1 syncing is asynchronous from this # Eth1 syncing is asynchronous from this
# TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome
# should gate this when it works more reliably
# TODO detect have-TTD-but-not-is_execution_block case, and where
# execution payload was non-zero when TTD detection more reliable
when true:
try:
# Minimize window for Eth1 monitor to shut down connection
await self.consensusManager.eth1Monitor.ensureDataProvider()
# TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome let executionPayload =
# should gate this when it works more reliably withBlck(blck.blck):
when true: when stateFork >= BeaconStateFork.Bellatrix:
try: blck.message.body.execution_payload
# Minimize window for Eth1 monitor to shut down connection else:
await self.consensusManager.eth1Monitor.ensureDataProvider() doAssert false
default(bellatrix.ExecutionPayload) # satisfy Nim
await newExecutionPayload( await newExecutionPayload(
self.consensusManager.eth1Monitor, self.consensusManager.eth1Monitor, executionPayload)
blck.blck.bellatrixData.message.body.execution_payload) except CatchableError as err:
except CatchableError as err: info "runQueueProcessingLoop: newPayload failed",
debug "runQueueProcessingLoop: newPayload failed", err = err.msg
err = err.msg # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#execution-engine-errors
PayloadExecutionStatus.syncing if not blck.resfut.isNil:
else: blck.resfut.complete(
debug "runQueueProcessingLoop: got execution payload before TTD" Result[void, BlockError].err(BlockError.MissingParent))
PayloadExecutionStatus.syncing continue
else: else:
# Vacuously debug "runQueueProcessingLoop: got execution payload before TTD"
PayloadExecutionStatus.valid PayloadExecutionStatus.syncing
else:
# Vacuously
PayloadExecutionStatus.valid
if executionPayloadStatus in [ if executionPayloadStatus in static([
PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid,
PayloadExecutionStatus.invalid_block_hash]: PayloadExecutionStatus.invalid_block_hash]):
debug "runQueueProcessingLoop: execution payload invalid", debug "runQueueProcessingLoop: execution payload invalid",
executionPayloadStatus executionPayloadStatus,
blck = shortLog(blck.blck)
self.consensusManager.dag.markBlockInvalid(blck.blck.root)
self.consensusManager.quarantine[].addUnviable(blck.blck.root)
# Every loop iteration ends with some version of blck.resfut.complete(), # Every loop iteration ends with some version of blck.resfut.complete(),
# including processBlock(), otherwise the sync manager stalls. # including processBlock(), otherwise the sync manager stalls.
if not blck.resfut.isNil: if not blck.resfut.isNil:
blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid)) blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid))
continue else:
if executionPayloadStatus == PayloadExecutionStatus.valid or
if isExecutionBlock: self[].is_optimistic_candidate_block(blck.blck):
# The EL client doesn't know here whether the payload is valid, because, await self.processBlock(
# for example, in Geth's case, its parent isn't known. When Geth logs an blck, executionPayloadStatus == PayloadExecutionStatus.valid)
# "Ignoring payload with missing parent" message, this is the result. It else:
# is distinct from the invalid cases above, and shouldn't cause the same debug "runQueueProcessingLoop: block cannot be optimistically imported",
# BlockError.Invalid error, because it doesn't badly on the peer sending blck = shortLog(blck.blck)
# it, it's just not fully verifiable yet for this node. Furthermore, the
# EL client can, e.g. via Geth, "rely on the beacon client to forcefully
# update the head with a forkchoice update request". This can occur when
# an EL client is substantially more synced than a CL client, and when a
# CL client in that position attempts to serially sync it will encounter
# potential for this message until it nearly catches up, unless using an
# approach such as forkchoiceUpdated to trigger sync.
#
# Returning the MissingParent error causes the sync manager to loop in
# place until the EL does resync/catch up, then the normal process can
# resume where there's a hybrid serial and optimistic sync model.
#
# When this occurs within a couple of epochs of the Merge, before there
# has been a chance to justify and finalize a post-merge block this can
# cause a sync deadlock unless the EL can be convinced to sync back, or
# the CL is rather more open-endedly optimistic (potentially for entire
# weak subjectivity periods) than seems optimal.
debug "runQueueProcessingLoop: execution payload accepted or syncing",
executionPayloadStatus
# Always do this. Geth will only initiate syncing or reorgs with this
# combination of newPayload and forkchoiceUpdated. By design this must
# be somewhat optimistic, at least by one slot, for Geth to process it
# at all. This eventually converges to the same head as the DAG by the
# time it's externally visible via validating activity.
#
# In particular, the constraints that hold here are that Geth expects a
# sequence of
# - newPayload(execution payload with block hash `h`) followed by
# - forkchoiceUpdated(head = `h`)
# This is intrinsically somewhat optimistic, because determining the
# validity of an execution payload requires the forkchoiceUpdated
# head to be set to a block hash of some execution payload with unknown
# validity; otherwise it would not be necessary to ask the EL.
#
# The main reason this isn't done more adjacently in this code flow is to
# catch outright invalid cases, where the EL can reject a payload, without
# even running forkchoiceUpdated on it.
static: doAssert high(BeaconStateFork) == BeaconStateFork.Bellatrix
let curBh =
blck.blck.bellatrixData.message.body.execution_payload.block_hash
if curBh != lastFcHead:
lastFcHead = curBh
if await self.runForkchoiceUpdated(
curBh,
self.consensusManager.dag.finalizedHead.blck.executionBlockRoot):
# Geth seldom seems to return VALID to newPayload alone, even when
# it has all the relevant information.
self[].processBlock(blck)
continue
if executionPayloadStatus != PayloadExecutionStatus.valid:
if not blck.resfut.isNil: if not blck.resfut.isNil:
blck.resfut.complete(Result[void, BlockError].err( blck.resfut.complete(
BlockError.MissingParent)) Result[void, BlockError].err(BlockError.MissingParent))
continue
# When newPayload, rather than forkchoiceUpdated, has returned valid.
doAssert executionPayloadStatus == PayloadExecutionStatus.valid
self[].processBlock(blck)

View File

@ -80,24 +80,124 @@ proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool]
return fut return fut
from eth/async_utils import awaitWithTimeout
from web3/engine_api_types import
ForkchoiceUpdatedResponse, PayloadExecutionStatus, PayloadStatusV1
func `$`(h: BlockHash): string = $h.asEth2Digest
proc runForkchoiceUpdated(
eth1Monitor: Eth1Monitor, headBlockRoot, finalizedBlockRoot: Eth2Digest):
Future[PayloadExecutionStatus] {.async.} =
# Allow finalizedBlockRoot to be 0 to avoid sync deadlocks.
#
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events
# has "Before the first finalized block occurs in the system the finalized
# block hash provided by this event is stubbed with
# `0x0000000000000000000000000000000000000000000000000000000000000000`."
# and
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/validator.md#executionpayload
# notes "`finalized_block_hash` is the hash of the latest finalized execution
# payload (`Hash32()` if none yet finalized)"
doAssert not headBlockRoot.isZero
try:
# Minimize window for Eth1 monitor to shut down connection
await eth1Monitor.ensureDataProvider()
let fcuR = awaitWithTimeout(
forkchoiceUpdated(
eth1Monitor, headBlockRoot, finalizedBlockRoot),
FORKCHOICEUPDATED_TIMEOUT):
debug "runForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))
debug "runForkchoiceUpdated: ran forkchoiceUpdated",
headBlockRoot,
finalizedBlockRoot,
payloadStatus = $fcuR.payloadStatus.status,
latestValidHash = $fcuR.payloadStatus.latestValidHash,
validationError = $fcuR.payloadStatus.validationError
return fcuR.payloadStatus.status
except CatchableError as err:
debug "runForkchoiceUpdated: forkchoiceUpdated failed",
err = err.msg
return PayloadExecutionStatus.syncing
proc updateExecutionClientHead(self: ref ConsensusManager, newHead: BlockRef)
{.async.} =
if self.eth1Monitor.isNil:
return
let executionHeadRoot = self.dag.loadExecutionBlockRoot(newHead)
if executionHeadRoot.isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(self.quarantine[], newHead.root)
return
# Can't use dag.head here because it hasn't been updated yet
let
executionFinalizedRoot =
self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck)
payloadExecutionStatus = await self.eth1Monitor.runForkchoiceUpdated(
executionHeadRoot, executionFinalizedRoot)
case payloadExecutionStatus
of PayloadExecutionStatus.valid:
self.dag.markBlockVerified(self.quarantine[], newHead.root)
of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash:
self.dag.markBlockInvalid(newHead.root)
self.quarantine[].addUnviable(newHead.root)
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
self.dag.optimisticRoots.incl newHead.root
proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
## Trigger fork choice and update the DAG with the new head block ## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization ## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning. ## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data # Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectHead( let newHead = self.attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time).valueOr: wallSlot.start_beacon_time).valueOr:
warn "Head selection failed, using previous head", warn "Head selection failed, using previous head",
head = shortLog(self.dag.head), wallSlot head = shortLog(self.dag.head), wallSlot
return return
if self.dag.loadExecutionBlockRoot(newHead).isZero:
# Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(self.quarantine[], newHead.root)
# Store the new head in the chain DAG - this may cause epochs to be # Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized # justified and finalized
self.dag.updateHead(newHead, self.quarantine[]) self.dag.updateHead(newHead, self.quarantine[])
self.checkExpectedBlock() self.checkExpectedBlock()
proc updateHeadWithExecution*(self: ref ConsensusManager, wallSlot: Slot)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time).valueOr:
warn "Head selection failed, using previous head",
head = shortLog(self.dag.head), wallSlot
return
# Ensure dag.updateHead has most current information
await self.updateExecutionClientHead(newHead)
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])
self[].checkExpectedBlock()
proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) = proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) =
## Prune unneeded and invalidated data after finalization ## Prune unneeded and invalidated data after finalization
## - the DAG state checkpoints ## - the DAG state checkpoints

View File

@ -241,30 +241,27 @@ proc initFullNode(
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
node.eventBus.exitQueue.emit(data) node.eventBus.exitQueue.emit(data)
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) = proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
# TODO (cheatfate): Proper implementation required
let optimistic = let optimistic =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
some(false) some node.dag.is_optimistic(data.root)
else: else:
none[bool]() none[bool]()
node.eventBus.blocksQueue.emit( node.eventBus.blocksQueue.emit(
EventBeaconBlockObject.init(data, optimistic)) EventBeaconBlockObject.init(data, optimistic))
proc onHeadChanged(data: HeadChangeInfoObject) = proc onHeadChanged(data: HeadChangeInfoObject) =
# TODO (cheatfate): Proper implementation required
let eventData = let eventData =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
var res = data var res = data
res.optimistic = some(false) res.optimistic = some node.dag.is_optimistic(data.block_root)
res res
else: else:
data data
node.eventBus.headQueue.emit(eventData) node.eventBus.headQueue.emit(eventData)
proc onChainReorg(data: ReorgInfoObject) = proc onChainReorg(data: ReorgInfoObject) =
# TODO (cheatfate): Proper implementation required
let eventData = let eventData =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
var res = data var res = data
res.optimistic = some(false) res.optimistic = some node.dag.is_optimistic(data.new_head_block)
res res
else: else:
data data
@ -282,11 +279,10 @@ proc initFullNode(
finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index) finalizedEpochRef.eth1_deposit_index)
node.updateLightClientFromDag() node.updateLightClientFromDag()
# TODO (cheatfate): Proper implementation required
let eventData = let eventData =
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
var res = data var res = data
res.optimistic = some(false) res.optimistic = some node.dag.is_optimistic(data.block_root)
res res
else: else:
data data
@ -322,7 +318,8 @@ proc initFullNode(
dag, attestationPool, quarantine, node.eth1Monitor) dag, attestationPool, quarantine, node.eth1Monitor)
blockProcessor = BlockProcessor.new( blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime) rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime,
config.safeSlotsToImportOptimistically)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock): blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] = Future[Result[void, BlockError]] =
# The design with a callback for block verification is unusual compared # The design with a callback for block verification is unusual compared
@ -1287,6 +1284,8 @@ func syncStatus(node: BeaconNode): string =
node.syncManager.syncStatus node.syncManager.syncStatus
elif node.backfiller.inProgress: elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus "backfill: " & node.backfiller.syncStatus
elif node.dag.is_optimistic(node.dag.head.root):
"opt synced"
else: else:
"synced" "synced"

View File

@ -264,8 +264,7 @@ proc installNodeApiHandlers*(router: var RestRouter, node: BeaconNode) =
node.syncManager.inProgress node.syncManager.inProgress
isOptimistic = isOptimistic =
if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH:
# TODO (cheatfate): Proper implementation required some(node.dag.is_optimistic(node.dag.head.root))
some(false)
else: else:
none[bool]() none[bool]()

View File

@ -279,8 +279,15 @@ proc getStateOptimistic*(node: BeaconNode,
of BeaconStateFork.Phase0, BeaconStateFork.Altair: of BeaconStateFork.Phase0, BeaconStateFork.Altair:
some[bool](false) some[bool](false)
of BeaconStateFork.Bellatrix: of BeaconStateFork.Bellatrix:
# TODO (cheatfate): Proper implementation required. # A state is optimistic iff the block which created it is
some[bool](false) withState(state):
# The block root which created the state at slot `n` is at slot `n-1`
if state.data.slot == GENESIS_SLOT:
some[bool](false)
else:
doAssert state.data.slot > 0
some[bool](node.dag.is_optimistic(
get_block_root_at_slot(state.data, state.data.slot - 1)))
else: else:
none[bool]() none[bool]()
@ -292,8 +299,7 @@ proc getBlockOptimistic*(node: BeaconNode,
of BeaconBlockFork.Phase0, BeaconBlockFork.Altair: of BeaconBlockFork.Phase0, BeaconBlockFork.Altair:
some[bool](false) some[bool](false)
of BeaconBlockFork.Bellatrix: of BeaconBlockFork.Bellatrix:
# TODO (cheatfate): Proper implementation required. some[bool](node.dag.is_optimistic(blck.root))
some[bool](false)
else: else:
none[bool]() none[bool]()
@ -303,8 +309,7 @@ proc getBlockRefOptimistic*(node: BeaconNode, blck: BlockRef): bool =
of BeaconBlockFork.Phase0, BeaconBlockFork.Altair: of BeaconBlockFork.Phase0, BeaconBlockFork.Altair:
false false
of BeaconBlockFork.Bellatrix: of BeaconBlockFork.Bellatrix:
# TODO (cheatfate): Proper implementation required. node.dag.is_optimistic(blck.root)
false
const const
jsonMediaType* = MediaType.init("application/json") jsonMediaType* = MediaType.init("application/json")

View File

@ -97,7 +97,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
) )
res res
# TODO (cheatfate): Proper implementation required # getSyncedHead() implies non-optimistic node.
let optimistic = let optimistic =
if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH:
some(false) some(false)
@ -151,7 +151,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
) )
res res
# TODO (cheatfate): Proper implementation required # getSyncedHead() implies non-optimistic node.
let optimistic = let optimistic =
if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH:
some(false) some(false)

View File

@ -268,21 +268,27 @@ template toString*(kind: BeaconStateFork): string =
"bellatrix" "bellatrix"
template toFork*[T: template toFork*[T:
phase0.BeaconBlock |
phase0.SignedBeaconBlock | phase0.SignedBeaconBlock |
phase0.TrustedBeaconBlock |
phase0.SigVerifiedSignedBeaconBlock | phase0.SigVerifiedSignedBeaconBlock |
phase0.MsgTrustedSignedBeaconBlock | phase0.MsgTrustedSignedBeaconBlock |
phase0.TrustedSignedBeaconBlock]( phase0.TrustedSignedBeaconBlock](
t: type T): BeaconBlockFork = t: type T): BeaconBlockFork =
BeaconBlockFork.Phase0 BeaconBlockFork.Phase0
template toFork*[T: template toFork*[T:
altair.BeaconBlock |
altair.SignedBeaconBlock | altair.SignedBeaconBlock |
altair.TrustedBeaconBlock |
altair.SigVerifiedSignedBeaconBlock | altair.SigVerifiedSignedBeaconBlock |
altair.MsgTrustedSignedBeaconBlock | altair.MsgTrustedSignedBeaconBlock |
altair.TrustedSignedBeaconBlock]( altair.TrustedSignedBeaconBlock](
t: type T): BeaconBlockFork = t: type T): BeaconBlockFork =
BeaconBlockFork.Altair BeaconBlockFork.Altair
template toFork*[T: template toFork*[T:
bellatrix.BeaconBlock |
bellatrix.SignedBeaconBlock | bellatrix.SignedBeaconBlock |
bellatrix.TrustedBeaconBlock |
bellatrix.SigVerifiedSignedBeaconBlock | bellatrix.SigVerifiedSignedBeaconBlock |
bellatrix.MsgTrustedSignedBeaconBlock | bellatrix.MsgTrustedSignedBeaconBlock |
bellatrix.TrustedSignedBeaconBlock]( bellatrix.TrustedSignedBeaconBlock](

View File

@ -288,11 +288,13 @@ func is_merge_transition_complete*(state: bellatrix.BeaconState): bool =
state.latest_execution_payload_header != defaultExecutionPayloadHeader state.latest_execution_payload_header != defaultExecutionPayloadHeader
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers
func is_execution_block*( func is_execution_block*(blck: SomeForkyBeaconBlock): bool =
body: bellatrix.BeaconBlockBody | bellatrix.TrustedBeaconBlockBody | when typeof(blck).toFork >= BeaconBlockFork.Bellatrix:
bellatrix.SigVerifiedBeaconBlockBody): bool = const defaultExecutionPayload =
const defaultBellatrixExecutionPayload = default(bellatrix.ExecutionPayload) default(typeof(blck.body.execution_payload))
body.execution_payload != defaultBellatrixExecutionPayload blck.body.execution_payload != defaultExecutionPayload
else:
false
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_block # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_block
func is_merge_transition_block( func is_merge_transition_block(

View File

@ -55,7 +55,7 @@ proc reportOptimisticCandidateBlock(optSync: LCOptimisticSync) {.gcsafe.} =
optSync.finalizedIsExecutionBlock = optSync.finalizedIsExecutionBlock =
withBlck(finalizedBlock.get): withBlck(finalizedBlock.get):
when stateFork >= BeaconStateFork.Bellatrix: when stateFork >= BeaconStateFork.Bellatrix:
some blck.message.body.is_execution_block() some blck.message.is_execution_block()
else: else:
some false some false

View File

@ -299,11 +299,13 @@ p2pProtocol BeaconSync(version = 1,
if startSlot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: if startSlot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH:
# "Clients MAY limit the number of blocks in the response." # "Clients MAY limit the number of blocks in the response."
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#beaconblocksbyrange
debug "Block range v1 request for post-altair range", debug "Block range v1 request for post-altair range",
peer, startSlot, reqCount, reqStep peer, startSlot, reqCount, reqStep
return return
# Phase 0 blocks are never optimistic.
var blocks: array[MAX_REQUEST_BLOCKS, BlockId] var blocks: array[MAX_REQUEST_BLOCKS, BlockId]
let let
@ -387,12 +389,14 @@ p2pProtocol BeaconSync(version = 1,
if blockRef.slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: if blockRef.slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH:
# Skipping this block should be fine because the spec says: # Skipping this block should be fine because the spec says:
# "Clients MAY limit the number of blocks in the response." # "Clients MAY limit the number of blocks in the response."
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#beaconblocksbyroot
# #
# Also, our response would be indistinguishable from a node # Also, our response would be indistinguishable from a node
# that have been synced exactly to the altair transition slot. # that have been synced exactly to the altair transition slot.
continue continue
# Phase 0 blocks are never optimistic.
if dag.getBlockSZ(blockRef.bid, bytes): if dag.getBlockSZ(blockRef.bid, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr: let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?", warn "Cannot read block size, database corrupt?",
@ -454,6 +458,13 @@ p2pProtocol BeaconSync(version = 1,
for i in startIndex..endIndex: for i in startIndex..endIndex:
if dag.getBlockSZ(blocks[i], bytes): if dag.getBlockSZ(blocks[i], bytes):
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.
if blocks[i].slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
dag.is_optimistic(dag.head.root):
continue
let uncompressedLen = uncompressedLenFramed(bytes).valueOr: let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?", warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blocks[i]) bytes = bytes.len(), blck = shortLog(blocks[i])
@ -510,6 +521,13 @@ p2pProtocol BeaconSync(version = 1,
continue continue
if dag.getBlockSZ(blockRef.bid, bytes): if dag.getBlockSZ(blockRef.bid, bytes):
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.
if blockRef.slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
dag.is_optimistic(dag.head.root):
continue
let uncompressedLen = uncompressedLenFramed(bytes).valueOr: let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?", warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef) bytes = bytes.len(), blck = shortLog(blockRef)

View File

@ -207,10 +207,11 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
# TODO if everyone follows this logic, the network will not recover from a # TODO if everyone follows this logic, the network will not recover from a
# halt: nobody will be producing blocks because everone expects someone # halt: nobody will be producing blocks because everone expects someone
# else to do it # else to do it
if wallSlot.afterGenesis and head.slot + node.config.syncHorizon < wallSlot.slot: if wallSlot.afterGenesis and
head.slot + node.config.syncHorizon < wallSlot.slot:
false false
else: else:
true not node.dag.is_optimistic(head.root)
func isGoodForSending(validationResult: ValidationRes): bool = func isGoodForSending(validationResult: ValidationRes): bool =
# Validator clients such as Vouch can be configured to work with multiple # Validator clients such as Vouch can be configured to work with multiple
@ -602,12 +603,14 @@ proc getExecutionPayload(
node.eth1Monitor.terminalBlockHash.get.asEth2Digest node.eth1Monitor.terminalBlockHash.get.asEth2Digest
else: else:
default(Eth2Digest) default(Eth2Digest)
executionBlockRoot = node.dag.loadExecutionBlockRoot(node.dag.head)
latestHead = latestHead =
if not node.dag.head.executionBlockRoot.isZero: if not executionBlockRoot.isZero:
node.dag.head.executionBlockRoot executionBlockRoot
else: else:
terminalBlockHash terminalBlockHash
latestFinalized = node.dag.finalizedHead.blck.executionBlockRoot latestFinalized =
node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck)
payload_id = (await forkchoice_updated( payload_id = (await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized, proposalState.bellatrixData.data, latestHead, latestFinalized,
node.getSuggestedFeeRecipient(pubkey), node.getSuggestedFeeRecipient(pubkey),
@ -806,8 +809,8 @@ proc proposeBlock(node: BeaconNode,
# storeBlock puts the block in the chaindag, and if accepted, takes care # storeBlock puts the block in the chaindag, and if accepted, takes care
# of side effects such as event api notification # of side effects such as event api notification
newBlockRef = node.blockProcessor[].storeBlock( newBlockRef = await node.blockProcessor.storeBlock(
MsgSource.api, wallTime, signedBlock) MsgSource.api, wallTime, signedBlock, true)
if newBlockRef.isErr: if newBlockRef.isErr:
warn "Unable to add proposed block to block pool", warn "Unable to add proposed block to block pool",
@ -1512,8 +1515,8 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
let let
wallTime = node.beaconClock.now() wallTime = node.beaconClock.now()
accepted = withBlck(forked): accepted = withBlck(forked):
let newBlockRef = node.blockProcessor[].storeBlock( let newBlockRef = await node.blockProcessor.storeBlock(
MsgSource.api, wallTime, blck) MsgSource.api, wallTime, blck, payloadValid = true)
# The boolean we return tells the caller whether the block was integrated # The boolean we return tells the caller whether the block was integrated
# into the chain # into the chain

View File

@ -414,7 +414,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice( pool[].addForkChoice(
epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time)
let head = pool[].selectHead(b1Add[].slot.start_beacon_time).get() let head = pool[].selectOptimisticHead(b1Add[].slot.start_beacon_time).get()
check: check:
head == b1Add[] head == b1Add[]
@ -427,7 +427,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice( pool[].addForkChoice(
epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time)
let head2 = pool[].selectHead(b2Add[].slot.start_beacon_time).get() let head2 = pool[].selectOptimisticHead(b2Add[].slot.start_beacon_time).get()
check: check:
head2 == b2Add[] head2 == b2Add[]
@ -443,7 +443,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice( pool[].addForkChoice(
epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time)
let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get()
check: check:
head == b10Add[] head == b10Add[]
@ -471,7 +471,7 @@ suite "Attestation pool processing" & preset():
attestation0, @[bc1[0]], attestation0.loadSig, attestation0, @[bc1[0]], attestation0.loadSig,
attestation0.data.slot.start_beacon_time) attestation0.data.slot.start_beacon_time)
let head2 = pool[].selectHead(b10Add[].slot.start_beacon_time).get() let head2 = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get()
check: check:
# Single vote for b10 and no votes for b11 # Single vote for b10 and no votes for b11
@ -484,7 +484,7 @@ suite "Attestation pool processing" & preset():
attestation1, @[bc1[1]], attestation1.loadSig, attestation1, @[bc1[1]], attestation1.loadSig,
attestation1.data.slot.start_beacon_time) attestation1.data.slot.start_beacon_time)
let head3 = pool[].selectHead(b10Add[].slot.start_beacon_time).get() let head3 = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get()
let bigger = if b11.root.data < b10.root.data: b10Add else: b11Add let bigger = if b11.root.data < b10.root.data: b10Add else: b11Add
check: check:
@ -495,7 +495,7 @@ suite "Attestation pool processing" & preset():
attestation2, @[bc1[2]], attestation2.loadSig, attestation2, @[bc1[2]], attestation2.loadSig,
attestation2.data.slot.start_beacon_time) attestation2.data.slot.start_beacon_time)
let head4 = pool[].selectHead(b11Add[].slot.start_beacon_time).get() let head4 = pool[].selectOptimisticHead(b11Add[].slot.start_beacon_time).get()
check: check:
# Two votes for b11 # Two votes for b11
@ -512,7 +512,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice(epochRef, blckRef, signedBlock.message, pool[].addForkChoice(epochRef, blckRef, signedBlock.message,
blckRef.slot.start_beacon_time) blckRef.slot.start_beacon_time)
let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get()
check: check:
head == b10Add[] head == b10Add[]
@ -543,7 +543,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice( pool[].addForkChoice(
epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time)
let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get()
doAssert: head == b10Add[] doAssert: head == b10Add[]
@ -569,7 +569,7 @@ suite "Attestation pool processing" & preset():
pool[].addForkChoice( pool[].addForkChoice(
epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time)
let head = pool[].selectHead(blockRef[].slot.start_beacon_time).get() let head = pool[].selectOptimisticHead(blockRef[].slot.start_beacon_time).get()
doAssert: head == blockRef[] doAssert: head == blockRef[]
dag.updateHead(head, quarantine[]) dag.updateHead(head, quarantine[])
pruneAtFinalization(dag, pool[]) pruneAtFinalization(dag, pool[])

View File

@ -44,11 +44,11 @@ suite "Block processor" & preset():
getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time() getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time()
processor = BlockProcessor.new( processor = BlockProcessor.new(
false, "", "", keys.newRng(), taskpool, consensusManager, false, "", "", keys.newRng(), taskpool, consensusManager,
validatorMonitor, getTimeFn) validatorMonitor, getTimeFn, safeSlotsToImportOptimistically = 128)
test "Reverse order block add & get" & preset(): test "Reverse order block add & get" & preset():
let missing = processor[].storeBlock( let missing = waitFor processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2) MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, payloadValid = true)
check: missing.error == BlockError.MissingParent check: missing.error == BlockError.MissingParent
check: check:
@ -57,8 +57,8 @@ suite "Block processor" & preset():
FetchRecord(root: b1.root) in quarantine[].checkMissing() FetchRecord(root: b1.root) in quarantine[].checkMissing()
let let
status = processor[].storeBlock( status = waitFor processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1) MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, payloadValid = true)
b1Get = dag.getBlockRef(b1.root) b1Get = dag.getBlockRef(b1.root)
check: check: