opportunistically even less async optimistic sync (#3880)

This commit is contained in:
tersec 2022-07-21 18:26:36 +00:00 committed by GitHub
parent f98e9ec8bc
commit f4208cfb23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 32 deletions

View File

@ -137,11 +137,9 @@ proc updateExecutionClientHead(self: ref ConsensusManager, newHead: BlockRef)
return return
# Can't use dag.head here because it hasn't been updated yet # Can't use dag.head here because it hasn't been updated yet
let let payloadExecutionStatus = await self.eth1Monitor.runForkchoiceUpdated(
executionFinalizedRoot = executionHeadRoot,
self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck) self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck))
payloadExecutionStatus = await self.eth1Monitor.runForkchoiceUpdated(
executionHeadRoot, executionFinalizedRoot)
case payloadExecutionStatus case payloadExecutionStatus
of PayloadExecutionStatus.valid: of PayloadExecutionStatus.valid:
@ -152,6 +150,17 @@ proc updateExecutionClientHead(self: ref ConsensusManager, newHead: BlockRef)
of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing: of PayloadExecutionStatus.accepted, PayloadExecutionStatus.syncing:
self.dag.optimisticRoots.incl newHead.root self.dag.optimisticRoots.incl newHead.root
proc updateHead*(self: var ConsensusManager, newHead: BlockRef) =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
## `pruneFinalized` must be called for pruning.
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])
self.checkExpectedBlock()
proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
## Trigger fork choice and update the DAG with the new head block ## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization ## This does not automatically prune the DAG after finalization
@ -168,13 +177,9 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
# Blocks without execution payloads can't be optimistic. # Blocks without execution payloads can't be optimistic.
self.dag.markBlockVerified(self.quarantine[], newHead.root) self.dag.markBlockVerified(self.quarantine[], newHead.root)
# Store the new head in the chain DAG - this may cause epochs to be self.updateHead(newHead)
# justified and finalized
self.dag.updateHead(newHead, self.quarantine[])
self.checkExpectedBlock() proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
proc updateHeadWithExecution*(self: ref ConsensusManager, wallSlot: Slot)
{.async.} = {.async.} =
## Trigger fork choice and update the DAG with the new head block ## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization ## This does not automatically prune the DAG after finalization
@ -182,12 +187,6 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, wallSlot: Slot)
# Grab the new head according to our latest attestation data # Grab the new head according to our latest attestation data
try: try:
let newHead = self.attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time).valueOr:
warn "Head selection failed, using previous head",
head = shortLog(self.dag.head), wallSlot
return
# Ensure dag.updateHead has most current information # Ensure dag.updateHead has most current information
await self.updateExecutionClientHead(newHead) await self.updateExecutionClientHead(newHead)

View File

@ -14,7 +14,7 @@ import
../sszdump ../sszdump
from ../consensus_object_pools/consensus_manager import from ../consensus_object_pools/consensus_manager import
ConsensusManager, updateHead, updateHeadWithExecution ConsensusManager, runForkchoiceUpdated, updateHead, updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
@ -164,7 +164,28 @@ proc storeBackfillBlock(
res res
from ../consensus_object_pools/attestation_pool import addForkChoice from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1
from ../eth1/eth1_monitor import
Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload
proc expectValidForkchoiceUpdated(
eth1Monitor: Eth1Monitor, headBlockRoot, finalizedBlockRoot: Eth2Digest):
Future[void] {.async.} =
let payloadExecutionStatus =
await eth1Monitor.runForkchoiceUpdated(headBlockRoot, finalizedBlockRoot)
if payloadExecutionStatus != PayloadExecutionStatus.valid:
# Only called when expecting this to be valid because `newPayload` or some
# previous `forkchoiceUpdated` had already marked it as valid.
warn "expectValidForkchoiceUpdate: forkChoiceUpdated not `VALID`",
payloadExecutionStatus,
headBlockRoot,
finalizedBlockRoot
from ../consensus_object_pools/attestation_pool import
addForkChoice, selectOptimisticHead
from ../consensus_object_pools/blockchain_dag import
is_optimistic, loadExecutionBlockRoot, markBlockVerified
from ../consensus_object_pools/block_dag import shortLog
from ../consensus_object_pools/spec_cache import get_attesting_indices from ../consensus_object_pools/spec_cache import get_attesting_indices
from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock from ../spec/datatypes/phase0 import TrustedSignedBeaconBlock
@ -252,15 +273,42 @@ proc storeBlock*(
# storeBlock gets called from validator_duties, which depends on its not # storeBlock gets called from validator_duties, which depends on its not
# blocking progress any longer than necessary, and processBlock here, in # blocking progress any longer than necessary, and processBlock here, in
# which case it's fine to await for a while on engine API results. # which case it's fine to await for a while on engine API results.
if not is_execution_block(signedBlock.message): #
self.consensusManager[].updateHead(wallTime.slotOrZero) # Three general scenarios: (1) pre-merge; (2) merge, already `VALID` by way
# of `newPayload`; (3) optimistically imported, need to call fcU before DAG
# updateHead. Handle each with as little async latency as feasible.
if payloadValid:
self.consensusManager.dag.markBlockVerified(
self.consensusManager.quarantine[], signedBlock.root)
# Grab the new head according to our latest attestation data; determines how
# async this needs to be.
let
wallSlot = wallTime.slotOrZero
newHead = attestationPool[].selectOptimisticHead(
wallSlot.start_beacon_time)
if newHead.isOk:
let executionHeadRoot =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get)
if executionHeadRoot.isZero:
# Blocks without execution payloads can't be optimistic.
self.consensusManager[].updateHead(newHead.get)
elif not self.consensusManager.dag.is_optimistic newHead.get.root:
# Not `NOT_VALID`; either `VALID` or `INVALIDATED`, but latter wouldn't
# be selected as head, so `VALID`. `forkchoiceUpdated` necessary for EL
# client only.
self.consensusManager[].updateHead(newHead.get)
asyncSpawn self.consensusManager.eth1Monitor.expectValidForkchoiceUpdated(
executionHeadRoot,
self.consensusManager.dag.loadExecutionBlockRoot(
self.consensusManager.dag.finalizedHead.blck))
else:
asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get)
else: else:
# This primarily exists to ensure that by the time the DAG updateHead is warn "Head selection failed, using previous head",
# called valid blocks have already been registered as verified. The head head = shortLog(self.consensusManager.dag.head), wallSlot
# can lag a slot behind wall clock, complicating detecting synced status
# for validating, otherwise.
asyncSpawn self.consensusManager.updateHeadWithExecution(
wallTime.slotOrZero)
let let
updateHeadTick = Moment.now() updateHeadTick = Moment.now()
@ -343,16 +391,13 @@ proc processBlock(
else: Result[void, BlockError].err(res.error())) else: Result[void, BlockError].err(res.error()))
from eth/async_utils import awaitWithTimeout from eth/async_utils import awaitWithTimeout
from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1
from ../eth1/eth1_monitor import
Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload
from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock from ../spec/datatypes/bellatrix import ExecutionPayload, SignedBeaconBlock
proc newExecutionPayload*( proc newExecutionPayload*(
eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload): eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload):
Future[PayloadExecutionStatus] {.async.} = Future[PayloadExecutionStatus] {.async.} =
if eth1Monitor.isNil: if eth1Monitor.isNil:
warn "newPayload: attempting to process execution payload without an Eth1Monitor. Ensure --web3-url setting is correct." warn "newPayload: attempting to process execution payload without Eth1Monitor. Ensure --web3-url setting is correct and JWT is configured."
return PayloadExecutionStatus.syncing return PayloadExecutionStatus.syncing
debug "newPayload: inserting block into execution engine", debug "newPayload: inserting block into execution engine",
@ -485,7 +530,8 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} =
if executionPayloadStatus == PayloadExecutionStatus.valid or if executionPayloadStatus == PayloadExecutionStatus.valid or
self[].is_optimistic_candidate_block(blck.blck): self[].is_optimistic_candidate_block(blck.blck):
self[].processBlock( self[].processBlock(
blck, executionPayloadStatus == PayloadExecutionStatus.valid) blck,
payloadValid = executionPayloadStatus == PayloadExecutionStatus.valid)
else: else:
debug "runQueueProcessingLoop: block cannot be optimistically imported", debug "runQueueProcessingLoop: block cannot be optimistically imported",
blck = shortLog(blck.blck) blck = shortLog(blck.blck)