From 8cc394ba49974cb1329131eab70a54e5316eb121 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Mon, 21 Mar 2022 17:52:15 +0100 Subject: [PATCH] extract DAG dependent init to own function (#3530) During operation as a light client, the chain DAG is not available. As a preparation, the beacon node initialization logic is divided into parts depending on the presence of the chain DAG, and parts that are always available (including a future light client mode). This is a pure code move without semantic changes. --- beacon_chain/nimbus_beacon_node.nim | 234 +++++++++++++++------------- 1 file changed, 124 insertions(+), 110 deletions(-) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index f4eba76dd..59210581d 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -202,6 +202,121 @@ proc checkWeakSubjectivityCheckpoint( headStateSlot = getStateField(dag.headState, slot) quit 1 +proc initFullNode( + node: BeaconNode, + rng: ref BrHmacDrbgContext, + dag: ChainDAGRef, + taskpool: TaskpoolPtr, + getBeaconTime: GetBeaconTimeFn) = + template config(): auto = node.config + + proc onAttestationReceived(data: Attestation) = + node.eventBus.emit("attestation-received", data) + proc onSyncContribution(data: SignedContributionAndProof) = + node.eventBus.emit("sync-contribution-and-proof", data) + proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = + node.eventBus.emit("voluntary-exit", data) + proc makeOnFinalizationCb( + # This `nimcall` functions helps for keeping track of what + # needs to be captured by the onFinalization closure. + eventBus: AsyncEventBus, + eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} = + static: doAssert (eventBus is ref) and (eth1Monitor is ref) + return proc(dag: ChainDAGRef, data: FinalizationInfoObject) = + if eth1Monitor != nil: + let finalizedEpochRef = dag.getFinalizedEpochRef() + discard trackFinalizedState(eth1Monitor, + finalizedEpochRef.eth1_data, + finalizedEpochRef.eth1_deposit_index) + eventBus.emit("finalization", data) + + func getLocalHeadSlot(): Slot = + dag.head.slot + + proc getLocalWallSlot(): Slot = + node.beaconClock.now.slotOrZero + + func getFirstSlotAtFinalizedEpoch(): Slot = + dag.finalizedHead.slot + + func getBackfillSlot(): Slot = + dag.backfill.slot + + let + quarantine = newClone( + Quarantine.init()) + attestationPool = newClone( + AttestationPool.init( + dag, quarantine, onAttestationReceived, config.proposerBoosting)) + syncCommitteeMsgPool = newClone( + SyncCommitteeMsgPool.init(rng, onSyncContribution)) + exitPool = newClone( + ExitPool.init(dag, onVoluntaryExitAdded)) + consensusManager = ConsensusManager.new( + dag, attestationPool, quarantine) + blockProcessor = BlockProcessor.new( + config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, + rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime) + blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock): + Future[Result[void, BlockError]] = + # The design with a callback for block verification is unusual compared + # to the rest of the application, but fits with the general approach + # taken in the sync/request managers - this is an architectural compromise + # that should probably be reimagined more holistically in the future. + let resfut = newFuture[Result[void, BlockError]]("blockVerifier") + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut) + resfut + processor = Eth2Processor.new( + config.doppelgangerDetection, + blockProcessor, node.validatorMonitor, dag, attestationPool, exitPool, + node.attachedValidators, syncCommitteeMsgPool, quarantine, rng, + getBeaconTime, taskpool) + syncManager = newSyncManager[Peer, PeerID]( + node.network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, + dag.tail.slot, blockVerifier) + backfiller = newSyncManager[Peer, PeerID]( + node.network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot, + getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, + dag.backfill.slot, blockVerifier, maxHeadAge = 0) + + dag.setFinalizationCb makeOnFinalizationCb(node.eventBus, node.eth1Monitor) + + node.dag = dag + node.quarantine = quarantine + node.attestationPool = attestationPool + node.syncCommitteeMsgPool = syncCommitteeMsgPool + node.exitPool = exitPool + node.processor = processor + node.blockProcessor = blockProcessor + node.consensusManager = consensusManager + node.requestManager = RequestManager.init(node.network, blockVerifier) + node.syncManager = syncManager + node.backfiller = backfiller + + debug "Loading validators", validatorsDir = config.validatorsDir() + + node.addValidators() + + block: + # Add in-process validators to the list of "known" validators such that + # we start with a reasonable ENR + let wallSlot = node.beaconClock.now().slotOrZero() + for validator in node.attachedValidators[].validators.values(): + if config.validatorMonitorAuto: + node.validatorMonitor[].addMonitor(validator.pubkey, validator.index) + + if validator.index.isSome(): + node.actionTracker.knownValidators[validator.index.get()] = wallSlot + let + stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot) + # Here, we also set the correct ENR should we be in all subnets mode! + node.network.updateStabilitySubnetMetadata(stabilitySubnets) + + node.network.initBeaconSync(dag, getBeaconTime) + + node.updateValidatorMetrics() + const SlashingDbName = "slashing_protection" # changing this requires physical file rename as well or history is lost. @@ -250,26 +365,6 @@ proc init*(T: type BeaconNode, genesisState, checkpointState: ref ForkedHashedBeaconState checkpointBlock: ForkedTrustedSignedBeaconBlock - proc onAttestationReceived(data: Attestation) = - eventBus.emit("attestation-received", data) - proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = - eventBus.emit("voluntary-exit", data) - proc makeOnFinalizationCb( - # This `nimcall` functions helps for keeping track of what - # needs to be captured by the onFinalization closure. - eventBus: AsyncEventBus, - eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} = - static: doAssert (eventBus is ref) and (eth1Monitor is ref) - return proc(dag: ChainDAGRef, data: FinalizationInfoObject) = - if eth1Monitor != nil: - let finalizedEpochRef = dag.getFinalizedEpochRef() - discard trackFinalizedState(eth1Monitor, - finalizedEpochRef.eth1_data, - finalizedEpochRef.eth1_deposit_index) - eventBus.emit("finalization", data) - proc onSyncContribution(data: SignedContributionAndProof) = - eventBus.emit("sync-contribution-and-proof", data) - if config.finalizedCheckpointState.isSome: let checkpointStatePath = config.finalizedCheckpointState.get.string checkpointState = try: @@ -518,14 +613,6 @@ proc init*(T: type BeaconNode, network = createEth2Node( rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime, getStateField(dag.headState, genesis_validators_root)) - quarantine = newClone(Quarantine.init()) - attestationPool = newClone( - AttestationPool.init( - dag, quarantine, onAttestationReceived, config.proposerBoosting)) - syncCommitteeMsgPool = newClone( - SyncCommitteeMsgPool.init(rng, onSyncContribution) - ) - exitPool = newClone(ExitPool.init(dag, onVoluntaryExitAdded)) case config.slashingDbKind of SlashingDbKind.v2: @@ -539,18 +626,6 @@ proc init*(T: type BeaconNode, info "Loading slashing protection database (v2)", path = config.validatorsDir() - func getLocalHeadSlot(): Slot = - dag.head.slot - - proc getLocalWallSlot(): Slot = - beaconClock.now.slotOrZero - - func getFirstSlotAtFinalizedEpoch(): Slot = - dag.finalizedHead.slot - - func getBackfillSlot(): Slot = - dag.backfill.slot - let slashingProtectionDB = SlashingProtectionDB.init( @@ -558,43 +633,13 @@ proc init*(T: type BeaconNode, config.validatorsDir(), SlashingDbName) validatorPool = newClone(ValidatorPool.init(slashingProtectionDB)) - consensusManager = ConsensusManager.new( - dag, attestationPool, quarantine - ) - blockProcessor = BlockProcessor.new( - config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - rng, taskpool, consensusManager, validatorMonitor, getBeaconTime) - blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock): - Future[Result[void, BlockError]] = - # The design with a callback for block verification is unusual compared - # to the rest of the application, but fits with the general approach - # taken in the sync/request managers - this is an architectural compromise - # that should probably be reimagined more holistically in the future. - let resfut = newFuture[Result[void, BlockError]]("blockVerifier") - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut) - resfut - processor = Eth2Processor.new( - config.doppelgangerDetection, - blockProcessor, validatorMonitor, dag, attestationPool, exitPool, - validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime, - taskpool) - syncManager = newSyncManager[Peer, PeerID]( - network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot, - getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, - dag.tail.slot, blockVerifier) - backfiller = newSyncManager[Peer, PeerID]( - network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot, - getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, - dag.backfill.slot, blockVerifier, maxHeadAge = 0) - - let stateTtlCache = if config.restCacheSize > 0: - StateTtlCache.init( - cacheSize = config.restCacheSize, - cacheTtl = chronos.seconds(config.restCacheTtl)) - else: - nil - - dag.setFinalizationCb makeOnFinalizationCb(eventBus, eth1Monitor) + stateTtlCache = + if config.restCacheSize > 0: + StateTtlCache.init( + cacheSize = config.restCacheSize, + cacheTtl = chronos.seconds(config.restCacheTtl)) + else: + nil var node = BeaconNode( nickname: nickname, @@ -605,52 +650,21 @@ proc init*(T: type BeaconNode, db: db, config: config, attachedValidators: validatorPool, - dag: dag, - quarantine: quarantine, - attestationPool: attestationPool, - syncCommitteeMsgPool: syncCommitteeMsgPool, - exitPool: exitPool, eth1Monitor: eth1Monitor, rpcServer: rpcServer, restServer: restServer, keymanagerServer: keymanagerServer, keymanagerToken: keymanagerToken, eventBus: eventBus, - requestManager: RequestManager.init(network, blockVerifier), - syncManager: syncManager, - backfiller: backfiller, actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets), - processor: processor, - blockProcessor: blockProcessor, - consensusManager: consensusManager, gossipState: {}, beaconClock: beaconClock, validatorMonitor: validatorMonitor, stateTtlCache: stateTtlCache ) - debug "Loading validators", validatorsDir = config.validatorsDir() - - node.addValidators() - - block: - # Add in-process validators to the list of "known" validators such that - # we start with a reasonable ENR - let wallSlot = node.beaconClock.now().slotOrZero() - for validator in node.attachedValidators[].validators.values(): - if config.validatorMonitorAuto: - validatorMonitor[].addMonitor(validator.pubkey, validator.index) - - if validator.index.isSome(): - node.actionTracker.knownValidators[validator.index.get()] = wallSlot - let - stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot) - # Here, we also set the correct ENR should we be in all subnets mode! - node.network.updateStabilitySubnetMetadata(stabilitySubnets) - - network.initBeaconSync(dag, getBeaconTime) - - node.updateValidatorMetrics() + node.initFullNode( + rng, dag, taskpool, getBeaconTime) node