extract DAG dependent init to own function (#3530)

During operation as a light client, the chain DAG is not available.
As a preparation, the beacon node initialization logic is divided into
parts depending on the presence of the chain DAG, and parts that are
always available (including a future light client mode).
This is a pure code move without semantic changes.
This commit is contained in:
Etan Kissling 2022-03-21 17:52:15 +01:00 committed by GitHub
parent 361596c719
commit 8cc394ba49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 124 additions and 110 deletions

View File

@ -202,6 +202,121 @@ proc checkWeakSubjectivityCheckpoint(
headStateSlot = getStateField(dag.headState, slot) headStateSlot = getStateField(dag.headState, slot)
quit 1 quit 1
proc initFullNode(
node: BeaconNode,
rng: ref BrHmacDrbgContext,
dag: ChainDAGRef,
taskpool: TaskpoolPtr,
getBeaconTime: GetBeaconTimeFn) =
template config(): auto = node.config
proc onAttestationReceived(data: Attestation) =
node.eventBus.emit("attestation-received", data)
proc onSyncContribution(data: SignedContributionAndProof) =
node.eventBus.emit("sync-contribution-and-proof", data)
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
node.eventBus.emit("voluntary-exit", data)
proc makeOnFinalizationCb(
# This `nimcall` functions helps for keeping track of what
# needs to be captured by the onFinalization closure.
eventBus: AsyncEventBus,
eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} =
static: doAssert (eventBus is ref) and (eth1Monitor is ref)
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
if eth1Monitor != nil:
let finalizedEpochRef = dag.getFinalizedEpochRef()
discard trackFinalizedState(eth1Monitor,
finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index)
eventBus.emit("finalization", data)
func getLocalHeadSlot(): Slot =
dag.head.slot
proc getLocalWallSlot(): Slot =
node.beaconClock.now.slotOrZero
func getFirstSlotAtFinalizedEpoch(): Slot =
dag.finalizedHead.slot
func getBackfillSlot(): Slot =
dag.backfill.slot
let
quarantine = newClone(
Quarantine.init())
attestationPool = newClone(
AttestationPool.init(
dag, quarantine, onAttestationReceived, config.proposerBoosting))
syncCommitteeMsgPool = newClone(
SyncCommitteeMsgPool.init(rng, onSyncContribution))
exitPool = newClone(
ExitPool.init(dag, onVoluntaryExitAdded))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, BlockError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool, exitPool,
node.attachedValidators, syncCommitteeMsgPool, quarantine, rng,
getBeaconTime, taskpool)
syncManager = newSyncManager[Peer, PeerID](
node.network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.tail.slot, blockVerifier)
backfiller = newSyncManager[Peer, PeerID](
node.network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.backfill.slot, blockVerifier, maxHeadAge = 0)
dag.setFinalizationCb makeOnFinalizationCb(node.eventBus, node.eth1Monitor)
node.dag = dag
node.quarantine = quarantine
node.attestationPool = attestationPool
node.syncCommitteeMsgPool = syncCommitteeMsgPool
node.exitPool = exitPool
node.processor = processor
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = RequestManager.init(node.network, blockVerifier)
node.syncManager = syncManager
node.backfiller = backfiller
debug "Loading validators", validatorsDir = config.validatorsDir()
node.addValidators()
block:
# Add in-process validators to the list of "known" validators such that
# we start with a reasonable ENR
let wallSlot = node.beaconClock.now().slotOrZero()
for validator in node.attachedValidators[].validators.values():
if config.validatorMonitorAuto:
node.validatorMonitor[].addMonitor(validator.pubkey, validator.index)
if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let
stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
node.network.initBeaconSync(dag, getBeaconTime)
node.updateValidatorMetrics()
const SlashingDbName = "slashing_protection" const SlashingDbName = "slashing_protection"
# changing this requires physical file rename as well or history is lost. # changing this requires physical file rename as well or history is lost.
@ -250,26 +365,6 @@ proc init*(T: type BeaconNode,
genesisState, checkpointState: ref ForkedHashedBeaconState genesisState, checkpointState: ref ForkedHashedBeaconState
checkpointBlock: ForkedTrustedSignedBeaconBlock checkpointBlock: ForkedTrustedSignedBeaconBlock
proc onAttestationReceived(data: Attestation) =
eventBus.emit("attestation-received", data)
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
eventBus.emit("voluntary-exit", data)
proc makeOnFinalizationCb(
# This `nimcall` functions helps for keeping track of what
# needs to be captured by the onFinalization closure.
eventBus: AsyncEventBus,
eth1Monitor: Eth1Monitor): OnFinalizedCallback {.nimcall.} =
static: doAssert (eventBus is ref) and (eth1Monitor is ref)
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
if eth1Monitor != nil:
let finalizedEpochRef = dag.getFinalizedEpochRef()
discard trackFinalizedState(eth1Monitor,
finalizedEpochRef.eth1_data,
finalizedEpochRef.eth1_deposit_index)
eventBus.emit("finalization", data)
proc onSyncContribution(data: SignedContributionAndProof) =
eventBus.emit("sync-contribution-and-proof", data)
if config.finalizedCheckpointState.isSome: if config.finalizedCheckpointState.isSome:
let checkpointStatePath = config.finalizedCheckpointState.get.string let checkpointStatePath = config.finalizedCheckpointState.get.string
checkpointState = try: checkpointState = try:
@ -518,14 +613,6 @@ proc init*(T: type BeaconNode,
network = createEth2Node( network = createEth2Node(
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime, rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
getStateField(dag.headState, genesis_validators_root)) getStateField(dag.headState, genesis_validators_root))
quarantine = newClone(Quarantine.init())
attestationPool = newClone(
AttestationPool.init(
dag, quarantine, onAttestationReceived, config.proposerBoosting))
syncCommitteeMsgPool = newClone(
SyncCommitteeMsgPool.init(rng, onSyncContribution)
)
exitPool = newClone(ExitPool.init(dag, onVoluntaryExitAdded))
case config.slashingDbKind case config.slashingDbKind
of SlashingDbKind.v2: of SlashingDbKind.v2:
@ -539,18 +626,6 @@ proc init*(T: type BeaconNode,
info "Loading slashing protection database (v2)", info "Loading slashing protection database (v2)",
path = config.validatorsDir() path = config.validatorsDir()
func getLocalHeadSlot(): Slot =
dag.head.slot
proc getLocalWallSlot(): Slot =
beaconClock.now.slotOrZero
func getFirstSlotAtFinalizedEpoch(): Slot =
dag.finalizedHead.slot
func getBackfillSlot(): Slot =
dag.backfill.slot
let let
slashingProtectionDB = slashingProtectionDB =
SlashingProtectionDB.init( SlashingProtectionDB.init(
@ -558,44 +633,14 @@ proc init*(T: type BeaconNode,
config.validatorsDir(), SlashingDbName) config.validatorsDir(), SlashingDbName)
validatorPool = newClone(ValidatorPool.init(slashingProtectionDB)) validatorPool = newClone(ValidatorPool.init(slashingProtectionDB))
consensusManager = ConsensusManager.new( stateTtlCache =
dag, attestationPool, quarantine if config.restCacheSize > 0:
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, validatorMonitor, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, BlockError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, validatorMonitor, dag, attestationPool, exitPool,
validatorPool, syncCommitteeMsgPool, quarantine, rng, getBeaconTime,
taskpool)
syncManager = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.tail.slot, blockVerifier)
backfiller = newSyncManager[Peer, PeerID](
network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
dag.backfill.slot, blockVerifier, maxHeadAge = 0)
let stateTtlCache = if config.restCacheSize > 0:
StateTtlCache.init( StateTtlCache.init(
cacheSize = config.restCacheSize, cacheSize = config.restCacheSize,
cacheTtl = chronos.seconds(config.restCacheTtl)) cacheTtl = chronos.seconds(config.restCacheTtl))
else: else:
nil nil
dag.setFinalizationCb makeOnFinalizationCb(eventBus, eth1Monitor)
var node = BeaconNode( var node = BeaconNode(
nickname: nickname, nickname: nickname,
graffitiBytes: if config.graffiti.isSome: config.graffiti.get graffitiBytes: if config.graffiti.isSome: config.graffiti.get
@ -605,52 +650,21 @@ proc init*(T: type BeaconNode,
db: db, db: db,
config: config, config: config,
attachedValidators: validatorPool, attachedValidators: validatorPool,
dag: dag,
quarantine: quarantine,
attestationPool: attestationPool,
syncCommitteeMsgPool: syncCommitteeMsgPool,
exitPool: exitPool,
eth1Monitor: eth1Monitor, eth1Monitor: eth1Monitor,
rpcServer: rpcServer, rpcServer: rpcServer,
restServer: restServer, restServer: restServer,
keymanagerServer: keymanagerServer, keymanagerServer: keymanagerServer,
keymanagerToken: keymanagerToken, keymanagerToken: keymanagerToken,
eventBus: eventBus, eventBus: eventBus,
requestManager: RequestManager.init(network, blockVerifier),
syncManager: syncManager,
backfiller: backfiller,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets), actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,
blockProcessor: blockProcessor,
consensusManager: consensusManager,
gossipState: {}, gossipState: {},
beaconClock: beaconClock, beaconClock: beaconClock,
validatorMonitor: validatorMonitor, validatorMonitor: validatorMonitor,
stateTtlCache: stateTtlCache stateTtlCache: stateTtlCache
) )
debug "Loading validators", validatorsDir = config.validatorsDir() node.initFullNode(
rng, dag, taskpool, getBeaconTime)
node.addValidators()
block:
# Add in-process validators to the list of "known" validators such that
# we start with a reasonable ENR
let wallSlot = node.beaconClock.now().slotOrZero()
for validator in node.attachedValidators[].validators.values():
if config.validatorMonitorAuto:
validatorMonitor[].addMonitor(validator.pubkey, validator.index)
if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let
stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
network.initBeaconSync(dag, getBeaconTime)
node.updateValidatorMetrics()
node node