# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[os, random, terminal, times], chronos, chronicles, metrics, metrics/chronos_httpserver, stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], ./consensus_object_pools/blob_quarantine, ./consensus_object_pools/data_column_quarantine, ./consensus_object_pools/vanity_logs/vanity_logs, ./networking/[topic_params, network_metadata_downloads, eth2_network], ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[deposit_snapshots, eip7594_helpers, engine_authentication, weak_subjectivity], ./sync/[sync_protocol, light_client_protocol], ./validators/[keystore_management, beacon_validators], "."/[ beacon_node, beacon_node_light_client, deposits, nimbus_binary_common, statusbar, trusted_node_sync, wallets] when defined(posix): import system/ansi_c from ./spec/datatypes/deneb import SignedBeaconBlock from ./spec/datatypes/electra import SignedBeaconBlock from libp2p/protocols/pubsub/gossipsub import TopicParams, validateParameters, init logScope: topics = "beacnde" # https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics declareGauge beacon_slot, "Latest slot of the beacon chain state" declareGauge beacon_current_epoch, "Current epoch" # Finalization tracking declareGauge finalization_delay, "Epoch delay between scheduled epoch and finalized epoch" declareGauge ticks_delay, "How long does to take to run the onSecond loop" declareGauge next_action_wait, "Seconds until the next attestation will be sent" declareGauge next_proposal_wait, "Seconds until the next proposal will be sent, or Inf if not known" declareGauge sync_committee_active, "1 if there are current sync committee duties, 0 otherwise" declareCounter db_checkpoint_seconds, "Time spent checkpointing the database to clear the WAL file" proc fetchGenesisState( metadata: Eth2NetworkMetadata, genesisState = none(InputFile), genesisStateUrl = none(Uri) ): Future[ref ForkedHashedBeaconState] {.async: (raises: []).} = let genesisBytes = if metadata.genesis.kind != BakedIn and genesisState.isSome: let res = io2.readAllBytes(genesisState.get.string) res.valueOr: error "Failed to read genesis state file", err = res.error.ioErrorMsg quit 1 elif metadata.hasGenesis: try: if metadata.genesis.kind == BakedInUrl: info "Obtaining genesis state", sourceUrl = $genesisStateUrl .get(parseUri metadata.genesis.url) await metadata.fetchGenesisBytes(genesisStateUrl) except CatchableError as err: error "Failed to obtain genesis state", source = metadata.genesis.sourceDesc, err = err.msg quit 1 else: @[] if genesisBytes.len > 0: try: newClone readSszForkedHashedBeaconState(metadata.cfg, genesisBytes) except CatchableError as err: error "Invalid genesis state", size = genesisBytes.len, digest = eth2digest(genesisBytes), err = err.msg quit 1 else: nil proc doRunTrustedNodeSync( db: BeaconChainDB, metadata: Eth2NetworkMetadata, databaseDir: string, eraDir: string, restUrl: string, stateId: Option[string], trustedBlockRoot: Option[Eth2Digest], backfill: bool, reindex: bool, downloadDepositSnapshot: bool, genesisState: ref ForkedHashedBeaconState) {.async.} = let syncTarget = if stateId.isSome: if trustedBlockRoot.isSome: warn "Ignoring `trustedBlockRoot`, `stateId` is set", stateId, trustedBlockRoot TrustedNodeSyncTarget( kind: TrustedNodeSyncKind.StateId, stateId: stateId.get) elif trustedBlockRoot.isSome: TrustedNodeSyncTarget( kind: TrustedNodeSyncKind.TrustedBlockRoot, trustedBlockRoot: trustedBlockRoot.get) else: TrustedNodeSyncTarget( kind: TrustedNodeSyncKind.StateId, stateId: "finalized") await db.doTrustedNodeSync( metadata.cfg, databaseDir, eraDir, restUrl, syncTarget, backfill, reindex, downloadDepositSnapshot, genesisState) func getVanityLogs(stdoutKind: StdoutLogKind): VanityLogs = case stdoutKind of StdoutLogKind.Auto: raiseAssert "inadmissable here" of StdoutLogKind.Colors: VanityLogs( onMergeTransitionBlock: bellatrixColor, onFinalizedMergeTransitionBlock: bellatrixBlink, onUpgradeToCapella: capellaColor, onKnownBlsToExecutionChange: capellaBlink, onUpgradeToDeneb: denebColor) of StdoutLogKind.NoColors: VanityLogs( onMergeTransitionBlock: bellatrixMono, onFinalizedMergeTransitionBlock: bellatrixMono, onUpgradeToCapella: capellaMono, onKnownBlsToExecutionChange: capellaMono, onUpgradeToDeneb: denebMono) of StdoutLogKind.Json, StdoutLogKind.None: VanityLogs( onMergeTransitionBlock: (proc() = notice "🐼 Proof of Stake Activated 🐼"), onFinalizedMergeTransitionBlock: (proc() = notice "🐼 Proof of Stake Finalized 🐼"), onUpgradeToCapella: (proc() = notice "🦉 Withdrowls now available 🦉"), onKnownBlsToExecutionChange: (proc() = notice "🦉 BLS to execution changed 🦉"), onUpgradeToDeneb: (proc() = notice "🐟 Proto-Danksharding is ON 🐟")) func getVanityMascot(consensusFork: ConsensusFork): string = case consensusFork of ConsensusFork.Electra: " " of ConsensusFork.Deneb: "🐟" of ConsensusFork.Capella: "🦉" of ConsensusFork.Bellatrix: "🐼" of ConsensusFork.Altair: "✨" of ConsensusFork.Phase0: "🦏" proc loadChainDag( config: BeaconNodeConf, cfg: RuntimeConfig, db: BeaconChainDB, eventBus: EventBus, validatorMonitor: ref ValidatorMonitor, networkGenesisValidatorsRoot: Opt[Eth2Digest]): ChainDAGRef = info "Loading block DAG from database", path = config.databaseDir var dag: ChainDAGRef proc onLightClientFinalityUpdate(data: ForkedLightClientFinalityUpdate) = if dag == nil: return withForkyFinalityUpdate(data): when lcDataFork > LightClientDataFork.None: let contextFork = dag.cfg.consensusForkAtEpoch(forkyFinalityUpdate.contextEpoch) eventBus.finUpdateQueue.emit( RestVersioned[ForkedLightClientFinalityUpdate]( data: data, jsonVersion: contextFork, sszContext: dag.forkDigests[].atConsensusFork(contextFork))) proc onLightClientOptimisticUpdate(data: ForkedLightClientOptimisticUpdate) = if dag == nil: return withForkyOptimisticUpdate(data): when lcDataFork > LightClientDataFork.None: let contextFork = dag.cfg.consensusForkAtEpoch(forkyOptimisticUpdate.contextEpoch) eventBus.optUpdateQueue.emit( RestVersioned[ForkedLightClientOptimisticUpdate]( data: data, jsonVersion: contextFork, sszContext: dag.forkDigests[].atConsensusFork(contextFork))) let chainDagFlags = if config.strictVerification: {strictVerification} else: {} onLightClientFinalityUpdateCb = if config.lightClientDataServe: onLightClientFinalityUpdate else: nil onLightClientOptimisticUpdateCb = if config.lightClientDataServe: onLightClientOptimisticUpdate else: nil dag = ChainDAGRef.init( cfg, db, validatorMonitor, chainDagFlags, config.eraDir, vanityLogs = getVanityLogs(detectTTY(config.logStdout)), lcDataConfig = LightClientDataConfig( serve: config.lightClientDataServe, importMode: config.lightClientDataImportMode, maxPeriods: config.lightClientDataMaxPeriods, onLightClientFinalityUpdate: onLightClientFinalityUpdateCb, onLightClientOptimisticUpdate: onLightClientOptimisticUpdateCb)) if networkGenesisValidatorsRoot.isSome: let databaseGenesisValidatorsRoot = getStateField(dag.headState, genesis_validators_root) if networkGenesisValidatorsRoot.get != databaseGenesisValidatorsRoot: fatal "The specified --data-dir contains data for a different network", networkGenesisValidatorsRoot = networkGenesisValidatorsRoot.get, databaseGenesisValidatorsRoot, dataDir = config.dataDir quit 1 # The first pruning after restart may take a while.. if config.historyMode == HistoryMode.Prune: dag.pruneHistory(true) dag proc checkWeakSubjectivityCheckpoint( dag: ChainDAGRef, wsCheckpoint: Checkpoint, beaconClock: BeaconClock) = let currentSlot = beaconClock.now.slotOrZero isCheckpointStale = not is_within_weak_subjectivity_period( dag.cfg, currentSlot, dag.headState, wsCheckpoint) if isCheckpointStale: error "Weak subjectivity checkpoint is stale", currentSlot, checkpoint = wsCheckpoint, headStateSlot = getStateField(dag.headState, slot) quit 1 from ./spec/state_transition_block import kzg_commitment_to_versioned_hash proc initFullNode( node: BeaconNode, rng: ref HmacDrbgContext, dag: ChainDAGRef, taskpool: TaskPoolPtr, getBeaconTime: GetBeaconTimeFn) {.async.} = template config(): auto = node.config proc onAttestationReceived(data: phase0.Attestation) = node.eventBus.attestQueue.emit(data) proc onSyncContribution(data: SignedContributionAndProof) = node.eventBus.contribQueue.emit(data) proc onVoluntaryExitAdded(data: SignedVoluntaryExit) = node.eventBus.exitQueue.emit(data) proc onBLSToExecutionChangeAdded(data: SignedBLSToExecutionChange) = node.eventBus.blsToExecQueue.emit(data) proc onProposerSlashingAdded(data: ProposerSlashing) = node.eventBus.propSlashQueue.emit(data) proc onAttesterSlashingAdded(data: phase0.AttesterSlashing) = node.eventBus.attSlashQueue.emit(data) proc onBlobSidecarAdded(data: BlobSidecar) = node.eventBus.blobSidecarQueue.emit( BlobSidecarInfoObject( block_root: hash_tree_root(data.signed_block_header.message), index: data.index, slot: data.signed_block_header.message.slot, kzg_commitment: data.kzg_commitment, versioned_hash: data.kzg_commitment.kzg_commitment_to_versioned_hash.to0xHex)) proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) = let optimistic = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: some node.dag.is_optimistic(data.toBlockId()) else: none[bool]() node.eventBus.blocksQueue.emit( EventBeaconBlockObject.init(data, optimistic)) proc onHeadChanged(data: HeadChangeInfoObject) = let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data res.optimistic = some node.dag.is_optimistic( BlockId(slot: data.slot, root: data.block_root)) res else: data node.eventBus.headQueue.emit(eventData) proc onChainReorg(data: ReorgInfoObject) = let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data res.optimistic = some node.dag.is_optimistic( BlockId(slot: data.slot, root: data.new_head_block)) res else: data node.eventBus.reorgQueue.emit(eventData) proc makeOnFinalizationCb( # This `nimcall` functions helps for keeping track of what # needs to be captured by the onFinalization closure. eventBus: EventBus, elManager: ELManager): OnFinalizedCallback {.nimcall.} = static: doAssert (elManager is ref) return proc(dag: ChainDAGRef, data: FinalizationInfoObject) = if elManager != nil: let finalizedEpochRef = dag.getFinalizedEpochRef() discard trackFinalizedState(elManager, finalizedEpochRef.eth1_data, finalizedEpochRef.eth1_deposit_index) node.updateLightClientFromDag() let eventData = if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH: var res = data # `slot` in this `BlockId` may be higher than block's actual slot, # this is alright for the purpose of calling `is_optimistic`. res.optimistic = some node.dag.is_optimistic( BlockId(slot: data.epoch.start_slot, root: data.block_root)) res else: data eventBus.finalQueue.emit(eventData) func getLocalHeadSlot(): Slot = dag.head.slot proc getLocalWallSlot(): Slot = node.beaconClock.now.slotOrZero func getFirstSlotAtFinalizedEpoch(): Slot = dag.finalizedHead.slot func getBackfillSlot(): Slot = if dag.backfill.parent_root != dag.tail.root: dag.backfill.slot else: dag.tail.slot func getFrontfillSlot(): Slot = max(dag.frontfill.get(BlockId()).slot, dag.horizon) let quarantine = newClone( Quarantine.init()) attestationPool = newClone(AttestationPool.init( dag, quarantine, config.forkChoiceVersion.get, onAttestationReceived)) syncCommitteeMsgPool = newClone( SyncCommitteeMsgPool.init(rng, dag.cfg, onSyncContribution)) lightClientPool = newClone( LightClientPool()) validatorChangePool = newClone(ValidatorChangePool.init( dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded, onProposerSlashingAdded, onAttesterSlashingAdded)) blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded)) dataColumnQuarantine = newClone(DataColumnQuarantine.init()) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), node.dynamicFeeRecipientsStore, config.validatorsDir, config.defaultFeeRecipient, config.suggestedGasLimit) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, blobQuarantine, dataColumnQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( MsgSource.gossip, signedBlock, blobs, data_columns, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): # when consensusFork >= ConsensusFork.Deneb: # if not blobQuarantine[].hasBlobs(forkyBlck): # # We don't have all the blobs for this block, so we have # # to put it in blobless quarantine. # if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): # err(VerifierError.UnviableFork) # else: # err(VerifierError.MissingParent) # else: # let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) # await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, # Opt.some(blobs), Opt.none(DataColumnSidecars), # maybeFinalized = maybeFinalized) # else: # await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, # Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), # maybeFinalized = maybeFinalized) when consensusFork >= ConsensusFork.Deneb: if len(forkyBlck.message.body.blob_kzg_commitments) != 0: if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck): err(VerifierError.UnviableFork) else: err(VerifierError.MissingParent) else: let data_columns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck) await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), Opt.some(data_columns), maybeFinalized = maybeFinalized) else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) rmanBlockLoader = proc( blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = dag.getForkedBlock(blockRoot) rmanBlobLoader = proc( blobId: BlobIdentifier): Opt[ref BlobSidecar] = var blob_sidecar = BlobSidecar.new() if dag.db.getBlobSidecar(blobId.block_root, blobId.index, blob_sidecar[]): Opt.some blob_sidecar else: Opt.none(ref BlobSidecar) rmanDataColumnLoader = proc( columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] = var data_column_sidecar = DataColumnSidecar.new() if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]): Opt.some data_column_sidecar else: Opt.none(ref DataColumnSidecar) processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine, rng, getBeaconTime, taskpool) router = (ref MessageRouter)( processor: processor, network: node.network) var supernode = node.config.subscribeAllSubnets let syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, supernode, node.network.nodeId, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.tail.slot, blockVerifier) backfiller = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, supernode, node.network.nodeId, SyncQueueKind.Backward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) requestManager = RequestManager.init( node.network, supernode, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier, rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader) if node.config.lightClientDataServe: proc scheduleSendingLightClientUpdates(slot: Slot) = if node.lightClientPool[].broadcastGossipFut != nil: return if slot <= node.lightClientPool[].latestBroadcastedSlot: return node.lightClientPool[].latestBroadcastedSlot = slot template fut(): auto = node.lightClientPool[].broadcastGossipFut fut = node.handleLightClientUpdates(slot) fut.addCallback do (p: pointer) {.gcsafe.}: fut = nil router.onSyncCommitteeMessage = scheduleSendingLightClientUpdates dag.setFinalizationCb makeOnFinalizationCb(node.eventBus, node.elManager) dag.setBlockCb(onBlockAdded) dag.setHeadCb(onHeadChanged) dag.setReorgCb(onChainReorg) node.dag = dag node.blobQuarantine = blobQuarantine node.dataColumnQuarantine = dataColumnQuarantine node.quarantine = quarantine node.attestationPool = attestationPool node.syncCommitteeMsgPool = syncCommitteeMsgPool node.lightClientPool = lightClientPool node.validatorChangePool = validatorChangePool node.processor = processor node.blockProcessor = blockProcessor node.consensusManager = consensusManager node.requestManager = requestManager node.syncManager = syncManager node.backfiller = backfiller node.router = router await node.addValidators() block: # Add in-process validators to the list of "known" validators such that # we start with a reasonable ENR let wallSlot = node.beaconClock.now().slotOrZero() for validator in node.attachedValidators[].validators.values(): if config.validatorMonitorAuto: node.validatorMonitor[].addMonitor(validator.pubkey, validator.index) if validator.index.isSome(): withState(dag.headState): let idx = validator.index.get() if distinctBase(idx) <= forkyState.data.validators.lenu64: template v: auto = forkyState.data.validators.item(idx) if is_active_validator(v, wallSlot.epoch) or is_active_validator(v, wallSlot.epoch + 1): node.consensusManager[].actionTracker.knownValidators[idx] = wallSlot elif is_exited_validator(v, wallSlot.epoch): notice "Ignoring exited validator", index = idx, pubkey = shortLog(v.pubkey) let stabilitySubnets = node.consensusManager[].actionTracker.stabilitySubnets(wallSlot) # Here, we also set the correct ENR should we be in all subnets mode! node.network.updateStabilitySubnetMetadata(stabilitySubnets) node.network.registerProtocol( PeerSync, PeerSync.NetworkState.init( node.dag, node.beaconClock.getBeaconTimeFn(), )) node.network.registerProtocol( BeaconSync, BeaconSync.NetworkState.init(node.dag)) if node.dag.lcDataStore.serve: node.network.registerProtocol( LightClientSync, LightClientSync.NetworkState.init(node.dag)) node.updateValidatorMetrics() const SlashingDbName = "slashing_protection" # changing this requires physical file rename as well or history is lost. proc init*(T: type BeaconNode, rng: ref HmacDrbgContext, config: BeaconNodeConf, metadata: Eth2NetworkMetadata): Future[BeaconNode] {.async.} = var taskpool: TaskPoolPtr template cfg: auto = metadata.cfg template eth1Network: auto = metadata.eth1Network try: if config.numThreads < 0: fatal "The number of threads --numThreads cannot be negative." quit 1 elif config.numThreads == 0: taskpool = TaskPoolPtr.new(numThreads = min(countProcessors(), 16)) else: taskpool = TaskPoolPtr.new(numThreads = config.numThreads) info "Threadpool started", numThreads = taskpool.numThreads except Exception: raise newException(Defect, "Failure in taskpool initialization.") if metadata.genesis.kind == BakedIn: if config.genesisState.isSome: warn "The --genesis-state option has no effect on networks with built-in genesis state" if config.genesisStateUrl.isSome: warn "The --genesis-state-url option has no effect on networks with built-in genesis state" let eventBus = EventBus( headQueue: newAsyncEventQueue[HeadChangeInfoObject](), blocksQueue: newAsyncEventQueue[EventBeaconBlockObject](), attestQueue: newAsyncEventQueue[phase0.Attestation](), exitQueue: newAsyncEventQueue[SignedVoluntaryExit](), blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](), propSlashQueue: newAsyncEventQueue[ProposerSlashing](), attSlashQueue: newAsyncEventQueue[AttesterSlashing](), blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](), finalQueue: newAsyncEventQueue[FinalizationInfoObject](), reorgQueue: newAsyncEventQueue[ReorgInfoObject](), contribQueue: newAsyncEventQueue[SignedContributionAndProof](), finUpdateQueue: newAsyncEventQueue[ RestVersioned[ForkedLightClientFinalityUpdate]](), optUpdateQueue: newAsyncEventQueue[ RestVersioned[ForkedLightClientOptimisticUpdate]]()) db = BeaconChainDB.new(config.databaseDir, cfg, inMemory = false) if config.externalBeaconApiUrl.isSome and ChainDAGRef.isInitialized(db).isErr: var genesisState: ref ForkedHashedBeaconState let trustedBlockRoot = if config.trustedStateRoot.isSome or config.trustedBlockRoot.isSome: config.trustedBlockRoot elif cfg.ALTAIR_FORK_EPOCH == GENESIS_EPOCH: # Sync can be bootstrapped from the genesis block root genesisState = await fetchGenesisState( metadata, config.genesisState, config.genesisStateUrl) if genesisState != nil: let genesisBlockRoot = get_initial_beacon_block(genesisState[]).root notice "Neither `--trusted-block-root` nor `--trusted-state-root` " & "provided with `--external-beacon-api-url`, " & "falling back to genesis block root", externalBeaconApiUrl = config.externalBeaconApiUrl.get, trustedBlockRoot = config.trustedBlockRoot, trustedStateRoot = config.trustedStateRoot, genesisBlockRoot = $genesisBlockRoot some genesisBlockRoot else: none[Eth2Digest]() else: none[Eth2Digest]() if config.trustedStateRoot.isNone and trustedBlockRoot.isNone: warn "Ignoring `--external-beacon-api-url`, neither " & "`--trusted-block-root` nor `--trusted-state-root` provided", externalBeaconApiUrl = config.externalBeaconApiUrl.get, trustedBlockRoot = config.trustedBlockRoot, trustedStateRoot = config.trustedStateRoot else: if genesisState == nil: genesisState = await fetchGenesisState( metadata, config.genesisState, config.genesisStateUrl) await db.doRunTrustedNodeSync( metadata, config.databaseDir, config.eraDir, config.externalBeaconApiUrl.get, config.trustedStateRoot.map do (x: Eth2Digest) -> string: "0x" & x.data.toHex, trustedBlockRoot, backfill = false, reindex = false, downloadDepositSnapshot = false, genesisState) if config.finalizedCheckpointBlock.isSome: warn "--finalized-checkpoint-block has been deprecated, ignoring" let checkpointState = if config.finalizedCheckpointState.isSome: let checkpointStatePath = config.finalizedCheckpointState.get.string let tmp = try: newClone(readSszForkedHashedBeaconState( cfg, readAllBytes(checkpointStatePath).tryGet())) except SszError as err: fatal "Checkpoint state loading failed", err = formatMsg(err, checkpointStatePath) quit 1 except CatchableError as err: fatal "Failed to read checkpoint state file", err = err.msg quit 1 if not getStateField(tmp[], slot).is_epoch: fatal "--finalized-checkpoint-state must point to a state for an epoch slot", slot = getStateField(tmp[], slot) quit 1 tmp else: nil if config.finalizedDepositTreeSnapshot.isSome: let depositTreeSnapshotPath = config.finalizedDepositTreeSnapshot.get.string snapshot = try: SSZ.loadFile(depositTreeSnapshotPath, DepositTreeSnapshot) except SszError as err: fatal "Deposit tree snapshot loading failed", err = formatMsg(err, depositTreeSnapshotPath) quit 1 except CatchableError as err: fatal "Failed to read deposit tree snapshot file", err = err.msg quit 1 depositContractSnapshot = DepositContractSnapshot.init(snapshot).valueOr: fatal "Invalid deposit tree snapshot file" quit 1 db.putDepositContractSnapshot(depositContractSnapshot) let engineApiUrls = config.engineApiUrls if engineApiUrls.len == 0: notice "Running without execution client - validator features disabled (see https://nimbus.guide/eth1.html)" var networkGenesisValidatorsRoot = metadata.bakedGenesisValidatorsRoot if not ChainDAGRef.isInitialized(db).isOk(): let genesisState = if checkpointState != nil and getStateField(checkpointState[], slot) == 0: checkpointState else: await fetchGenesisState( metadata, config.genesisState, config.genesisStateUrl) if genesisState == nil and checkpointState == nil: fatal "No database and no genesis snapshot found. Please supply a genesis.ssz " & "with the network configuration" quit 1 if not genesisState.isNil and not checkpointState.isNil: if getStateField(genesisState[], genesis_validators_root) != getStateField(checkpointState[], genesis_validators_root): fatal "Checkpoint state does not match genesis - check the --network parameter", rootFromGenesis = getStateField( genesisState[], genesis_validators_root), rootFromCheckpoint = getStateField( checkpointState[], genesis_validators_root) quit 1 try: # Always store genesis state if we have it - this allows reindexing and # answering genesis queries if not genesisState.isNil: ChainDAGRef.preInit(db, genesisState[]) networkGenesisValidatorsRoot = Opt.some(getStateField(genesisState[], genesis_validators_root)) if not checkpointState.isNil: if genesisState.isNil or getStateField(checkpointState[], slot) != GENESIS_SLOT: ChainDAGRef.preInit(db, checkpointState[]) doAssert ChainDAGRef.isInitialized(db).isOk(), "preInit should have initialized db" except CatchableError as exc: error "Failed to initialize database", err = exc.msg quit 1 else: if not checkpointState.isNil: fatal "A database already exists, cannot start from given checkpoint", dataDir = config.dataDir quit 1 # Doesn't use std/random directly, but dependencies might randomize(rng[].rand(high(int))) # The validatorMonitorTotals flag has been deprecated and should eventually be # removed - until then, it's given priority if set so as not to needlessly # break existing setups let validatorMonitor = newClone(ValidatorMonitor.init( config.validatorMonitorAuto, config.validatorMonitorTotals.get( not config.validatorMonitorDetails))) for key in config.validatorMonitorPubkeys: validatorMonitor[].addMonitor(key, Opt.none(ValidatorIndex)) let dag = loadChainDag( config, cfg, db, eventBus, validatorMonitor, networkGenesisValidatorsRoot) genesisTime = getStateField(dag.headState, genesis_time) beaconClock = BeaconClock.init(genesisTime).valueOr: fatal "Invalid genesis time in state", genesisTime quit 1 getBeaconTime = beaconClock.getBeaconTimeFn() if config.weakSubjectivityCheckpoint.isSome: dag.checkWeakSubjectivityCheckpoint( config.weakSubjectivityCheckpoint.get, beaconClock) let elManager = ELManager.new( cfg, metadata.depositContractBlock, metadata.depositContractBlockHash, db, engineApiUrls, eth1Network) if config.rpcEnabled.isSome: warn "Nimbus's JSON-RPC server has been removed. This includes the --rpc, --rpc-port, and --rpc-address configuration options. https://nimbus.guide/rest-api.html shows how to enable and configure the REST Beacon API server which replaces it." let restServer = if config.restEnabled: RestServerRef.init(config.restAddress, config.restPort, config.restAllowedOrigin, validateBeaconApiQueries, config) else: nil let netKeys = getPersistentNetKeys(rng[], config) nickname = if config.nodeName == "auto": shortForm(netKeys) else: config.nodeName network = createEth2Node( rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime, getStateField(dag.headState, genesis_validators_root)) case config.slashingDbKind of SlashingDbKind.v2: discard of SlashingDbKind.v1: error "Slashing DB v1 is no longer supported for writing" quit 1 of SlashingDbKind.both: warn "Slashing DB v1 deprecated, writing only v2" info "Loading slashing protection database (v2)", path = config.validatorsDir() proc getValidatorAndIdx(pubkey: ValidatorPubKey): Opt[ValidatorAndIndex] = withState(dag.headState): getValidator(forkyState().data.validators.asSeq(), pubkey) func getCapellaForkVersion(): Opt[Version] = Opt.some(cfg.CAPELLA_FORK_VERSION) func getDenebForkEpoch(): Opt[Epoch] = Opt.some(cfg.DENEB_FORK_EPOCH) func getElectraForkEpoch(): Opt[Epoch] = Opt.some(cfg.ELECTRA_FORK_EPOCH) proc getForkForEpoch(epoch: Epoch): Opt[Fork] = Opt.some(dag.forkAtEpoch(epoch)) proc getGenesisRoot(): Eth2Digest = getStateField(dag.headState, genesis_validators_root) let keystoreCache = KeystoreCacheRef.init() slashingProtectionDB = SlashingProtectionDB.init( getStateField(dag.headState, genesis_validators_root), config.validatorsDir(), SlashingDbName) validatorPool = newClone(ValidatorPool.init( slashingProtectionDB, config.doppelgangerDetection)) keymanagerInitResult = initKeymanagerServer(config, restServer) keymanagerHost = if keymanagerInitResult.server != nil: newClone KeymanagerHost.init( validatorPool, keystoreCache, rng, keymanagerInitResult.token, config.validatorsDir, config.secretsDir, config.defaultFeeRecipient, config.suggestedGasLimit, config.defaultGraffitiBytes, config.getPayloadBuilderAddress, getValidatorAndIdx, getBeaconTime, getCapellaForkVersion, getDenebForkEpoch, getForkForEpoch, getGenesisRoot) else: nil stateTtlCache = if config.restCacheSize > 0: StateTtlCache.init( cacheSize = config.restCacheSize, cacheTtl = chronos.seconds(config.restCacheTtl)) else: nil if config.payloadBuilderEnable: info "Using external payload builder", payloadBuilderUrl = config.payloadBuilderUrl let node = BeaconNode( nickname: nickname, graffitiBytes: if config.graffiti.isSome: config.graffiti.get else: defaultGraffitiBytes(), network: network, netKeys: netKeys, db: db, config: config, attachedValidators: validatorPool, elManager: elManager, restServer: restServer, keymanagerHost: keymanagerHost, keymanagerServer: keymanagerInitResult.server, keystoreCache: keystoreCache, eventBus: eventBus, gossipState: {}, blocksGossipState: {}, beaconClock: beaconClock, validatorMonitor: validatorMonitor, stateTtlCache: stateTtlCache, dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init())) node.initLightClient( rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root) await node.initFullNode(rng, dag, taskpool, getBeaconTime) node.updateLightClientFromDag() node func verifyFinalization(node: BeaconNode, slot: Slot) = # Epoch must be >= 4 to check finalization const SETTLING_TIME_OFFSET = 1'u64 let epoch = slot.epoch() # Don't static-assert this -- if this isn't called, don't require it doAssert SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET # Intentionally, loudly assert. Point is to fail visibly and unignorably # during testing. if epoch >= 4 and slot mod SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET: let finalizedEpoch = node.dag.finalizedHead.slot.epoch() # Finalization rule 234, that has the most lag slots among the cases, sets # state.finalized_checkpoint = old_previous_justified_checkpoint.epoch + 3 # and then state.slot gets incremented, to increase the maximum offset, if # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch from std/sequtils import toSeq func subnetLog(v: BitArray): string = $toSeq(v.oneIndices()) func forkDigests(node: BeaconNode): auto = let forkDigestsArray: array[ConsensusFork, auto] = [ node.dag.forkDigests.phase0, node.dag.forkDigests.altair, node.dag.forkDigests.bellatrix, node.dag.forkDigests.capella, node.dag.forkDigests.deneb, node.dag.forkDigests.electra] forkDigestsArray # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attestation-subnet-subscription proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) = if node.gossipState.card == 0: # When disconnected, updateBlocksGossipStatus is responsible for all things # subnets - in particular, it will remove subscriptions on the edge where # we enter the disconnected state. return let aggregateSubnets = node.consensusManager[].actionTracker.aggregateSubnets(slot) stabilitySubnets = node.consensusManager[].actionTracker.stabilitySubnets(slot) subnets = aggregateSubnets + stabilitySubnets node.network.updateStabilitySubnetMetadata(stabilitySubnets) # Now we know what we should be subscribed to - make it so let prevSubnets = node.consensusManager[].actionTracker.subscribedSubnets unsubscribeSubnets = prevSubnets - subnets subscribeSubnets = subnets - prevSubnets # Remember what we subscribed to, so we can unsubscribe later node.consensusManager[].actionTracker.subscribedSubnets = subnets let forkDigests = node.forkDigests() for gossipFork in node.gossipState: let forkDigest = forkDigests[gossipFork] node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, forkDigest) node.network.subscribeAttestationSubnets(subscribeSubnets, forkDigest) debug "Attestation subnets", slot, epoch = slot.epoch, gossipState = node.gossipState, stabilitySubnets = subnetLog(stabilitySubnets), aggregateSubnets = subnetLog(aggregateSubnets), prevSubnets = subnetLog(prevSubnets), subscribeSubnets = subnetLog(subscribeSubnets), unsubscribeSubnets = subnetLog(unsubscribeSubnets), gossipState = node.gossipState proc updateBlocksGossipStatus*( node: BeaconNode, slot: Slot, dagIsBehind: bool) = template cfg(): auto = node.dag.cfg let isBehind = if node.shouldSyncOptimistically(slot): # If optimistic sync is active, always subscribe to blocks gossip false else: # Use DAG status to determine whether to subscribe for blocks gossip dagIsBehind targetGossipState = getTargetGossipState( slot.epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH, cfg.CAPELLA_FORK_EPOCH, cfg.DENEB_FORK_EPOCH, cfg.ELECTRA_FORK_EPOCH, isBehind) template currentGossipState(): auto = node.blocksGossipState if currentGossipState == targetGossipState: return if currentGossipState.card == 0 and targetGossipState.card > 0: debug "Enabling blocks topic subscriptions", wallSlot = slot, targetGossipState elif currentGossipState.card > 0 and targetGossipState.card == 0: debug "Disabling blocks topic subscriptions", wallSlot = slot else: # Individual forks added / removed discard let newGossipForks = targetGossipState - currentGossipState oldGossipForks = currentGossipState - targetGossipState for gossipFork in oldGossipForks: let forkDigest = node.dag.forkDigests[].atConsensusFork(gossipFork) node.network.unsubscribe(getBeaconBlocksTopic(forkDigest)) for gossipFork in newGossipForks: let forkDigest = node.dag.forkDigests[].atConsensusFork(gossipFork) node.network.subscribe( getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true) node.blocksGossipState = targetGossipState proc addPhase0MessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams) node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams) node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams) node.network.subscribe( getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true) # updateAttestationSubnetHandlers subscribes attestation subnets proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest)) node.network.unsubscribe(getProposerSlashingsTopic(forkDigest)) node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest)) node.network.unsubscribe(getAggregateAndProofsTopic(forkDigest)) for subnet_id in SubnetId: node.network.unsubscribe(getAttestationTopic(forkDigest, subnet_id)) node.consensusManager[].actionTracker.subscribedSubnets = default(AttnetBits) func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto = # Only used to determine which gossip topics to which to subscribe if node.config.subscribeAllSubnets: (func(pubkey: ValidatorPubKey): bool {.closure.} = true) else: (func(pubkey: ValidatorPubKey): bool = node.consensusManager[].actionTracker.hasSyncDuty(pubkey, epoch) or pubkey in node.attachedValidators[].validators) func getCurrentSyncCommiteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = let syncCommittee = withState(node.dag.headState): when consensusFork >= ConsensusFork.Altair: forkyState.data.current_sync_committee else: return static(default(SyncnetBits)) getSyncSubnets(node.hasSyncPubKey(epoch), syncCommittee) func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = let syncCommittee = withState(node.dag.headState): when consensusFork >= ConsensusFork.Altair: forkyState.data.next_sync_committee else: return static(default(SyncnetBits)) getSyncSubnets( node.hasSyncPubKey((epoch.sync_committee_period + 1).start_slot().epoch), syncCommittee) func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = let subnets = node.getCurrentSyncCommiteeSubnets(epoch) epochsToSyncPeriod = nearSyncCommitteePeriod(epoch) # The end-slot tracker might call this when it's theoretically applicable, # but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync # committee period begins, in which case `epochsToNextSyncPeriod` is none. if epochsToSyncPeriod.isNone or node.dag.cfg.consensusForkAtEpoch(epoch + epochsToSyncPeriod.get) < ConsensusFork.Altair: return subnets subnets + node.getNextSyncCommitteeSubnets(epoch) proc addAltairMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addPhase0MessageHandlers(forkDigest, slot) # If this comes online near sync committee period, it'll immediately get # replaced as usual by trackSyncCommitteeTopics, which runs at slot end. let syncnets = node.getSyncCommitteeSubnets(slot.epoch) for subcommitteeIdx in SyncSubcommitteeIndex: if syncnets[subcommitteeIdx]: node.network.subscribe( getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams) node.network.subscribe( getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams) node.network.updateSyncnetsMetadata(syncnets) proc addCapellaMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addAltairMessageHandlers(forkDigest, slot) node.network.subscribe(getBlsToExecutionChangeTopic(forkDigest), basicParams) proc fetchCustodySubnetCount* (node: BeaconNode): uint64= var res = CUSTODY_REQUIREMENT.uint64 if node.config.subscribeAllSubnets: res = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 res proc addDenebMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addCapellaMessageHandlers(forkDigest, slot) let targetSubnets = node.fetchCustodySubnetCount() custody_subnets = node.network.nodeId.get_custody_column_subnets(max(SAMPLES_PER_SLOT.uint8, targetSubnets.uint8)) debugEcho "Target Subnets" debugEcho targetSubnets debugEcho "Custody Subnets" for cs in custody_subnets.get: debugEcho cs for i in 0'u64 ..< DATA_COLUMN_SIDECAR_SUBNET_COUNT: if i in custody_subnets.get: let topic = getDataColumnSidecarTopic(forkDigest, i) debugEcho "Topic" debugEcho topic node.network.subscribe(topic, basicParams) if node.config.subscribeAllSubnets: node.network.loadCscnetsMetadata(DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint8) elif not node.config.subscribeAllSubnets: let csc = node.config.custodySubnetCount if csc.isSome and csc.get < DATA_COLUMN_SIDECAR_SUBNET_COUNT: node.network.loadCscnetsMetadata(csc.get.uint8) else: node.network.loadCscnetsMetadata(CUSTODY_REQUIREMENT.uint8) proc addElectraMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addDenebMessageHandlers(forkDigest, slot) proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removePhase0MessageHandlers(forkDigest) for subcommitteeIdx in SyncSubcommitteeIndex: closureScope: let idx = subcommitteeIdx node.network.unsubscribe(getSyncCommitteeTopic(forkDigest, idx)) node.network.unsubscribe( getSyncCommitteeContributionAndProofTopic(forkDigest)) proc removeCapellaMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removeAltairMessageHandlers(forkDigest) node.network.unsubscribe(getBlsToExecutionChangeTopic(forkDigest)) proc removeDenebMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removeCapellaMessageHandlers(forkDigest) let targetSubnets = node.fetchCustodySubnetCount() for topic in dataColumnSidecarTopics(forkDigest, targetSubnets): node.network.unsubscribe(topic) proc removeElectraMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removeDenebMessageHandlers(forkDigest) proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) = template lastSyncUpdate: untyped = node.consensusManager[].actionTracker.lastSyncUpdate if lastSyncUpdate == Opt.some(slot.sync_committee_period()) and nearSyncCommitteePeriod(slot.epoch).isNone(): # No need to update unless we're close to the next sync committee period or # new validators were registered with the action tracker # TODO we _could_ skip running this in some of the "near" slots, but.. return lastSyncUpdate = Opt.some(slot.sync_committee_period()) let syncnets = node.getSyncCommitteeSubnets(slot.epoch) debug "Updating sync committee subnets", syncnets, metadata_syncnets = node.network.metadata.syncnets, gossipState = node.gossipState # Assume that different gossip fork sync committee setups are in sync; this # only remains relevant, currently, for one gossip transition epoch, so the # consequences of this not being true aren't exceptionally dire, while this # allows for bookkeeping simplication. if syncnets == node.network.metadata.syncnets: return let newSyncnets = syncnets - node.network.metadata.syncnets oldSyncnets = node.network.metadata.syncnets - syncnets forkDigests = node.forkDigests() for subcommitteeIdx in SyncSubcommitteeIndex: doAssert not (newSyncnets[subcommitteeIdx] and oldSyncnets[subcommitteeIdx]) for gossipFork in node.gossipState: template topic(): auto = getSyncCommitteeTopic(forkDigests[gossipFork], subcommitteeIdx) if oldSyncnets[subcommitteeIdx]: node.network.unsubscribe(topic) elif newSyncnets[subcommitteeIdx]: node.network.subscribe(topic, basicParams) node.network.updateSyncnetsMetadata(syncnets) proc doppelgangerChecked(node: BeaconNode, epoch: Epoch) = if not node.processor[].doppelgangerDetectionEnabled: return # broadcastStartEpoch is set to FAR_FUTURE_EPOCH when we're not monitoring # gossip - it is only viable to assert liveness in epochs where gossip is # active if epoch > node.processor[].doppelgangerDetection.broadcastStartEpoch: for validator in node.attachedValidators[]: validator.doppelgangerChecked(epoch - 1) from ./spec/state_transition_epoch import effective_balance_might_update proc maybeUpdateActionTrackerNextEpoch( node: BeaconNode, forkyState: ForkyHashedBeaconState, nextEpoch: Epoch) = if node.consensusManager[].actionTracker.needsUpdate( forkyState, nextEpoch): template epochRefFallback() = let epochRef = node.dag.getEpochRef(node.dag.head, nextEpoch, false).expect( "Getting head EpochRef should never fail") node.consensusManager[].actionTracker.updateActions( epochRef.shufflingRef, epochRef.beacon_proposers) when forkyState is phase0.HashedBeaconState: # The previous_epoch_participation-based logic requires Altair or newer epochRefFallback() else: let shufflingRef = node.dag.getShufflingRef(node.dag.head, nextEpoch, false).valueOr: # epochRefFallback() won't work in this case either return nextEpochProposers = get_beacon_proposer_indices( forkyState.data, shufflingRef.shuffled_active_validator_indices, nextEpoch) nextEpochFirstProposer = nextEpochProposers[0].valueOr: # All proposers except the first can be more straightforwardly and # efficiently (re)computed correctly once in that epoch. epochRefFallback() return # Has to account for potential epoch transition TIMELY_SOURCE_FLAG_INDEX, # TIMELY_TARGET_FLAG_INDEX, and inactivity penalties, resulting from spec # functions get_flag_index_deltas() and get_inactivity_penalty_deltas(). # # There are no penalties associated with TIMELY_HEAD_FLAG_INDEX, but a # reward exists. effective_balance == MAX_EFFECTIVE_BALANCE.Gwei ensures # if even so, then the effective balance cannot change as a result. # # It's not truly necessary to avoid all rewards and penalties, but only # to bound them to ensure they won't unexpected alter effective balance # during the upcoming epoch transition. # # During genesis epoch, the check for epoch participation is against # current, not previous, epoch, and therefore there's a possibility of # checking for if a validator has participated in an epoch before it will # happen. # # Because process_rewards_and_penalties() in epoch processing happens # before the current/previous participation swap, previous is correct # even here, and consistent with what the epoch transition uses. # # Whilst slashing, proposal, and sync committee rewards and penalties do # update the balances as they occur, they don't update effective_balance # until the end of epoch, so detect via effective_balance_might_update. # # On EF mainnet epoch 233906, this matches 99.5% of active validators; # with Holesky epoch 2041, 83% of active validators. let participation_flags = forkyState.data.previous_epoch_participation.item( nextEpochFirstProposer) effective_balance = forkyState.data.validators.item( nextEpochFirstProposer).effective_balance if participation_flags.has_flag(TIMELY_SOURCE_FLAG_INDEX) and participation_flags.has_flag(TIMELY_TARGET_FLAG_INDEX) and effective_balance == MAX_EFFECTIVE_BALANCE.Gwei and forkyState.data.slot.epoch != GENESIS_EPOCH and forkyState.data.inactivity_scores.item( nextEpochFirstProposer) == 0 and not effective_balance_might_update( forkyState.data.balances.item(nextEpochFirstProposer), effective_balance): node.consensusManager[].actionTracker.updateActions( shufflingRef, nextEpochProposers) else: epochRefFallback() proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = ## Subscribe to subnets that we are providing stability for or aggregating ## and unsubscribe from the ones that are no longer relevant. # Let the tracker know what duties are approaching - this will tell us how # many stability subnets we need to be subscribed to and what subnets we'll # soon be aggregating - in addition to the in-beacon-node duties, there may # also be duties coming from the validator client, but we don't control when # these arrive await node.registerDuties(slot) # We start subscribing to gossip before we're fully synced - this allows time # to subscribe before the sync end game const TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 HYSTERESIS_BUFFER = 16 static: doAssert high(ConsensusFork) == ConsensusFork.Electra let head = node.dag.head headDistance = if slot > head.slot: (slot - head.slot).uint64 else: 0'u64 isBehind = headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER targetGossipState = getTargetGossipState( slot.epoch, node.dag.cfg.ALTAIR_FORK_EPOCH, node.dag.cfg.BELLATRIX_FORK_EPOCH, node.dag.cfg.CAPELLA_FORK_EPOCH, node.dag.cfg.DENEB_FORK_EPOCH, node.dag.cfg.ELECTRA_FORK_EPOCH, isBehind) doAssert targetGossipState.card <= 2 let newGossipForks = targetGossipState - node.gossipState oldGossipForks = node.gossipState - targetGossipState doAssert newGossipForks.card <= 2 doAssert oldGossipForks.card <= 2 func maxGossipFork(gossipState: GossipState): int = var res = -1 for gossipFork in gossipState: res = max(res, gossipFork.int) res if maxGossipFork(targetGossipState) < maxGossipFork(node.gossipState) and targetGossipState != {}: warn "Unexpected clock regression during transition", targetGossipState, gossipState = node.gossipState if node.gossipState.card == 0 and targetGossipState.card > 0: # We are synced, so we will connect debug "Enabling topic subscriptions", wallSlot = slot, headSlot = head.slot, headDistance, targetGossipState node.processor[].setupDoppelgangerDetection(slot) # Specially when waiting for genesis, we'll already be synced on startup - # it might also happen on a sufficiently fast restart # We "know" the actions for the current and the next epoch withState(node.dag.headState): if node.consensusManager[].actionTracker.needsUpdate( forkyState, slot.epoch): let epochRef = node.dag.getEpochRef(head, slot.epoch, false).expect( "Getting head EpochRef should never fail") node.consensusManager[].actionTracker.updateActions( epochRef.shufflingRef, epochRef.beacon_proposers) node.maybeUpdateActionTrackerNextEpoch(forkyState, slot.epoch + 1) if node.gossipState.card > 0 and targetGossipState.card == 0: debug "Disabling topic subscriptions", wallSlot = slot, headSlot = head.slot, headDistance node.processor[].clearDoppelgangerProtection() let forkDigests = node.forkDigests() const removeMessageHandlers: array[ConsensusFork, auto] = [ removePhase0MessageHandlers, removeAltairMessageHandlers, removeAltairMessageHandlers, # bellatrix (altair handlers, different forkDigest) removeCapellaMessageHandlers, removeDenebMessageHandlers, removeElectraMessageHandlers ] for gossipFork in oldGossipForks: removeMessageHandlers[gossipFork](node, forkDigests[gossipFork]) const addMessageHandlers: array[ConsensusFork, auto] = [ addPhase0MessageHandlers, addAltairMessageHandlers, addAltairMessageHandlers, # bellatrix (altair handlers, different forkDigest) addCapellaMessageHandlers, addDenebMessageHandlers, addElectraMessageHandlers ] for gossipFork in newGossipForks: addMessageHandlers[gossipFork](node, forkDigests[gossipFork], slot) node.gossipState = targetGossipState node.doppelgangerChecked(slot.epoch) node.updateAttestationSubnetHandlers(slot) node.updateBlocksGossipStatus(slot, isBehind) node.updateLightClientGossipStatus(slot, isBehind) proc pruneBlobs(node: BeaconNode, slot: Slot) = let blobPruneEpoch = (slot.epoch - node.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1) if slot.is_epoch() and blobPruneEpoch >= node.dag.cfg.DENEB_FORK_EPOCH: var blocks: array[SLOTS_PER_EPOCH.int, BlockId] var count = 0 let startIndex = node.dag.getBlockRange( blobPruneEpoch.start_slot, 1, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1)) for i in startIndex..= node.dag.cfg.DENEB_FORK_EPOCH: var blocks: array[SLOTS_PER_EPOCH.int, BlockId] var count = 0 let startIndex = node.dag.getBlockRange( dataColumnPruneEpoch.start_slot, 1, blocks.toopenArray(0, SLOTS_PER_EPOCH - 1)) for i in startIndex..= ConsensusFork.Deneb: if node.config.subscribeAllSubnets: let data_column_sidecars = await node.tryReconstructingDataColumns(forkyBlck) if not data_column_sidecars.isOk(): return notice "Data Column Reconstructed and Saved Successfully" notice "Attempting to publish reconstructed columns" let dc = data_column_sidecars.get var das_workers = newSeq[Future[SendResult]](dc.len) for i in 0.. 0: node.attachedValidators[] .slashingProtection # pruning is only done if the DB is set to pruning mode. .pruneAfterFinalization( node.dag.finalizedHead.slot.epoch() ) # Delay part of pruning until latency critical duties are done. # The other part of pruning, `pruneBlocksDAG`, is done eagerly. # ---- # This is the last pruning to do as it clears the "needPruning" condition. node.consensusManager[].pruneStateCachesAndForkChoice() if node.config.historyMode == HistoryMode.Prune: if not (slot + 1).is_epoch(): # The epoch slot already is "heavy" due to the epoch processing, leave # the pruning for later node.dag.pruneHistory() node.pruneBlobs(slot) node.pruneDataColumns(slot) when declared(GC_fullCollect): # The slots in the beacon node work as frames in a game: we want to make # sure that we're ready for the next one and don't get stuck in lengthy # garbage collection tasks when time is of essence in the middle of a slot - # while this does not guarantee that we'll never collect during a slot, it # makes sure that all the scratch space we used during slot tasks (logging, # temporary buffers etc) gets recycled for the next slot that is likely to # need similar amounts of memory. try: GC_fullCollect() except Defect as exc: raise exc # Reraise to maintain call stack except Exception: # TODO upstream raiseAssert "Unexpected exception during GC collection" let gcCollectionTick = Moment.now() # Checkpoint the database to clear the WAL file and make sure changes in # the database are synced with the filesystem. node.db.checkpoint() let dbCheckpointTick = Moment.now() dbCheckpointDur = dbCheckpointTick - gcCollectionTick db_checkpoint_seconds.inc(dbCheckpointDur.toFloatSeconds) if dbCheckpointDur >= MinSignificantProcessingDuration: info "Database checkpointed", dur = dbCheckpointDur else: debug "Database checkpointed", dur = dbCheckpointDur node.syncCommitteeMsgPool[].pruneData(slot) if slot.is_epoch: node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch) # Update upcoming actions - we do this every slot in case a reorg happens let head = node.dag.head if node.isSynced(head) and head.executionValid: withState(node.dag.headState): # maybeUpdateActionTrackerNextEpoch might not account for balance changes # from the process_rewards_and_penalties() epoch transition but only from # process_block() and other per-slot sources. This mainly matters insofar # as it might trigger process_effective_balance_updates() changes in that # same epoch transition, which function is therefore potentially blind to # but which might then affect beacon proposers. # # Because this runs every slot, it can account naturally for slashings, # which affect balances via slash_validator() when they happen, and any # missed sync committee participation via process_sync_aggregate(), but # attestation penalties for example, need, specific handling. # checked by maybeUpdateActionTrackerNextEpoch. node.maybeUpdateActionTrackerNextEpoch(forkyState, slot.epoch + 1) let nextAttestationSlot = node.consensusManager[].actionTracker.getNextAttestationSlot(slot) nextProposalSlot = node.consensusManager[].actionTracker.getNextProposalSlot(slot) nextActionSlot = min(nextAttestationSlot, nextProposalSlot) nextActionWaitTime = saturate(fromNow(node.beaconClock, nextActionSlot)) # -1 is a more useful output than 18446744073709551615 as an indicator of # no future attestation/proposal known. template formatInt64(x: Slot): int64 = if x == high(uint64).Slot: -1'i64 else: toGaugeValue(x) let syncCommitteeSlot = slot + 1 syncCommitteeEpoch = syncCommitteeSlot.epoch inCurrentSyncCommittee = not node.getCurrentSyncCommiteeSubnets(syncCommitteeEpoch).isZeros() template formatSyncCommitteeStatus(): string = if inCurrentSyncCommittee: "current" elif not node.getNextSyncCommitteeSubnets(syncCommitteeEpoch).isZeros(): let slotsToNextSyncCommitteePeriod = SLOTS_PER_SYNC_COMMITTEE_PERIOD - since_sync_committee_period_start(syncCommitteeSlot) # int64 conversion is safe doAssert slotsToNextSyncCommitteePeriod <= SLOTS_PER_SYNC_COMMITTEE_PERIOD "in " & toTimeLeftString( SECONDS_PER_SLOT.int64.seconds * slotsToNextSyncCommitteePeriod.int64) else: "none" info "Slot end", slot = shortLog(slot), nextActionWait = if nextActionSlot == FAR_FUTURE_SLOT: "n/a" else: shortLog(nextActionWaitTime), nextAttestationSlot = formatInt64(nextAttestationSlot), nextProposalSlot = formatInt64(nextProposalSlot), syncCommitteeDuties = formatSyncCommitteeStatus(), head = shortLog(head) if nextActionSlot != FAR_FUTURE_SLOT: next_action_wait.set(nextActionWaitTime.toFloatSeconds) next_proposal_wait.set( if nextProposalSlot != FAR_FUTURE_SLOT: saturate(fromNow(node.beaconClock, nextProposalSlot)).toFloatSeconds() else: Inf) sync_committee_active.set(if inCurrentSyncCommittee: 1 else: 0) let epoch = slot.epoch if epoch + 1 >= node.network.forkId.next_fork_epoch: # Update 1 epoch early to block non-fork-ready peers node.network.updateForkId(epoch, node.dag.genesis_validators_root) # When we're not behind schedule, we'll speculatively update the clearance # state in anticipation of receiving the next block - we do it after # logging slot end since the nextActionWaitTime can be short let advanceCutoff = node.beaconClock.fromNow( slot.start_beacon_time() + chronos.seconds(int(SECONDS_PER_SLOT - 1))) if advanceCutoff.inFuture: # We wait until there's only a second left before the next slot begins, then # we advance the clearance state to the next slot - this gives us a high # probability of being prepared for the block that will arrive and the # epoch processing that follows await sleepAsync(advanceCutoff.offset) node.dag.advanceClearanceState() # Prepare action tracker for the next slot node.consensusManager[].actionTracker.updateSlot(slot + 1) # The last thing we do is to perform the subscriptions and unsubscriptions for # the next slot, just before that slot starts - because of the advance cuttoff # above, this will be done just before the next slot starts node.updateSyncCommitteeTopics(slot + 1) await node.updateGossipStatus(slot + 1) func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = node.dag.cfg.consensusForkAtEpoch(node.dag.head.slot.epoch) if consensusFork == ConsensusFork.high: return Opt.none(string) let nextConsensusFork = consensusFork.succ() nextForkEpoch = node.dag.cfg.consensusForkEpoch(nextConsensusFork) if nextForkEpoch == FAR_FUTURE_EPOCH: return Opt.none(string) Opt.some( (if withVanityArt: nextConsensusFork.getVanityMascot & " " else: "") & $nextConsensusFork & ":" & $nextForkEpoch) func syncStatus(node: BeaconNode, wallSlot: Slot): string = let optimisticHead = not node.dag.head.executionValid if node.syncManager.inProgress: let optimisticSuffix = if optimisticHead: "/opt" else: "" lightClientSuffix = if node.consensusManager[].shouldSyncOptimistically(wallSlot): " - lc: " & $shortLog(node.consensusManager[].optimisticHead) else: "" node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix elif node.backfiller.inProgress: "backfill: " & node.backfiller.syncStatus elif optimistic_head: "synced/opt" else: "synced" when defined(windows): from winservice import establishWindowsService, reportServiceStatusSuccess proc onSlotStart(node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot): Future[bool] {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might ## skip a few in case we're running late. ## wallTime: current system time - we will strive to perform all duties up ## to this point in time ## lastSlot: the last slot that we successfully processed, so we know where to ## start work from - there might be jumps if processing is delayed let # The slot we should be at, according to the clock wallSlot = wallTime.slotOrZero # If everything was working perfectly, the slot that we should be processing expectedSlot = lastSlot + 1 finalizedEpoch = node.dag.finalizedHead.blck.slot.epoch() delay = wallTime - expectedSlot.start_beacon_time() node.processingDelay = Opt.some(nanoseconds(delay.nanoseconds)) block: logScope: slot = shortLog(wallSlot) epoch = shortLog(wallSlot.epoch) sync = node.syncStatus(wallSlot) peers = len(node.network.peerPool) head = shortLog(node.dag.head) finalized = shortLog(getStateField( node.dag.headState, finalized_checkpoint)) delay = shortLog(delay) let nextConsensusForkDescription = node.formatNextConsensusFork() if nextConsensusForkDescription.isNone: info "Slot start" else: info "Slot start", nextFork = nextConsensusForkDescription.get # Check before any re-scheduling of onSlotStart() if checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch): quit(0) when defined(windows): if node.config.runAsService: reportServiceStatusSuccess() beacon_slot.set wallSlot.toGaugeValue beacon_current_epoch.set wallSlot.epoch.toGaugeValue # both non-negative, so difference can't overflow or underflow int64 finalization_delay.set( wallSlot.epoch.toGaugeValue - finalizedEpoch.toGaugeValue) if node.config.strictVerification: verifyFinalization(node, wallSlot) node.consensusManager[].updateHead(wallSlot) await node.reconstructAndSendDataColumns() await node.handleValidatorDuties(lastSlot, wallSlot) await onSlotEnd(node, wallSlot) # https://github.com/ethereum/builder-specs/blob/v0.4.0/specs/bellatrix/validator.md#registration-dissemination # This specification suggests validators re-submit to builder software every # `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION` epochs. if wallSlot.is_epoch and wallSlot.epoch mod EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION == 0: asyncSpawn node.registerValidators(wallSlot.epoch) return false proc onSecond(node: BeaconNode, time: Moment) = # Nim GC metrics (for the main thread) updateThreadMetrics() if node.config.stopAtSyncedEpoch != 0 and node.dag.head.slot.epoch >= node.config.stopAtSyncedEpoch: notice "Shutting down after having reached the target synced epoch" bnStatus = BeaconNodeStatus.Stopping proc runOnSecondLoop(node: BeaconNode) {.async.} = const sleepTime = chronos.seconds(1) nanosecondsIn1s = float(sleepTime.nanoseconds) while true: let start = chronos.now(chronos.Moment) await chronos.sleepAsync(sleepTime) let afterSleep = chronos.now(chronos.Moment) let sleepTime = afterSleep - start node.onSecond(start) let finished = chronos.now(chronos.Moment) let processingTime = finished - afterSleep ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s) trace "onSecond task completed", sleepTime, processingTime func connectedPeersCount(node: BeaconNode): int = len(node.network.peerPool) proc installRestHandlers(restServer: RestServerRef, node: BeaconNode) = restServer.router.installBeaconApiHandlers(node) restServer.router.installBuilderApiHandlers(node) restServer.router.installConfigApiHandlers(node) restServer.router.installDebugApiHandlers(node) restServer.router.installEventApiHandlers(node) restServer.router.installNimbusApiHandlers(node) restServer.router.installNodeApiHandlers(node) restServer.router.installValidatorApiHandlers(node) if node.dag.lcDataStore.serve: restServer.router.installLightClientApiHandlers(node) from ./spec/datatypes/capella import SignedBeaconBlock proc installMessageValidators(node: BeaconNode) = # These validators stay around the whole time, regardless of which specific # subnets are subscribed to during any given epoch. let forkDigests = node.dag.forkDigests for fork in ConsensusFork: withConsensusFork(fork): let digest = forkDigests[].atConsensusFork(consensusFork) # beacon_block # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/p2p-interface.md#beacon_block node.network.addValidator( getBeaconBlocksTopic(digest), proc ( signedBlock: consensusFork.SignedBeaconBlock ): ValidationResult = if node.shouldSyncOptimistically(node.currentSlot): toValidationResult( node.optimisticProcessor.processSignedBeaconBlock( signedBlock)) else: toValidationResult( node.processor[].processSignedBeaconBlock( MsgSource.gossip, signedBlock))) # beacon_attestation_{subnet_id} # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id when consensusFork >= ConsensusFork.Electra: for it in SubnetId: closureScope: # Needed for inner `proc`; don't lift it out of loop. let subnet_id = it node.network.addAsyncValidator( getAttestationTopic(digest, subnet_id), proc ( attestation: electra.Attestation ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( MsgSource.gossip, attestation, subnet_id))) else: for it in SubnetId: closureScope: # Needed for inner `proc`; don't lift it out of loop. let subnet_id = it node.network.addAsyncValidator( getAttestationTopic(digest, subnet_id), proc ( attestation: phase0.Attestation ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( MsgSource.gossip, attestation, subnet_id))) # beacon_aggregate_and_proof # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof when consensusFork >= ConsensusFork.Electra: node.network.addAsyncValidator( getAggregateAndProofsTopic(digest), proc ( signedAggregateAndProof: electra.SignedAggregateAndProof ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSignedAggregateAndProof( MsgSource.gossip, signedAggregateAndProof))) else: node.network.addAsyncValidator( getAggregateAndProofsTopic(digest), proc ( signedAggregateAndProof: phase0.SignedAggregateAndProof ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSignedAggregateAndProof( MsgSource.gossip, signedAggregateAndProof))) # attester_slashing # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attester_slashing node.network.addValidator( getAttesterSlashingsTopic(digest), proc ( attesterSlashing: phase0.AttesterSlashing ): ValidationResult = toValidationResult( node.processor[].processAttesterSlashing( MsgSource.gossip, attesterSlashing))) # proposer_slashing # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#proposer_slashing node.network.addValidator( getProposerSlashingsTopic(digest), proc ( proposerSlashing: ProposerSlashing ): ValidationResult = toValidationResult( node.processor[].processProposerSlashing( MsgSource.gossip, proposerSlashing))) # voluntary_exit # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#voluntary_exit node.network.addValidator( getVoluntaryExitsTopic(digest), proc ( signedVoluntaryExit: SignedVoluntaryExit ): ValidationResult = toValidationResult( node.processor[].processSignedVoluntaryExit( MsgSource.gossip, signedVoluntaryExit))) when consensusFork >= ConsensusFork.Altair: # sync_committee_{subnet_id} # https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_subnet_id for subcommitteeIdx in SyncSubcommitteeIndex: closureScope: # Needed for inner `proc`; don't lift it out of loop. let idx = subcommitteeIdx node.network.addAsyncValidator( getSyncCommitteeTopic(digest, idx), proc ( msg: SyncCommitteeMessage ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSyncCommitteeMessage( MsgSource.gossip, msg, idx))) # sync_committee_contribution_and_proof # https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof node.network.addAsyncValidator( getSyncCommitteeContributionAndProofTopic(digest), proc ( msg: SignedContributionAndProof ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processSignedContributionAndProof( MsgSource.gossip, msg))) when consensusFork >= ConsensusFork.Capella: # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/specs/capella/p2p-interface.md#bls_to_execution_change node.network.addAsyncValidator( getBlsToExecutionChangeTopic(digest), proc ( msg: SignedBLSToExecutionChange ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processBlsToExecutionChange( MsgSource.gossip, msg))) when consensusFork >= ConsensusFork.Deneb: # blob_sidecar_{subnet_id} # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id # for it in BlobId: # closureScope: # Needed for inner `proc`; don't lift it out of loop. # let subnet_id = it # node.network.addValidator( # getBlobSidecarTopic(digest, subnet_id), proc ( # blobSidecar: deneb.BlobSidecar # ): ValidationResult = # toValidationResult( # node.processor[].processBlobSidecar( # MsgSource.gossip, blobSidecar, subnet_id))) # data_column_sidecar_{subnet_id} # # let subnetCount = # if node.config.subscribeAllSubnets: # DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 # else: # CUSTODY_REQUIREMENT.uint64 # let dc_subnets = get_custody_column_subnet(node.network.nodeId, subnetCount) for it in 0'u64 ..< DATA_COLUMN_SIDECAR_SUBNET_COUNT: closureScope: # Needed for inner `proc`; don't lift it out of loop. let subnet_id = it node.network.addValidator( getDataColumnSidecarTopic(digest, subnet_id), proc ( dataColumnSidecar: DataColumnSidecar ): ValidationResult = toValidationResult( node.processor[].processDataColumnSidecar( MsgSource.gossip, dataColumnSidecar, subnet_id))) node.installLightClientMessageValidators() proc stop(node: BeaconNode) = bnStatus = BeaconNodeStatus.Stopping notice "Graceful shutdown" if not node.config.inProcessValidators: try: node.vcProcess.close() except Exception as exc: warn "Couldn't close vc process", msg = exc.msg try: waitFor node.network.stop() except CatchableError as exc: warn "Couldn't stop network", msg = exc.msg node.attachedValidators[].slashingProtection.close() node.attachedValidators[].close() node.db.close() notice "Databases closed" proc startBackfillTask(node: BeaconNode) {.async.} = while node.dag.needsBackfill: if not node.syncManager.inProgress: # Only start the backfiller if it's needed _and_ head sync has completed - # if we lose sync after having synced head, we could stop the backfilller, # but this should be a fringe case - might as well keep the logic simple for # now node.backfiller.start() return await sleepAsync(chronos.seconds(2)) proc run(node: BeaconNode) {.raises: [CatchableError].} = bnStatus = BeaconNodeStatus.Running if not isNil(node.restServer): node.restServer.installRestHandlers(node) node.restServer.start() if not isNil(node.keymanagerServer): doAssert not isNil(node.keymanagerHost) node.keymanagerServer.router.installKeymanagerHandlers(node.keymanagerHost[]) if node.keymanagerServer != node.restServer: node.keymanagerServer.start() let wallTime = node.beaconClock.now() wallSlot = wallTime.slotOrZero() node.startLightClient() node.requestManager.start() node.syncManager.start() if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask() waitFor node.updateGossipStatus(wallSlot) for web3signerUrl in node.config.web3SignerUrls: # TODO # The current strategy polls all remote signers independently # from each other which may lead to some race conditions of # validators are migrated from one signer to another # (because the updates to our validator pool are not atomic). # Consider using different strategies that would detect such # race conditions. asyncSpawn node.pollForDynamicValidators( web3signerUrl, node.config.web3signerUpdateInterval) asyncSpawn runSlotLoop(node, wallTime, onSlotStart) asyncSpawn runOnSecondLoop(node) asyncSpawn runQueueProcessingLoop(node.blockProcessor) asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache) # main event loop while bnStatus == BeaconNodeStatus.Running: poll() # if poll fails, the network is broken # time to say goodbye node.stop() var gPidFile: string proc createPidFile(filename: string) {.raises: [IOError].} = writeFile filename, $os.getCurrentProcessId() gPidFile = filename addQuitProc proc {.noconv.} = discard io2.removeFile(gPidFile) proc initializeNetworking(node: BeaconNode) {.async.} = node.installMessageValidators() info "Listening to incoming network requests" await node.network.startListening() let addressFile = node.config.dataDir / "beacon_node.enr" writeFile(addressFile, node.network.announcedENR.toURI) await node.network.start() proc start*(node: BeaconNode) {.raises: [CatchableError].} = let head = node.dag.head finalizedHead = node.dag.finalizedHead genesisTime = node.beaconClock.fromNow(start_beacon_time(Slot 0)) notice "Starting beacon node", version = fullVersionStr, nimVersion = NimVersion, enr = node.network.announcedENR.toURI, peerId = $node.network.switch.peerInfo.peerId, timeSinceFinalization = node.beaconClock.now() - finalizedHead.slot.start_beacon_time(), head = shortLog(head), justified = shortLog(getStateField( node.dag.headState, current_justified_checkpoint)), finalized = shortLog(getStateField( node.dag.headState, finalized_checkpoint)), finalizedHead = shortLog(finalizedHead), SLOTS_PER_EPOCH, SECONDS_PER_SLOT, SPEC_VERSION, dataDir = node.config.dataDir.string, validators = node.attachedValidators[].count if genesisTime.inFuture: notice "Waiting for genesis", genesisIn = genesisTime.offset waitFor node.initializeNetworking() node.elManager.start() node.run() func formatGwei(amount: Gwei): string = # TODO This is implemented in a quite a silly way. # Better routines for formatting decimal numbers # should exists somewhere else. let eth = distinctBase(amount) div 1000000000 remainder = distinctBase(amount) mod 1000000000 result = $eth if remainder != 0: result.add '.' let remainderStr = $remainder for i in remainderStr.len ..< 9: result.add '0' result.add remainderStr while result[^1] == '0': result.setLen(result.len - 1) when not defined(windows): proc initStatusBar(node: BeaconNode) {.raises: [ValueError].} = if not isatty(stdout): return if not node.config.statusBarEnabled: return try: enableTrueColors() except Exception as exc: # TODO Exception error "Couldn't enable colors", err = exc.msg proc dataResolver(expr: string): string {.raises: [].} = template justified: untyped = node.dag.head.atEpochStart( getStateField( node.dag.headState, current_justified_checkpoint).epoch) # TODO: # We should introduce a general API for resolving dot expressions # such as `db.latest_block.slot` or `metrics.connected_peers`. # Such an API can be shared between the RPC back-end, CLI tools # such as ncli, a potential GraphQL back-end and so on. # The status bar feature would allow the user to specify an # arbitrary expression that is resolvable through this API. case expr.toLowerAscii of "version": versionAsStr of "full_version": fullVersionStr of "connected_peers": $(node.connectedPeersCount) of "head_root": shortLog(node.dag.head.root) of "head_epoch": $(node.dag.head.slot.epoch) of "head_epoch_slot": $(node.dag.head.slot.since_epoch_start) of "head_slot": $(node.dag.head.slot) of "justifed_root": shortLog(justified.blck.root) of "justifed_epoch": $(justified.slot.epoch) of "justifed_epoch_slot": $(justified.slot.since_epoch_start) of "justifed_slot": $(justified.slot) of "finalized_root": shortLog(node.dag.finalizedHead.blck.root) of "finalized_epoch": $(node.dag.finalizedHead.slot.epoch) of "finalized_epoch_slot": $(node.dag.finalizedHead.slot.since_epoch_start) of "finalized_slot": $(node.dag.finalizedHead.slot) of "epoch": $node.currentSlot.epoch of "epoch_slot": $(node.currentSlot.since_epoch_start) of "slot": $node.currentSlot of "slots_per_epoch": $SLOTS_PER_EPOCH of "slot_trailing_digits": var slotStr = $node.currentSlot if slotStr.len > 3: slotStr = slotStr[^3..^1] slotStr of "attached_validators_balance": formatGwei(node.attachedValidatorBalanceTotal) of "next_consensus_fork": let nextConsensusForkDescription = node.formatNextConsensusFork(withVanityArt = true) if nextConsensusForkDescription.isNone: "" else: " (scheduled " & nextConsensusForkDescription.get & ")" of "sync_status": node.syncStatus(node.currentSlot) else: # We ignore typos for now and just render the expression # as it was written. TODO: come up with a good way to show # an error message to the user. "$" & expr var statusBar = StatusBarView.init( node.config.statusBarContents, dataResolver) when compiles(defaultChroniclesStream.outputs[0].writer): let tmp = defaultChroniclesStream.outputs[0].writer defaultChroniclesStream.outputs[0].writer = proc (logLevel: LogLevel, msg: LogOutputStr) {.raises: [].} = try: # p.hidePrompt erase statusBar # p.writeLine msg tmp(logLevel, msg) render statusBar # p.showPrompt except Exception as e: # render raises Exception logLoggingFailure(cstring(msg), e) proc statusBarUpdatesPollingLoop() {.async.} = try: while true: update statusBar erase statusBar render statusBar await sleepAsync(chronos.seconds(1)) except CatchableError as exc: warn "Failed to update status bar, no further updates", err = exc.msg asyncSpawn statusBarUpdatesPollingLoop() proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.raises: [CatchableError].} = info "Launching beacon node", version = fullVersionStr, bls_backend = $BLS_BACKEND, const_preset, cmdParams = commandLineParams(), config template ignoreDeprecatedOption(option: untyped): untyped = if config.option.isSome: warn "Config option is deprecated", option = config.option.get ignoreDeprecatedOption requireEngineAPI ignoreDeprecatedOption safeSlotsToImportOptimistically ignoreDeprecatedOption terminalTotalDifficultyOverride ignoreDeprecatedOption optimistic ignoreDeprecatedOption validatorMonitorTotals ignoreDeprecatedOption web3ForcePolling createPidFile(config.dataDir.string / "beacon_node.pid") config.createDumpDirs() if config.metricsEnabled: let metricsAddress = config.metricsAddress notice "Starting metrics HTTP server", url = "http://" & $metricsAddress & ":" & $config.metricsPort & "/metrics" try: startMetricsHttpServer($metricsAddress, config.metricsPort) except CatchableError as exc: raise exc except Exception as exc: raiseAssert exc.msg # TODO fix metrics # Nim GC metrics (for the main thread) will be collected in onSecond(), but # we disable piggy-backing on other metrics here. setSystemMetricsAutomaticUpdate(false) # There are no managed event loops in here, to do a graceful shutdown, but # letting the default Ctrl+C handler exit is safe, since we only read from # the db. let metadata = config.loadEth2Network() # Updating the config based on the metadata certainly is not beautiful but it # works for node in metadata.bootstrapNodes: config.bootstrapNodes.add node if config.forkChoiceVersion.isNone: config.forkChoiceVersion = some(ForkChoiceVersion.Pr3431) ## Ctrl+C handling proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 try: setupForeignThreadGc() except Exception as exc: raiseAssert exc.msg # shouldn't happen notice "Shutting down after having received SIGINT" bnStatus = BeaconNodeStatus.Stopping try: setControlCHook(controlCHandler) except Exception as exc: # TODO Exception warn "Cannot set ctrl-c handler", msg = exc.msg # equivalent SIGTERM handler when defined(posix): proc SIGTERMHandler(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM" bnStatus = BeaconNodeStatus.Stopping c_signal(ansi_c.SIGTERM, SIGTERMHandler) if metadata.cfg.DENEB_FORK_EPOCH != FAR_FUTURE_EPOCH: let res = if config.trustedSetupFile.isNone: conf.loadKzgTrustedSetup() else: conf.loadKzgTrustedSetup(config.trustedSetupFile.get) if res.isErr(): raiseAssert res.error() let node = waitFor BeaconNode.init(rng, config, metadata) if bnStatus == BeaconNodeStatus.Stopping: return when not defined(windows): # This status bar can lock a Windows terminal emulator, blocking the whole # event loop (seen on Windows 10, with a default MSYS2 terminal). initStatusBar(node) if node.nickname != "": dynamicLogScope(node = node.nickname): node.start() else: node.start() proc doRecord(config: BeaconNodeConf, rng: var HmacDrbgContext) {. raises: [CatchableError].} = case config.recordCmd: of RecordCmd.create: let netKeys = getPersistentNetKeys(rng, config) var fieldPairs: seq[FieldPair] for field in config.fields: let fieldPair = field.split(":") if fieldPair.len > 1: fieldPairs.add(toFieldPair(fieldPair[0], hexToSeqByte(fieldPair[1]))) else: fatal "Invalid field pair" quit QuitFailure let record = enr.Record.init( config.seqNumber, netKeys.seckey.asEthKey, some(config.ipExt), some(config.tcpPortExt), some(config.udpPortExt), fieldPairs).expect("Record within size limits") echo record.toURI() of RecordCmd.print: echo $config.recordPrint proc doWeb3Cmd(config: BeaconNodeConf, rng: var HmacDrbgContext) {.raises: [CatchableError].} = case config.web3Cmd: of Web3Cmd.test: let metadata = config.loadEth2Network() waitFor testWeb3Provider(config.web3TestUrl, metadata.cfg.DEPOSIT_CONTRACT_ADDRESS, rng.loadJwtSecret(config, allowCreate = true)) proc doSlashingExport(conf: BeaconNodeConf) {.raises: [IOError].}= let dir = conf.validatorsDir() filetrunc = SlashingDbName # TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312 let db = SlashingProtectionDB.loadUnchecked(dir, filetrunc, readOnly = false) let interchange = conf.exportedInterchangeFile.string db.exportSlashingInterchange(interchange, conf.exportedValidators) echo "Export finished: '", dir/filetrunc & ".sqlite3" , "' into '", interchange, "'" proc doSlashingImport(conf: BeaconNodeConf) {.raises: [SerializationError, IOError].} = let dir = conf.validatorsDir() filetrunc = SlashingDbName # TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312 let interchange = conf.importedInterchangeFile.string var spdir: SPDIR try: spdir = Json.loadFile(interchange, SPDIR, requireAllFields = true) except SerializationError as err: writeStackTrace() stderr.write $Json & " load issue for file \"", interchange, "\"\n" stderr.write err.formatMsg(interchange), "\n" quit 1 # Open DB and handle migration from v1 to v2 if needed let db = SlashingProtectionDB.init( genesis_validators_root = Eth2Digest spdir.metadata.genesis_validators_root, basePath = dir, dbname = filetrunc, modes = {kCompleteArchive} ) # Now import the slashing interchange file # Failures mode: # - siError can only happen with invalid genesis_validators_root which would be caught above # - siPartial can happen for invalid public keys, slashable blocks, slashable votes let status = db.inclSPDIR(spdir) doAssert status in {siSuccess, siPartial} echo "Import finished: '", interchange, "' into '", dir/filetrunc & ".sqlite3", "'" proc doSlashingInterchange(conf: BeaconNodeConf) {.raises: [CatchableError].} = case conf.slashingdbCmd of SlashProtCmd.`export`: conf.doSlashingExport() of SlashProtCmd.`import`: conf.doSlashingImport() proc handleStartUpCmd(config: var BeaconNodeConf) {.raises: [CatchableError].} = # Single RNG instance for the application - will be seeded on construction # and avoid using system resources (such as urandom) after that let rng = HmacDrbgContext.new() case config.cmd of BNStartUpCmd.noCommand: doRunBeaconNode(config, rng) of BNStartUpCmd.deposits: doDeposits(config, rng[]) of BNStartUpCmd.wallets: doWallets(config, rng[]) of BNStartUpCmd.record: doRecord(config, rng[]) of BNStartUpCmd.web3: doWeb3Cmd(config, rng[]) of BNStartUpCmd.slashingdb: doSlashingInterchange(config) of BNStartUpCmd.trustedNodeSync: if config.blockId.isSome(): error "--blockId option has been removed - use --state-id instead!" quit 1 let metadata = loadEth2Network(config) db = BeaconChainDB.new(config.databaseDir, metadata.cfg, inMemory = false) genesisState = waitFor fetchGenesisState(metadata) waitFor db.doRunTrustedNodeSync( metadata, config.databaseDir, config.eraDir, config.trustedNodeUrl, config.stateId, config.lcTrustedBlockRoot, config.backfillBlocks, config.reindex, config.downloadDepositSnapshot, genesisState) db.close() {.pop.} # TODO moduletests exceptions programMain: var config = makeBannerAndConfig(clientId, copyrights, nimBanner, SPEC_VERSION, [], BeaconNodeConf).valueOr: stderr.write error quit QuitFailure if not(checkAndCreateDataDir(string(config.dataDir))): # We are unable to access/create data folder or data folder's # permissions are insecure. quit QuitFailure setupFileLimits() setupLogging(config.logLevel, config.logStdout, config.logFile) ## This Ctrl+C handler exits the program in non-graceful way. ## It's responsible for handling Ctrl+C in sub-commands such ## as `wallets *` and `deposits *`. In a regular beacon node ## run, it will be overwritten later with a different handler ## performing a graceful exit. proc exitImmediatelyOnCtrlC() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 setupForeignThreadGc() # in case a password prompt disabled echoing resetStdin() echo "" # If we interrupt during an interactive prompt, this # will move the cursor to the next line notice "Shutting down after having received SIGINT" quit 0 setControlCHook(exitImmediatelyOnCtrlC) # equivalent SIGTERM handler when defined(posix): proc exitImmediatelyOnSIGTERM(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM" quit 0 c_signal(ansi_c.SIGTERM, exitImmediatelyOnSIGTERM) when defined(windows): if config.runAsService: proc exitService() = bnStatus = BeaconNodeStatus.Stopping establishWindowsService(clientId, copyrights, nimBanner, SPEC_VERSION, "nimbus_beacon_node", BeaconNodeConf, handleStartUpCmd, exitService) else: handleStartUpCmd(config) else: handleStartUpCmd(config)