more efficient forkchoiceUpdated usage (#4055)

* more efficient forkchoiceUpdated usage

* await rather than asyncSpawn; ensure head update before dag.updateHead

* use action tracker rather than attached validators to check for next slot proposal; use wall slot + 1 rather than state slot + 1 to correctly check when missing blocks

* re-add two-fcU case for when newPayload not VALID

* check dynamicFeeRecipientsStore for potential proposal

* remove duplicate checks for whether next proposer
This commit is contained in:
tersec 2022-09-07 18:34:52 +00:00 committed by GitHub
parent 324e021966
commit bf3a014287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 135 additions and 87 deletions

View File

@ -77,7 +77,6 @@ type
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
genesisSnapshotContent*: string
actionTracker*: ActionTracker
processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor
consensusManager*: ref ConsensusManager

View File

@ -20,6 +20,7 @@ from ../spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, getDynamicFeeRecipient
from ../validators/keystore_management import
KeymanagerHost, getSuggestedFeeRecipient
from ../validators/action_tracker import ActionTracker, getNextProposalSlot
type
ForkChoiceUpdatedInformation* = object
@ -47,6 +48,10 @@ type
# ----------------------------------------------------------------
eth1Monitor*: Eth1Monitor
# Allow determination of whether there's an upcoming proposal
# ----------------------------------------------------------------
actionTracker*: ActionTracker
# Allow determination of preferred fee recipient during proposals
# ----------------------------------------------------------------
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore
@ -66,6 +71,7 @@ func new*(T: type ConsensusManager,
attestationPool: ref AttestationPool,
quarantine: ref Quarantine,
eth1Monitor: Eth1Monitor,
actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
keymanagerHost: ref KeymanagerHost,
defaultFeeRecipient: Eth1Address
@ -75,6 +81,7 @@ func new*(T: type ConsensusManager,
attestationPool: attestationPool,
quarantine: quarantine,
eth1Monitor: eth1Monitor,
actionTracker: actionTracker,
dynamicFeeRecipientsStore: dynamicFeeRecipientsStore,
keymanagerHost: keymanagerHost,
forkchoiceUpdatedInfo: Opt.none ForkchoiceUpdatedInformation,
@ -260,12 +267,26 @@ proc updateHead*(self: var ConsensusManager, wallSlot: Slot) =
self.updateHead(newHead.blck)
proc checkNextProposer(dag: ChainDAGRef, slot: Slot):
proc checkNextProposer(
dag: ChainDAGRef, actionTracker: ActionTracker,
dynamicFeeRecipientsStore: ref DynamicFeeRecipientsStore,
slot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
let proposer = dag.getProposer(dag.head, slot + 1)
let nextSlot = slot + 1
let proposer = dag.getProposer(dag.head, nextSlot)
if proposer.isNone():
return Opt.none((ValidatorIndex, ValidatorPubKey))
Opt.some((proposer.get, dag.validatorKey(proposer.get).get().toPubKey))
if actionTracker.getNextProposalSlot(slot) != nextSlot and
dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
proposer.get, nextSlot.epoch).isNone:
return Opt.none((ValidatorIndex, ValidatorPubKey))
let proposerKey = dag.validatorKey(proposer.get).get().toPubKey
Opt.some((proposer.get, proposerKey))
proc checkNextProposer*(self: ref ConsensusManager, wallSlot: Slot):
Opt[(ValidatorIndex, ValidatorPubKey)] =
self.dag.checkNextProposer(
self.actionTracker, self.dynamicFeeRecipientsStore, wallSlot)
proc getFeeRecipient*(
self: ref ConsensusManager, pubkey: ValidatorPubKey, validatorIdx: ValidatorIndex,
@ -279,56 +300,59 @@ proc getFeeRecipient*(
from ../spec/datatypes/bellatrix import PayloadID
proc runProposalForkchoiceUpdated*(self: ref ConsensusManager) {.async.} =
withState(self.dag.headState):
let
nextSlot = state.data.slot + 1
(validatorIndex, nextProposer) =
self.dag.checkNextProposer(nextSlot).valueOr:
return
proc runProposalForkchoiceUpdated*(
self: ref ConsensusManager, wallSlot: Slot) {.async.} =
let
nextWallSlot = wallSlot + 1
(validatorIndex, nextProposer) = self.checkNextProposer(wallSlot).valueOr:
return
debug "runProposalForkchoiceUpdated: expected to be proposing next slot",
nextWallSlot, validatorIndex, nextProposer
# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = compute_timestamp_at_slot(state.data, nextSlot)
randomData =
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextSlot.epoch)
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck)
# Approximately lines up with validator_duties version. Used optimistcally/
# opportunistically, so mismatches are fine if not too frequent.
let
timestamp = withState(self.dag.headState):
compute_timestamp_at_slot(state.data, nextWallSlot)
randomData = withState(self.dag.headState):
get_randao_mix(state.data, get_current_epoch(state.data)).data
feeRecipient = self.getFeeRecipient(
nextProposer, validatorIndex, nextWallSlot.epoch)
beaconHead = self.attestationPool[].getBeaconHead(self.dag.head)
headBlockRoot = self.dag.loadExecutionBlockRoot(beaconHead.blck)
if headBlockRoot.isZero:
if headBlockRoot.isZero:
return
try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor,
headBlockRoot,
beaconHead.safeExecutionPayloadHash,
beaconHead.finalizedExecutionPayloadHash,
timestamp, randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))
if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return
try:
let fcResult = awaitWithTimeout(
forkchoiceUpdated(
self.eth1Monitor,
headBlockRoot,
beaconHead.safeExecutionPayloadHash,
beaconHead.finalizedExecutionPayloadHash,
timestamp, randomData, feeRecipient),
FORKCHOICEUPDATED_TIMEOUT):
debug "runProposalForkchoiceUpdated: forkchoiceUpdated timed out"
ForkchoiceUpdatedResponse(
payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))
self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
safeBlockRoot: beaconHead.safeExecutionPayloadHash,
finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
if fcResult.payloadStatus.status != PayloadExecutionStatus.valid or
fcResult.payloadId.isNone:
return
self.forkchoiceUpdatedInfo = Opt.some ForkchoiceUpdatedInformation(
payloadId: bellatrix.PayloadID(fcResult.payloadId.get),
headBlockRoot: headBlockRoot,
safeBlockRoot: beaconHead.safeExecutionPayloadHash,
finalizedBlockRoot: beaconHead.finalizedExecutionPayloadHash,
timestamp: timestamp,
feeRecipient: feeRecipient)
except CatchableError as err:
error "Engine API fork-choice update failed", err = err.msg
proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead)
proc updateHeadWithExecution*(
self: ref ConsensusManager, newHead: BeaconHead, wallSlot: Slot)
{.async.} =
## Trigger fork choice and update the DAG with the new head block
## This does not automatically prune the DAG after finalization
@ -343,9 +367,13 @@ proc updateHeadWithExecution*(self: ref ConsensusManager, newHead: BeaconHead)
# justified and finalized
self.dag.updateHead(newHead.blck, self.quarantine[])
# TODO after things stabilize with this, check for upcoming proposal and
# don't bother sending first fcU, but initially, keep both in place
asyncSpawn self.runProposalForkchoiceUpdated()
# If this node should propose next slot, start preparing payload. Both
# fcUs are useful: the updateExecutionClientHead(newHead) call updates
# the head state (including optimistic status) that self.dagUpdateHead
# needs while runProposalForkchoiceUpdated requires RANDAO information
# from the head state corresponding to the `newHead` block, which only
# self.dag.updateHead(...) sets up.
await self.runProposalForkchoiceUpdated(wallSlot)
self[].checkExpectedBlock()
except CatchableError as exc:

View File

@ -17,9 +17,10 @@ import
../sszdump
from ../consensus_object_pools/consensus_manager import
ConsensusManager, optimisticExecutionPayloadHash, runForkchoiceUpdated,
runForkchoiceUpdatedDiscardResult, runProposalForkchoiceUpdated,
shouldSyncOptimistically, updateHead, updateHeadWithExecution
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
runForkchoiceUpdated, runForkchoiceUpdatedDiscardResult,
runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead,
updateHeadWithExecution
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, slot
from ../consensus_object_pools/block_pools_types import BlockError, EpochRef
@ -318,8 +319,10 @@ proc storeBlock*(
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)
else:
let headExecutionPayloadHash =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck)
let
headExecutionPayloadHash =
self.consensusManager.dag.loadExecutionBlockRoot(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero
if headExecutionPayloadHash.isZero:
# Blocks without execution payloads can't be optimistic.
self.consensusManager[].updateHead(newHead.get.blck)
@ -328,15 +331,22 @@ proc storeBlock*(
# be selected as head, so `VALID`. `forkchoiceUpdated` necessary for EL
# client only.
self.consensusManager[].updateHead(newHead.get.blck)
asyncSpawn eth1Monitor.expectValidForkchoiceUpdated(
headBlockRoot = headExecutionPayloadHash,
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)
# TODO remove redundant fcU in case of proposal
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated()
if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
asyncSpawn eth1Monitor.expectValidForkchoiceUpdated(
headBlockRoot = headExecutionPayloadHash,
safeBlockRoot = newHead.get.safeExecutionPayloadHash,
finalizedBlockRoot = newHead.get.finalizedExecutionPayloadHash)
else:
# Some attached validator is next proposer, so prepare payload. As
# updateHead() updated the DAG head, runProposalForkchoiceUpdated,
# which needs the state corresponding to that head block, can run.
asyncSpawn self.consensusManager.runProposalForkchoiceUpdated(
wallSlot)
else:
asyncSpawn self.consensusManager.updateHeadWithExecution(newHead.get)
asyncSpawn self.consensusManager.updateHeadWithExecution(
newHead.get, wallSlot)
else:
warn "Head selection failed, using previous head",
head = shortLog(self.consensusManager.dag.head), wallSlot

View File

@ -292,6 +292,7 @@ proc initFullNode(
ExitPool.init(dag, attestationPool, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.eth1Monitor,
ActionTracker.init(rng, config.subscribeAllSubnets),
node.dynamicFeeRecipientsStore, node.keymanagerHost,
config.defaultFeeRecipient)
blockProcessor = BlockProcessor.new(
@ -371,9 +372,10 @@ proc initFullNode(
node.validatorMonitor[].addMonitor(validator.pubkey, validator.index)
if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let
stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
node.consensusManager[].actionTracker.knownValidators[
validator.index.get()] = wallSlot
let stabilitySubnets =
node.consensusManager[].actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
@ -735,7 +737,6 @@ proc init*(T: type BeaconNode,
keymanagerHost: keymanagerHost,
keymanagerServer: keymanagerInitResult.server,
eventBus: eventBus,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
gossipState: {},
blocksGossipState: {},
beaconClock: beaconClock,
@ -791,20 +792,22 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
return
let
aggregateSubnets = node.actionTracker.aggregateSubnets(slot)
stabilitySubnets = node.actionTracker.stabilitySubnets(slot)
aggregateSubnets =
node.consensusManager[].actionTracker.aggregateSubnets(slot)
stabilitySubnets =
node.consensusManager[].actionTracker.stabilitySubnets(slot)
subnets = aggregateSubnets + stabilitySubnets
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
# Now we know what we should be subscribed to - make it so
let
prevSubnets = node.actionTracker.subscribedSubnets
prevSubnets = node.consensusManager[].actionTracker.subscribedSubnets
unsubscribeSubnets = prevSubnets - subnets
subscribeSubnets = subnets - prevSubnets
# Remember what we subscribed to, so we can unsubscribe later
node.actionTracker.subscribedSubnets = subnets
node.consensusManager[].actionTracker.subscribedSubnets = subnets
let forkDigests = node.forkDigests()
@ -888,7 +891,7 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
for subnet_id in SubnetId:
node.network.unsubscribe(getAttestationTopic(forkDigest, subnet_id))
node.actionTracker.subscribedSubnets = default(AttnetBits)
node.consensusManager[].actionTracker.subscribedSubnets = default(AttnetBits)
func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto =
# Only used to determine which gossip topics to which to subscribe
@ -1103,15 +1106,16 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
# We "know" the actions for the current and the next epoch
withState(node.dag.headState):
if node.actionTracker.needsUpdate(state, slot.epoch):
if node.consensusManager[].actionTracker.needsUpdate(state, slot.epoch):
let epochRef = node.dag.getEpochRef(head, slot.epoch, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)
if node.actionTracker.needsUpdate(state, slot.epoch + 1):
if node.consensusManager[].actionTracker.needsUpdate(
state, slot.epoch + 1):
let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)
if node.gossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling topic subscriptions",
@ -1192,14 +1196,17 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
let head = node.dag.head
if node.isSynced(head):
withState(node.dag.headState):
if node.actionTracker.needsUpdate(state, slot.epoch + 1):
if node.consensusManager[].actionTracker.needsUpdate(
state, slot.epoch + 1):
let epochRef = node.dag.getEpochRef(head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail")
node.actionTracker.updateActions(epochRef)
node.consensusManager[].actionTracker.updateActions(epochRef)
let
nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot)
nextProposalSlot = node.actionTracker.getNextProposalSlot(slot)
nextAttestationSlot =
node.consensusManager[].actionTracker.getNextAttestationSlot(slot)
nextProposalSlot =
node.consensusManager[].actionTracker.getNextProposalSlot(slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
@ -1263,7 +1270,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.dag.advanceClearanceState()
# Prepare action tracker for the next slot
node.actionTracker.updateSlot(slot + 1)
node.consensusManager[].actionTracker.updateSlot(slot + 1)
# The last thing we do is to perform the subscriptions and unsubscriptions for
# the next slot, just before that slot starts - because of the advance cuttoff

View File

@ -605,7 +605,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
get_committee_count_per_slot(shufflingRef), request.slot,
request.committee_index)
node.actionTracker.registerDuty(
node.consensusManager[].actionTracker.registerDuty(
request.slot, subnet_id, request.validator_index,
request.is_aggregator)

View File

@ -1384,8 +1384,10 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
doppelgangerDetection.nodeLaunchSlot > GENESIS_SLOT and
node.config.doppelgangerDetection:
let
nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot - 1)
nextProposalSlot = node.actionTracker.getNextProposalSlot(slot - 1)
nextAttestationSlot =
node.consensusManager[].actionTracker.getNextAttestationSlot(slot - 1)
nextProposalSlot =
node.consensusManager[].actionTracker.getNextProposalSlot(slot - 1)
if slot in [nextAttestationSlot, nextProposalSlot]:
notice "Doppelganger detection active - skipping validator duties while observing activity on the network",
@ -1544,5 +1546,5 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
continue
let isAggregator = is_aggregator(committee.lenu64, slotSigRes.get())
node.actionTracker.registerDuty(
node.consensusManager[].actionTracker.registerDuty(
slot, subnet_id, validator_index, isAggregator)

View File

@ -23,6 +23,7 @@ import
from ../beacon_chain/spec/eth2_apis/dynamic_fee_recipients import
DynamicFeeRecipientsStore, init
from ../beacon_chain/validators/action_tracker import ActionTracker
from ../beacon_chain/validators/keystore_management import KeymanagerHost
proc pruneAtFinalization(dag: ChainDAGRef) =
@ -40,9 +41,10 @@ suite "Block processor" & preset():
quarantine = newClone(Quarantine.init())
attestationPool = newClone(AttestationPool.init(dag, quarantine))
eth1Monitor = new Eth1Monitor
actionTracker: ActionTracker
keymanagerHost: ref KeymanagerHost
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, eth1Monitor,
dag, attestationPool, quarantine, eth1Monitor, actionTracker,
newClone(DynamicFeeRecipientsStore.init()), keymanagerHost,
default(Eth1Address))
state = newClone(dag.headState)