diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 71c3ab6d4..a7a4f0f76 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -67,6 +67,8 @@ type config*: BeaconNodeConf attachedValidators*: ref ValidatorPool optimisticProcessor*: OptimisticProcessor + optimisticFcuFut*: Future[(PayloadExecutionStatus, Opt[BlockHash])] + .Raising([CancelledError]) lightClient*: LightClient dag*: ChainDAGRef quarantine*: ref Quarantine diff --git a/beacon_chain/beacon_node_light_client.nim b/beacon_chain/beacon_node_light_client.nim index b6e15b06b..6b2c47743 100644 --- a/beacon_chain/beacon_node_light_client.nim +++ b/beacon_chain/beacon_node_light_client.nim @@ -38,61 +38,17 @@ proc initLightClient*( # for broadcasting light client data as a server. let - optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): - Future[void] {.async: (raises: [CancelledError]).} = - debug "New LC optimistic block", - opt = signedBlock.toBlockId(), - dag = node.dag.head.bid, - wallSlot = node.currentSlot + optimisticHandler = proc( + signedBlock: ForkedSignedBeaconBlock + ): Future[void] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): when consensusFork >= ConsensusFork.Bellatrix: if forkyBlck.message.is_execution_block: - template blckPayload(): auto = - forkyBlck.message.body.execution_payload - - if not blckPayload.block_hash.isZero: - # engine_newPayloadV1 + template payload(): auto = forkyBlck.message.body.execution_payload + if not payload.block_hash.isZero: discard await node.elManager.newExecutionPayload( forkyBlck.message) - - # Retain optimistic head for other `forkchoiceUpdated` callers. - # May temporarily block `forkchoiceUpdatedV1` calls, e.g., Geth: - # - Refuses `newPayload`: "Ignoring payload while snap syncing" - # - Refuses `fcU`: "Forkchoice requested unknown head" - # Once DAG sync catches up or as new optimistic heads are fetched - # the situation recovers - node.consensusManager[].setOptimisticHead( - forkyBlck.toBlockId(), blckPayload.block_hash) - - # engine_forkchoiceUpdatedV1 or engine_forkchoiceUpdatedV2, - # depending on pre or post-Shapella - let beaconHead = node.attestationPool[].getBeaconHead(nil) - - template callForkchoiceUpdated(attributes: untyped) = - discard await node.elManager.forkchoiceUpdated( - headBlockHash = blckPayload.block_hash, - safeBlockHash = beaconHead.safeExecutionBlockHash, - finalizedBlockHash = beaconHead.finalizedExecutionBlockHash, - payloadAttributes = Opt.none attributes) - - case node.dag.cfg.consensusForkAtEpoch( - forkyBlck.message.slot.epoch) - of ConsensusFork.Deneb, ConsensusFork.Electra: - callForkchoiceUpdated(PayloadAttributesV3) - of ConsensusFork.Capella: - # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#specification-1 - # Consensus layer client MUST call this method instead of - # `engine_forkchoiceUpdatedV1` under any of the following - # conditions: - # `headBlockHash` references a block which `timestamp` is - # greater or equal to the Shanghai timestamp - callForkchoiceUpdated(PayloadAttributesV2) - of ConsensusFork.Bellatrix: - callForkchoiceUpdated(PayloadAttributesV1) - of ConsensusFork.Phase0, ConsensusFork.Altair: - discard else: discard - optimisticProcessor = initOptimisticProcessor( getBeaconTime, optimisticHandler) @@ -104,9 +60,46 @@ proc initLightClient*( proc onOptimisticHeader( lightClient: LightClient, optimisticHeader: ForkedLightClientHeader) = + if node.optimisticFcuFut != nil: + return withForkyHeader(optimisticHeader): when lcDataFork > LightClientDataFork.None: - optimisticProcessor.setOptimisticHeader(forkyHeader.beacon) + let bid = forkyHeader.beacon.toBlockId() + logScope: + opt = bid + dag = node.dag.head.bid + wallSlot = node.currentSlot + when lcDataFork >= LightClientDataFork.Capella: + let + consensusFork = node.dag.cfg.consensusForkAtEpoch(bid.slot.epoch) + blockHash = forkyHeader.execution.block_hash + + # Retain optimistic head for other `forkchoiceUpdated` callers. + # May temporarily block `forkchoiceUpdated` calls, e.g., Geth: + # - Refuses `newPayload`: "Ignoring payload while snap syncing" + # - Refuses `fcU`: "Forkchoice requested unknown head" + # Once DAG sync catches up or as new optimistic heads are fetched + # the situation recovers + debug "New LC optimistic header" + node.consensusManager[].setOptimisticHead(bid, blockHash) + if not node.consensusManager[] + .shouldSyncOptimistically(node.currentSlot): + return + + # engine_forkchoiceUpdated + let beaconHead = node.attestationPool[].getBeaconHead(nil) + withConsensusFork(consensusFork): + when lcDataForkAtConsensusFork(consensusFork) == lcDataFork: + node.optimisticFcuFut = node.elManager.forkchoiceUpdated( + headBlockHash = blockHash, + safeBlockHash = beaconHead.safeExecutionBlockHash, + finalizedBlockHash = beaconHead.finalizedExecutionBlockHash, + payloadAttributes = Opt.none consensusFork.PayloadAttributes) + node.optimisticFcuFut.addCallback do (future: pointer): + node.optimisticFcuFut = nil + else: + # The execution block hash is only available from Capella onward + info "Ignoring new LC optimistic header until Capella" lightClient.onOptimisticHeader = onOptimisticHeader lightClient.trustedBlockRoot = config.trustedBlockRoot diff --git a/beacon_chain/gossip_processing/optimistic_processor.nim b/beacon_chain/gossip_processing/optimistic_processor.nim index 684171651..15fd12725 100644 --- a/beacon_chain/gossip_processing/optimistic_processor.nim +++ b/beacon_chain/gossip_processing/optimistic_processor.nim @@ -20,29 +20,19 @@ export gossip_validation logScope: topics = "gossip_opt" -const - # Maximum `blocks` to cache (not validated; deleted on new optimistic header) - maxBlocks = 16 # <= `GOSSIP_MAX_SIZE_BELLATRIX` (10 MB) each - - # Minimum interval at which spam is logged - minLogInterval = chronos.seconds(5) - type - MsgTrustedBlockProcessor* = - proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): Future[void] {. - async: (raises: [CancelledError]).} + OptimisticBlockVerifier* = proc( + signedBlock: ForkedSignedBeaconBlock + ): Future[void] {.async: (raises: [CancelledError]).} OptimisticProcessor* = ref object getBeaconTime: GetBeaconTimeFn - optimisticVerifier: MsgTrustedBlockProcessor - blocks: Table[Eth2Digest, ref ForkedSignedBeaconBlock] - latestOptimisticSlot: Slot + optimisticVerifier: OptimisticBlockVerifier processFut: Future[void].Raising([CancelledError]) - logMoment: Moment proc initOptimisticProcessor*( getBeaconTime: GetBeaconTimeFn, - optimisticVerifier: MsgTrustedBlockProcessor): OptimisticProcessor = + optimisticVerifier: OptimisticBlockVerifier): OptimisticProcessor = OptimisticProcessor( getBeaconTime: getBeaconTime, optimisticVerifier: optimisticVerifier) @@ -56,9 +46,6 @@ proc validateBeaconBlock( (wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero): return errIgnore("BeaconBlock: slot too high") - if signed_beacon_block.message.slot <= self.latestOptimisticSlot: - return errIgnore("BeaconBlock: no significant progress") - if not signed_beacon_block.message.is_execution_block(): return errIgnore("BeaconBlock: no execution block") @@ -93,32 +80,16 @@ proc processSignedBeaconBlock*( debug "Dropping optimistic block", error = v.error return err(v.error) - # Note that validation of blocks is delayed by ~4/3 slots because we have to - # wait for the sync committee to sign the correct block and for that signature - # to be included in the next block. Therefore, we skip block validation here - # and cache the block in memory. Because there is no validation, we have to - # mitigate against bogus blocks, mostly by bounding the caches. Assuming that - # any denial-of-service attacks eventually subside, care is taken to recover. - template logWithSpamProtection(body: untyped): untyped = - block: - let now = Moment.now() - if self.logMoment + minLogInterval <= now: - logScope: minLogInterval - body - self.logMoment = now + # Only process one block at a time (backpressure) + trace "Optimistic block validated" + if self.processFut == nil: + self.processFut = self.optimisticVerifier( + ForkedSignedBeaconBlock.init(signedBlock)) - # Store block for later verification - if not self.blocks.hasKey(signedBlock.root): - # If `blocks` is full, we got spammed with multiple blocks for a slot, - # of the optimistic header advancements have been all withheld from us. - # Whenever the optimistic header advances, old blocks are cleared, - # so we can simply ignore additional spam blocks until that happens. - if self.blocks.len >= maxBlocks: - logWithSpamProtection: - error "`blocks` full - ignoring", maxBlocks - else: - self.blocks[signedBlock.root] = - newClone(ForkedSignedBeaconBlock.init(signedBlock)) + proc handleFinishedProcess(future: pointer) = + self.processFut = nil + + self.processFut.addCallback(handleFinishedProcess) # Block validation is delegated to the sync committee and is done with delay. # If we forward invalid spam blocks, we may be disconnected + IP banned, @@ -127,40 +98,4 @@ proc processSignedBeaconBlock*( # However, we are actively contributing to other topics, so some of the # negative peer score may be offset through those different topics. # The practical impact depends on the actually deployed scoring heuristics. - trace "Optimistic block cached" return errIgnore("Validation delegated to sync committee") - -proc setOptimisticHeader*( - self: OptimisticProcessor, optimisticHeader: BeaconBlockHeader) = - # If irrelevant, skip processing - if optimisticHeader.slot <= self.latestOptimisticSlot: - return - self.latestOptimisticSlot = optimisticHeader.slot - - # Delete blocks that are no longer of interest - let blockRoot = optimisticHeader.hash_tree_root() - var - rootsToDelete: seq[Eth2Digest] - signedBlock: ref ForkedMsgTrustedSignedBeaconBlock - for root, blck in self.blocks: - if root == blockRoot: - signedBlock = blck.asMsgTrusted() - if blck[].slot <= optimisticHeader.slot: - rootsToDelete.add root - for root in rootsToDelete: - self.blocks.del root - - # Block must be known - if signedBlock == nil: - return - - # If a block is already being processed, skip (backpressure) - if self.processFut != nil: - return - - self.processFut = self.optimisticVerifier(signedBlock[]) - - proc handleFinishedProcess(future: pointer) = - self.processFut = nil - - self.processFut.addCallback(handleFinishedProcess) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 0079690dc..69d089833 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1619,7 +1619,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string = node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus - elif optimistic_head: + elif optimisticHead: "synced/opt" else: "synced" @@ -1768,7 +1768,8 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getAttestationTopic(digest, subnet_id), proc ( attestation: electra.Attestation - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + ): Future[ValidationResult] {. + async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( MsgSource.gossip, attestation, subnet_id, @@ -1780,7 +1781,8 @@ proc installMessageValidators(node: BeaconNode) = node.network.addAsyncValidator( getAttestationTopic(digest, subnet_id), proc ( attestation: phase0.Attestation - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + ): Future[ValidationResult] {. + async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( MsgSource.gossip, attestation, subnet_id, diff --git a/beacon_chain/nimbus_light_client.nim b/beacon_chain/nimbus_light_client.nim index 2f78df875..519b1e89d 100644 --- a/beacon_chain/nimbus_light_client.nim +++ b/beacon_chain/nimbus_light_client.nim @@ -107,23 +107,15 @@ programMain: else: nil - optimisticHandler = proc(signedBlock: ForkedMsgTrustedSignedBeaconBlock): - Future[void] {.async: (raises: [CancelledError]).} = - notice "New LC optimistic block", - opt = signedBlock.toBlockId(), - wallSlot = getBeaconTime().slotOrZero + optimisticHandler = proc( + signedBlock: ForkedSignedBeaconBlock + ): Future[void] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): when consensusFork >= ConsensusFork.Bellatrix: if forkyBlck.message.is_execution_block: template payload(): auto = forkyBlck.message.body.execution_payload - if elManager != nil and not payload.block_hash.isZero: discard await elManager.newExecutionPayload(forkyBlck.message) - discard await elManager.forkchoiceUpdated( - headBlockHash = payload.block_hash, - safeBlockHash = payload.block_hash, # stub value - finalizedBlockHash = ZERO_HASH, - payloadAttributes = Opt.none(consensusFork.PayloadAttributes)) else: discard optimisticProcessor = initOptimisticProcessor( getBeaconTime, optimisticHandler) @@ -153,26 +145,54 @@ programMain: waitFor network.startListening() waitFor network.start() + func isSynced(optimisticSlot: Slot, wallSlot: Slot): bool = + # Check whether light client has synced sufficiently close to wall slot + const maxAge = 2 * SLOTS_PER_EPOCH + optimisticSlot >= max(wallSlot, maxAge.Slot) - maxAge + proc onFinalizedHeader( lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) = withForkyHeader(finalizedHeader): when lcDataFork > LightClientDataFork.None: info "New LC finalized header", finalized_header = shortLog(forkyHeader) - let period = forkyHeader.beacon.slot.sync_committee_period syncCommittee = lightClient.finalizedSyncCommittee.expect("Init OK") db.putSyncCommittee(period, syncCommittee) db.putLatestFinalizedHeader(finalizedHeader) + var optimisticFcuFut: Future[(PayloadExecutionStatus, Opt[BlockHash])] + .Raising([CancelledError]) proc onOptimisticHeader( lightClient: LightClient, optimisticHeader: ForkedLightClientHeader) = + if optimisticFcuFut != nil: + return withForkyHeader(optimisticHeader): when lcDataFork > LightClientDataFork.None: - info "New LC optimistic header", - optimistic_header = shortLog(forkyHeader) - optimisticProcessor.setOptimisticHeader(forkyHeader.beacon) + logScope: optimistic_header = shortLog(forkyHeader) + when lcDataFork >= LightClientDataFork.Capella: + let + bid = forkyHeader.beacon.toBlockId() + consensusFork = cfg.consensusForkAtEpoch(bid.slot.epoch) + blockHash = forkyHeader.execution.block_hash + + info "New LC optimistic header" + if elManager == nil or blockHash.isZero or + not isSynced(bid.slot, getBeaconTime().slotOrZero()): + return + + withConsensusFork(consensusFork): + when lcDataForkAtConsensusFork(consensusFork) == lcDataFork: + optimisticFcuFut = elManager.forkchoiceUpdated( + headBlockHash = blockHash, + safeBlockHash = blockHash, # stub value + finalizedBlockHash = ZERO_HASH, + payloadAttributes = Opt.none(consensusFork.PayloadAttributes)) + optimisticFcuFut.addCallback do (future: pointer): + optimisticFcuFut = nil + else: + info "Ignoring new LC optimistic header until Capella" lightClient.onFinalizedHeader = onFinalizedHeader lightClient.onOptimisticHeader = onOptimisticHeader @@ -204,9 +224,7 @@ programMain: let optimisticHeader = lightClient.optimisticHeader withForkyHeader(optimisticHeader): when lcDataFork > LightClientDataFork.None: - # Check whether light client has synced sufficiently close to wall slot - const maxAge = 2 * SLOTS_PER_EPOCH - forkyHeader.beacon.slot >= max(wallSlot, maxAge.Slot) - maxAge + isSynced(forkyHeader.beacon.slot, wallSlot) else: false diff --git a/docs/the_nimbus_book/src/el-light-client.md b/docs/the_nimbus_book/src/el-light-client.md index 31569c045..eb29ebede 100644 --- a/docs/the_nimbus_book/src/el-light-client.md +++ b/docs/the_nimbus_book/src/el-light-client.md @@ -178,13 +178,11 @@ INF 2022-11-21 18:03:27.984+01:00 New LC optimistic header opt WRN 2022-11-21 18:03:31.419+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160 INF 2022-11-21 18:03:36.001+01:00 Slot start slot=1109718 epoch=34678 sync=synced peers=7 head=c5464508:1109716 finalized=c092a1d1:1109216 delay=1ms98us INF 2022-11-21 18:03:40.012+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109717, proposer_index: 835, parent_root: \"c5464508\", state_root: \"13f823f8\"))" -NTC 2022-11-21 18:03:40.012+01:00 New LC optimistic block opt=99ab28aa:1109717 wallSlot=1109718 WRN 2022-11-21 18:03:40.422+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=1 new_peers=@[] current_peers=7 wanted_peers=160 INF 2022-11-21 18:03:48.001+01:00 Slot start slot=1109719 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms53us WRN 2022-11-21 18:03:50.205+01:00 Peer count low, no new peers discovered topics="networking" discovered_nodes=0 new_peers=@[] current_peers=7 wanted_peers=160 INF 2022-11-21 18:04:00.001+01:00 Slot start slot=1109720 epoch=34678 sync=synced peers=7 head=99ab28aa:1109717 finalized=c092a1d1:1109216 delay=1ms145us INF 2022-11-21 18:04:03.982+01:00 New LC optimistic header optimistic_header="(beacon: (slot: 1109718, proposer_index: 1202, parent_root: \"99ab28aa\", state_root: \"7f7f88d2\"))" -NTC 2022-11-21 18:04:03.982+01:00 New LC optimistic block opt=ab007266:1109718 wallSlot=1109720 ``` !!! note