rewrite merge sync (#3759)
This commit is contained in:
parent
69f505e2ba
commit
d41c2a293b
|
@ -1455,8 +1455,8 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
|
|||
info "startEth1Syncing: checking for merge terminal block",
|
||||
currentEpoch = m.currentEpoch,
|
||||
BELLATRIX_FORK_EPOCH = m.cfg.BELLATRIX_FORK_EPOCH,
|
||||
totalDifficulty = nextBlock.totalDifficulty,
|
||||
ttd = m.cfg.TERMINAL_TOTAL_DIFFICULTY,
|
||||
totalDifficulty = $nextBlock.totalDifficulty,
|
||||
ttd = $m.cfg.TERMINAL_TOTAL_DIFFICULTY,
|
||||
terminalBlockHash = m.terminalBlockHash
|
||||
|
||||
if terminalBlockCandidate.totalDifficulty >= m.cfg.TERMINAL_TOTAL_DIFFICULTY:
|
||||
|
|
|
@ -320,9 +320,11 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
|
|||
if res.isOk(): Result[void, BlockError].ok()
|
||||
else: Result[void, BlockError].err(res.error()))
|
||||
|
||||
func `$`(h: BlockHash): string = $h.asEth2Digest
|
||||
|
||||
proc runForkchoiceUpdated(
|
||||
self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest)
|
||||
{.async.} =
|
||||
self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest):
|
||||
Future[bool] {.async.} =
|
||||
# Allow finalizedBlockRoot to be 0 to avoid sync deadlocks.
|
||||
#
|
||||
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-3675.md#pos-events
|
||||
|
@ -334,26 +336,31 @@ proc runForkchoiceUpdated(
|
|||
# notes "`finalized_block_hash` is the hash of the latest finalized execution
|
||||
# payload (`Hash32()` if none yet finalized)"
|
||||
if headBlockRoot.isZero:
|
||||
return
|
||||
return false
|
||||
|
||||
try:
|
||||
# Minimize window for Eth1 monitor to shut down connection
|
||||
await self.consensusManager.eth1Monitor.ensureDataProvider()
|
||||
|
||||
debug "runForkChoiceUpdated: running forkchoiceUpdated",
|
||||
headBlockRoot,
|
||||
finalizedBlockRoot
|
||||
|
||||
discard awaitWithTimeout(
|
||||
let fcuR = awaitWithTimeout(
|
||||
forkchoiceUpdated(
|
||||
self.consensusManager.eth1Monitor, headBlockRoot, finalizedBlockRoot),
|
||||
FORKCHOICEUPDATED_TIMEOUT):
|
||||
debug "runForkChoiceUpdated: forkchoiceUpdated timed out"
|
||||
default(ForkchoiceUpdatedResponse)
|
||||
|
||||
debug "runForkChoiceUpdated: running forkchoiceUpdated",
|
||||
headBlockRoot,
|
||||
finalizedBlockRoot,
|
||||
payloadStatus = $fcuR.payloadStatus.status,
|
||||
latestValidHash = $fcuR.payloadStatus.latestValidHash,
|
||||
validationError = $fcuR.payloadStatus.validationError
|
||||
|
||||
return fcuR.payloadStatus.status == PayloadExecutionStatus.valid
|
||||
except CatchableError as err:
|
||||
debug "runForkChoiceUpdated: forkchoiceUpdated failed",
|
||||
err = err.msg
|
||||
discard
|
||||
return false
|
||||
|
||||
proc newExecutionPayload*(
|
||||
eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload):
|
||||
|
@ -401,6 +408,9 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
optForkchoiceHeadSlot = GENESIS_SLOT # safe default
|
||||
optForkchoiceHeadRoot: Eth2Digest
|
||||
|
||||
# don't keep spamming same fcU to Geth; might be restarting sync each time
|
||||
lastFcHead: Eth2Digest
|
||||
|
||||
while true:
|
||||
# Cooperative concurrency: one block per loop iteration - because
|
||||
# we run both networking and CPU-heavy things like block processing
|
||||
|
@ -413,8 +423,6 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
# larger network reads when under load.
|
||||
idleTimeout = 10.milliseconds
|
||||
|
||||
defaultBellatrixPayload = default(bellatrix.ExecutionPayload)
|
||||
|
||||
discard await idleAsync().withTimeout(idleTimeout)
|
||||
|
||||
let
|
||||
|
@ -422,19 +430,25 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
hasExecutionPayload = blck.blck.kind >= BeaconBlockFork.Bellatrix
|
||||
executionPayloadStatus =
|
||||
if hasExecutionPayload and
|
||||
# Allow local testnets to run without requiring an execution layer
|
||||
blck.blck.bellatrixData.message.body.execution_payload !=
|
||||
defaultBellatrixPayload:
|
||||
try:
|
||||
# Minimize window for Eth1 monitor to shut down connection
|
||||
await self.consensusManager.eth1Monitor.ensureDataProvider()
|
||||
blck.blck.bellatrixData.message.body.is_execution_block:
|
||||
# Eth1 syncing is asynchronous from this
|
||||
|
||||
await newExecutionPayload(
|
||||
self.consensusManager.eth1Monitor,
|
||||
blck.blck.bellatrixData.message.body.execution_payload)
|
||||
except CatchableError as err:
|
||||
debug "runQueueProcessingLoop: newPayload failed",
|
||||
err = err.msg
|
||||
# TODO self.consensusManager.eth1Monitor.terminalBlockHash.isSome
|
||||
# should gate this when it works more reliably
|
||||
when true:
|
||||
try:
|
||||
# Minimize window for Eth1 monitor to shut down connection
|
||||
await self.consensusManager.eth1Monitor.ensureDataProvider()
|
||||
|
||||
await newExecutionPayload(
|
||||
self.consensusManager.eth1Monitor,
|
||||
blck.blck.bellatrixData.message.body.execution_payload)
|
||||
except CatchableError as err:
|
||||
debug "runQueueProcessingLoop: newPayload failed",
|
||||
err = err.msg
|
||||
PayloadExecutionStatus.syncing
|
||||
else:
|
||||
debug "runQueueProcessingLoop: got execution payload before TTD"
|
||||
PayloadExecutionStatus.syncing
|
||||
else:
|
||||
# Vacuously
|
||||
|
@ -445,13 +459,13 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
PayloadExecutionStatus.invalid_block_hash]:
|
||||
debug "runQueueProcessingLoop: execution payload invalid",
|
||||
executionPayloadStatus
|
||||
# Every loop iteration ends with some version of blck.resfut.complete(),
|
||||
# including processBlock(), otherwise the sync manager stalls.
|
||||
if not blck.resfut.isNil:
|
||||
blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid))
|
||||
continue
|
||||
|
||||
if executionPayloadStatus in
|
||||
[PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing] and
|
||||
hasExecutionPayload:
|
||||
if hasExecutionPayload:
|
||||
# The EL client doesn't know here whether the payload is valid, because,
|
||||
# for example, in Geth's case, its parent isn't known. When Geth logs an
|
||||
# "Ignoring payload with missing parent" message, this is the result. It
|
||||
|
@ -465,10 +479,6 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
# potential for this message until it nearly catches up, unless using an
|
||||
# approach such as forkchoiceUpdated to trigger sync.
|
||||
#
|
||||
# Since the code doesn't reach here unless it's not optimistic sync, and
|
||||
# it doesn't set the finalized block, it's safe enough to try this chain
|
||||
# until either the EL client resyncs or returns invalid on the chain.
|
||||
#
|
||||
# Returning the MissingParent error causes the sync manager to loop in
|
||||
# place until the EL does resync/catch up, then the normal process can
|
||||
# resume where there's a hybrid serial and optimistic sync model.
|
||||
|
@ -481,28 +491,44 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
|
|||
debug "runQueueProcessingLoop: execution payload accepted or syncing",
|
||||
executionPayloadStatus
|
||||
|
||||
await self.runForkchoiceUpdated(
|
||||
self.consensusManager.dag.head.executionBlockRoot,
|
||||
self.consensusManager.dag.finalizedHead.blck.executionBlockRoot)
|
||||
# Always do this. Geth will only initiate syncing or reorgs with this
|
||||
# combination of newPayload and forkchoiceUpdated. By design this must
|
||||
# be somewhat optimistic, at least by one slot, for Geth to process it
|
||||
# at all. This eventually converges to the same head as the DAG by the
|
||||
# time it's externally visible via validating activity.
|
||||
#
|
||||
# In particular, the constraints that hold here are that Geth expects a
|
||||
# sequence of
|
||||
# - newPayload(execution payload with block hash `h`) followed by
|
||||
# - forkchoiceUpdated(head = `h`)
|
||||
# This is intrinsically somewhat optimistic, because determining the
|
||||
# validity of an execution payload requires the forkchoiceUpdated
|
||||
# head to be set to a block hash of some execution payload with unknown
|
||||
# validity; otherwise it would not be necessary to ask the EL.
|
||||
#
|
||||
# The main reason this isn't done more adjacently in this code flow is to
|
||||
# catch outright invalid cases, where the EL can reject a payload, without
|
||||
# even running forkchoiceUpdated on it.
|
||||
static: doAssert high(BeaconStateFork) == BeaconStateFork.Bellatrix
|
||||
let curBh =
|
||||
blck.blck.bellatrixData.message.body.execution_payload.block_hash
|
||||
if curBh != lastFcHead:
|
||||
lastFcHead = curBh
|
||||
if await self.runForkchoiceUpdated(
|
||||
blck.blck.bellatrixData.message.body.execution_payload.block_hash,
|
||||
self.consensusManager.dag.finalizedHead.blck.executionBlockRoot):
|
||||
# Geth seldom seems to return VALID to newPayload alone, even when
|
||||
# it has all the relevant information.
|
||||
self[].processBlock(blck)
|
||||
continue
|
||||
|
||||
if not blck.resfut.isNil:
|
||||
blck.resfut.complete(Result[void, BlockError].err(
|
||||
BlockError.MissingParent))
|
||||
continue
|
||||
if executionPayloadStatus != PayloadExecutionStatus.valid:
|
||||
if not blck.resfut.isNil:
|
||||
blck.resfut.complete(Result[void, BlockError].err(
|
||||
BlockError.MissingParent))
|
||||
|
||||
if executionPayloadStatus == PayloadExecutionStatus.valid:
|
||||
self[].processBlock(blck)
|
||||
else:
|
||||
# Every non-nil future must be completed here, but don't want to process
|
||||
# the block any further in CL terms. Also don't want to specify Invalid,
|
||||
# as if it gets here, it's something more like MissingParent (except, on
|
||||
# the EL side).
|
||||
if not blck.resfut.isNil:
|
||||
blck.resfut.complete(
|
||||
Result[void, BlockError].err(BlockError.MissingParent))
|
||||
continue
|
||||
|
||||
if executionPayloadStatus == PayloadExecutionStatus.valid and
|
||||
hasExecutionPayload:
|
||||
await self.runForkchoiceUpdated(
|
||||
self.consensusManager.dag.head.executionBlockRoot,
|
||||
self.consensusManager.dag.finalizedHead.blck.executionBlockRoot)
|
||||
# When newPayload, rather than forkchoiceUpdated, has returned valid.
|
||||
doAssert executionPayloadStatus == PayloadExecutionStatus.valid
|
||||
self[].processBlock(blck)
|
||||
|
|
Loading…
Reference in New Issue