From d41c2a293b618c6f1f93e79c2dba03c54255e0d2 Mon Sep 17 00:00:00 2001 From: tersec Date: Fri, 17 Jun 2022 14:16:03 +0000 Subject: [PATCH] rewrite merge sync (#3759) --- beacon_chain/eth1/eth1_monitor.nim | 4 +- .../gossip_processing/block_processor.nim | 130 +++++++++++------- 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index db9614023..c080cfb78 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -1455,8 +1455,8 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} = info "startEth1Syncing: checking for merge terminal block", currentEpoch = m.currentEpoch, BELLATRIX_FORK_EPOCH = m.cfg.BELLATRIX_FORK_EPOCH, - totalDifficulty = nextBlock.totalDifficulty, - ttd = m.cfg.TERMINAL_TOTAL_DIFFICULTY, + totalDifficulty = $nextBlock.totalDifficulty, + ttd = $m.cfg.TERMINAL_TOTAL_DIFFICULTY, terminalBlockHash = m.terminalBlockHash if terminalBlockCandidate.totalDifficulty >= m.cfg.TERMINAL_TOTAL_DIFFICULTY: diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 3403e4516..95cedafd2 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -320,9 +320,11 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = if res.isOk(): Result[void, BlockError].ok() else: Result[void, BlockError].err(res.error())) +func `$`(h: BlockHash): string = $h.asEth2Digest + proc runForkchoiceUpdated( - self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest) - {.async.} = + self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest): + Future[bool] {.async.} = # Allow finalizedBlockRoot to be 0 to avoid sync deadlocks. # # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events @@ -334,26 +336,31 @@ proc runForkchoiceUpdated( # notes "`finalized_block_hash` is the hash of the latest finalized execution # payload (`Hash32()` if none yet finalized)" if headBlockRoot.isZero: - return + return false try: # Minimize window for Eth1 monitor to shut down connection await self.consensusManager.eth1Monitor.ensureDataProvider() - debug "runForkChoiceUpdated: running forkchoiceUpdated", - headBlockRoot, - finalizedBlockRoot - - discard awaitWithTimeout( + let fcuR = awaitWithTimeout( forkchoiceUpdated( self.consensusManager.eth1Monitor, headBlockRoot, finalizedBlockRoot), FORKCHOICEUPDATED_TIMEOUT): debug "runForkChoiceUpdated: forkchoiceUpdated timed out" default(ForkchoiceUpdatedResponse) + + debug "runForkChoiceUpdated: running forkchoiceUpdated", + headBlockRoot, + finalizedBlockRoot, + payloadStatus = $fcuR.payloadStatus.status, + latestValidHash = $fcuR.payloadStatus.latestValidHash, + validationError = $fcuR.payloadStatus.validationError + + return fcuR.payloadStatus.status == PayloadExecutionStatus.valid except CatchableError as err: debug "runForkChoiceUpdated: forkchoiceUpdated failed", err = err.msg - discard + return false proc newExecutionPayload*( eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload): @@ -401,6 +408,9 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = optForkchoiceHeadSlot = GENESIS_SLOT # safe default optForkchoiceHeadRoot: Eth2Digest + # don't keep spamming same fcU to Geth; might be restarting sync each time + lastFcHead: Eth2Digest + while true: # Cooperative concurrency: one block per loop iteration - because # we run both networking and CPU-heavy things like block processing @@ -413,8 +423,6 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = # larger network reads when under load. idleTimeout = 10.milliseconds - defaultBellatrixPayload = default(bellatrix.ExecutionPayload) - discard await idleAsync().withTimeout(idleTimeout) let @@ -422,19 +430,25 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = hasExecutionPayload = blck.blck.kind >= BeaconBlockFork.Bellatrix executionPayloadStatus = if hasExecutionPayload and - # Allow local testnets to run without requiring an execution layer - blck.blck.bellatrixData.message.body.execution_payload != - defaultBellatrixPayload: - try: - # Minimize window for Eth1 monitor to shut down connection - await self.consensusManager.eth1Monitor.ensureDataProvider() + blck.blck.bellatrixData.message.body.is_execution_block: + # Eth1 syncing is asynchronous from this - await newExecutionPayload( - self.consensusManager.eth1Monitor, - blck.blck.bellatrixData.message.body.execution_payload) - except CatchableError as err: - debug "runQueueProcessingLoop: newPayload failed", - err = err.msg + # TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome + # should gate this when it works more reliably + when true: + try: + # Minimize window for Eth1 monitor to shut down connection + await self.consensusManager.eth1Monitor.ensureDataProvider() + + await newExecutionPayload( + self.consensusManager.eth1Monitor, + blck.blck.bellatrixData.message.body.execution_payload) + except CatchableError as err: + debug "runQueueProcessingLoop: newPayload failed", + err = err.msg + PayloadExecutionStatus.syncing + else: + debug "runQueueProcessingLoop: got execution payload before TTD" PayloadExecutionStatus.syncing else: # Vacuously @@ -445,13 +459,13 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = PayloadExecutionStatus.invalid_block_hash]: debug "runQueueProcessingLoop: execution payload invalid", executionPayloadStatus + # Every loop iteration ends with some version of blck.resfut.complete(), + # including processBlock(), otherwise the sync manager stalls. if not blck.resfut.isNil: blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid)) continue - if executionPayloadStatus in - [PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing] and - hasExecutionPayload: + if hasExecutionPayload: # The EL client doesn't know here whether the payload is valid, because, # for example, in Geth's case, its parent isn't known. When Geth logs an # "Ignoring payload with missing parent" message, this is the result. It @@ -465,10 +479,6 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = # potential for this message until it nearly catches up, unless using an # approach such as forkchoiceUpdated to trigger sync. # - # Since the code doesn't reach here unless it's not optimistic sync, and - # it doesn't set the finalized block, it's safe enough to try this chain - # until either the EL client resyncs or returns invalid on the chain. - # # Returning the MissingParent error causes the sync manager to loop in # place until the EL does resync/catch up, then the normal process can # resume where there's a hybrid serial and optimistic sync model. @@ -481,28 +491,44 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = debug "runQueueProcessingLoop: execution payload accepted or syncing", executionPayloadStatus - await self.runForkchoiceUpdated( - self.consensusManager.dag.head.executionBlockRoot, - self.consensusManager.dag.finalizedHead.blck.executionBlockRoot) + # Always do this. Geth will only initiate syncing or reorgs with this + # combination of newPayload and forkchoiceUpdated. By design this must + # be somewhat optimistic, at least by one slot, for Geth to process it + # at all. This eventually converges to the same head as the DAG by the + # time it's externally visible via validating activity. + # + # In particular, the constraints that hold here are that Geth expects a + # sequence of + # - newPayload(execution payload with block hash `h`) followed by + # - forkchoiceUpdated(head = `h`) + # This is intrinsically somewhat optimistic, because determining the + # validity of an execution payload requires the forkchoiceUpdated + # head to be set to a block hash of some execution payload with unknown + # validity; otherwise it would not be necessary to ask the EL. + # + # The main reason this isn't done more adjacently in this code flow is to + # catch outright invalid cases, where the EL can reject a payload, without + # even running forkchoiceUpdated on it. + static: doAssert high(BeaconStateFork) == BeaconStateFork.Bellatrix + let curBh = + blck.blck.bellatrixData.message.body.execution_payload.block_hash + if curBh != lastFcHead: + lastFcHead = curBh + if await self.runForkchoiceUpdated( + blck.blck.bellatrixData.message.body.execution_payload.block_hash, + self.consensusManager.dag.finalizedHead.blck.executionBlockRoot): + # Geth seldom seems to return VALID to newPayload alone, even when + # it has all the relevant information. + self[].processBlock(blck) + continue - if not blck.resfut.isNil: - blck.resfut.complete(Result[void, BlockError].err( - BlockError.MissingParent)) - continue + if executionPayloadStatus != PayloadExecutionStatus.valid: + if not blck.resfut.isNil: + blck.resfut.complete(Result[void, BlockError].err( + BlockError.MissingParent)) - if executionPayloadStatus == PayloadExecutionStatus.valid: - self[].processBlock(blck) - else: - # Every non-nil future must be completed here, but don't want to process - # the block any further in CL terms. Also don't want to specify Invalid, - # as if it gets here, it's something more like MissingParent (except, on - # the EL side). - if not blck.resfut.isNil: - blck.resfut.complete( - Result[void, BlockError].err(BlockError.MissingParent)) + continue - if executionPayloadStatus == PayloadExecutionStatus.valid and - hasExecutionPayload: - await self.runForkchoiceUpdated( - self.consensusManager.dag.head.executionBlockRoot, - self.consensusManager.dag.finalizedHead.blck.executionBlockRoot) + # When newPayload, rather than forkchoiceUpdated, has returned valid. + doAssert executionPayloadStatus == PayloadExecutionStatus.valid + self[].processBlock(blck)