allow execution clients several seconds to construct blocks (#4012)

This commit is contained in:
tersec 2022-08-23 16:19:52 +00:00 committed by GitHub
parent 9e9db216c5
commit 1d55743ebb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 14 deletions

View File

@ -89,7 +89,7 @@ type
stateTtlCache*: StateTtlCache stateTtlCache*: StateTtlCache
nextExchangeTransitionConfTime*: Moment nextExchangeTransitionConfTime*: Moment
router*: ref MessageRouter router*: ref MessageRouter
dynamicFeeRecipientsStore*: DynamicFeeRecipientsStore dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
const const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -16,7 +16,20 @@ import
../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool], ../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
../eth1/eth1_monitor ../eth1/eth1_monitor
from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient
type type
ForkChoiceUpdatedInformation* = object
payloadId*: PayloadID
headBlockRoot*: Eth2Digest
safeBlockRoot*: Eth2Digest
finalizedBlockRoot*: Eth2Digest
timestamp*: uint64
feeRecipient*: Eth1Address
ConsensusManager* = object ConsensusManager* = object
expectedSlot: Slot expectedSlot: Slot
expectedBlockReceived: Future[bool] expectedBlockReceived: Future[bool]
@ -34,6 +47,16 @@ type
# ---------------------------------------------------------------- # ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor eth1Monitor*: Eth1Monitor
# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
keymanagerHost: ref KeymanagerHost
defaultFeeRecipient: Eth1Address
# Tracking last proposal forkchoiceUpdated payload information
# ----------------------------------------------------------------
forkchoiceUpdatedInfo*: Opt[ForkchoiceUpdatedInformation]
# Initialization # Initialization
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -41,13 +64,20 @@ func new*(T: type ConsensusManager,
dag: ChainDAGRef, dag: ChainDAGRef,
attestationPool: ref AttestationPool, attestationPool: ref AttestationPool,
quarantine: ref Quarantine, quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor eth1Monitor: Eth1Monitor,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost,
defaultFeeRecipient: Eth1Address
): ref ConsensusManager = ): ref ConsensusManager =
(ref ConsensusManager)( (ref ConsensusManager)(
dag: dag, dag: dag,
attestationPool: attestationPool, attestationPool: attestationPool,
quarantine: quarantine, quarantine: quarantine,
eth1Monitor: eth1Monitor eth1Monitor: eth1Monitor,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost,
forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation,
defaultFeeRecipient: defaultFeeRecipient
) )
# Consensus Management # Consensus Management
@ -182,6 +212,71 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
self.updateHead(newHead) self.updateHead(newHead)
proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))
proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address =
self.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if self.keymanagerHost != nil:
self.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
self.defaultFeeRecipient
else:
self.defaultFeeRecipient
from ../spec/datatypes/bellatrix import PayloadID
proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} =
withState(self.dag.headState):
let
nextSlot = state.data.slot + 1
(validatorIndex, nextProposer) =
self.dag.checkNextProposer(nextSlot).valueOr:
return
# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = compute_timestamp_at_slot(state.data, nextSlot)
randomData =
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
headBlockRoot = self.dag.loadExecutionBlockRoot(self.dag.head)
finalizedBlockRoot =
self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck)
if headBlockRoot.isZero:
return
try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor, headBlockRoot, finalizedBlockRoot, timestamp,
randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))
if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return
self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
finalizedBlockRoot: finalizedBlockRoot,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef) proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
{.async.} = {.async.} =
## Trigger fork choice and update the DAG with the new head block ## Trigger fork choice and update the DAG with the new head block
@ -197,6 +292,10 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef)
# justified and finalized # justified and finalized
self.dag.updateHead(newHead, self.quarantine[]) self.dag.updateHead(newHead, self.quarantine[])
# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()
self[].checkExpectedBlock() self[].checkExpectedBlock()
except CatchableError as exc: except CatchableError as exc:
debug "updateHeadWithExecution error", debug "updateHeadWithExecution error",

View File

@ -17,7 +17,8 @@ import
../sszdump ../sszdump
from ../consensus_object_pools/consensus_manager import from ../consensus_object_pools/consensus_manager import
ConsensusManager, runForkchoiceUpdated, updateHead, updateHeadWithExecution ConsensusManager, runForkchoiceUpdated, runProposalForkchoiceUpdated,
updateHead, updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
@ -307,6 +308,9 @@ proc storeBlock*(
executionHeadRoot, executionHeadRoot,
self.consensusManager.dag.loadExecutionBlockRoot( self.consensusManager.dag.loadExecutionBlockRoot(
self.consensusManager.dag.finalizedHead.blck)) self.consensusManager.dag.finalizedHead.blck))
# TODO remove redundant fcU in case of proposal
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated()
else: else:
asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get) asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get)
else: else:

View File

@ -290,7 +290,9 @@ proc initFullNode(
exitPool = newClone( exitPool = newClone(
ExitPool.init(dag, attestationPool, onVoluntaryExitAdded)) ExitPool.init(dag, attestationPool, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new( consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.eth1Monitor) dag, attestationPool, quarantine, node.eth1Monitor,
node.dynamicFeeRecipientsStore, node.keymanagerHost,
config.defaultFeeRecipient)
blockProcessor = BlockProcessor.new( blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime, rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime,
@ -753,7 +755,7 @@ proc init*(T: type BeaconNode,
validatorMonitor: validatorMonitor, validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache, stateTtlCache: stateTtlCache,
nextExchangeTransitionConfTime: nextExchangeTransitionConfTime, nextExchangeTransitionConfTime: nextExchangeTransitionConfTime,
dynamicFeeRecipientsStore: DynamicFeeRecipientsStore.init()) dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()))
node.initLightClient( node.initLightClient(
rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root) rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root)
@ -1203,7 +1205,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.syncCommitteeMsgPool[].pruneData(slot) node.syncCommitteeMsgPool[].pruneData(slot)
if slot.is_epoch: if slot.is_epoch:
node.trackNextSyncCommitteeTopics(slot) node.trackNextSyncCommitteeTopics(slot)
node.dynamicFeeRecipientsStore.pruneOldMappings(slot.epoch) node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch)
# Update upcoming actions - we do this every slot in case a reorg happens # Update upcoming actions - we do this every slot in case a reorg happens
let head = node.dag.head let head = node.dag.head

View File

@ -779,7 +779,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
currentEpoch = node.beaconClock.now.slotOrZero.epoch currentEpoch = node.beaconClock.now.slotOrZero.epoch
for proposerData in body: for proposerData in body:
node.dynamicFeeRecipientsStore.addMapping( node.dynamicFeeRecipientsStore[].addMapping(
proposerData.validator_index, proposerData.validator_index,
proposerData.fee_recipient, proposerData.fee_recipient,
currentEpoch) currentEpoch)

View File

@ -354,11 +354,12 @@ proc get_execution_payload(
asConsensusExecutionPayload( asConsensusExecutionPayload(
await execution_engine.getPayload(payload_id.get)) await execution_engine.getPayload(payload_id.get))
# TODO remove in favor of consensusManager copy
proc getFeeRecipient(node: BeaconNode, proc getFeeRecipient(node: BeaconNode,
pubkey: ValidatorPubKey, pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex, validatorIdx: ValidatorIndex,
epoch: Epoch): Eth1Address = epoch: Epoch): Eth1Address =
node.dynamicFeeRecipientsStore.getDynamicFeeRecipient(validatorIdx, epoch).valueOr: node.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr:
if node.keymanagerHost != nil: if node.keymanagerHost != nil:
node.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr: node.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr:
node.config.defaultFeeRecipient node.config.defaultFeeRecipient
@ -406,10 +407,31 @@ proc getExecutionPayload(
latestFinalized = latestFinalized =
node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck) node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck)
feeRecipient = node.getFeeRecipient(pubkey, validator_index, epoch) feeRecipient = node.getFeeRecipient(pubkey, validator_index, epoch)
payload_id = (await forkchoice_updated( lastFcU = node.consensusManager.forkchoiceUpdatedInfo
proposalState.bellatrixData.data, latestHead, latestFinalized, timestamp = compute_timestamp_at_slot(
feeRecipient, proposalState.bellatrixData.data,
node.consensusManager.eth1Monitor)) proposalState.bellatrixData.data.slot)
payload_id =
if lastFcU.isSome and
lastFcU.get.headBlockRoot == latestHead and
lastFcU.get.finalizedBlockRoot == latestFinalized and
lastFcU.get.timestamp == timestamp and
lastFcU.get.feeRecipient == feeRecipient:
some bellatrix.PayloadID(lastFcU.get.payloadId)
else:
debug "getExecutionPayload: didn't find payloadId, re-querying",
latestHead,
latestFinalized,
timestamp,
feeRecipient,
cachedHeadBlockRoot = lastFcU.get.headBlockRoot,
cachedFinalizedBlockRoot = lastFcU.get.finalizedBlockRoot,
cachedTimestamp = lastFcU.get.timestamp,
cachedFeeRecipient = lastFcU.get.feeRecipient
(await forkchoice_updated(
proposalState.bellatrixData.data, latestHead, latestFinalized,
feeRecipient, node.consensusManager.eth1Monitor))
payload = try: payload = try:
awaitWithTimeout( awaitWithTimeout(
get_execution_payload(payload_id, node.consensusManager.eth1Monitor), get_execution_payload(payload_id, node.consensusManager.eth1Monitor),

View File

@ -21,6 +21,10 @@ import
../beacon_chain/eth1/eth1_monitor, ../beacon_chain/eth1/eth1_monitor,
./testutil, ./testdbutil, ./testblockutil ./testutil, ./testdbutil, ./testblockutil
from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, init
from ../beacon_chain/validators/keystore_management import KeymanagerHost
proc pruneAtFinalization(dag: ChainDAGRef) = proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning(): if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG() dag.pruneStateCachesDAG()
@ -36,8 +40,11 @@ suite "Block processor" & preset():
quarantine = newClone(Quarantine.init()) quarantine = newClone(Quarantine.init())
attestationPool = newClone(AttestationPool.init(dag, quarantine)) attestationPool = newClone(AttestationPool.init(dag, quarantine))
eth1Monitor = new Eth1Monitor eth1Monitor = new Eth1Monitor
keymanagerHost: ref KeymanagerHost
consensusManager = ConsensusManager.new( consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, eth1Monitor) dag, attestationPool, quarantine, eth1Monitor,
newClone(DynamicFeeRecipientsStore.init()), keymanagerHost,
default(Eth1Address))
state = newClone(dag.headState) state = newClone(dag.headState)
cache = StateCache() cache = StateCache()
b1 = addTestBlock(state[], cache).phase0Data b1 = addTestBlock(state[], cache).phase0Data