From 1d55743ebb49279c3864abe6d2a8f2243c38e841 Mon Sep 17 00:00:00 2001 From: tersec Date: Tue, 23 Aug 2022 16:19:52 +0000 Subject: [PATCH] allow execution clients several seconds to construct blocks (#4012) --- beacon_chain/beacon_node.nim | 2 +- .../consensus_manager.nim | 103 +++++++++++++++++- .../gossip_processing/block_processor.nim | 6 +- beacon_chain/nimbus_beacon_node.nim | 8 +- beacon_chain/rpc/rest_validator_api.nim | 2 +- beacon_chain/validators/validator_duties.nim | 32 +++++- tests/test_block_processor.nim | 9 +- 7 files changed, 148 insertions(+), 14 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 37743ff4e..33be59063 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -89,7 +89,7 @@ type stateTtlCache*: StateTtlCache nextExchangeTransitionConfTime*: Moment router*: ref MessageRouter - dynamicFeeRecipientsStore*: DynamicFeeRecipientsStore + dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT diff --git a/beacon_chain/consensus_object_pools/consensus_manager.nim b/beacon_chain/consensus_object_pools/consensus_manager.nim index b9b421003..d20f68064 100644 --- a/beacon_chain/consensus_object_pools/consensus_manager.nim +++ b/beacon_chain/consensus_object_pools/consensus_manager.nim @@ -16,7 +16,20 @@ import ../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool], ../eth1/eth1_monitor +from ../spec/eth2_apis/dynamic_fee_recipients import + DynamicFeeRecipientsStore, getDynamicFeeRecipient +from ../validators/keystore_management import + KeymanagerHost, getSuggestedFeeRecipient + type + ForkChoiceUpdatedInformation* = object + payloadId*: PayloadID + headBlockRoot*: Eth2Digest + safeBlockRoot*: Eth2Digest + finalizedBlockRoot*: Eth2Digest + timestamp*: uint64 + feeRecipient*: Eth1Address + ConsensusManager* = object expectedSlot: Slot expectedBlockReceived: Future[bool] @@ -34,6 +47,16 @@ type # ---------------------------------------------------------------- eth1Monitor*: Eth1Monitor + # Allow determination of preferred fee recipient during proposals + # ---------------------------------------------------------------- + dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore + keymanagerHost: ref KeymanagerHost + defaultFeeRecipient: Eth1Address + + # Tracking last proposal forkchoiceUpdated payload information + # ---------------------------------------------------------------- + forkchoiceUpdatedInfo*: Opt[ForkchoiceUpdatedInformation] + # Initialization # ------------------------------------------------------------------------------ @@ -41,13 +64,20 @@ func new*(T: type ConsensusManager, dag: ChainDAGRef, attestationPool: ref AttestationPool, quarantine: ref Quarantine, - eth1Monitor: Eth1Monitor + eth1Monitor: Eth1Monitor, + dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore, + keymanagerHost: ref KeymanagerHost, + defaultFeeRecipient: Eth1Address ): ref ConsensusManager = (ref ConsensusManager)( dag: dag, attestationPool: attestationPool, quarantine: quarantine, - eth1Monitor: eth1Monitor + eth1Monitor: eth1Monitor, + dynamicFeeRecipientsStore: dynamicFeeRecipientsStore, + keymanagerHost: keymanagerHost, + forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation, + defaultFeeRecipient: defaultFeeRecipient ) # Consensus Management @@ -182,6 +212,71 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) = self.updateHead(newHead) +proc checkNextProposer(dag: ChainDAGRef, slot: Slot): + Opt[(ValidatorIndex, ValidatorPubKey)] = + let proposer = dag.getProposer(dag.head, slot + 1) + if proposer.isNone(): + return Opt.none((ValidatorIndex, ValidatorPubKey)) + Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey)) + +proc getFeeRecipient*( + self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex, + epoch: Epoch): Eth1Address = + self.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr: + if self.keymanagerHost != nil: + self.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr: + self.defaultFeeRecipient + else: + self.defaultFeeRecipient + +from ../spec/datatypes/bellatrix import PayloadID + +proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} = + withState(self.dag.headState): + let + nextSlot = state.data.slot + 1 + (validatorIndex, nextProposer) = + self.dag.checkNextProposer(nextSlot).valueOr: + return + + # Approximately lines up with validator_duties version. Used optimistcally/ + # opportunistically, so mismatches are fine if not too frequent. + let + timestamp = compute_timestamp_at_slot(state.data, nextSlot) + randomData = + get_randao_mix(state.data, get_current_epoch(state.data)).data + feeRecipient = self.getFeeRecipient( + nextProposer, validatorIndex, nextSlot.epoch) + headBlockRoot = self.dag.loadExecutionBlockRoot(self.dag.head) + finalizedBlockRoot = + self.dag.loadExecutionBlockRoot(self.dag.finalizedHead.blck) + + if headBlockRoot.isZero: + return + + try: + let fcResult = awaitWithTimeout( + forkchoiceUpdated( + self.eth1Monitor, headBlockRoot, finalizedBlockRoot, timestamp, + randomData, feeRecipient), + FORKCHOICEUPDATED_TIMEOUT): + debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out" + ForkchoiceUpdatedResponse( + payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing)) + + if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or + fcResult.payloadId.isNone: + return + + self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation( + payloadId: bellatrix.PayloadID(fcResult.payloadId.get), + headBlockRoot: headBlockRoot, + finalizedBlockRoot: finalizedBlockRoot, + timestamp: timestamp, + feeRecipient: feeRecipient) + except CatchableError as err: + error "Engine API fork-choice update failed", err = err.msg + proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef) {.async.} = ## Trigger fork choice and update the DAG with the new head block @@ -197,6 +292,10 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BlockRef) # justified and finalized self.dag.updateHead(newHead, self.quarantine[]) + # TODO after things stabilize with this, check for upcoming proposal and + # don't bother sending first fcU, but initially, keep both in place + asyncSpawn self.runProposalForkchoiceUpdated() + self[].checkExpectedBlock() except CatchableError as exc: debug "updateHeadWithExecution error", diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 2f891d195..925c7f2fa 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -17,7 +17,8 @@ import ../sszdump from ../consensus_object_pools/consensus_manager import - ConsensusManager, runForkchoiceUpdated, updateHead, updateHeadWithExecution + ConsensusManager, runForkchoiceUpdated, runProposalForkchoiceUpdated, + updateHead, updateHeadWithExecution from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds from ../consensus_object_pools/block_dag import BlockRef, root, slot from ../consensus_object_pools/block_pools_types import BlockError, EpochRef @@ -307,6 +308,9 @@ proc storeBlock*( executionHeadRoot, self.consensusManager.dag.loadExecutionBlockRoot( self.consensusManager.dag.finalizedHead.blck)) + + # TODO remove redundant fcU in case of proposal + asyncSpawn self.consensusManager.runProposalForkchoiceUpdated() else: asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get) else: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 6f17d9f8b..534bdfc36 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -290,7 +290,9 @@ proc initFullNode( exitPool = newClone( ExitPool.init(dag, attestationPool, onVoluntaryExitAdded)) consensusManager = ConsensusManager.new( - dag, attestationPool, quarantine, node.eth1Monitor) + dag, attestationPool, quarantine, node.eth1Monitor, + node.dynamicFeeRecipientsStore, node.keymanagerHost, + config.defaultFeeRecipient) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime, @@ -753,7 +755,7 @@ proc init*(T: type BeaconNode, validatorMonitor: validatorMonitor, stateTtlCache: stateTtlCache, nextExchangeTransitionConfTime: nextExchangeTransitionConfTime, - dynamicFeeRecipientsStore: DynamicFeeRecipientsStore.init()) + dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init())) node.initLightClient( rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root) @@ -1203,7 +1205,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = node.syncCommitteeMsgPool[].pruneData(slot) if slot.is_epoch: node.trackNextSyncCommitteeTopics(slot) - node.dynamicFeeRecipientsStore.pruneOldMappings(slot.epoch) + node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch) # Update upcoming actions - we do this every slot in case a reorg happens let head = node.dag.head diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index adb5a8d9d..e76bf3335 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -779,7 +779,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = currentEpoch = node.beaconClock.now.slotOrZero.epoch for proposerData in body: - node.dynamicFeeRecipientsStore.addMapping( + node.dynamicFeeRecipientsStore[].addMapping( proposerData.validator_index, proposerData.fee_recipient, currentEpoch) diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index ce08e167a..e8ca7ab96 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -354,11 +354,12 @@ proc get_execution_payload( asConsensusExecutionPayload( await execution_engine.getPayload(payload_id.get)) +# TODO remove in favor of consensusManager copy proc getFeeRecipient(node: BeaconNode, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex, epoch: Epoch): Eth1Address = - node.dynamicFeeRecipientsStore.getDynamicFeeRecipient(validatorIdx, epoch).valueOr: + node.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(validatorIdx, epoch).valueOr: if node.keymanagerHost != nil: node.keymanagerHost[].getSuggestedFeeRecipient(pubkey).valueOr: node.config.defaultFeeRecipient @@ -406,10 +407,31 @@ proc getExecutionPayload( latestFinalized = node.dag.loadExecutionBlockRoot(node.dag.finalizedHead.blck) feeRecipient = node.getFeeRecipient(pubkey, validator_index, epoch) - payload_id = (await forkchoice_updated( - proposalState.bellatrixData.data, latestHead, latestFinalized, - feeRecipient, - node.consensusManager.eth1Monitor)) + lastFcU = node.consensusManager.forkchoiceUpdatedInfo + timestamp = compute_timestamp_at_slot( + proposalState.bellatrixData.data, + proposalState.bellatrixData.data.slot) + payload_id = + if lastFcU.isSome and + lastFcU.get.headBlockRoot == latestHead and + lastFcU.get.finalizedBlockRoot == latestFinalized and + lastFcU.get.timestamp == timestamp and + lastFcU.get.feeRecipient == feeRecipient: + some bellatrix.PayloadID(lastFcU.get.payloadId) + else: + debug "getExecutionPayload: didn't find payloadId, re-querying", + latestHead, + latestFinalized, + timestamp, + feeRecipient, + cachedHeadBlockRoot = lastFcU.get.headBlockRoot, + cachedFinalizedBlockRoot = lastFcU.get.finalizedBlockRoot, + cachedTimestamp = lastFcU.get.timestamp, + cachedFeeRecipient = lastFcU.get.feeRecipient + + (await forkchoice_updated( + proposalState.bellatrixData.data, latestHead, latestFinalized, + feeRecipient, node.consensusManager.eth1Monitor)) payload = try: awaitWithTimeout( get_execution_payload(payload_id, node.consensusManager.eth1Monitor), diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 84e16fe11..1a397a25f 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -21,6 +21,10 @@ import ../beacon_chain/eth1/eth1_monitor, ./testutil, ./testdbutil, ./testblockutil +from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import + DynamicFeeRecipientsStore, init +from ../beacon_chain/validators/keystore_management import KeymanagerHost + proc pruneAtFinalization(dag: ChainDAGRef) = if dag.needStateCachesAndForkChoicePruning(): dag.pruneStateCachesDAG() @@ -36,8 +40,11 @@ suite "Block processor" & preset(): quarantine = newClone(Quarantine.init()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) eth1Monitor = new Eth1Monitor + keymanagerHost: ref KeymanagerHost consensusManager = ConsensusManager.new( - dag, attestationPool, quarantine, eth1Monitor) + dag, attestationPool, quarantine, eth1Monitor, + newClone(DynamicFeeRecipientsStore.init()), keymanagerHost, + default(Eth1Address)) state = newClone(dag.headState) cache = StateCache() b1 = addTestBlock(state[], cache).phase0Data