From 1221bb66e8cc62748e169f0936204ac3b0857517 Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 4 Jul 2022 20:35:33 +0000 Subject: [PATCH] optimistic sync (#3793) * optimistic sync * flag that initially loaded blocks from database might need execution block root filled in * return optimistic status in REST calls * refactor blockslot pruning * ensure beacon_blocks_by_{root,range} do not provide optimistic blocks * handle forkchoice head being pre-merge with block being postmerge * re-enable blocking head updates on validator duties * fix is_optimistic_candidate_block per spec; don't crash with nil future * fix is_optimistic_candidate_block per spec; don't crash with nil future * mark blocks sans execution payloads valid during head update --- .../attestation_pool.nim | 4 +- .../block_clearance.nim | 16 + .../consensus_object_pools/block_dag.nim | 11 +- .../block_pools_types.nim | 6 +- .../consensus_object_pools/blockchain_dag.nim | 130 ++++++- .../gossip_processing/block_processor.nim | 322 ++++++++---------- .../gossip_processing/consensus_manager.nim | 102 +++++- beacon_chain/nimbus_beacon_node.nim | 17 +- beacon_chain/rpc/rest_node_api.nim | 3 +- beacon_chain/rpc/rest_utils.nim | 17 +- beacon_chain/rpc/rest_validator_api.nim | 4 +- beacon_chain/spec/forks.nim | 6 + beacon_chain/spec/helpers.nim | 12 +- .../sync/optimistic_sync_light_client.nim | 2 +- beacon_chain/sync/sync_protocol.nim | 22 +- beacon_chain/validators/validator_duties.nim | 21 +- tests/test_attestation_pool.nim | 18 +- tests/test_block_processor.nim | 10 +- 18 files changed, 470 insertions(+), 253 deletions(-) diff --git a/beacon_chain/consensus_object_pools/attestation_pool.nim b/beacon_chain/consensus_object_pools/attestation_pool.nim index eb08a52cc..a1a7ff9c0 100644 --- a/beacon_chain/consensus_object_pools/attestation_pool.nim +++ b/beacon_chain/consensus_object_pools/attestation_pool.nim @@ -723,9 +723,11 @@ func getAggregatedAttestation*(pool: var AttestationPool, res -proc selectHead*(pool: var AttestationPool, wallTime: BeaconTime): Opt[BlockRef] = +proc selectOptimisticHead*( + pool: var AttestationPool, wallTime: BeaconTime): Opt[BlockRef] = ## Trigger fork choice and returns the new head block. ## Can return `nil` + # TODO rename this to get_optimistic_head let newHead = pool.forkChoice.get_head(pool.dag, wallTime) if newHead.isErr: diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index bdf06aea9..52f9656c3 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -29,6 +29,7 @@ proc addResolvedHeadBlock( dag: ChainDAGRef, state: var ForkedHashedBeaconState, trustedBlock: ForkyTrustedSignedBeaconBlock, + blockVerified: bool, parent: BlockRef, cache: var StateCache, onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded, stateDataDur, sigVerifyDur, stateVerifyDur: Duration @@ -73,6 +74,7 @@ proc addResolvedHeadBlock( debug "Block resolved", blockRoot = shortLog(blockRoot), blck = shortLog(trustedBlock.message), + blockVerified, heads = dag.heads.len(), stateDataDur, sigVerifyDur, stateVerifyDur, putBlockDur = putBlockTick - startTick, @@ -81,6 +83,9 @@ proc addResolvedHeadBlock( # Update light client data dag.processNewBlockForLightClient(state, trustedBlock, parent.bid) + if not blockVerified: + dag.optimisticRoots.incl blockRoot + # Notify others of the new block before processing the quarantine, such that # notifications for parents happens before those of the children if onBlockAdded != nil: @@ -136,6 +141,7 @@ proc advanceClearanceState*(dag: ChainDAGRef) = proc addHeadBlock*( dag: ChainDAGRef, verifier: var BatchVerifier, signedBlock: ForkySignedBeaconBlock, + blockVerified: bool, onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnBellatrixBlockAdded ): Result[BlockRef, BlockError] = @@ -256,12 +262,22 @@ proc addHeadBlock*( ok addResolvedHeadBlock( dag, dag.clearanceState, signedBlock.asTrusted(), + blockVerified = blockVerified, parent, cache, onBlockAdded, stateDataDur = stateDataTick - startTick, sigVerifyDur = sigVerifyTick - stateDataTick, stateVerifyDur = stateVerifyTick - sigVerifyTick) +proc addHeadBlock*( + dag: ChainDAGRef, verifier: var BatchVerifier, + signedBlock: ForkySignedBeaconBlock, + onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | + OnBellatrixBlockAdded + ): Result[BlockRef, BlockError] = + addHeadBlock( + dag, verifier, signedBlock, blockVerified = true, onBlockAdded) + proc addBackfillBlock*( dag: ChainDAGRef, signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = diff --git a/beacon_chain/consensus_object_pools/block_dag.nim b/beacon_chain/consensus_object_pools/block_dag.nim index fafe5d75f..821454fb6 100644 --- a/beacon_chain/consensus_object_pools/block_dag.nim +++ b/beacon_chain/consensus_object_pools/block_dag.nim @@ -8,6 +8,7 @@ {.push raises: [Defect].} import + std/options, chronicles, ../spec/datatypes/[phase0, altair, bellatrix], ../spec/forks @@ -31,7 +32,7 @@ type bid*: BlockId ##\ ## Root that can be used to retrieve block data from database - executionBlockRoot*: Eth2Digest + executionBlockRoot*: Option[Eth2Digest] parent*: BlockRef ##\ ## Not nil, except for the finalized head @@ -50,8 +51,8 @@ template root*(blck: BlockRef): Eth2Digest = blck.bid.root template slot*(blck: BlockRef): Slot = blck.bid.slot func init*( - T: type BlockRef, root: Eth2Digest, executionPayloadRoot: Eth2Digest, - slot: Slot): BlockRef = + T: type BlockRef, root: Eth2Digest, + executionPayloadRoot: Option[Eth2Digest], slot: Slot): BlockRef = BlockRef( bid: BlockId(root: root, slot: slot), executionBlockRoot: executionPayloadRoot, @@ -61,13 +62,13 @@ func init*( T: type BlockRef, root: Eth2Digest, blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock | phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef = - BlockRef.init(root, ZERO_HASH, blck.slot) + BlockRef.init(root, some ZERO_HASH, blck.slot) func init*( T: type BlockRef, root: Eth2Digest, blck: bellatrix.SomeBeaconBlock | bellatrix.TrustedBeaconBlock): BlockRef = BlockRef.init( - root, Eth2Digest(blck.body.execution_payload.block_hash), blck.slot) + root, some Eth2Digest(blck.body.execution_payload.block_hash), blck.slot) func parent*(bs: BlockSlot): BlockSlot = ## Return a blockslot representing the previous slot, using the parent block diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index df829ed40..df42a64b0 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -141,7 +141,8 @@ type ## in the case where an earlier genesis block exists. head*: BlockRef - ## The most recently known head, as chosen by fork choice + ## The most recently known head, as chosen by fork choice; might be + ## optimistic backfill*: BeaconBlockSummary ## The backfill points to the oldest block with an unbroken ancestry from @@ -226,6 +227,9 @@ type ## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync ## committee messages will be rejected + optimisticRoots*: HashSet[Eth2Digest] + ## https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers + EpochKey* = object ## The epoch key fully determines the shuffling for proposers and ## committees in a beacon state - the epoch level information in the state diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 874164e61..e9d6df98a 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -165,12 +165,11 @@ func init*( attester_dependent_root: attester_dependent_root, merge_transition_complete: case state.kind: - of BeaconStateFork.Phase0: false - of BeaconStateFork.Altair: false + of BeaconStateFork.Phase0, BeaconStateFork.Altair: false of BeaconStateFork.Bellatrix: # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_complete state.bellatrixData.data.latest_execution_payload_header != - ExecutionPayloadHeader() + (static(ExecutionPayloadHeader())) ) epochStart = epoch.start_slot() @@ -747,7 +746,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # Load head -> finalized, or all summaries in case the finalized block table # hasn't been written yet for blck in db.getAncestorSummaries(head.root): - let newRef = BlockRef.init(blck.root, ZERO_HASH, blck.summary.slot) + # The execution block root gets filled in as needed + let newRef = BlockRef.init(blck.root, none Eth2Digest, blck.summary.slot) if headRef == nil: doAssert blck.root == head.root headRef = newRef @@ -950,6 +950,17 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, dag.initLightClientDataCache() + # If these aren't actually optimistic, the first fcU will resolve that + withState(dag.headState): + when stateFork >= BeaconStateFork.Bellatrix: + template executionPayloadHeader(): auto = + state().data.latest_execution_payload_header + const emptyExecutionPayloadHeader = + default(type(executionPayloadHeader)) + if executionPayloadHeader != emptyExecutionPayloadHeader: + dag.optimisticRoots.incl dag.head.root + dag.optimisticRoots.incl dag.finalizedHead.blck.root + dag template genesis_validators_root*(dag: ChainDAGRef): Eth2Digest = @@ -1341,6 +1352,18 @@ proc delState(dag: ChainDAGRef, bsi: BlockSlotId) = dag.db.delState(root.get()) dag.db.delStateRoot(bsi.bid.root, bsi.slot) +proc pruneBlockSlot(dag: ChainDAGRef, bs: BlockSlot) = + # TODO: should we move that disk I/O to `onSlotEnd` + dag.delState(bs.toBlockSlotId().expect("not nil")) + + if bs.isProposed(): + # Update light client data + dag.deleteLightClientData(bs.blck.bid) + + dag.optimisticRoots.excl bs.blck.root + dag.forkBlocks.excl(KeyedBlockRef.init(bs.blck)) + dag.db.delBlock(bs.blck.root) + proc pruneBlocksDAG(dag: ChainDAGRef) = ## This prunes the block DAG ## This does NOT prune the cached state checkpoints and EpochRef @@ -1375,16 +1398,7 @@ proc pruneBlocksDAG(dag: ChainDAGRef) = "finalizedHead parent should have been pruned from memory already" while cur.blck.parent != nil: - # TODO: should we move that disk I/O to `onSlotEnd` - dag.delState(cur.toBlockSlotId().expect("not nil")) - - if cur.isProposed(): - # Update light client data - dag.deleteLightClientData(cur.blck.bid) - - dag.forkBlocks.excl(KeyedBlockRef.init(cur.blck)) - dag.db.delBlock(cur.blck.root) - + dag.pruneBlockSlot(cur) cur = cur.parentOrSlot dag.heads.del(n) @@ -1394,6 +1408,63 @@ proc pruneBlocksDAG(dag: ChainDAGRef) = prunedHeads = hlen - dag.heads.len, dagPruneDur = Moment.now() - startTick +# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers +template is_optimistic*(dag: ChainDAGRef, root: Eth2Digest): bool = + root in dag.optimisticRoots + +proc markBlockInvalid*(dag: ChainDAGRef, root: Eth2Digest) = + let blck = dag.getBlockRef(root).valueOr: + return + logScope: blck = shortLog(blck) + + if not dag.is_optimistic(root): + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#transitioning-from-valid---invalidated-or-invalidated---valid + # "It is outside of the scope of the specification since it's only possible + # with a faulty EE. Such a scenario requires manual intervention." + warn "markBlockInvalid: attempt to invalidate valid block" + doAssert verifyFinalization notin dag.updateFlags + return + + if root == dag.finalizedHead.blck.root: + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#re-orgs + # "If the justified checkpoint transitions from `NOT_VALIDATED` -> + # `INVALIDATED`, a consensus engine MAY choose to alert the user and force + # the application to exit." + # + # But be slightly less aggressive, and only check finalized. + warn "markBlockInvalid: finalized block invalidated" + doAssert verifyFinalization notin dag.updateFlags + return + + debug "markBlockInvalid" + dag.pruneBlockSlot(blck.atSlot()) + +proc markBlockVerified*( + dag: ChainDAGRef, quarantine: var Quarantine, root: Eth2Digest) = + # Might be called when block was not optimistic to begin with, or had been + # but already had been marked verified. + if not dag.is_optimistic(root): + return + + var cur = dag.getBlockRef(root).valueOr: + return + logScope: blck = shortLog(cur) + + debug "markBlockVerified" + + while true: + if not dag.is_optimistic(cur.bid.root): + return + + dag.optimisticRoots.excl cur.bid.root + + debug "markBlockVerified ancestor" + + if cur.parent.isNil: + break + + cur = cur.parent + iterator syncSubcommittee*( syncCommittee: openArray[ValidatorIndex], subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex = @@ -1533,6 +1604,27 @@ template getHeadStateMergeComplete*(dag: ChainDAGRef): bool = else: false +proc loadExecutionBlockRoot*(dag: ChainDAGRef, blck: BlockRef): Eth2Digest = + if dag.cfg.blockForkAtEpoch(blck.bid.slot.epoch) < BeaconBlockFork.Bellatrix: + return ZERO_HASH + + if blck.executionBlockRoot.isSome: + return blck.executionBlockRoot.get + + let blockData = dag.getForkedBlock(blck.bid).valueOr: + blck.executionBlockRoot = some ZERO_HASH + return ZERO_HASH + + let executionBlockRoot = + withBlck(blockData): + when stateFork >= BeaconStateFork.Bellatrix: + blck.message.body.execution_payload.block_hash + else: + ZERO_HASH + blck.executionBlockRoot = some executionBlockRoot + + executionBlockRoot + proc updateHead*( dag: ChainDAGRef, newHead: BlockRef, @@ -1614,7 +1706,8 @@ proc updateHead*( stateRoot = shortLog(getStateRoot(dag.headState)), justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), - finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) + finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)), + isOptHead = dag.is_optimistic(newHead.root) if not(isNil(dag.onReorgHappened)): let @@ -1635,7 +1728,8 @@ proc updateHead*( stateRoot = shortLog(getStateRoot(dag.headState)), justified = shortLog(getStateField( dag.headState, current_justified_checkpoint)), - finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)) + finalized = shortLog(getStateField(dag.headState, finalized_checkpoint)), + isOptHead = dag.is_optimistic(newHead.root) if not(isNil(dag.onHeadChanged)): let @@ -1685,8 +1779,8 @@ proc updateHead*( dag.db.updateFinalizedBlocks(newFinalized) - if oldFinalizedHead.blck.executionBlockRoot.isZero and - not dag.finalizedHead.blck.executionBlockRoot.isZero: + if dag.loadExecutionBlockRoot(oldFinalizedHead.blck).isZero and + not dag.loadExecutionBlockRoot(dag.finalizedHead.blck).isZero: dag.vanityLogs.onFinalizedMergeTransitionBlock() # Pruning the block dag is required every time the finalized head changes diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index eccf621f6..eeb1a10ff 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -8,21 +8,22 @@ {.push raises: [Defect].} import - std/math, stew/results, chronicles, chronos, metrics, - eth/async_utils, - web3/engine_api_types, - ../spec/datatypes/[phase0, altair, bellatrix], - ../spec/[forks, signatures_batch], - ../consensus_object_pools/[ - attestation_pool, block_clearance, blockchain_dag, block_quarantine, - spec_cache], - ../eth1/eth1_monitor, - ./consensus_manager, - ../beacon_clock, + ../spec/signatures_batch, ../sszdump +from ./consensus_manager import + ConsensusManager, updateHead, updateHeadWithExecution +from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds +from ../consensus_object_pools/block_dag import BlockRef, root, slot +from ../consensus_object_pools/block_pools_types import BlockError, EpochRef +from ../consensus_object_pools/block_quarantine import + addOrphan, addUnviable, pop, removeOrphan +from ../validators/validator_monitor import + MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock, + registerSyncAggregateInBlock + export sszdump, signatures_batch # Block Processor @@ -63,6 +64,7 @@ type dumpEnabled: bool dumpDirInvalid: string dumpDirIncoming: string + safeSlotsToImportOptimistically: uint16 # Producers # ---------------------------------------------------------------- @@ -92,7 +94,8 @@ proc new*(T: type BlockProcessor, rng: ref HmacDrbgContext, taskpool: TaskPoolPtr, consensusManager: ref ConsensusManager, validatorMonitor: ref ValidatorMonitor, - getBeaconTime: GetBeaconTimeFn): ref BlockProcessor = + getBeaconTime: GetBeaconTimeFn, + safeSlotsToImportOptimistically: uint16): ref BlockProcessor = (ref BlockProcessor)( dumpEnabled: dumpEnabled, dumpDirInvalid: dumpDirInvalid, @@ -101,6 +104,7 @@ proc new*(T: type BlockProcessor, consensusManager: consensusManager, validatorMonitor: validatorMonitor, getBeaconTime: getBeaconTime, + safeSlotsToImportOptimistically: safeSlotsToImportOptimistically, verifier: BatchVerifier(rng: rng, taskpool: taskpool) ) @@ -131,6 +135,9 @@ proc dumpBlock[T]( else: discard +from ../consensus_object_pools/block_clearance import + addBackfillBlock, addHeadBlock + proc storeBackfillBlock( self: var BlockProcessor, signedBlock: ForkySignedBeaconBlock): Result[void, BlockError] = @@ -157,11 +164,17 @@ proc storeBackfillBlock( res +from ../consensus_object_pools/attestation_pool import addForkChoice +from ../consensus_object_pools/spec_cache import get_attesting_indices +from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock + proc storeBlock*( - self: var BlockProcessor, + self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, - signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(), - validationDur = Duration()): Result[BlockRef, BlockError] = + signedBlock: ForkySignedBeaconBlock, payloadValid: bool, + queueTick: Moment = Moment.now(), + validationDur = Duration()): + Future[Result[BlockRef, BlockError]] {.async.} = ## storeBlock is the main entry point for unvalidated blocks - all untrusted ## blocks, regardless of origin, pass through here. When storing a block, ## we will add it to the dag and pass it to all block consumers that need @@ -182,7 +195,7 @@ proc storeBlock*( self.consensusManager.quarantine[].removeOrphan(signedBlock) type Trusted = typeof signedBlock.asTrusted() - let blck = dag.addHeadBlock(self.verifier, signedBlock) do ( + let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do ( blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef): # Callback add to fork choice if valid attestationPool[].addForkChoice( @@ -208,7 +221,7 @@ proc storeBlock*( trustedBlock.message.slot, trustedBlock.root, state.data.current_sync_committee.pubkeys.data[i]) - self.dumpBlock(signedBlock, blck) + self[].dumpBlock(signedBlock, blck) # There can be a scenario where we receive a block we already received. # However this block was before the last finalized epoch and so its parent @@ -239,8 +252,23 @@ proc storeBlock*( let storeBlockTick = Moment.now() - # Eagerly update head: the incoming block "should" get selected - self.consensusManager[].updateHead(wallTime.slotOrZero) + # Eagerly update head: the incoming block "should" get selected. + # + # storeBlock gets called from validator_duties, which depends on its not + # blocking progress any longer than necessary, and processBlock here, in + # which case it's fine to await for a while on engine API results. + if not is_execution_block(signedBlock.message): + self.consensusManager[].updateHead(wallTime.slotOrZero) + else: + # This primarily exists to ensure that by the time the DAG updateHead is + # called valid blocks have already been registered as verified. The head + # can lag a slot behind wall clock, complicating detecting synced status + # for validating, otherwise. + # + # TODO have a third version which is fire-and-forget for when it is merge + # but payloadValid is true, i.e. fcU is for EL's benefit, not CL. Current + # behavior adds unnecessary latency to CL event loop. + await self.consensusManager.updateHeadWithExecution(wallTime.slotOrZero) let updateHeadTick = Moment.now() @@ -257,9 +285,9 @@ proc storeBlock*( for quarantined in self.consensusManager.quarantine[].pop(blck.get().root): # Process the blocks that had the newly accepted block as parent - self.addBlock(MsgSource.gossip, quarantined) + self[].addBlock(MsgSource.gossip, quarantined) - blck + return blck # Enqueue # ------------------------------------------------------------------------------ @@ -299,7 +327,9 @@ proc addBlock*( # Event Loop # ------------------------------------------------------------------------------ -proc processBlock(self: var BlockProcessor, entry: BlockEntry) = +proc processBlock( + self: ref BlockProcessor, entry: BlockEntry, payloadValid: bool) + {.async.} = logScope: blockRoot = shortLog(entry.blck.root) @@ -311,59 +341,29 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = error "Processing block before genesis, clock turned back?" quit 1 - let - res = withBlck(entry.blck): - self.storeBlock(entry.src, wallTime, blck, entry.queueTick, entry.validationDur) + let res = withBlck(entry.blck): + await self.storeBlock( + entry.src, wallTime, blck, payloadValid, entry.queueTick, + entry.validationDur) if entry.resfut != nil: entry.resfut.complete( if res.isOk(): Result[void, BlockError].ok() else: Result[void, BlockError].err(res.error())) -func `$`(h: BlockHash): string = $h.asEth2Digest - -proc runForkchoiceUpdated( - self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest): - Future[bool] {.async.} = - # Allow finalizedBlockRoot to be 0 to avoid sync deadlocks. - # - # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events - # has "Before the first finalized block occurs in the system the finalized - # block hash provided by this event is stubbed with - # `0x0000000000000000000000000000000000000000000000000000000000000000`." - # and - # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/validator.md#executionpayload - # notes "`finalized_block_hash` is the hash of the latest finalized execution - # payload (`Hash32()` if none yet finalized)" - doAssert not headBlockRoot.isZero - - try: - # Minimize window for Eth1 monitor to shut down connection - await self.consensusManager.eth1Monitor.ensureDataProvider() - - let fcuR = awaitWithTimeout( - forkchoiceUpdated( - self.consensusManager.eth1Monitor, headBlockRoot, finalizedBlockRoot), - FORKCHOICEUPDATED_TIMEOUT): - debug "runForkChoiceUpdated: forkchoiceUpdated timed out" - default(ForkchoiceUpdatedResponse) - - debug "runForkChoiceUpdated: running forkchoiceUpdated", - headBlockRoot, - finalizedBlockRoot, - payloadStatus = $fcuR.payloadStatus.status, - latestValidHash = $fcuR.payloadStatus.latestValidHash, - validationError = $fcuR.payloadStatus.validationError - - return fcuR.payloadStatus.status == PayloadExecutionStatus.valid - except CatchableError as err: - debug "runForkChoiceUpdated: forkchoiceUpdated failed", - err = err.msg - return false +from eth/async_utils import awaitWithTimeout +from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1 +from ../eth1/eth1_monitor import + Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload +from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock proc newExecutionPayload*( eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload): Future[PayloadExecutionStatus] {.async.} = + if eth1Monitor.isNil: + warn "newPayload: attempting to process execution payload without an Eth1Monitor. Ensure --web3-url setting is correct." + return PayloadExecutionStatus.syncing + debug "newPayload: inserting block into execution engine", parentHash = executionPayload.parent_hash, blockHash = executionPayload.block_hash, @@ -375,14 +375,9 @@ proc newExecutionPayload*( gasUsed = executionPayload.gas_used, timestamp = executionPayload.timestamp, extraDataLen = executionPayload.extra_data.len, - blockHash = executionPayload.block_hash, - baseFeePerGas = executionPayload.base_fee_per_gas, + baseFeePerGas = $executionPayload.base_fee_per_gas, numTransactions = executionPayload.transactions.len - if eth1Monitor.isNil: - info "newPayload: attempting to process execution payload without an Eth1Monitor. Ensure --web3-url setting is correct." - return PayloadExecutionStatus.syncing - try: let payloadResponse = @@ -394,22 +389,40 @@ proc newExecutionPayload*( PayloadStatusV1(status: PayloadExecutionStatus.syncing) payloadStatus = payloadResponse.status + debug "newPayload: succeeded", + parentHash = executionPayload.parent_hash, + blockHash = executionPayload.block_hash, + blockNumber = executionPayload.block_number, + payloadStatus + return payloadStatus except CatchableError as err: debug "newPayload failed", msg = err.msg return PayloadExecutionStatus.syncing +from ../consensus_object_pools/blockchain_dag import + getBlockRef, loadExecutionBlockRoot, markBlockInvalid + +# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers +proc is_optimistic_candidate_block( + self: BlockProcessor, blck: ForkedSignedBeaconBlock): bool = + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#when-to-optimistically-import-blocks + # The current slot (as per the system clock) is at least + # `SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY` ahead of the slot of the block being + # imported. + if blck.slot + self.safeSlotsToImportOptimistically <= + self.getBeaconTime().slotOrZero: + return true + + let + parentRoot = withBlck(blck): blck.message.parent_root + parentBlck = self.consensusManager.dag.getBlockRef(parentRoot).valueOr: + return false + + # The parent of the block has execution enabled. + not self.consensusManager.dag.loadExecutionBlockRoot(parentBlck).isZero + proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = - # Don't want to vacillate between "optimistic" sync and non-optimistic - # sync heads. Relies on runQueueProcessingLoop() being the only place, - # in Nimbus, which does this. - var - optForkchoiceHeadSlot = GENESIS_SLOT # safe default - optForkchoiceHeadRoot: Eth2Digest - - # don't keep spamming same fcU to Geth; might be restarting sync each time - lastFcHead: Eth2Digest - while true: # Cooperative concurrency: one block per loop iteration - because # we run both networking and CPU-heavy things like block processing @@ -426,110 +439,65 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = let blck = await self[].blockQueue.popFirst() - hasExecutionPayload = blck.blck.kind >= BeaconBlockFork.Bellatrix - isExecutionBlock = - hasExecutionPayload and - blck.blck.bellatrixData.message.body.is_execution_block + hasExecutionPayload = + withBlck(blck.blck): blck.message.is_execution_block executionPayloadStatus = - if isExecutionBlock: - # Eth1 syncing is asynchronous from this + if hasExecutionPayload: + # Eth1 syncing is asynchronous from this + # TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome + # should gate this when it works more reliably + # TODO detect have-TTD-but-not-is_execution_block case, and where + # execution payload was non-zero when TTD detection more reliable + when true: + try: + # Minimize window for Eth1 monitor to shut down connection + await self.consensusManager.eth1Monitor.ensureDataProvider() - # TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome - # should gate this when it works more reliably - when true: - try: - # Minimize window for Eth1 monitor to shut down connection - await self.consensusManager.eth1Monitor.ensureDataProvider() + let executionPayload = + withBlck(blck.blck): + when stateFork >= BeaconStateFork.Bellatrix: + blck.message.body.execution_payload + else: + doAssert false + default(bellatrix.ExecutionPayload) # satisfy Nim - await newExecutionPayload( - self.consensusManager.eth1Monitor, - blck.blck.bellatrixData.message.body.execution_payload) - except CatchableError as err: - debug "runQueueProcessingLoop: newPayload failed", - err = err.msg - PayloadExecutionStatus.syncing - else: - debug "runQueueProcessingLoop: got execution payload before TTD" - PayloadExecutionStatus.syncing - else: - # Vacuously - PayloadExecutionStatus.valid + await newExecutionPayload( + self.consensusManager.eth1Monitor, executionPayload) + except CatchableError as err: + info "runQueueProcessingLoop: newPayload failed", + err = err.msg + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#execution-engine-errors + if not blck.resfut.isNil: + blck.resfut.complete( + Result[void, BlockError].err(BlockError.MissingParent)) + continue + else: + debug "runQueueProcessingLoop: got execution payload before TTD" + PayloadExecutionStatus.syncing + else: + # Vacuously + PayloadExecutionStatus.valid - if executionPayloadStatus in [ + if executionPayloadStatus in static([ PayloadExecutionStatus.invalid, - PayloadExecutionStatus.invalid_block_hash]: + PayloadExecutionStatus.invalid_block_hash]): debug "runQueueProcessingLoop: execution payload invalid", - executionPayloadStatus + executionPayloadStatus, + blck = shortLog(blck.blck) + self.consensusManager.dag.markBlockInvalid(blck.blck.root) + self.consensusManager.quarantine[].addUnviable(blck.blck.root) # Every loop iteration ends with some version of blck.resfut.complete(), # including processBlock(), otherwise the sync manager stalls. if not blck.resfut.isNil: blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid)) - continue - - if isExecutionBlock: - # The EL client doesn't know here whether the payload is valid, because, - # for example, in Geth's case, its parent isn't known. When Geth logs an - # "Ignoring payload with missing parent" message, this is the result. It - # is distinct from the invalid cases above, and shouldn't cause the same - # BlockError.Invalid error, because it doesn't badly on the peer sending - # it, it's just not fully verifiable yet for this node. Furthermore, the - # EL client can, e.g. via Geth, "rely on the beacon client to forcefully - # update the head with a forkchoice update request". This can occur when - # an EL client is substantially more synced than a CL client, and when a - # CL client in that position attempts to serially sync it will encounter - # potential for this message until it nearly catches up, unless using an - # approach such as forkchoiceUpdated to trigger sync. - # - # Returning the MissingParent error causes the sync manager to loop in - # place until the EL does resync/catch up, then the normal process can - # resume where there's a hybrid serial and optimistic sync model. - # - # When this occurs within a couple of epochs of the Merge, before there - # has been a chance to justify and finalize a post-merge block this can - # cause a sync deadlock unless the EL can be convinced to sync back, or - # the CL is rather more open-endedly optimistic (potentially for entire - # weak subjectivity periods) than seems optimal. - debug "runQueueProcessingLoop: execution payload accepted or syncing", - executionPayloadStatus - - # Always do this. Geth will only initiate syncing or reorgs with this - # combination of newPayload and forkchoiceUpdated. By design this must - # be somewhat optimistic, at least by one slot, for Geth to process it - # at all. This eventually converges to the same head as the DAG by the - # time it's externally visible via validating activity. - # - # In particular, the constraints that hold here are that Geth expects a - # sequence of - # - newPayload(execution payload with block hash `h`) followed by - # - forkchoiceUpdated(head = `h`) - # This is intrinsically somewhat optimistic, because determining the - # validity of an execution payload requires the forkchoiceUpdated - # head to be set to a block hash of some execution payload with unknown - # validity; otherwise it would not be necessary to ask the EL. - # - # The main reason this isn't done more adjacently in this code flow is to - # catch outright invalid cases, where the EL can reject a payload, without - # even running forkchoiceUpdated on it. - static: doAssert high(BeaconStateFork) == BeaconStateFork.Bellatrix - let curBh = - blck.blck.bellatrixData.message.body.execution_payload.block_hash - if curBh != lastFcHead: - lastFcHead = curBh - if await self.runForkchoiceUpdated( - curBh, - self.consensusManager.dag.finalizedHead.blck.executionBlockRoot): - # Geth seldom seems to return VALID to newPayload alone, even when - # it has all the relevant information. - self[].processBlock(blck) - continue - - if executionPayloadStatus != PayloadExecutionStatus.valid: + else: + if executionPayloadStatus == PayloadExecutionStatus.valid or + self[].is_optimistic_candidate_block(blck.blck): + await self.processBlock( + blck, executionPayloadStatus == PayloadExecutionStatus.valid) + else: + debug "runQueueProcessingLoop: block cannot be optimistically imported", + blck = shortLog(blck.blck) if not blck.resfut.isNil: - blck.resfut.complete(Result[void, BlockError].err( - BlockError.MissingParent)) - - continue - - # When newPayload, rather than forkchoiceUpdated, has returned valid. - doAssert executionPayloadStatus == PayloadExecutionStatus.valid - self[].processBlock(blck) + blck.resfut.complete( + Result[void, BlockError].err(BlockError.MissingParent)) diff --git a/beacon_chain/gossip_processing/consensus_manager.nim b/beacon_chain/gossip_processing/consensus_manager.nim index 0dcb0cac2..71c179769 100644 --- a/beacon_chain/gossip_processing/consensus_manager.nim +++ b/beacon_chain/gossip_processing/consensus_manager.nim @@ -80,24 +80,124 @@ proc expectBlock*(self: var ConsensusManager, expectedSlot: Slot): Future[bool] return fut +from eth/async_utils import awaitWithTimeout +from web3/engine_api_types import + ForkchoiceUpdatedResponse, PayloadExecutionStatus, PayloadStatusV1 + +func `$`(h: BlockHash): string = $h.asEth2Digest + +proc runForkchoiceUpdated( + eth1Monitor: Eth1Monitor, headBlockRoot, finalizedBlockRoot: Eth2Digest): + Future[PayloadExecutionStatus] {.async.} = + # Allow finalizedBlockRoot to be 0 to avoid sync deadlocks. + # + # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events + # has "Before the first finalized block occurs in the system the finalized + # block hash provided by this event is stubbed with + # `0x0000000000000000000000000000000000000000000000000000000000000000`." + # and + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/validator.md#executionpayload + # notes "`finalized_block_hash` is the hash of the latest finalized execution + # payload (`Hash32()` if none yet finalized)" + doAssert not headBlockRoot.isZero + + try: + # Minimize window for Eth1 monitor to shut down connection + await eth1Monitor.ensureDataProvider() + + let fcuR = awaitWithTimeout( + forkchoiceUpdated( + eth1Monitor, headBlockRoot, finalizedBlockRoot), + FORKCHOICEUPDATED_TIMEOUT): + debug "runForkchoiceUpdated: forkchoiceUpdated timed out" + ForkchoiceUpdatedResponse( + payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)) + + debug "runForkchoiceUpdated: ran forkchoiceUpdated", + headBlockRoot, + finalizedBlockRoot, + payloadStatus = $fcuR.payloadStatus.status, + latestValidHash = $fcuR.payloadStatus.latestValidHash, + validationError = $fcuR.payloadStatus.validationError + + return fcuR.payloadStatus.status + except CatchableError as err: + debug "runForkchoiceUpdated: forkchoiceUpdated failed", + err = err.msg + return PayloadExecutionStatus.syncing + +proc updateExecutionClientHead(self: ref ConsensusManager, newHead: BlockRef) + {.async.} = + if self.eth1Monitor.isNil: + return + + let executionHeadRoot = self.dag.loadExecutionBlockRoot(newHead) + + if executionHeadRoot.isZero: + # Blocks without execution payloads can't be optimistic. + self.dag.markBlockVerified(self.quarantine[], newHead.root) + return + + # Can't use dag.head here because it hasn't been updated yet + let + executionFinalizedRoot = + self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck) + payloadExecutionStatus = await self.eth1Monitor.runForkchoiceUpdated( + executionHeadRoot, executionFinalizedRoot) + + case payloadExecutionStatus + of PayloadExecutionStatus.valid: + self.dag.markBlockVerified(self.quarantine[], newHead.root) + of PayloadExecutionStatus.invalid, PayloadExecutionStatus.invalid_block_hash: + self.dag.markBlockInvalid(newHead.root) + self.quarantine[].addUnviable(newHead.root) + of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing: + self.dag.optimisticRoots.incl newHead.root + proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = ## Trigger fork choice and update the DAG with the new head block ## This does not automatically prune the DAG after finalization ## `pruneFinalized` must be called for pruning. # Grab the new head according to our latest attestation data - let newHead = self.attestationPool[].selectHead( + let newHead = self.attestationPool[].selectOptimisticHead( wallSlot.start_beacon_time).valueOr: warn "Head selection failed, using previous head", head = shortLog(self.dag.head), wallSlot return + if self.dag.loadExecutionBlockRoot(newHead).isZero: + # Blocks without execution payloads can't be optimistic. + self.dag.markBlockVerified(self.quarantine[], newHead.root) + # Store the new head in the chain DAG - this may cause epochs to be # justified and finalized self.dag.updateHead(newHead, self.quarantine[]) self.checkExpectedBlock() +proc updateHeadWithExecution*(self: ref ConsensusManager, wallSlot: Slot) + {.async.} = + ## Trigger fork choice and update the DAG with the new head block + ## This does not automatically prune the DAG after finalization + ## `pruneFinalized` must be called for pruning. + + # Grab the new head according to our latest attestation data + let newHead = self.attestationPool[].selectOptimisticHead( + wallSlot.start_beacon_time).valueOr: + warn "Head selection failed, using previous head", + head = shortLog(self.dag.head), wallSlot + return + + # Ensure dag.updateHead has most current information + await self.updateExecutionClientHead(newHead) + + # Store the new head in the chain DAG - this may cause epochs to be + # justified and finalized + self.dag.updateHead(newHead, self.quarantine[]) + + self[].checkExpectedBlock() + proc pruneStateCachesAndForkChoice*(self: var ConsensusManager) = ## Prune unneeded and invalidated data after finalization ## - the DAG state checkpoints diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 7311b59fa..3fd239427 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -241,30 +241,27 @@ proc initFullNode( proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = node.eventBus.exitQueue.emit(data) proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) = - # TODO (cheatfate): Proper implementation required let optimistic = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: - some(false) + some node.dag.is_optimistic(data.root) else: none[bool]() node.eventBus.blocksQueue.emit( EventBeaconBlockObject.init(data, optimistic)) proc onHeadChanged(data: HeadChangeInfoObject) = - # TODO (cheatfate): Proper implementation required let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data - res.optimistic = some(false) + res.optimistic = some node.dag.is_optimistic(data.block_root) res else: data node.eventBus.headQueue.emit(eventData) proc onChainReorg(data: ReorgInfoObject) = - # TODO (cheatfate): Proper implementation required let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data - res.optimistic = some(false) + res.optimistic = some node.dag.is_optimistic(data.new_head_block) res else: data @@ -282,11 +279,10 @@ proc initFullNode( finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index) node.updateLightClientFromDag() - # TODO (cheatfate): Proper implementation required let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data - res.optimistic = some(false) + res.optimistic = some node.dag.is_optimistic(data.block_root) res else: data @@ -322,7 +318,8 @@ proc initFullNode( dag, attestationPool, quarantine, node.eth1Monitor) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime) + rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime, + config.safeSlotsToImportOptimistically) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] = # The design with a callback for block verification is unusual compared @@ -1287,6 +1284,8 @@ func syncStatus(node: BeaconNode): string = node.syncManager.syncStatus elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus + elif node.dag.is_optimistic(node.dag.head.root): + "opt synced" else: "synced" diff --git a/beacon_chain/rpc/rest_node_api.nim b/beacon_chain/rpc/rest_node_api.nim index 2e76882ea..ee9d711ee 100644 --- a/beacon_chain/rpc/rest_node_api.nim +++ b/beacon_chain/rpc/rest_node_api.nim @@ -264,8 +264,7 @@ proc installNodeApiHandlers*(router: var RestRouter, node: BeaconNode) = node.syncManager.inProgress isOptimistic = if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: - # TODO (cheatfate): Proper implementation required - some(false) + some(node.dag.is_optimistic(node.dag.head.root)) else: none[bool]() diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index cac09c310..eadbd6481 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -279,8 +279,15 @@ proc getStateOptimistic*(node: BeaconNode, of BeaconStateFork.Phase0, BeaconStateFork.Altair: some[bool](false) of BeaconStateFork.Bellatrix: - # TODO (cheatfate): Proper implementation required. - some[bool](false) + # A state is optimistic iff the block which created it is + withState(state): + # The block root which created the state at slot `n` is at slot `n-1` + if state.data.slot == GENESIS_SLOT: + some[bool](false) + else: + doAssert state.data.slot > 0 + some[bool](node.dag.is_optimistic( + get_block_root_at_slot(state.data, state.data.slot - 1))) else: none[bool]() @@ -292,8 +299,7 @@ proc getBlockOptimistic*(node: BeaconNode, of BeaconBlockFork.Phase0, BeaconBlockFork.Altair: some[bool](false) of BeaconBlockFork.Bellatrix: - # TODO (cheatfate): Proper implementation required. - some[bool](false) + some[bool](node.dag.is_optimistic(blck.root)) else: none[bool]() @@ -303,8 +309,7 @@ proc getBlockRefOptimistic*(node: BeaconNode, blck: BlockRef): bool = of BeaconBlockFork.Phase0, BeaconBlockFork.Altair: false of BeaconBlockFork.Bellatrix: - # TODO (cheatfate): Proper implementation required. - false + node.dag.is_optimistic(blck.root) const jsonMediaType* = MediaType.init("application/json") diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index b0030ee29..564a33155 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -97,7 +97,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = ) res - # TODO (cheatfate): Proper implementation required + # getSyncedHead() implies non-optimistic node. let optimistic = if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: some(false) @@ -151,7 +151,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = ) res - # TODO (cheatfate): Proper implementation required + # getSyncedHead() implies non-optimistic node. let optimistic = if node.currentSlot().epoch() >= node.dag.cfg.BELLATRIX_FORK_EPOCH: some(false) diff --git a/beacon_chain/spec/forks.nim b/beacon_chain/spec/forks.nim index db74789fd..2ff7d8b2f 100644 --- a/beacon_chain/spec/forks.nim +++ b/beacon_chain/spec/forks.nim @@ -268,21 +268,27 @@ template toString*(kind: BeaconStateFork): string = "bellatrix" template toFork*[T: + phase0.BeaconBlock | phase0.SignedBeaconBlock | + phase0.TrustedBeaconBlock | phase0.SigVerifiedSignedBeaconBlock | phase0.MsgTrustedSignedBeaconBlock | phase0.TrustedSignedBeaconBlock]( t: type T): BeaconBlockFork = BeaconBlockFork.Phase0 template toFork*[T: + altair.BeaconBlock | altair.SignedBeaconBlock | + altair.TrustedBeaconBlock | altair.SigVerifiedSignedBeaconBlock | altair.MsgTrustedSignedBeaconBlock | altair.TrustedSignedBeaconBlock]( t: type T): BeaconBlockFork = BeaconBlockFork.Altair template toFork*[T: + bellatrix.BeaconBlock | bellatrix.SignedBeaconBlock | + bellatrix.TrustedBeaconBlock | bellatrix.SigVerifiedSignedBeaconBlock | bellatrix.MsgTrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock]( diff --git a/beacon_chain/spec/helpers.nim b/beacon_chain/spec/helpers.nim index f9f5d9e27..88ba3a001 100644 --- a/beacon_chain/spec/helpers.nim +++ b/beacon_chain/spec/helpers.nim @@ -288,11 +288,13 @@ func is_merge_transition_complete*(state: bellatrix.BeaconState): bool = state.latest_execution_payload_header != defaultExecutionPayloadHeader # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/sync/optimistic.md#helpers -func is_execution_block*( - body: bellatrix.BeaconBlockBody | bellatrix.TrustedBeaconBlockBody | - bellatrix.SigVerifiedBeaconBlockBody): bool = - const defaultBellatrixExecutionPayload = default(bellatrix.ExecutionPayload) - body.execution_payload != defaultBellatrixExecutionPayload +func is_execution_block*(blck: SomeForkyBeaconBlock): bool = + when typeof(blck).toFork >= BeaconBlockFork.Bellatrix: + const defaultExecutionPayload = + default(typeof(blck.body.execution_payload)) + blck.body.execution_payload != defaultExecutionPayload + else: + false # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/bellatrix/beacon-chain.md#is_merge_transition_block func is_merge_transition_block( diff --git a/beacon_chain/sync/optimistic_sync_light_client.nim b/beacon_chain/sync/optimistic_sync_light_client.nim index cdd30eeec..32f061a69 100644 --- a/beacon_chain/sync/optimistic_sync_light_client.nim +++ b/beacon_chain/sync/optimistic_sync_light_client.nim @@ -55,7 +55,7 @@ proc reportOptimisticCandidateBlock(optSync: LCOptimisticSync) {.gcsafe.} = optSync.finalizedIsExecutionBlock = withBlck(finalizedBlock.get): when stateFork >= BeaconStateFork.Bellatrix: - some blck.message.body.is_execution_block() + some blck.message.is_execution_block() else: some false diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 81522bfbe..196bac948 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -299,11 +299,13 @@ p2pProtocol BeaconSync(version = 1, if startSlot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: # "Clients MAY limit the number of blocks in the response." - # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#beaconblocksbyrange debug "Block range v1 request for post-altair range", peer, startSlot, reqCount, reqStep return + # Phase 0 blocks are never optimistic. + var blocks: array[MAX_REQUEST_BLOCKS, BlockId] let @@ -387,12 +389,14 @@ p2pProtocol BeaconSync(version = 1, if blockRef.slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: # Skipping this block should be fine because the spec says: # "Clients MAY limit the number of blocks in the response." - # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot + # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/p2p-interface.md#beaconblocksbyroot # # Also, our response would be indistinguishable from a node # that have been synced exactly to the altair transition slot. continue + # Phase 0 blocks are never optimistic. + if dag.getBlockSZ(blockRef.bid, bytes): let uncompressedLen = uncompressedLenFramed(bytes).valueOr: warn "Cannot read block size, database corrupt?", @@ -454,6 +458,13 @@ p2pProtocol BeaconSync(version = 1, for i in startIndex..endIndex: if dag.getBlockSZ(blocks[i], bytes): + # In general, there is not much intermediate time between post-merge + # blocks all being optimistic and none of them being optimistic. The + # EL catches up, tells the CL the head is verified, and that's it. + if blocks[i].slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and + dag.is_optimistic(dag.head.root): + continue + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: warn "Cannot read block size, database corrupt?", bytes = bytes.len(), blck = shortLog(blocks[i]) @@ -510,6 +521,13 @@ p2pProtocol BeaconSync(version = 1, continue if dag.getBlockSZ(blockRef.bid, bytes): + # In general, there is not much intermediate time between post-merge + # blocks all being optimistic and none of them being optimistic. The + # EL catches up, tells the CL the head is verified, and that's it. + if blockRef.slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and + dag.is_optimistic(dag.head.root): + continue + let uncompressedLen = uncompressedLenFramed(bytes).valueOr: warn "Cannot read block size, database corrupt?", bytes = bytes.len(), blck = shortLog(blockRef) diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index d2f88f02d..22ca73348 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -207,10 +207,11 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool = # TODO if everyone follows this logic, the network will not recover from a # halt: nobody will be producing blocks because everone expects someone # else to do it - if wallSlot.afterGenesis and head.slot + node.config.syncHorizon < wallSlot.slot: + if wallSlot.afterGenesis and + head.slot + node.config.syncHorizon < wallSlot.slot: false else: - true + not node.dag.is_optimistic(head.root) func isGoodForSending(validationResult: ValidationRes): bool = # Validator clients such as Vouch can be configured to work with multiple @@ -602,12 +603,14 @@ proc getExecutionPayload( node.eth1Monitor.terminalBlockHash.get.asEth2Digest else: default(Eth2Digest) + executionBlockRoot = node.dag.loadExecutionBlockRoot(node.dag.head) latestHead = - if not node.dag.head.executionBlockRoot.isZero: - node.dag.head.executionBlockRoot + if not executionBlockRoot.isZero: + executionBlockRoot else: terminalBlockHash - latestFinalized = node.dag.finalizedHead.blck.executionBlockRoot + latestFinalized = + node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck) payload_id = (await forkchoice_updated( proposalState.bellatrixData.data, latestHead, latestFinalized, node.getSuggestedFeeRecipient(pubkey), @@ -806,8 +809,8 @@ proc proposeBlock(node: BeaconNode, # storeBlock puts the block in the chaindag, and if accepted, takes care # of side effects such as event api notification - newBlockRef = node.blockProcessor[].storeBlock( - MsgSource.api, wallTime, signedBlock) + newBlockRef = await node.blockProcessor.storeBlock( + MsgSource.api, wallTime, signedBlock, true) if newBlockRef.isErr: warn "Unable to add proposed block to block pool", @@ -1512,8 +1515,8 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock let wallTime = node.beaconClock.now() accepted = withBlck(forked): - let newBlockRef = node.blockProcessor[].storeBlock( - MsgSource.api, wallTime, blck) + let newBlockRef = await node.blockProcessor.storeBlock( + MsgSource.api, wallTime, blck, payloadValid = true) # The boolean we return tells the caller whether the block was integrated # into the chain diff --git a/tests/test_attestation_pool.nim b/tests/test_attestation_pool.nim index 9a5ccf232..7e3f73589 100644 --- a/tests/test_attestation_pool.nim +++ b/tests/test_attestation_pool.nim @@ -414,7 +414,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice( epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head = pool[].selectHead(b1Add[].slot.start_beacon_time).get() + let head = pool[].selectOptimisticHead(b1Add[].slot.start_beacon_time).get() check: head == b1Add[] @@ -427,7 +427,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice( epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head2 = pool[].selectHead(b2Add[].slot.start_beacon_time).get() + let head2 = pool[].selectOptimisticHead(b2Add[].slot.start_beacon_time).get() check: head2 == b2Add[] @@ -443,7 +443,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice( epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() + let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get() check: head == b10Add[] @@ -471,7 +471,7 @@ suite "Attestation pool processing" & preset(): attestation0, @[bc1[0]], attestation0.loadSig, attestation0.data.slot.start_beacon_time) - let head2 = pool[].selectHead(b10Add[].slot.start_beacon_time).get() + let head2 = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get() check: # Single vote for b10 and no votes for b11 @@ -484,7 +484,7 @@ suite "Attestation pool processing" & preset(): attestation1, @[bc1[1]], attestation1.loadSig, attestation1.data.slot.start_beacon_time) - let head3 = pool[].selectHead(b10Add[].slot.start_beacon_time).get() + let head3 = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get() let bigger = if b11.root.data < b10.root.data: b10Add else: b11Add check: @@ -495,7 +495,7 @@ suite "Attestation pool processing" & preset(): attestation2, @[bc1[2]], attestation2.loadSig, attestation2.data.slot.start_beacon_time) - let head4 = pool[].selectHead(b11Add[].slot.start_beacon_time).get() + let head4 = pool[].selectOptimisticHead(b11Add[].slot.start_beacon_time).get() check: # Two votes for b11 @@ -512,7 +512,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice(epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() + let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get() check: head == b10Add[] @@ -543,7 +543,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice( epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head = pool[].selectHead(b10Add[].slot.start_beacon_time).get() + let head = pool[].selectOptimisticHead(b10Add[].slot.start_beacon_time).get() doAssert: head == b10Add[] @@ -569,7 +569,7 @@ suite "Attestation pool processing" & preset(): pool[].addForkChoice( epochRef, blckRef, signedBlock.message, blckRef.slot.start_beacon_time) - let head = pool[].selectHead(blockRef[].slot.start_beacon_time).get() + let head = pool[].selectOptimisticHead(blockRef[].slot.start_beacon_time).get() doAssert: head == blockRef[] dag.updateHead(head, quarantine[]) pruneAtFinalization(dag, pool[]) diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 0cf842e8c..32d8a77fe 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -44,11 +44,11 @@ suite "Block processor" & preset(): getTimeFn = proc(): BeaconTime = b2.message.slot.start_beacon_time() processor = BlockProcessor.new( false, "", "", keys.newRng(), taskpool, consensusManager, - validatorMonitor, getTimeFn) + validatorMonitor, getTimeFn, safeSlotsToImportOptimistically = 128) test "Reverse order block add & get" & preset(): - let missing = processor[].storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b2) + let missing = waitFor processor.storeBlock( + MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, payloadValid = true) check: missing.error == BlockError.MissingParent check: @@ -57,8 +57,8 @@ suite "Block processor" & preset(): FetchRecord(root: b1.root) in quarantine[].checkMissing() let - status = processor[].storeBlock( - MsgSource.gossip, b2.message.slot.start_beacon_time(), b1) + status = waitFor processor.storeBlock( + MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, payloadValid = true) b1Get = dag.getBlockRef(b1.root) check: