mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-18 10:31:04 +00:00
9fc870777c
Bellatrix light client data does not contain the EL block hash, so we had to follow blocks gossip to learn the EL `block_hash` of such blocks. Now that Bellatrix is obsolete, we can simplify EL syncing logic under light client scenarios. Bellatrix light client data can still be used to advance the light client sync itself, but will no longer result in `engine_forkchoiceUpdated` calls until the sync reaches Capella. This also frees up some memory as we no longer have to retain blocks.
2431 lines
92 KiB
Nim
2431 lines
92 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[os, random, terminal, times],
|
|
chronos, chronicles,
|
|
metrics, metrics/chronos_httpserver,
|
|
stew/[byteutils, io2],
|
|
eth/p2p/discoveryv5/[enr, random2],
|
|
./consensus_object_pools/blob_quarantine,
|
|
./consensus_object_pools/vanity_logs/vanity_logs,
|
|
./networking/[topic_params, network_metadata_downloads],
|
|
./rpc/[rest_api, state_ttl_cache],
|
|
./spec/datatypes/[altair, bellatrix, phase0],
|
|
./spec/[deposit_snapshots, engine_authentication, weak_subjectivity],
|
|
./sync/[sync_protocol, light_client_protocol],
|
|
./validators/[keystore_management, beacon_validators],
|
|
"."/[
|
|
beacon_node, beacon_node_light_client, deposits,
|
|
nimbus_binary_common, statusbar, trusted_node_sync, wallets]
|
|
|
|
when defined(posix):
|
|
import system/ansi_c
|
|
|
|
from ./spec/datatypes/deneb import SignedBeaconBlock
|
|
|
|
from
|
|
libp2p/protocols/pubsub/gossipsub
|
|
import
|
|
TopicParams, validateParameters, init
|
|
|
|
logScope: topics = "beacnde"
|
|
|
|
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
|
|
declareGauge beacon_slot, "Latest slot of the beacon chain state"
|
|
declareGauge beacon_current_epoch, "Current epoch"
|
|
|
|
# Finalization tracking
|
|
declareGauge finalization_delay,
|
|
"Epoch delay between scheduled epoch and finalized epoch"
|
|
|
|
declareGauge ticks_delay,
|
|
"How long does to take to run the onSecond loop"
|
|
|
|
declareGauge next_action_wait,
|
|
"Seconds until the next attestation will be sent"
|
|
|
|
declareGauge next_proposal_wait,
|
|
"Seconds until the next proposal will be sent, or Inf if not known"
|
|
|
|
declareGauge sync_committee_active,
|
|
"1 if there are current sync committee duties, 0 otherwise"
|
|
|
|
declareCounter db_checkpoint_seconds,
|
|
"Time spent checkpointing the database to clear the WAL file"
|
|
|
|
proc fetchGenesisState(
|
|
metadata: Eth2NetworkMetadata,
|
|
genesisState = none(InputFile),
|
|
genesisStateUrl = none(Uri)
|
|
): Future[ref ForkedHashedBeaconState] {.async: (raises: []).} =
|
|
let genesisBytes =
|
|
if metadata.genesis.kind != BakedIn and genesisState.isSome:
|
|
let res = io2.readAllBytes(genesisState.get.string)
|
|
res.valueOr:
|
|
error "Failed to read genesis state file", err = res.error.ioErrorMsg
|
|
quit 1
|
|
elif metadata.hasGenesis:
|
|
try:
|
|
if metadata.genesis.kind == BakedInUrl:
|
|
info "Obtaining genesis state",
|
|
sourceUrl = $genesisStateUrl
|
|
.get(parseUri metadata.genesis.url)
|
|
await metadata.fetchGenesisBytes(genesisStateUrl)
|
|
except CatchableError as err:
|
|
error "Failed to obtain genesis state",
|
|
source = metadata.genesis.sourceDesc,
|
|
err = err.msg
|
|
quit 1
|
|
else:
|
|
@[]
|
|
|
|
if genesisBytes.len > 0:
|
|
try:
|
|
newClone readSszForkedHashedBeaconState(metadata.cfg, genesisBytes)
|
|
except CatchableError as err:
|
|
error "Invalid genesis state",
|
|
size = genesisBytes.len,
|
|
digest = eth2digest(genesisBytes),
|
|
err = err.msg
|
|
quit 1
|
|
else:
|
|
nil
|
|
|
|
proc doRunTrustedNodeSync(
|
|
db: BeaconChainDB,
|
|
metadata: Eth2NetworkMetadata,
|
|
databaseDir: string,
|
|
eraDir: string,
|
|
restUrl: string,
|
|
stateId: Option[string],
|
|
trustedBlockRoot: Option[Eth2Digest],
|
|
backfill: bool,
|
|
reindex: bool,
|
|
downloadDepositSnapshot: bool,
|
|
genesisState: ref ForkedHashedBeaconState) {.async.} =
|
|
let syncTarget =
|
|
if stateId.isSome:
|
|
if trustedBlockRoot.isSome:
|
|
warn "Ignoring `trustedBlockRoot`, `stateId` is set",
|
|
stateId, trustedBlockRoot
|
|
TrustedNodeSyncTarget(
|
|
kind: TrustedNodeSyncKind.StateId,
|
|
stateId: stateId.get)
|
|
elif trustedBlockRoot.isSome:
|
|
TrustedNodeSyncTarget(
|
|
kind: TrustedNodeSyncKind.TrustedBlockRoot,
|
|
trustedBlockRoot: trustedBlockRoot.get)
|
|
else:
|
|
TrustedNodeSyncTarget(
|
|
kind: TrustedNodeSyncKind.StateId,
|
|
stateId: "finalized")
|
|
|
|
await db.doTrustedNodeSync(
|
|
metadata.cfg,
|
|
databaseDir,
|
|
eraDir,
|
|
restUrl,
|
|
syncTarget,
|
|
backfill,
|
|
reindex,
|
|
downloadDepositSnapshot,
|
|
genesisState)
|
|
|
|
func getVanityLogs(stdoutKind: StdoutLogKind): VanityLogs =
|
|
case stdoutKind
|
|
of StdoutLogKind.Auto: raiseAssert "inadmissable here"
|
|
of StdoutLogKind.Colors:
|
|
VanityLogs(
|
|
onMergeTransitionBlock: bellatrixColor,
|
|
onFinalizedMergeTransitionBlock: bellatrixBlink,
|
|
onUpgradeToCapella: capellaColor,
|
|
onKnownBlsToExecutionChange: capellaBlink,
|
|
onUpgradeToDeneb: denebColor,
|
|
onUpgradeToElectra: electraColor)
|
|
of StdoutLogKind.NoColors:
|
|
VanityLogs(
|
|
onMergeTransitionBlock: bellatrixMono,
|
|
onFinalizedMergeTransitionBlock: bellatrixMono,
|
|
onUpgradeToCapella: capellaMono,
|
|
onKnownBlsToExecutionChange: capellaMono,
|
|
onUpgradeToDeneb: denebMono,
|
|
onUpgradeToElectra: electraMono)
|
|
of StdoutLogKind.Json, StdoutLogKind.None:
|
|
VanityLogs(
|
|
onMergeTransitionBlock:
|
|
(proc() = notice "🐼 Proof of Stake Activated 🐼"),
|
|
onFinalizedMergeTransitionBlock:
|
|
(proc() = notice "🐼 Proof of Stake Finalized 🐼"),
|
|
onUpgradeToCapella:
|
|
(proc() = notice "🦉 Withdrowls now available 🦉"),
|
|
onKnownBlsToExecutionChange:
|
|
(proc() = notice "🦉 BLS to execution changed 🦉"),
|
|
onUpgradeToDeneb:
|
|
(proc() = notice "🐟 Proto-Danksharding is ON 🐟"),
|
|
onUpgradeToElectra:
|
|
(proc() = notice "🦒 [PH] Electra 🦒"))
|
|
|
|
func getVanityMascot(consensusFork: ConsensusFork): string =
|
|
case consensusFork
|
|
of ConsensusFork.Electra:
|
|
"🦒"
|
|
of ConsensusFork.Deneb:
|
|
"🐟"
|
|
of ConsensusFork.Capella:
|
|
"🦉"
|
|
of ConsensusFork.Bellatrix:
|
|
"🐼"
|
|
of ConsensusFork.Altair:
|
|
"✨"
|
|
of ConsensusFork.Phase0:
|
|
"🦏"
|
|
|
|
proc loadChainDag(
|
|
config: BeaconNodeConf,
|
|
cfg: RuntimeConfig,
|
|
db: BeaconChainDB,
|
|
eventBus: EventBus,
|
|
validatorMonitor: ref ValidatorMonitor,
|
|
networkGenesisValidatorsRoot: Opt[Eth2Digest]): ChainDAGRef =
|
|
info "Loading block DAG from database", path = config.databaseDir
|
|
|
|
var dag: ChainDAGRef
|
|
proc onLightClientFinalityUpdate(data: ForkedLightClientFinalityUpdate) =
|
|
if dag == nil: return
|
|
withForkyFinalityUpdate(data):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
let contextFork =
|
|
dag.cfg.consensusForkAtEpoch(forkyFinalityUpdate.contextEpoch)
|
|
eventBus.finUpdateQueue.emit(
|
|
RestVersioned[ForkedLightClientFinalityUpdate](
|
|
data: data,
|
|
jsonVersion: contextFork,
|
|
sszContext: dag.forkDigests[].atConsensusFork(contextFork)))
|
|
proc onLightClientOptimisticUpdate(data: ForkedLightClientOptimisticUpdate) =
|
|
if dag == nil: return
|
|
withForkyOptimisticUpdate(data):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
let contextFork =
|
|
dag.cfg.consensusForkAtEpoch(forkyOptimisticUpdate.contextEpoch)
|
|
eventBus.optUpdateQueue.emit(
|
|
RestVersioned[ForkedLightClientOptimisticUpdate](
|
|
data: data,
|
|
jsonVersion: contextFork,
|
|
sszContext: dag.forkDigests[].atConsensusFork(contextFork)))
|
|
|
|
let
|
|
chainDagFlags =
|
|
if config.strictVerification: {strictVerification}
|
|
else: {}
|
|
onLightClientFinalityUpdateCb =
|
|
if config.lightClientDataServe: onLightClientFinalityUpdate
|
|
else: nil
|
|
onLightClientOptimisticUpdateCb =
|
|
if config.lightClientDataServe: onLightClientOptimisticUpdate
|
|
else: nil
|
|
dag = ChainDAGRef.init(
|
|
cfg, db, validatorMonitor, chainDagFlags, config.eraDir,
|
|
vanityLogs = getVanityLogs(detectTTY(config.logStdout)),
|
|
lcDataConfig = LightClientDataConfig(
|
|
serve: config.lightClientDataServe,
|
|
importMode: config.lightClientDataImportMode,
|
|
maxPeriods: config.lightClientDataMaxPeriods,
|
|
onLightClientFinalityUpdate: onLightClientFinalityUpdateCb,
|
|
onLightClientOptimisticUpdate: onLightClientOptimisticUpdateCb))
|
|
|
|
if networkGenesisValidatorsRoot.isSome:
|
|
let databaseGenesisValidatorsRoot =
|
|
getStateField(dag.headState, genesis_validators_root)
|
|
if networkGenesisValidatorsRoot.get != databaseGenesisValidatorsRoot:
|
|
fatal "The specified --data-dir contains data for a different network",
|
|
networkGenesisValidatorsRoot = networkGenesisValidatorsRoot.get,
|
|
databaseGenesisValidatorsRoot,
|
|
dataDir = config.dataDir
|
|
quit 1
|
|
|
|
# The first pruning after restart may take a while..
|
|
if config.historyMode == HistoryMode.Prune:
|
|
dag.pruneHistory(true)
|
|
|
|
dag
|
|
|
|
proc checkWeakSubjectivityCheckpoint(
|
|
dag: ChainDAGRef,
|
|
wsCheckpoint: Checkpoint,
|
|
beaconClock: BeaconClock) =
|
|
let
|
|
currentSlot = beaconClock.now.slotOrZero
|
|
isCheckpointStale = not is_within_weak_subjectivity_period(
|
|
dag.cfg, currentSlot, dag.headState, wsCheckpoint)
|
|
|
|
if isCheckpointStale:
|
|
error "Weak subjectivity checkpoint is stale",
|
|
currentSlot, checkpoint = wsCheckpoint,
|
|
headStateSlot = getStateField(dag.headState, slot)
|
|
quit 1
|
|
|
|
from ./spec/state_transition_block import kzg_commitment_to_versioned_hash
|
|
|
|
proc initFullNode(
|
|
node: BeaconNode,
|
|
rng: ref HmacDrbgContext,
|
|
dag: ChainDAGRef,
|
|
taskpool: TaskPoolPtr,
|
|
getBeaconTime: GetBeaconTimeFn) {.async.} =
|
|
template config(): auto = node.config
|
|
|
|
proc onAttestationReceived(data: phase0.Attestation) =
|
|
node.eventBus.attestQueue.emit(data)
|
|
proc onSyncContribution(data: SignedContributionAndProof) =
|
|
node.eventBus.contribQueue.emit(data)
|
|
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
|
|
node.eventBus.exitQueue.emit(data)
|
|
proc onBLSToExecutionChangeAdded(data: SignedBLSToExecutionChange) =
|
|
node.eventBus.blsToExecQueue.emit(data)
|
|
proc onProposerSlashingAdded(data: ProposerSlashing) =
|
|
node.eventBus.propSlashQueue.emit(data)
|
|
proc onAttesterSlashingAdded(data: phase0.AttesterSlashing) =
|
|
node.eventBus.attSlashQueue.emit(data)
|
|
proc onBlobSidecarAdded(data: BlobSidecar) =
|
|
node.eventBus.blobSidecarQueue.emit(
|
|
BlobSidecarInfoObject(
|
|
block_root: hash_tree_root(data.signed_block_header.message),
|
|
index: data.index,
|
|
slot: data.signed_block_header.message.slot,
|
|
kzg_commitment: data.kzg_commitment,
|
|
versioned_hash:
|
|
data.kzg_commitment.kzg_commitment_to_versioned_hash.to0xHex))
|
|
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
|
|
let optimistic =
|
|
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
|
|
some node.dag.is_optimistic(data.toBlockId())
|
|
else:
|
|
none[bool]()
|
|
node.eventBus.blocksQueue.emit(
|
|
EventBeaconBlockObject.init(data, optimistic))
|
|
proc onHeadChanged(data: HeadChangeInfoObject) =
|
|
let eventData =
|
|
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
|
|
var res = data
|
|
res.optimistic = some node.dag.is_optimistic(
|
|
BlockId(slot: data.slot, root: data.block_root))
|
|
res
|
|
else:
|
|
data
|
|
node.eventBus.headQueue.emit(eventData)
|
|
proc onChainReorg(data: ReorgInfoObject) =
|
|
let eventData =
|
|
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
|
|
var res = data
|
|
res.optimistic = some node.dag.is_optimistic(
|
|
BlockId(slot: data.slot, root: data.new_head_block))
|
|
res
|
|
else:
|
|
data
|
|
node.eventBus.reorgQueue.emit(eventData)
|
|
proc makeOnFinalizationCb(
|
|
# This `nimcall` functions helps for keeping track of what
|
|
# needs to be captured by the onFinalization closure.
|
|
eventBus: EventBus,
|
|
elManager: ELManager): OnFinalizedCallback {.nimcall.} =
|
|
static: doAssert (elManager is ref)
|
|
return proc(dag: ChainDAGRef, data: FinalizationInfoObject) =
|
|
if elManager != nil:
|
|
let finalizedEpochRef = dag.getFinalizedEpochRef()
|
|
discard trackFinalizedState(elManager,
|
|
finalizedEpochRef.eth1_data,
|
|
finalizedEpochRef.eth1_deposit_index)
|
|
node.updateLightClientFromDag()
|
|
let eventData =
|
|
if node.currentSlot().epoch() >= dag.cfg.BELLATRIX_FORK_EPOCH:
|
|
var res = data
|
|
# `slot` in this `BlockId` may be higher than block's actual slot,
|
|
# this is alright for the purpose of calling `is_optimistic`.
|
|
res.optimistic = some node.dag.is_optimistic(
|
|
BlockId(slot: data.epoch.start_slot, root: data.block_root))
|
|
res
|
|
else:
|
|
data
|
|
eventBus.finalQueue.emit(eventData)
|
|
|
|
func getLocalHeadSlot(): Slot =
|
|
dag.head.slot
|
|
|
|
proc getLocalWallSlot(): Slot =
|
|
node.beaconClock.now.slotOrZero
|
|
|
|
func getFirstSlotAtFinalizedEpoch(): Slot =
|
|
dag.finalizedHead.slot
|
|
|
|
func getBackfillSlot(): Slot =
|
|
if dag.backfill.parent_root != dag.tail.root:
|
|
dag.backfill.slot
|
|
else:
|
|
dag.tail.slot
|
|
|
|
func getFrontfillSlot(): Slot =
|
|
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
|
|
|
|
let
|
|
quarantine = newClone(
|
|
Quarantine.init())
|
|
attestationPool = newClone(AttestationPool.init(
|
|
dag, quarantine, config.forkChoiceVersion.get, onAttestationReceived))
|
|
syncCommitteeMsgPool = newClone(
|
|
SyncCommitteeMsgPool.init(rng, dag.cfg, onSyncContribution))
|
|
lightClientPool = newClone(
|
|
LightClientPool())
|
|
validatorChangePool = newClone(ValidatorChangePool.init(
|
|
dag, attestationPool, onVoluntaryExitAdded, onBLSToExecutionChangeAdded,
|
|
onProposerSlashingAdded, onAttesterSlashingAdded))
|
|
blobQuarantine = newClone(BlobQuarantine.init(onBlobSidecarAdded))
|
|
consensusManager = ConsensusManager.new(
|
|
dag, attestationPool, quarantine, node.elManager,
|
|
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
|
|
node.dynamicFeeRecipientsStore, config.validatorsDir,
|
|
config.defaultFeeRecipient, config.suggestedGasLimit)
|
|
blockProcessor = BlockProcessor.new(
|
|
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
|
|
rng, taskpool, consensusManager, node.validatorMonitor,
|
|
blobQuarantine, getBeaconTime)
|
|
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
|
blobs: Opt[BlobSidecars], maybeFinalized: bool):
|
|
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
|
|
# The design with a callback for block verification is unusual compared
|
|
# to the rest of the application, but fits with the general approach
|
|
# taken in the sync/request managers - this is an architectural compromise
|
|
# that should probably be reimagined more holistically in the future.
|
|
blockProcessor[].addBlock(
|
|
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
|
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
|
maybeFinalized: bool):
|
|
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
|
withBlck(signedBlock):
|
|
when consensusFork >= ConsensusFork.Deneb:
|
|
if not blobQuarantine[].hasBlobs(forkyBlck):
|
|
# We don't have all the blobs for this block, so we have
|
|
# to put it in blobless quarantine.
|
|
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
|
|
err(VerifierError.UnviableFork)
|
|
else:
|
|
err(VerifierError.MissingParent)
|
|
else:
|
|
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
|
|
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
|
Opt.some(blobs),
|
|
maybeFinalized = maybeFinalized)
|
|
else:
|
|
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
|
Opt.none(BlobSidecars),
|
|
maybeFinalized = maybeFinalized)
|
|
rmanBlockLoader = proc(
|
|
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
|
|
dag.getForkedBlock(blockRoot)
|
|
rmanBlobLoader = proc(
|
|
blobId: BlobIdentifier): Opt[ref BlobSidecar] =
|
|
var blob_sidecar = BlobSidecar.new()
|
|
if dag.db.getBlobSidecar(blobId.block_root, blobId.index, blob_sidecar[]):
|
|
Opt.some blob_sidecar
|
|
else:
|
|
Opt.none(ref BlobSidecar)
|
|
|
|
processor = Eth2Processor.new(
|
|
config.doppelgangerDetection,
|
|
blockProcessor, node.validatorMonitor, dag, attestationPool,
|
|
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
|
|
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
|
|
syncManager = newSyncManager[Peer, PeerId](
|
|
node.network.peerPool,
|
|
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
|
SyncQueueKind.Forward, getLocalHeadSlot,
|
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
|
getFrontfillSlot, dag.tail.slot, blockVerifier)
|
|
backfiller = newSyncManager[Peer, PeerId](
|
|
node.network.peerPool,
|
|
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
|
SyncQueueKind.Backward, getLocalHeadSlot,
|
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
|
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
|
maxHeadAge = 0)
|
|
router = (ref MessageRouter)(
|
|
processor: processor,
|
|
network: node.network)
|
|
requestManager = RequestManager.init(
|
|
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
|
|
(proc(): bool = syncManager.inProgress),
|
|
quarantine, blobQuarantine, rmanBlockVerifier,
|
|
rmanBlockLoader, rmanBlobLoader)
|
|
|
|
if node.config.lightClientDataServe:
|
|
proc scheduleSendingLightClientUpdates(slot: Slot) =
|
|
if node.lightClientPool[].broadcastGossipFut != nil:
|
|
return
|
|
if slot <= node.lightClientPool[].latestBroadcastedSlot:
|
|
return
|
|
node.lightClientPool[].latestBroadcastedSlot = slot
|
|
|
|
template fut(): auto = node.lightClientPool[].broadcastGossipFut
|
|
fut = node.handleLightClientUpdates(slot)
|
|
fut.addCallback do (p: pointer) {.gcsafe.}:
|
|
fut = nil
|
|
|
|
router.onSyncCommitteeMessage = scheduleSendingLightClientUpdates
|
|
|
|
dag.setFinalizationCb makeOnFinalizationCb(node.eventBus, node.elManager)
|
|
dag.setBlockCb(onBlockAdded)
|
|
dag.setHeadCb(onHeadChanged)
|
|
dag.setReorgCb(onChainReorg)
|
|
|
|
node.dag = dag
|
|
node.blobQuarantine = blobQuarantine
|
|
node.quarantine = quarantine
|
|
node.attestationPool = attestationPool
|
|
node.syncCommitteeMsgPool = syncCommitteeMsgPool
|
|
node.lightClientPool = lightClientPool
|
|
node.validatorChangePool = validatorChangePool
|
|
node.processor = processor
|
|
node.blockProcessor = blockProcessor
|
|
node.consensusManager = consensusManager
|
|
node.requestManager = requestManager
|
|
node.syncManager = syncManager
|
|
node.backfiller = backfiller
|
|
node.router = router
|
|
|
|
await node.addValidators()
|
|
|
|
block:
|
|
# Add in-process validators to the list of "known" validators such that
|
|
# we start with a reasonable ENR
|
|
let wallSlot = node.beaconClock.now().slotOrZero()
|
|
for validator in node.attachedValidators[].validators.values():
|
|
if config.validatorMonitorAuto:
|
|
node.validatorMonitor[].addMonitor(validator.pubkey, validator.index)
|
|
|
|
if validator.index.isSome():
|
|
withState(dag.headState):
|
|
let idx = validator.index.get()
|
|
if distinctBase(idx) <= forkyState.data.validators.lenu64:
|
|
template v: auto = forkyState.data.validators.item(idx)
|
|
if is_active_validator(v, wallSlot.epoch) or
|
|
is_active_validator(v, wallSlot.epoch + 1):
|
|
node.consensusManager[].actionTracker.knownValidators[idx] = wallSlot
|
|
elif is_exited_validator(v, wallSlot.epoch):
|
|
notice "Ignoring exited validator",
|
|
index = idx,
|
|
pubkey = shortLog(v.pubkey)
|
|
let stabilitySubnets =
|
|
node.consensusManager[].actionTracker.stabilitySubnets(wallSlot)
|
|
# Here, we also set the correct ENR should we be in all subnets mode!
|
|
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
|
|
|
|
node.network.registerProtocol(
|
|
PeerSync, PeerSync.NetworkState.init(
|
|
node.dag,
|
|
node.beaconClock.getBeaconTimeFn(),
|
|
))
|
|
|
|
node.network.registerProtocol(
|
|
BeaconSync, BeaconSync.NetworkState.init(node.dag))
|
|
|
|
if node.dag.lcDataStore.serve:
|
|
node.network.registerProtocol(
|
|
LightClientSync, LightClientSync.NetworkState.init(node.dag))
|
|
|
|
node.updateValidatorMetrics()
|
|
|
|
const
|
|
SlashingDbName = "slashing_protection"
|
|
# changing this requires physical file rename as well or history is lost.
|
|
|
|
proc init*(T: type BeaconNode,
|
|
rng: ref HmacDrbgContext,
|
|
config: BeaconNodeConf,
|
|
metadata: Eth2NetworkMetadata): Future[BeaconNode]
|
|
{.async.} =
|
|
var taskpool: TaskPoolPtr
|
|
|
|
template cfg: auto = metadata.cfg
|
|
template eth1Network: auto = metadata.eth1Network
|
|
|
|
try:
|
|
if config.numThreads < 0:
|
|
fatal "The number of threads --numThreads cannot be negative."
|
|
quit 1
|
|
elif config.numThreads == 0:
|
|
taskpool = TaskPoolPtr.new(numThreads = min(countProcessors(), 16))
|
|
else:
|
|
taskpool = TaskPoolPtr.new(numThreads = config.numThreads)
|
|
|
|
info "Threadpool started", numThreads = taskpool.numThreads
|
|
except Exception:
|
|
raise newException(Defect, "Failure in taskpool initialization.")
|
|
|
|
if metadata.genesis.kind == BakedIn:
|
|
if config.genesisState.isSome:
|
|
warn "The --genesis-state option has no effect on networks with built-in genesis state"
|
|
|
|
if config.genesisStateUrl.isSome:
|
|
warn "The --genesis-state-url option has no effect on networks with built-in genesis state"
|
|
|
|
let
|
|
eventBus = EventBus(
|
|
headQueue: newAsyncEventQueue[HeadChangeInfoObject](),
|
|
blocksQueue: newAsyncEventQueue[EventBeaconBlockObject](),
|
|
attestQueue: newAsyncEventQueue[phase0.Attestation](),
|
|
exitQueue: newAsyncEventQueue[SignedVoluntaryExit](),
|
|
blsToExecQueue: newAsyncEventQueue[SignedBLSToExecutionChange](),
|
|
propSlashQueue: newAsyncEventQueue[ProposerSlashing](),
|
|
attSlashQueue: newAsyncEventQueue[AttesterSlashing](),
|
|
blobSidecarQueue: newAsyncEventQueue[BlobSidecarInfoObject](),
|
|
finalQueue: newAsyncEventQueue[FinalizationInfoObject](),
|
|
reorgQueue: newAsyncEventQueue[ReorgInfoObject](),
|
|
contribQueue: newAsyncEventQueue[SignedContributionAndProof](),
|
|
finUpdateQueue: newAsyncEventQueue[
|
|
RestVersioned[ForkedLightClientFinalityUpdate]](),
|
|
optUpdateQueue: newAsyncEventQueue[
|
|
RestVersioned[ForkedLightClientOptimisticUpdate]]())
|
|
db = BeaconChainDB.new(config.databaseDir, cfg, inMemory = false)
|
|
|
|
if config.externalBeaconApiUrl.isSome and ChainDAGRef.isInitialized(db).isErr:
|
|
var genesisState: ref ForkedHashedBeaconState
|
|
let trustedBlockRoot =
|
|
if config.trustedStateRoot.isSome or config.trustedBlockRoot.isSome:
|
|
config.trustedBlockRoot
|
|
elif cfg.ALTAIR_FORK_EPOCH == GENESIS_EPOCH:
|
|
# Sync can be bootstrapped from the genesis block root
|
|
genesisState = await fetchGenesisState(
|
|
metadata, config.genesisState, config.genesisStateUrl)
|
|
if genesisState != nil:
|
|
let genesisBlockRoot = get_initial_beacon_block(genesisState[]).root
|
|
notice "Neither `--trusted-block-root` nor `--trusted-state-root` " &
|
|
"provided with `--external-beacon-api-url`, " &
|
|
"falling back to genesis block root",
|
|
externalBeaconApiUrl = config.externalBeaconApiUrl.get,
|
|
trustedBlockRoot = config.trustedBlockRoot,
|
|
trustedStateRoot = config.trustedStateRoot,
|
|
genesisBlockRoot = $genesisBlockRoot
|
|
some genesisBlockRoot
|
|
else:
|
|
none[Eth2Digest]()
|
|
else:
|
|
none[Eth2Digest]()
|
|
if config.trustedStateRoot.isNone and trustedBlockRoot.isNone:
|
|
warn "Ignoring `--external-beacon-api-url`, neither " &
|
|
"`--trusted-block-root` nor `--trusted-state-root` provided",
|
|
externalBeaconApiUrl = config.externalBeaconApiUrl.get,
|
|
trustedBlockRoot = config.trustedBlockRoot,
|
|
trustedStateRoot = config.trustedStateRoot
|
|
else:
|
|
if genesisState == nil:
|
|
genesisState = await fetchGenesisState(
|
|
metadata, config.genesisState, config.genesisStateUrl)
|
|
await db.doRunTrustedNodeSync(
|
|
metadata,
|
|
config.databaseDir,
|
|
config.eraDir,
|
|
config.externalBeaconApiUrl.get,
|
|
config.trustedStateRoot.map do (x: Eth2Digest) -> string:
|
|
"0x" & x.data.toHex,
|
|
trustedBlockRoot,
|
|
backfill = false,
|
|
reindex = false,
|
|
downloadDepositSnapshot = false,
|
|
genesisState)
|
|
|
|
if config.finalizedCheckpointBlock.isSome:
|
|
warn "--finalized-checkpoint-block has been deprecated, ignoring"
|
|
|
|
let checkpointState = if config.finalizedCheckpointState.isSome:
|
|
let checkpointStatePath = config.finalizedCheckpointState.get.string
|
|
let tmp = try:
|
|
newClone(readSszForkedHashedBeaconState(
|
|
cfg, readAllBytes(checkpointStatePath).tryGet()))
|
|
except SszError as err:
|
|
fatal "Checkpoint state loading failed",
|
|
err = formatMsg(err, checkpointStatePath)
|
|
quit 1
|
|
except CatchableError as err:
|
|
fatal "Failed to read checkpoint state file", err = err.msg
|
|
quit 1
|
|
|
|
if not getStateField(tmp[], slot).is_epoch:
|
|
fatal "--finalized-checkpoint-state must point to a state for an epoch slot",
|
|
slot = getStateField(tmp[], slot)
|
|
quit 1
|
|
tmp
|
|
else:
|
|
nil
|
|
|
|
if config.finalizedDepositTreeSnapshot.isSome:
|
|
let
|
|
depositTreeSnapshotPath = config.finalizedDepositTreeSnapshot.get.string
|
|
snapshot =
|
|
try:
|
|
SSZ.loadFile(depositTreeSnapshotPath, DepositTreeSnapshot)
|
|
except SszError as err:
|
|
fatal "Deposit tree snapshot loading failed",
|
|
err = formatMsg(err, depositTreeSnapshotPath)
|
|
quit 1
|
|
except CatchableError as err:
|
|
fatal "Failed to read deposit tree snapshot file", err = err.msg
|
|
quit 1
|
|
depositContractSnapshot = DepositContractSnapshot.init(snapshot).valueOr:
|
|
fatal "Invalid deposit tree snapshot file"
|
|
quit 1
|
|
db.putDepositContractSnapshot(depositContractSnapshot)
|
|
|
|
let engineApiUrls = config.engineApiUrls
|
|
|
|
if engineApiUrls.len == 0:
|
|
notice "Running without execution client - validator features disabled (see https://nimbus.guide/eth1.html)"
|
|
|
|
var networkGenesisValidatorsRoot = metadata.bakedGenesisValidatorsRoot
|
|
|
|
if not ChainDAGRef.isInitialized(db).isOk():
|
|
let genesisState =
|
|
if checkpointState != nil and
|
|
getStateField(checkpointState[], slot) == 0:
|
|
checkpointState
|
|
else:
|
|
await fetchGenesisState(
|
|
metadata, config.genesisState, config.genesisStateUrl)
|
|
|
|
if genesisState == nil and checkpointState == nil:
|
|
fatal "No database and no genesis snapshot found. Please supply a genesis.ssz " &
|
|
"with the network configuration"
|
|
quit 1
|
|
|
|
if not genesisState.isNil and not checkpointState.isNil:
|
|
if getStateField(genesisState[], genesis_validators_root) !=
|
|
getStateField(checkpointState[], genesis_validators_root):
|
|
fatal "Checkpoint state does not match genesis - check the --network parameter",
|
|
rootFromGenesis = getStateField(
|
|
genesisState[], genesis_validators_root),
|
|
rootFromCheckpoint = getStateField(
|
|
checkpointState[], genesis_validators_root)
|
|
quit 1
|
|
|
|
try:
|
|
# Always store genesis state if we have it - this allows reindexing and
|
|
# answering genesis queries
|
|
if not genesisState.isNil:
|
|
ChainDAGRef.preInit(db, genesisState[])
|
|
networkGenesisValidatorsRoot =
|
|
Opt.some(getStateField(genesisState[], genesis_validators_root))
|
|
|
|
if not checkpointState.isNil:
|
|
if genesisState.isNil or
|
|
getStateField(checkpointState[], slot) != GENESIS_SLOT:
|
|
ChainDAGRef.preInit(db, checkpointState[])
|
|
|
|
doAssert ChainDAGRef.isInitialized(db).isOk(), "preInit should have initialized db"
|
|
except CatchableError as exc:
|
|
error "Failed to initialize database", err = exc.msg
|
|
quit 1
|
|
else:
|
|
if not checkpointState.isNil:
|
|
fatal "A database already exists, cannot start from given checkpoint",
|
|
dataDir = config.dataDir
|
|
quit 1
|
|
|
|
# Doesn't use std/random directly, but dependencies might
|
|
randomize(rng[].rand(high(int)))
|
|
|
|
# The validatorMonitorTotals flag has been deprecated and should eventually be
|
|
# removed - until then, it's given priority if set so as not to needlessly
|
|
# break existing setups
|
|
let
|
|
validatorMonitor = newClone(ValidatorMonitor.init(
|
|
config.validatorMonitorAuto,
|
|
config.validatorMonitorTotals.get(
|
|
not config.validatorMonitorDetails)))
|
|
|
|
for key in config.validatorMonitorPubkeys:
|
|
validatorMonitor[].addMonitor(key, Opt.none(ValidatorIndex))
|
|
|
|
let
|
|
dag = loadChainDag(
|
|
config, cfg, db, eventBus,
|
|
validatorMonitor, networkGenesisValidatorsRoot)
|
|
genesisTime = getStateField(dag.headState, genesis_time)
|
|
beaconClock = BeaconClock.init(genesisTime).valueOr:
|
|
fatal "Invalid genesis time in state", genesisTime
|
|
quit 1
|
|
|
|
getBeaconTime = beaconClock.getBeaconTimeFn()
|
|
|
|
if config.weakSubjectivityCheckpoint.isSome:
|
|
dag.checkWeakSubjectivityCheckpoint(
|
|
config.weakSubjectivityCheckpoint.get, beaconClock)
|
|
|
|
let elManager = ELManager.new(
|
|
cfg,
|
|
metadata.depositContractBlock,
|
|
metadata.depositContractBlockHash,
|
|
db,
|
|
engineApiUrls,
|
|
eth1Network)
|
|
|
|
if config.rpcEnabled.isSome:
|
|
warn "Nimbus's JSON-RPC server has been removed. This includes the --rpc, --rpc-port, and --rpc-address configuration options. https://nimbus.guide/rest-api.html shows how to enable and configure the REST Beacon API server which replaces it."
|
|
|
|
let restServer = if config.restEnabled:
|
|
RestServerRef.init(config.restAddress, config.restPort,
|
|
config.restAllowedOrigin,
|
|
validateBeaconApiQueries,
|
|
config)
|
|
else:
|
|
nil
|
|
|
|
let
|
|
netKeys = getPersistentNetKeys(rng[], config)
|
|
nickname = if config.nodeName == "auto": shortForm(netKeys)
|
|
else: config.nodeName
|
|
network = createEth2Node(
|
|
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
|
|
getStateField(dag.headState, genesis_validators_root))
|
|
|
|
case config.slashingDbKind
|
|
of SlashingDbKind.v2:
|
|
discard
|
|
of SlashingDbKind.v1:
|
|
error "Slashing DB v1 is no longer supported for writing"
|
|
quit 1
|
|
of SlashingDbKind.both:
|
|
warn "Slashing DB v1 deprecated, writing only v2"
|
|
|
|
info "Loading slashing protection database (v2)",
|
|
path = config.validatorsDir()
|
|
|
|
proc getValidatorAndIdx(pubkey: ValidatorPubKey): Opt[ValidatorAndIndex] =
|
|
withState(dag.headState):
|
|
getValidator(forkyState().data.validators.asSeq(), pubkey)
|
|
|
|
func getCapellaForkVersion(): Opt[Version] =
|
|
Opt.some(cfg.CAPELLA_FORK_VERSION)
|
|
|
|
func getDenebForkEpoch(): Opt[Epoch] =
|
|
Opt.some(cfg.DENEB_FORK_EPOCH)
|
|
|
|
func getElectraForkEpoch(): Opt[Epoch] =
|
|
Opt.some(cfg.ELECTRA_FORK_EPOCH)
|
|
|
|
proc getForkForEpoch(epoch: Epoch): Opt[Fork] =
|
|
Opt.some(dag.forkAtEpoch(epoch))
|
|
|
|
proc getGenesisRoot(): Eth2Digest =
|
|
getStateField(dag.headState, genesis_validators_root)
|
|
|
|
let
|
|
keystoreCache = KeystoreCacheRef.init()
|
|
slashingProtectionDB =
|
|
SlashingProtectionDB.init(
|
|
getStateField(dag.headState, genesis_validators_root),
|
|
config.validatorsDir(), SlashingDbName)
|
|
validatorPool = newClone(ValidatorPool.init(
|
|
slashingProtectionDB, config.doppelgangerDetection))
|
|
|
|
keymanagerInitResult = initKeymanagerServer(config, restServer)
|
|
keymanagerHost = if keymanagerInitResult.server != nil:
|
|
newClone KeymanagerHost.init(
|
|
validatorPool,
|
|
keystoreCache,
|
|
rng,
|
|
keymanagerInitResult.token,
|
|
config.validatorsDir,
|
|
config.secretsDir,
|
|
config.defaultFeeRecipient,
|
|
config.suggestedGasLimit,
|
|
config.defaultGraffitiBytes,
|
|
config.getPayloadBuilderAddress,
|
|
getValidatorAndIdx,
|
|
getBeaconTime,
|
|
getCapellaForkVersion,
|
|
getDenebForkEpoch,
|
|
getForkForEpoch,
|
|
getGenesisRoot)
|
|
else: nil
|
|
|
|
stateTtlCache =
|
|
if config.restCacheSize > 0:
|
|
StateTtlCache.init(
|
|
cacheSize = config.restCacheSize,
|
|
cacheTtl = chronos.seconds(config.restCacheTtl))
|
|
else:
|
|
nil
|
|
|
|
if config.payloadBuilderEnable:
|
|
info "Using external payload builder",
|
|
payloadBuilderUrl = config.payloadBuilderUrl
|
|
|
|
let node = BeaconNode(
|
|
nickname: nickname,
|
|
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
|
|
else: defaultGraffitiBytes(),
|
|
network: network,
|
|
netKeys: netKeys,
|
|
db: db,
|
|
config: config,
|
|
attachedValidators: validatorPool,
|
|
elManager: elManager,
|
|
restServer: restServer,
|
|
keymanagerHost: keymanagerHost,
|
|
keymanagerServer: keymanagerInitResult.server,
|
|
keystoreCache: keystoreCache,
|
|
eventBus: eventBus,
|
|
gossipState: {},
|
|
blocksGossipState: {},
|
|
beaconClock: beaconClock,
|
|
validatorMonitor: validatorMonitor,
|
|
stateTtlCache: stateTtlCache,
|
|
dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()))
|
|
|
|
node.initLightClient(
|
|
rng, cfg, dag.forkDigests, getBeaconTime, dag.genesis_validators_root)
|
|
await node.initFullNode(rng, dag, taskpool, getBeaconTime)
|
|
|
|
node.updateLightClientFromDag()
|
|
|
|
node
|
|
|
|
func verifyFinalization(node: BeaconNode, slot: Slot) =
|
|
# Epoch must be >= 4 to check finalization
|
|
const SETTLING_TIME_OFFSET = 1'u64
|
|
let epoch = slot.epoch()
|
|
|
|
# Don't static-assert this -- if this isn't called, don't require it
|
|
doAssert SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET
|
|
|
|
# Intentionally, loudly assert. Point is to fail visibly and unignorably
|
|
# during testing.
|
|
if epoch >= 4 and slot mod SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET:
|
|
let finalizedEpoch =
|
|
node.dag.finalizedHead.slot.epoch()
|
|
# Finalization rule 234, that has the most lag slots among the cases, sets
|
|
# state.finalized_checkpoint = old_previous_justified_checkpoint.epoch + 3
|
|
# and then state.slot gets incremented, to increase the maximum offset, if
|
|
# finalization occurs every slot, to 4 slots vs scheduledSlot.
|
|
doAssert finalizedEpoch + 4 >= epoch
|
|
|
|
from std/sequtils import toSeq
|
|
|
|
func subnetLog(v: BitArray): string =
|
|
$toSeq(v.oneIndices())
|
|
|
|
func forkDigests(node: BeaconNode): auto =
|
|
let forkDigestsArray: array[ConsensusFork, auto] = [
|
|
node.dag.forkDigests.phase0,
|
|
node.dag.forkDigests.altair,
|
|
node.dag.forkDigests.bellatrix,
|
|
node.dag.forkDigests.capella,
|
|
node.dag.forkDigests.deneb,
|
|
node.dag.forkDigests.electra]
|
|
forkDigestsArray
|
|
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attestation-subnet-subscription
|
|
proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
|
|
if node.gossipState.card == 0:
|
|
# When disconnected, updateBlocksGossipStatus is responsible for all things
|
|
# subnets - in particular, it will remove subscriptions on the edge where
|
|
# we enter the disconnected state.
|
|
return
|
|
|
|
let
|
|
aggregateSubnets =
|
|
node.consensusManager[].actionTracker.aggregateSubnets(slot)
|
|
stabilitySubnets =
|
|
node.consensusManager[].actionTracker.stabilitySubnets(slot)
|
|
subnets = aggregateSubnets + stabilitySubnets
|
|
|
|
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
|
|
|
|
# Now we know what we should be subscribed to - make it so
|
|
let
|
|
prevSubnets = node.consensusManager[].actionTracker.subscribedSubnets
|
|
unsubscribeSubnets = prevSubnets - subnets
|
|
subscribeSubnets = subnets - prevSubnets
|
|
|
|
# Remember what we subscribed to, so we can unsubscribe later
|
|
node.consensusManager[].actionTracker.subscribedSubnets = subnets
|
|
|
|
let forkDigests = node.forkDigests()
|
|
|
|
for gossipFork in node.gossipState:
|
|
let forkDigest = forkDigests[gossipFork]
|
|
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, forkDigest)
|
|
node.network.subscribeAttestationSubnets(subscribeSubnets, forkDigest)
|
|
|
|
debug "Attestation subnets",
|
|
slot, epoch = slot.epoch, gossipState = node.gossipState,
|
|
stabilitySubnets = subnetLog(stabilitySubnets),
|
|
aggregateSubnets = subnetLog(aggregateSubnets),
|
|
prevSubnets = subnetLog(prevSubnets),
|
|
subscribeSubnets = subnetLog(subscribeSubnets),
|
|
unsubscribeSubnets = subnetLog(unsubscribeSubnets),
|
|
gossipState = node.gossipState
|
|
|
|
proc updateBlocksGossipStatus*(
|
|
node: BeaconNode, slot: Slot, dagIsBehind: bool) =
|
|
template cfg(): auto = node.dag.cfg
|
|
|
|
let
|
|
isBehind =
|
|
if node.shouldSyncOptimistically(slot):
|
|
# If optimistic sync is active, always subscribe to blocks gossip
|
|
false
|
|
else:
|
|
# Use DAG status to determine whether to subscribe for blocks gossip
|
|
dagIsBehind
|
|
|
|
targetGossipState = getTargetGossipState(
|
|
slot.epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH,
|
|
cfg.CAPELLA_FORK_EPOCH, cfg.DENEB_FORK_EPOCH, cfg.ELECTRA_FORK_EPOCH,
|
|
isBehind)
|
|
|
|
template currentGossipState(): auto = node.blocksGossipState
|
|
if currentGossipState == targetGossipState:
|
|
return
|
|
|
|
if currentGossipState.card == 0 and targetGossipState.card > 0:
|
|
debug "Enabling blocks topic subscriptions",
|
|
wallSlot = slot, targetGossipState
|
|
elif currentGossipState.card > 0 and targetGossipState.card == 0:
|
|
debug "Disabling blocks topic subscriptions",
|
|
wallSlot = slot
|
|
else:
|
|
# Individual forks added / removed
|
|
discard
|
|
|
|
let
|
|
newGossipForks = targetGossipState - currentGossipState
|
|
oldGossipForks = currentGossipState - targetGossipState
|
|
|
|
for gossipFork in oldGossipForks:
|
|
let forkDigest = node.dag.forkDigests[].atConsensusFork(gossipFork)
|
|
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
|
|
|
|
for gossipFork in newGossipForks:
|
|
let forkDigest = node.dag.forkDigests[].atConsensusFork(gossipFork)
|
|
node.network.subscribe(
|
|
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
|
|
enableTopicMetrics = true)
|
|
|
|
node.blocksGossipState = targetGossipState
|
|
|
|
proc addPhase0MessageHandlers(
|
|
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
|
|
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
|
|
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
|
|
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
|
|
node.network.subscribe(
|
|
getAggregateAndProofsTopic(forkDigest), aggregateTopicParams,
|
|
enableTopicMetrics = true)
|
|
|
|
# updateAttestationSubnetHandlers subscribes attestation subnets
|
|
|
|
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
|
|
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
|
|
node.network.unsubscribe(getProposerSlashingsTopic(forkDigest))
|
|
node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest))
|
|
node.network.unsubscribe(getAggregateAndProofsTopic(forkDigest))
|
|
|
|
for subnet_id in SubnetId:
|
|
node.network.unsubscribe(getAttestationTopic(forkDigest, subnet_id))
|
|
|
|
node.consensusManager[].actionTracker.subscribedSubnets = default(AttnetBits)
|
|
|
|
func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto =
|
|
# Only used to determine which gossip topics to which to subscribe
|
|
if node.config.subscribeAllSubnets:
|
|
(func(pubkey: ValidatorPubKey): bool {.closure.} = true)
|
|
else:
|
|
(func(pubkey: ValidatorPubKey): bool =
|
|
node.consensusManager[].actionTracker.hasSyncDuty(pubkey, epoch) or
|
|
pubkey in node.attachedValidators[].validators)
|
|
|
|
func getCurrentSyncCommiteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
|
|
let syncCommittee = withState(node.dag.headState):
|
|
when consensusFork >= ConsensusFork.Altair:
|
|
forkyState.data.current_sync_committee
|
|
else:
|
|
return static(default(SyncnetBits))
|
|
|
|
getSyncSubnets(node.hasSyncPubKey(epoch), syncCommittee)
|
|
|
|
func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
|
|
let syncCommittee = withState(node.dag.headState):
|
|
when consensusFork >= ConsensusFork.Altair:
|
|
forkyState.data.next_sync_committee
|
|
else:
|
|
return static(default(SyncnetBits))
|
|
|
|
getSyncSubnets(
|
|
node.hasSyncPubKey((epoch.sync_committee_period + 1).start_slot().epoch),
|
|
syncCommittee)
|
|
|
|
func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
|
|
let
|
|
subnets = node.getCurrentSyncCommiteeSubnets(epoch)
|
|
epochsToSyncPeriod = nearSyncCommitteePeriod(epoch)
|
|
|
|
# The end-slot tracker might call this when it's theoretically applicable,
|
|
# but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync
|
|
# committee period begins, in which case `epochsToNextSyncPeriod` is none.
|
|
if epochsToSyncPeriod.isNone or
|
|
node.dag.cfg.consensusForkAtEpoch(epoch + epochsToSyncPeriod.get) <
|
|
ConsensusFork.Altair:
|
|
return subnets
|
|
|
|
subnets + node.getNextSyncCommitteeSubnets(epoch)
|
|
|
|
proc addAltairMessageHandlers(
|
|
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
|
|
node.addPhase0MessageHandlers(forkDigest, slot)
|
|
|
|
# If this comes online near sync committee period, it'll immediately get
|
|
# replaced as usual by trackSyncCommitteeTopics, which runs at slot end.
|
|
let syncnets = node.getSyncCommitteeSubnets(slot.epoch)
|
|
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
if syncnets[subcommitteeIdx]:
|
|
node.network.subscribe(
|
|
getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams)
|
|
|
|
node.network.subscribe(
|
|
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
|
|
|
|
node.network.updateSyncnetsMetadata(syncnets)
|
|
|
|
proc addCapellaMessageHandlers(
|
|
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
|
|
node.addAltairMessageHandlers(forkDigest, slot)
|
|
node.network.subscribe(getBlsToExecutionChangeTopic(forkDigest), basicParams)
|
|
|
|
proc addDenebMessageHandlers(
|
|
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
|
|
node.addCapellaMessageHandlers(forkDigest, slot)
|
|
for topic in blobSidecarTopics(forkDigest):
|
|
node.network.subscribe(topic, basicParams)
|
|
|
|
proc addElectraMessageHandlers(
|
|
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
|
|
node.addDenebMessageHandlers(forkDigest, slot)
|
|
|
|
proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
|
|
node.removePhase0MessageHandlers(forkDigest)
|
|
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
closureScope:
|
|
let idx = subcommitteeIdx
|
|
node.network.unsubscribe(getSyncCommitteeTopic(forkDigest, idx))
|
|
|
|
node.network.unsubscribe(
|
|
getSyncCommitteeContributionAndProofTopic(forkDigest))
|
|
|
|
proc removeCapellaMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
|
|
node.removeAltairMessageHandlers(forkDigest)
|
|
node.network.unsubscribe(getBlsToExecutionChangeTopic(forkDigest))
|
|
|
|
proc removeDenebMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
|
|
node.removeCapellaMessageHandlers(forkDigest)
|
|
for topic in blobSidecarTopics(forkDigest):
|
|
node.network.unsubscribe(topic)
|
|
|
|
proc removeElectraMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
|
|
node.removeDenebMessageHandlers(forkDigest)
|
|
|
|
proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) =
|
|
template lastSyncUpdate: untyped =
|
|
node.consensusManager[].actionTracker.lastSyncUpdate
|
|
if lastSyncUpdate == Opt.some(slot.sync_committee_period()) and
|
|
nearSyncCommitteePeriod(slot.epoch).isNone():
|
|
# No need to update unless we're close to the next sync committee period or
|
|
# new validators were registered with the action tracker
|
|
# TODO we _could_ skip running this in some of the "near" slots, but..
|
|
return
|
|
|
|
lastSyncUpdate = Opt.some(slot.sync_committee_period())
|
|
|
|
let syncnets = node.getSyncCommitteeSubnets(slot.epoch)
|
|
|
|
debug "Updating sync committee subnets",
|
|
syncnets,
|
|
metadata_syncnets = node.network.metadata.syncnets,
|
|
gossipState = node.gossipState
|
|
|
|
# Assume that different gossip fork sync committee setups are in sync; this
|
|
# only remains relevant, currently, for one gossip transition epoch, so the
|
|
# consequences of this not being true aren't exceptionally dire, while this
|
|
# allows for bookkeeping simplication.
|
|
if syncnets == node.network.metadata.syncnets:
|
|
return
|
|
|
|
let
|
|
newSyncnets =
|
|
syncnets - node.network.metadata.syncnets
|
|
oldSyncnets =
|
|
node.network.metadata.syncnets - syncnets
|
|
forkDigests = node.forkDigests()
|
|
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
doAssert not (newSyncnets[subcommitteeIdx] and
|
|
oldSyncnets[subcommitteeIdx])
|
|
for gossipFork in node.gossipState:
|
|
template topic(): auto =
|
|
getSyncCommitteeTopic(forkDigests[gossipFork], subcommitteeIdx)
|
|
if oldSyncnets[subcommitteeIdx]:
|
|
node.network.unsubscribe(topic)
|
|
elif newSyncnets[subcommitteeIdx]:
|
|
node.network.subscribe(topic, basicParams)
|
|
|
|
node.network.updateSyncnetsMetadata(syncnets)
|
|
|
|
proc doppelgangerChecked(node: BeaconNode, epoch: Epoch) =
|
|
if not node.processor[].doppelgangerDetectionEnabled:
|
|
return
|
|
|
|
# broadcastStartEpoch is set to FAR_FUTURE_EPOCH when we're not monitoring
|
|
# gossip - it is only viable to assert liveness in epochs where gossip is
|
|
# active
|
|
if epoch > node.processor[].doppelgangerDetection.broadcastStartEpoch:
|
|
for validator in node.attachedValidators[]:
|
|
validator.doppelgangerChecked(epoch - 1)
|
|
|
|
from ./spec/state_transition_epoch import effective_balance_might_update
|
|
|
|
proc maybeUpdateActionTrackerNextEpoch(
|
|
node: BeaconNode, forkyState: ForkyHashedBeaconState, nextEpoch: Epoch) =
|
|
if node.consensusManager[].actionTracker.needsUpdate(
|
|
forkyState, nextEpoch):
|
|
template epochRefFallback() =
|
|
let epochRef =
|
|
node.dag.getEpochRef(node.dag.head, nextEpoch, false).expect(
|
|
"Getting head EpochRef should never fail")
|
|
node.consensusManager[].actionTracker.updateActions(
|
|
epochRef.shufflingRef, epochRef.beacon_proposers)
|
|
|
|
when forkyState is phase0.HashedBeaconState:
|
|
# The previous_epoch_participation-based logic requires Altair or newer
|
|
epochRefFallback()
|
|
else:
|
|
let
|
|
shufflingRef = node.dag.getShufflingRef(node.dag.head, nextEpoch, false).valueOr:
|
|
# epochRefFallback() won't work in this case either
|
|
return
|
|
nextEpochProposers = get_beacon_proposer_indices(
|
|
forkyState.data, shufflingRef.shuffled_active_validator_indices,
|
|
nextEpoch)
|
|
nextEpochFirstProposer = nextEpochProposers[0].valueOr:
|
|
# All proposers except the first can be more straightforwardly and
|
|
# efficiently (re)computed correctly once in that epoch.
|
|
epochRefFallback()
|
|
return
|
|
|
|
# Has to account for potential epoch transition TIMELY_SOURCE_FLAG_INDEX,
|
|
# TIMELY_TARGET_FLAG_INDEX, and inactivity penalties, resulting from spec
|
|
# functions get_flag_index_deltas() and get_inactivity_penalty_deltas().
|
|
#
|
|
# There are no penalties associated with TIMELY_HEAD_FLAG_INDEX, but a
|
|
# reward exists. effective_balance == MAX_EFFECTIVE_BALANCE.Gwei ensures
|
|
# if even so, then the effective balance cannot change as a result.
|
|
#
|
|
# It's not truly necessary to avoid all rewards and penalties, but only
|
|
# to bound them to ensure they won't unexpected alter effective balance
|
|
# during the upcoming epoch transition.
|
|
#
|
|
# During genesis epoch, the check for epoch participation is against
|
|
# current, not previous, epoch, and therefore there's a possibility of
|
|
# checking for if a validator has participated in an epoch before it will
|
|
# happen.
|
|
#
|
|
# Because process_rewards_and_penalties() in epoch processing happens
|
|
# before the current/previous participation swap, previous is correct
|
|
# even here, and consistent with what the epoch transition uses.
|
|
#
|
|
# Whilst slashing, proposal, and sync committee rewards and penalties do
|
|
# update the balances as they occur, they don't update effective_balance
|
|
# until the end of epoch, so detect via effective_balance_might_update.
|
|
#
|
|
# On EF mainnet epoch 233906, this matches 99.5% of active validators;
|
|
# with Holesky epoch 2041, 83% of active validators.
|
|
let
|
|
participation_flags =
|
|
forkyState.data.previous_epoch_participation.item(
|
|
nextEpochFirstProposer)
|
|
effective_balance = forkyState.data.validators.item(
|
|
nextEpochFirstProposer).effective_balance
|
|
|
|
if participation_flags.has_flag(TIMELY_SOURCE_FLAG_INDEX) and
|
|
participation_flags.has_flag(TIMELY_TARGET_FLAG_INDEX) and
|
|
effective_balance == MAX_EFFECTIVE_BALANCE.Gwei and
|
|
forkyState.data.slot.epoch != GENESIS_EPOCH and
|
|
forkyState.data.inactivity_scores.item(
|
|
nextEpochFirstProposer) == 0 and
|
|
not effective_balance_might_update(
|
|
forkyState.data.balances.item(nextEpochFirstProposer),
|
|
effective_balance):
|
|
node.consensusManager[].actionTracker.updateActions(
|
|
shufflingRef, nextEpochProposers)
|
|
else:
|
|
epochRefFallback()
|
|
|
|
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
|
|
## Subscribe to subnets that we are providing stability for or aggregating
|
|
## and unsubscribe from the ones that are no longer relevant.
|
|
|
|
# Let the tracker know what duties are approaching - this will tell us how
|
|
# many stability subnets we need to be subscribed to and what subnets we'll
|
|
# soon be aggregating - in addition to the in-beacon-node duties, there may
|
|
# also be duties coming from the validator client, but we don't control when
|
|
# these arrive
|
|
await node.registerDuties(slot)
|
|
|
|
# We start subscribing to gossip before we're fully synced - this allows time
|
|
# to subscribe before the sync end game
|
|
const
|
|
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
|
|
HYSTERESIS_BUFFER = 16
|
|
|
|
static: doAssert high(ConsensusFork) == ConsensusFork.Electra
|
|
|
|
let
|
|
head = node.dag.head
|
|
headDistance =
|
|
if slot > head.slot: (slot - head.slot).uint64
|
|
else: 0'u64
|
|
isBehind =
|
|
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
|
|
targetGossipState =
|
|
getTargetGossipState(
|
|
slot.epoch,
|
|
node.dag.cfg.ALTAIR_FORK_EPOCH,
|
|
node.dag.cfg.BELLATRIX_FORK_EPOCH,
|
|
node.dag.cfg.CAPELLA_FORK_EPOCH,
|
|
node.dag.cfg.DENEB_FORK_EPOCH,
|
|
node.dag.cfg.ELECTRA_FORK_EPOCH,
|
|
isBehind)
|
|
|
|
doAssert targetGossipState.card <= 2
|
|
|
|
let
|
|
newGossipForks = targetGossipState - node.gossipState
|
|
oldGossipForks = node.gossipState - targetGossipState
|
|
|
|
doAssert newGossipForks.card <= 2
|
|
doAssert oldGossipForks.card <= 2
|
|
|
|
func maxGossipFork(gossipState: GossipState): int =
|
|
var res = -1
|
|
for gossipFork in gossipState:
|
|
res = max(res, gossipFork.int)
|
|
res
|
|
|
|
if maxGossipFork(targetGossipState) < maxGossipFork(node.gossipState) and
|
|
targetGossipState != {}:
|
|
warn "Unexpected clock regression during transition",
|
|
targetGossipState,
|
|
gossipState = node.gossipState
|
|
|
|
if node.gossipState.card == 0 and targetGossipState.card > 0:
|
|
# We are synced, so we will connect
|
|
debug "Enabling topic subscriptions",
|
|
wallSlot = slot,
|
|
headSlot = head.slot,
|
|
headDistance, targetGossipState
|
|
|
|
node.processor[].setupDoppelgangerDetection(slot)
|
|
|
|
# Specially when waiting for genesis, we'll already be synced on startup -
|
|
# it might also happen on a sufficiently fast restart
|
|
|
|
# We "know" the actions for the current and the next epoch
|
|
withState(node.dag.headState):
|
|
if node.consensusManager[].actionTracker.needsUpdate(
|
|
forkyState, slot.epoch):
|
|
let epochRef = node.dag.getEpochRef(head, slot.epoch, false).expect(
|
|
"Getting head EpochRef should never fail")
|
|
node.consensusManager[].actionTracker.updateActions(
|
|
epochRef.shufflingRef, epochRef.beacon_proposers)
|
|
|
|
node.maybeUpdateActionTrackerNextEpoch(forkyState, slot.epoch + 1)
|
|
|
|
if node.gossipState.card > 0 and targetGossipState.card == 0:
|
|
debug "Disabling topic subscriptions",
|
|
wallSlot = slot,
|
|
headSlot = head.slot,
|
|
headDistance
|
|
|
|
node.processor[].clearDoppelgangerProtection()
|
|
|
|
let forkDigests = node.forkDigests()
|
|
|
|
const removeMessageHandlers: array[ConsensusFork, auto] = [
|
|
removePhase0MessageHandlers,
|
|
removeAltairMessageHandlers,
|
|
removeAltairMessageHandlers, # bellatrix (altair handlers, different forkDigest)
|
|
removeCapellaMessageHandlers,
|
|
removeDenebMessageHandlers,
|
|
removeElectraMessageHandlers
|
|
]
|
|
|
|
for gossipFork in oldGossipForks:
|
|
removeMessageHandlers[gossipFork](node, forkDigests[gossipFork])
|
|
|
|
const addMessageHandlers: array[ConsensusFork, auto] = [
|
|
addPhase0MessageHandlers,
|
|
addAltairMessageHandlers,
|
|
addAltairMessageHandlers, # bellatrix (altair handlers, different forkDigest)
|
|
addCapellaMessageHandlers,
|
|
addDenebMessageHandlers,
|
|
addElectraMessageHandlers
|
|
]
|
|
|
|
for gossipFork in newGossipForks:
|
|
addMessageHandlers[gossipFork](node, forkDigests[gossipFork], slot)
|
|
|
|
node.gossipState = targetGossipState
|
|
node.doppelgangerChecked(slot.epoch)
|
|
node.updateAttestationSubnetHandlers(slot)
|
|
node.updateBlocksGossipStatus(slot, isBehind)
|
|
node.updateLightClientGossipStatus(slot, isBehind)
|
|
|
|
proc pruneBlobs(node: BeaconNode, slot: Slot) =
|
|
let blobPruneEpoch = (slot.epoch -
|
|
node.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1)
|
|
if slot.is_epoch() and blobPruneEpoch >= node.dag.cfg.DENEB_FORK_EPOCH:
|
|
var blocks: array[SLOTS_PER_EPOCH.int, BlockId]
|
|
var count = 0
|
|
let startIndex = node.dag.getBlockRange(
|
|
blobPruneEpoch.start_slot, 1, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1))
|
|
for i in startIndex..<SLOTS_PER_EPOCH:
|
|
let blck = node.dag.getForkedBlock(blocks[int(i)]).valueOr: continue
|
|
withBlck(blck):
|
|
when typeof(forkyBlck).kind < ConsensusFork.Deneb: continue
|
|
else:
|
|
for j in 0..len(forkyBlck.message.body.blob_kzg_commitments) - 1:
|
|
if node.db.delBlobSidecar(blocks[int(i)].root, BlobIndex(j)):
|
|
count = count + 1
|
|
debug "pruned blobs", count, blobPruneEpoch
|
|
|
|
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
|
# Things we do when slot processing has ended and we're about to wait for the
|
|
# next slot
|
|
|
|
# By waiting until close before slot end, ensure that preparation for next
|
|
# slot does not interfere with propagation of messages and with VC duties.
|
|
const endOffset = aggregateSlotOffset + nanos(
|
|
(NANOSECONDS_PER_SLOT - aggregateSlotOffset.nanoseconds.uint64).int64 div 2)
|
|
let endCutoff = node.beaconClock.fromNow(slot.start_beacon_time + endOffset)
|
|
if endCutoff.inFuture:
|
|
debug "Waiting for slot end", slot, endCutoff = shortLog(endCutoff.offset)
|
|
await sleepAsync(endCutoff.offset)
|
|
|
|
if node.dag.needStateCachesAndForkChoicePruning():
|
|
if node.attachedValidators[].validators.len > 0:
|
|
node.attachedValidators[]
|
|
.slashingProtection
|
|
# pruning is only done if the DB is set to pruning mode.
|
|
.pruneAfterFinalization(
|
|
node.dag.finalizedHead.slot.epoch()
|
|
)
|
|
|
|
# Delay part of pruning until latency critical duties are done.
|
|
# The other part of pruning, `pruneBlocksDAG`, is done eagerly.
|
|
# ----
|
|
# This is the last pruning to do as it clears the "needPruning" condition.
|
|
node.consensusManager[].pruneStateCachesAndForkChoice()
|
|
|
|
if node.config.historyMode == HistoryMode.Prune:
|
|
if not (slot + 1).is_epoch():
|
|
# The epoch slot already is "heavy" due to the epoch processing, leave
|
|
# the pruning for later
|
|
node.dag.pruneHistory()
|
|
node.pruneBlobs(slot)
|
|
|
|
when declared(GC_fullCollect):
|
|
# The slots in the beacon node work as frames in a game: we want to make
|
|
# sure that we're ready for the next one and don't get stuck in lengthy
|
|
# garbage collection tasks when time is of essence in the middle of a slot -
|
|
# while this does not guarantee that we'll never collect during a slot, it
|
|
# makes sure that all the scratch space we used during slot tasks (logging,
|
|
# temporary buffers etc) gets recycled for the next slot that is likely to
|
|
# need similar amounts of memory.
|
|
try:
|
|
GC_fullCollect()
|
|
except Defect as exc:
|
|
raise exc # Reraise to maintain call stack
|
|
except Exception:
|
|
# TODO upstream
|
|
raiseAssert "Unexpected exception during GC collection"
|
|
let gcCollectionTick = Moment.now()
|
|
|
|
# Checkpoint the database to clear the WAL file and make sure changes in
|
|
# the database are synced with the filesystem.
|
|
node.db.checkpoint()
|
|
let
|
|
dbCheckpointTick = Moment.now()
|
|
dbCheckpointDur = dbCheckpointTick - gcCollectionTick
|
|
db_checkpoint_seconds.inc(dbCheckpointDur.toFloatSeconds)
|
|
if dbCheckpointDur >= MinSignificantProcessingDuration:
|
|
info "Database checkpointed", dur = dbCheckpointDur
|
|
else:
|
|
debug "Database checkpointed", dur = dbCheckpointDur
|
|
|
|
node.syncCommitteeMsgPool[].pruneData(slot)
|
|
if slot.is_epoch:
|
|
node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch)
|
|
|
|
# Update upcoming actions - we do this every slot in case a reorg happens
|
|
let head = node.dag.head
|
|
if node.isSynced(head) and head.executionValid:
|
|
withState(node.dag.headState):
|
|
# maybeUpdateActionTrackerNextEpoch might not account for balance changes
|
|
# from the process_rewards_and_penalties() epoch transition but only from
|
|
# process_block() and other per-slot sources. This mainly matters insofar
|
|
# as it might trigger process_effective_balance_updates() changes in that
|
|
# same epoch transition, which function is therefore potentially blind to
|
|
# but which might then affect beacon proposers.
|
|
#
|
|
# Because this runs every slot, it can account naturally for slashings,
|
|
# which affect balances via slash_validator() when they happen, and any
|
|
# missed sync committee participation via process_sync_aggregate(), but
|
|
# attestation penalties for example, need, specific handling.
|
|
# checked by maybeUpdateActionTrackerNextEpoch.
|
|
node.maybeUpdateActionTrackerNextEpoch(forkyState, slot.epoch + 1)
|
|
|
|
let
|
|
nextAttestationSlot =
|
|
node.consensusManager[].actionTracker.getNextAttestationSlot(slot)
|
|
nextProposalSlot =
|
|
node.consensusManager[].actionTracker.getNextProposalSlot(slot)
|
|
nextActionSlot = min(nextAttestationSlot, nextProposalSlot)
|
|
nextActionWaitTime = saturate(fromNow(node.beaconClock, nextActionSlot))
|
|
|
|
# -1 is a more useful output than 18446744073709551615 as an indicator of
|
|
# no future attestation/proposal known.
|
|
template formatInt64(x: Slot): int64 =
|
|
if x == high(uint64).Slot:
|
|
-1'i64
|
|
else:
|
|
toGaugeValue(x)
|
|
|
|
let
|
|
syncCommitteeSlot = slot + 1
|
|
syncCommitteeEpoch = syncCommitteeSlot.epoch
|
|
inCurrentSyncCommittee =
|
|
not node.getCurrentSyncCommiteeSubnets(syncCommitteeEpoch).isZeros()
|
|
|
|
template formatSyncCommitteeStatus(): string =
|
|
if inCurrentSyncCommittee:
|
|
"current"
|
|
elif not node.getNextSyncCommitteeSubnets(syncCommitteeEpoch).isZeros():
|
|
let slotsToNextSyncCommitteePeriod =
|
|
SLOTS_PER_SYNC_COMMITTEE_PERIOD -
|
|
since_sync_committee_period_start(syncCommitteeSlot)
|
|
# int64 conversion is safe
|
|
doAssert slotsToNextSyncCommitteePeriod <= SLOTS_PER_SYNC_COMMITTEE_PERIOD
|
|
"in " & toTimeLeftString(
|
|
SECONDS_PER_SLOT.int64.seconds * slotsToNextSyncCommitteePeriod.int64)
|
|
else:
|
|
"none"
|
|
|
|
info "Slot end",
|
|
slot = shortLog(slot),
|
|
nextActionWait =
|
|
if nextActionSlot == FAR_FUTURE_SLOT:
|
|
"n/a"
|
|
else:
|
|
shortLog(nextActionWaitTime),
|
|
nextAttestationSlot = formatInt64(nextAttestationSlot),
|
|
nextProposalSlot = formatInt64(nextProposalSlot),
|
|
syncCommitteeDuties = formatSyncCommitteeStatus(),
|
|
head = shortLog(head)
|
|
|
|
if nextActionSlot != FAR_FUTURE_SLOT:
|
|
next_action_wait.set(nextActionWaitTime.toFloatSeconds)
|
|
|
|
next_proposal_wait.set(
|
|
if nextProposalSlot != FAR_FUTURE_SLOT:
|
|
saturate(fromNow(node.beaconClock, nextProposalSlot)).toFloatSeconds()
|
|
else:
|
|
Inf)
|
|
|
|
sync_committee_active.set(if inCurrentSyncCommittee: 1 else: 0)
|
|
|
|
let epoch = slot.epoch
|
|
if epoch + 1 >= node.network.forkId.next_fork_epoch:
|
|
# Update 1 epoch early to block non-fork-ready peers
|
|
node.network.updateForkId(epoch, node.dag.genesis_validators_root)
|
|
|
|
# When we're not behind schedule, we'll speculatively update the clearance
|
|
# state in anticipation of receiving the next block - we do it after
|
|
# logging slot end since the nextActionWaitTime can be short
|
|
let advanceCutoff = node.beaconClock.fromNow(
|
|
slot.start_beacon_time() + chronos.seconds(int(SECONDS_PER_SLOT - 1)))
|
|
if advanceCutoff.inFuture:
|
|
# We wait until there's only a second left before the next slot begins, then
|
|
# we advance the clearance state to the next slot - this gives us a high
|
|
# probability of being prepared for the block that will arrive and the
|
|
# epoch processing that follows
|
|
await sleepAsync(advanceCutoff.offset)
|
|
node.dag.advanceClearanceState()
|
|
|
|
# Prepare action tracker for the next slot
|
|
node.consensusManager[].actionTracker.updateSlot(slot + 1)
|
|
|
|
# The last thing we do is to perform the subscriptions and unsubscriptions for
|
|
# the next slot, just before that slot starts - because of the advance cuttoff
|
|
# above, this will be done just before the next slot starts
|
|
node.updateSyncCommitteeTopics(slot + 1)
|
|
|
|
await node.updateGossipStatus(slot + 1)
|
|
|
|
func formatNextConsensusFork(
|
|
node: BeaconNode, withVanityArt = false): Opt[string] =
|
|
let consensusFork =
|
|
node.dag.cfg.consensusForkAtEpoch(node.dag.head.slot.epoch)
|
|
if consensusFork == ConsensusFork.high:
|
|
return Opt.none(string)
|
|
let
|
|
nextConsensusFork = consensusFork.succ()
|
|
nextForkEpoch = node.dag.cfg.consensusForkEpoch(nextConsensusFork)
|
|
if nextForkEpoch == FAR_FUTURE_EPOCH:
|
|
return Opt.none(string)
|
|
Opt.some(
|
|
(if withVanityArt: nextConsensusFork.getVanityMascot & " " else: "") &
|
|
$nextConsensusFork & ":" & $nextForkEpoch)
|
|
|
|
func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
|
let optimisticHead = not node.dag.head.executionValid
|
|
if node.syncManager.inProgress:
|
|
let
|
|
optimisticSuffix =
|
|
if optimisticHead:
|
|
"/opt"
|
|
else:
|
|
""
|
|
lightClientSuffix =
|
|
if node.consensusManager[].shouldSyncOptimistically(wallSlot):
|
|
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
|
|
else:
|
|
""
|
|
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
|
|
elif node.backfiller.inProgress:
|
|
"backfill: " & node.backfiller.syncStatus
|
|
elif optimisticHead:
|
|
"synced/opt"
|
|
else:
|
|
"synced"
|
|
|
|
when defined(windows):
|
|
from winservice import establishWindowsService, reportServiceStatusSuccess
|
|
|
|
proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
|
|
lastSlot: Slot): Future[bool] {.async.} =
|
|
## Called at the beginning of a slot - usually every slot, but sometimes might
|
|
## skip a few in case we're running late.
|
|
## wallTime: current system time - we will strive to perform all duties up
|
|
## to this point in time
|
|
## lastSlot: the last slot that we successfully processed, so we know where to
|
|
## start work from - there might be jumps if processing is delayed
|
|
let
|
|
# The slot we should be at, according to the clock
|
|
wallSlot = wallTime.slotOrZero
|
|
# If everything was working perfectly, the slot that we should be processing
|
|
expectedSlot = lastSlot + 1
|
|
finalizedEpoch = node.dag.finalizedHead.blck.slot.epoch()
|
|
delay = wallTime - expectedSlot.start_beacon_time()
|
|
|
|
node.processingDelay = Opt.some(nanoseconds(delay.nanoseconds))
|
|
|
|
block:
|
|
logScope:
|
|
slot = shortLog(wallSlot)
|
|
epoch = shortLog(wallSlot.epoch)
|
|
sync = node.syncStatus(wallSlot)
|
|
peers = len(node.network.peerPool)
|
|
head = shortLog(node.dag.head)
|
|
finalized = shortLog(getStateField(
|
|
node.dag.headState, finalized_checkpoint))
|
|
delay = shortLog(delay)
|
|
let nextConsensusForkDescription = node.formatNextConsensusFork()
|
|
if nextConsensusForkDescription.isNone:
|
|
info "Slot start"
|
|
else:
|
|
info "Slot start", nextFork = nextConsensusForkDescription.get
|
|
|
|
# Check before any re-scheduling of onSlotStart()
|
|
if checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch):
|
|
quit(0)
|
|
|
|
when defined(windows):
|
|
if node.config.runAsService:
|
|
reportServiceStatusSuccess()
|
|
|
|
beacon_slot.set wallSlot.toGaugeValue
|
|
beacon_current_epoch.set wallSlot.epoch.toGaugeValue
|
|
|
|
# both non-negative, so difference can't overflow or underflow int64
|
|
finalization_delay.set(
|
|
wallSlot.epoch.toGaugeValue - finalizedEpoch.toGaugeValue)
|
|
|
|
if node.config.strictVerification:
|
|
verifyFinalization(node, wallSlot)
|
|
|
|
node.consensusManager[].updateHead(wallSlot)
|
|
|
|
await node.handleValidatorDuties(lastSlot, wallSlot)
|
|
|
|
await onSlotEnd(node, wallSlot)
|
|
|
|
# https://github.com/ethereum/builder-specs/blob/v0.4.0/specs/bellatrix/validator.md#registration-dissemination
|
|
# This specification suggests validators re-submit to builder software every
|
|
# `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION` epochs.
|
|
if wallSlot.is_epoch and
|
|
wallSlot.epoch mod EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION == 0:
|
|
asyncSpawn node.registerValidators(wallSlot.epoch)
|
|
|
|
return false
|
|
|
|
proc onSecond(node: BeaconNode, time: Moment) =
|
|
# Nim GC metrics (for the main thread)
|
|
updateThreadMetrics()
|
|
|
|
if node.config.stopAtSyncedEpoch != 0 and
|
|
node.dag.head.slot.epoch >= node.config.stopAtSyncedEpoch:
|
|
notice "Shutting down after having reached the target synced epoch"
|
|
bnStatus = BeaconNodeStatus.Stopping
|
|
|
|
proc runOnSecondLoop(node: BeaconNode) {.async.} =
|
|
const
|
|
sleepTime = chronos.seconds(1)
|
|
nanosecondsIn1s = float(sleepTime.nanoseconds)
|
|
while true:
|
|
let start = chronos.now(chronos.Moment)
|
|
await chronos.sleepAsync(sleepTime)
|
|
let afterSleep = chronos.now(chronos.Moment)
|
|
let sleepTime = afterSleep - start
|
|
node.onSecond(start)
|
|
let finished = chronos.now(chronos.Moment)
|
|
let processingTime = finished - afterSleep
|
|
ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s)
|
|
trace "onSecond task completed", sleepTime, processingTime
|
|
|
|
func connectedPeersCount(node: BeaconNode): int =
|
|
len(node.network.peerPool)
|
|
|
|
proc installRestHandlers(restServer: RestServerRef, node: BeaconNode) =
|
|
restServer.router.installBeaconApiHandlers(node)
|
|
restServer.router.installBuilderApiHandlers(node)
|
|
restServer.router.installConfigApiHandlers(node)
|
|
restServer.router.installDebugApiHandlers(node)
|
|
restServer.router.installEventApiHandlers(node)
|
|
restServer.router.installNimbusApiHandlers(node)
|
|
restServer.router.installNodeApiHandlers(node)
|
|
restServer.router.installValidatorApiHandlers(node)
|
|
if node.dag.lcDataStore.serve:
|
|
restServer.router.installLightClientApiHandlers(node)
|
|
|
|
from ./spec/datatypes/capella import SignedBeaconBlock
|
|
|
|
proc installMessageValidators(node: BeaconNode) =
|
|
# These validators stay around the whole time, regardless of which specific
|
|
# subnets are subscribed to during any given epoch.
|
|
let forkDigests = node.dag.forkDigests
|
|
|
|
for fork in ConsensusFork:
|
|
withConsensusFork(fork):
|
|
let digest = forkDigests[].atConsensusFork(consensusFork)
|
|
|
|
# beacon_block
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/p2p-interface.md#beacon_block
|
|
node.network.addValidator(
|
|
getBeaconBlocksTopic(digest), proc (
|
|
signedBlock: consensusFork.SignedBeaconBlock
|
|
): ValidationResult =
|
|
if node.shouldSyncOptimistically(node.currentSlot):
|
|
toValidationResult(
|
|
node.optimisticProcessor.processSignedBeaconBlock(
|
|
signedBlock))
|
|
else:
|
|
toValidationResult(
|
|
node.processor[].processSignedBeaconBlock(
|
|
MsgSource.gossip, signedBlock)))
|
|
|
|
# beacon_attestation_{subnet_id}
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
|
|
when consensusFork >= ConsensusFork.Electra:
|
|
for it in SubnetId:
|
|
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
|
let subnet_id = it
|
|
node.network.addAsyncValidator(
|
|
getAttestationTopic(digest, subnet_id), proc (
|
|
attestation: electra.Attestation
|
|
): Future[ValidationResult] {.
|
|
async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processAttestation(
|
|
MsgSource.gossip, attestation, subnet_id,
|
|
checkSignature = true, checkValidator = false)))
|
|
else:
|
|
for it in SubnetId:
|
|
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
|
let subnet_id = it
|
|
node.network.addAsyncValidator(
|
|
getAttestationTopic(digest, subnet_id), proc (
|
|
attestation: phase0.Attestation
|
|
): Future[ValidationResult] {.
|
|
async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processAttestation(
|
|
MsgSource.gossip, attestation, subnet_id,
|
|
checkSignature = true, checkValidator = false)))
|
|
|
|
# beacon_aggregate_and_proof
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof
|
|
when consensusFork >= ConsensusFork.Electra:
|
|
node.network.addAsyncValidator(
|
|
getAggregateAndProofsTopic(digest), proc (
|
|
signedAggregateAndProof: electra.SignedAggregateAndProof
|
|
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processSignedAggregateAndProof(
|
|
MsgSource.gossip, signedAggregateAndProof)))
|
|
else:
|
|
node.network.addAsyncValidator(
|
|
getAggregateAndProofsTopic(digest), proc (
|
|
signedAggregateAndProof: phase0.SignedAggregateAndProof
|
|
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processSignedAggregateAndProof(
|
|
MsgSource.gossip, signedAggregateAndProof)))
|
|
|
|
# attester_slashing
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attester_slashing
|
|
node.network.addValidator(
|
|
getAttesterSlashingsTopic(digest), proc (
|
|
attesterSlashing: phase0.AttesterSlashing
|
|
): ValidationResult =
|
|
toValidationResult(
|
|
node.processor[].processAttesterSlashing(
|
|
MsgSource.gossip, attesterSlashing)))
|
|
|
|
# proposer_slashing
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#proposer_slashing
|
|
node.network.addValidator(
|
|
getProposerSlashingsTopic(digest), proc (
|
|
proposerSlashing: ProposerSlashing
|
|
): ValidationResult =
|
|
toValidationResult(
|
|
node.processor[].processProposerSlashing(
|
|
MsgSource.gossip, proposerSlashing)))
|
|
|
|
# voluntary_exit
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#voluntary_exit
|
|
node.network.addValidator(
|
|
getVoluntaryExitsTopic(digest), proc (
|
|
signedVoluntaryExit: SignedVoluntaryExit
|
|
): ValidationResult =
|
|
toValidationResult(
|
|
node.processor[].processSignedVoluntaryExit(
|
|
MsgSource.gossip, signedVoluntaryExit)))
|
|
|
|
when consensusFork >= ConsensusFork.Altair:
|
|
# sync_committee_{subnet_id}
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_subnet_id
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
|
let idx = subcommitteeIdx
|
|
node.network.addAsyncValidator(
|
|
getSyncCommitteeTopic(digest, idx), proc (
|
|
msg: SyncCommitteeMessage
|
|
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processSyncCommitteeMessage(
|
|
MsgSource.gossip, msg, idx)))
|
|
|
|
# sync_committee_contribution_and_proof
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
|
|
node.network.addAsyncValidator(
|
|
getSyncCommitteeContributionAndProofTopic(digest), proc (
|
|
msg: SignedContributionAndProof
|
|
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processSignedContributionAndProof(
|
|
MsgSource.gossip, msg)))
|
|
|
|
when consensusFork >= ConsensusFork.Capella:
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.2/specs/capella/p2p-interface.md#bls_to_execution_change
|
|
node.network.addAsyncValidator(
|
|
getBlsToExecutionChangeTopic(digest), proc (
|
|
msg: SignedBLSToExecutionChange
|
|
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
|
|
return toValidationResult(
|
|
await node.processor.processBlsToExecutionChange(
|
|
MsgSource.gossip, msg)))
|
|
|
|
when consensusFork >= ConsensusFork.Deneb:
|
|
# blob_sidecar_{subnet_id}
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id
|
|
for it in BlobId:
|
|
closureScope: # Needed for inner `proc`; don't lift it out of loop.
|
|
let subnet_id = it
|
|
node.network.addValidator(
|
|
getBlobSidecarTopic(digest, subnet_id), proc (
|
|
blobSidecar: deneb.BlobSidecar
|
|
): ValidationResult =
|
|
toValidationResult(
|
|
node.processor[].processBlobSidecar(
|
|
MsgSource.gossip, blobSidecar, subnet_id)))
|
|
|
|
node.installLightClientMessageValidators()
|
|
|
|
proc stop(node: BeaconNode) =
|
|
bnStatus = BeaconNodeStatus.Stopping
|
|
notice "Graceful shutdown"
|
|
if not node.config.inProcessValidators:
|
|
try:
|
|
node.vcProcess.close()
|
|
except Exception as exc:
|
|
warn "Couldn't close vc process", msg = exc.msg
|
|
try:
|
|
waitFor node.network.stop()
|
|
except CatchableError as exc:
|
|
warn "Couldn't stop network", msg = exc.msg
|
|
|
|
node.attachedValidators[].slashingProtection.close()
|
|
node.attachedValidators[].close()
|
|
node.db.close()
|
|
notice "Databases closed"
|
|
|
|
proc startBackfillTask(node: BeaconNode) {.async.} =
|
|
while node.dag.needsBackfill:
|
|
if not node.syncManager.inProgress:
|
|
# Only start the backfiller if it's needed _and_ head sync has completed -
|
|
# if we lose sync after having synced head, we could stop the backfilller,
|
|
# but this should be a fringe case - might as well keep the logic simple for
|
|
# now
|
|
node.backfiller.start()
|
|
return
|
|
|
|
await sleepAsync(chronos.seconds(2))
|
|
|
|
proc run(node: BeaconNode) {.raises: [CatchableError].} =
|
|
bnStatus = BeaconNodeStatus.Running
|
|
|
|
if not isNil(node.restServer):
|
|
node.restServer.installRestHandlers(node)
|
|
node.restServer.start()
|
|
|
|
if not isNil(node.keymanagerServer):
|
|
doAssert not isNil(node.keymanagerHost)
|
|
node.keymanagerServer.router.installKeymanagerHandlers(node.keymanagerHost[])
|
|
if node.keymanagerServer != node.restServer:
|
|
node.keymanagerServer.start()
|
|
|
|
let
|
|
wallTime = node.beaconClock.now()
|
|
wallSlot = wallTime.slotOrZero()
|
|
|
|
node.startLightClient()
|
|
node.requestManager.start()
|
|
node.syncManager.start()
|
|
|
|
if node.dag.needsBackfill(): asyncSpawn node.startBackfillTask()
|
|
|
|
waitFor node.updateGossipStatus(wallSlot)
|
|
|
|
for web3signerUrl in node.config.web3SignerUrls:
|
|
# TODO
|
|
# The current strategy polls all remote signers independently
|
|
# from each other which may lead to some race conditions of
|
|
# validators are migrated from one signer to another
|
|
# (because the updates to our validator pool are not atomic).
|
|
# Consider using different strategies that would detect such
|
|
# race conditions.
|
|
asyncSpawn node.pollForDynamicValidators(
|
|
web3signerUrl, node.config.web3signerUpdateInterval)
|
|
|
|
asyncSpawn runSlotLoop(node, wallTime, onSlotStart)
|
|
asyncSpawn runOnSecondLoop(node)
|
|
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
|
|
asyncSpawn runKeystoreCachePruningLoop(node.keystoreCache)
|
|
|
|
# main event loop
|
|
while bnStatus == BeaconNodeStatus.Running:
|
|
poll() # if poll fails, the network is broken
|
|
|
|
# time to say goodbye
|
|
node.stop()
|
|
|
|
var gPidFile: string
|
|
proc createPidFile(filename: string) {.raises: [IOError].} =
|
|
writeFile filename, $os.getCurrentProcessId()
|
|
gPidFile = filename
|
|
addQuitProc proc {.noconv.} = discard io2.removeFile(gPidFile)
|
|
|
|
proc initializeNetworking(node: BeaconNode) {.async.} =
|
|
node.installMessageValidators()
|
|
|
|
info "Listening to incoming network requests"
|
|
await node.network.startListening()
|
|
|
|
let addressFile = node.config.dataDir / "beacon_node.enr"
|
|
writeFile(addressFile, node.network.announcedENR.toURI)
|
|
|
|
await node.network.start()
|
|
|
|
proc start*(node: BeaconNode) {.raises: [CatchableError].} =
|
|
let
|
|
head = node.dag.head
|
|
finalizedHead = node.dag.finalizedHead
|
|
genesisTime = node.beaconClock.fromNow(start_beacon_time(Slot 0))
|
|
|
|
notice "Starting beacon node",
|
|
version = fullVersionStr,
|
|
nimVersion = NimVersion,
|
|
enr = node.network.announcedENR.toURI,
|
|
peerId = $node.network.switch.peerInfo.peerId,
|
|
timeSinceFinalization =
|
|
node.beaconClock.now() - finalizedHead.slot.start_beacon_time(),
|
|
head = shortLog(head),
|
|
justified = shortLog(getStateField(
|
|
node.dag.headState, current_justified_checkpoint)),
|
|
finalized = shortLog(getStateField(
|
|
node.dag.headState, finalized_checkpoint)),
|
|
finalizedHead = shortLog(finalizedHead),
|
|
SLOTS_PER_EPOCH,
|
|
SECONDS_PER_SLOT,
|
|
SPEC_VERSION,
|
|
dataDir = node.config.dataDir.string,
|
|
validators = node.attachedValidators[].count
|
|
|
|
if genesisTime.inFuture:
|
|
notice "Waiting for genesis", genesisIn = genesisTime.offset
|
|
|
|
waitFor node.initializeNetworking()
|
|
|
|
node.elManager.start()
|
|
node.run()
|
|
|
|
func formatGwei(amount: Gwei): string =
|
|
# TODO This is implemented in a quite a silly way.
|
|
# Better routines for formatting decimal numbers
|
|
# should exists somewhere else.
|
|
let
|
|
eth = distinctBase(amount) div 1000000000
|
|
remainder = distinctBase(amount) mod 1000000000
|
|
|
|
result = $eth
|
|
if remainder != 0:
|
|
result.add '.'
|
|
let remainderStr = $remainder
|
|
for i in remainderStr.len ..< 9:
|
|
result.add '0'
|
|
result.add remainderStr
|
|
while result[^1] == '0':
|
|
result.setLen(result.len - 1)
|
|
|
|
when not defined(windows):
|
|
proc initStatusBar(node: BeaconNode) {.raises: [ValueError].} =
|
|
if not isatty(stdout): return
|
|
if not node.config.statusBarEnabled: return
|
|
|
|
try:
|
|
enableTrueColors()
|
|
except Exception as exc: # TODO Exception
|
|
error "Couldn't enable colors", err = exc.msg
|
|
|
|
proc dataResolver(expr: string): string {.raises: [].} =
|
|
template justified: untyped = node.dag.head.atEpochStart(
|
|
getStateField(
|
|
node.dag.headState, current_justified_checkpoint).epoch)
|
|
# TODO:
|
|
# We should introduce a general API for resolving dot expressions
|
|
# such as `db.latest_block.slot` or `metrics.connected_peers`.
|
|
# Such an API can be shared between the RPC back-end, CLI tools
|
|
# such as ncli, a potential GraphQL back-end and so on.
|
|
# The status bar feature would allow the user to specify an
|
|
# arbitrary expression that is resolvable through this API.
|
|
case expr.toLowerAscii
|
|
of "version":
|
|
versionAsStr
|
|
|
|
of "full_version":
|
|
fullVersionStr
|
|
|
|
of "connected_peers":
|
|
$(node.connectedPeersCount)
|
|
|
|
of "head_root":
|
|
shortLog(node.dag.head.root)
|
|
of "head_epoch":
|
|
$(node.dag.head.slot.epoch)
|
|
of "head_epoch_slot":
|
|
$(node.dag.head.slot.since_epoch_start)
|
|
of "head_slot":
|
|
$(node.dag.head.slot)
|
|
|
|
of "justifed_root":
|
|
shortLog(justified.blck.root)
|
|
of "justifed_epoch":
|
|
$(justified.slot.epoch)
|
|
of "justifed_epoch_slot":
|
|
$(justified.slot.since_epoch_start)
|
|
of "justifed_slot":
|
|
$(justified.slot)
|
|
|
|
of "finalized_root":
|
|
shortLog(node.dag.finalizedHead.blck.root)
|
|
of "finalized_epoch":
|
|
$(node.dag.finalizedHead.slot.epoch)
|
|
of "finalized_epoch_slot":
|
|
$(node.dag.finalizedHead.slot.since_epoch_start)
|
|
of "finalized_slot":
|
|
$(node.dag.finalizedHead.slot)
|
|
|
|
of "epoch":
|
|
$node.currentSlot.epoch
|
|
|
|
of "epoch_slot":
|
|
$(node.currentSlot.since_epoch_start)
|
|
|
|
of "slot":
|
|
$node.currentSlot
|
|
|
|
of "slots_per_epoch":
|
|
$SLOTS_PER_EPOCH
|
|
|
|
of "slot_trailing_digits":
|
|
var slotStr = $node.currentSlot
|
|
if slotStr.len > 3: slotStr = slotStr[^3..^1]
|
|
slotStr
|
|
|
|
of "attached_validators_balance":
|
|
formatGwei(node.attachedValidatorBalanceTotal)
|
|
|
|
of "next_consensus_fork":
|
|
let nextConsensusForkDescription =
|
|
node.formatNextConsensusFork(withVanityArt = true)
|
|
if nextConsensusForkDescription.isNone:
|
|
""
|
|
else:
|
|
" (scheduled " & nextConsensusForkDescription.get & ")"
|
|
|
|
of "sync_status":
|
|
node.syncStatus(node.currentSlot)
|
|
else:
|
|
# We ignore typos for now and just render the expression
|
|
# as it was written. TODO: come up with a good way to show
|
|
# an error message to the user.
|
|
"$" & expr
|
|
|
|
var statusBar = StatusBarView.init(
|
|
node.config.statusBarContents,
|
|
dataResolver)
|
|
|
|
when compiles(defaultChroniclesStream.outputs[0].writer):
|
|
let tmp = defaultChroniclesStream.outputs[0].writer
|
|
|
|
defaultChroniclesStream.outputs[0].writer =
|
|
proc (logLevel: LogLevel, msg: LogOutputStr) {.raises: [].} =
|
|
try:
|
|
# p.hidePrompt
|
|
erase statusBar
|
|
# p.writeLine msg
|
|
tmp(logLevel, msg)
|
|
render statusBar
|
|
# p.showPrompt
|
|
except Exception as e: # render raises Exception
|
|
logLoggingFailure(cstring(msg), e)
|
|
|
|
proc statusBarUpdatesPollingLoop() {.async.} =
|
|
try:
|
|
while true:
|
|
update statusBar
|
|
erase statusBar
|
|
render statusBar
|
|
await sleepAsync(chronos.seconds(1))
|
|
except CatchableError as exc:
|
|
warn "Failed to update status bar, no further updates", err = exc.msg
|
|
|
|
asyncSpawn statusBarUpdatesPollingLoop()
|
|
|
|
proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref HmacDrbgContext) {.raises: [CatchableError].} =
|
|
info "Launching beacon node",
|
|
version = fullVersionStr,
|
|
bls_backend = $BLS_BACKEND,
|
|
const_preset,
|
|
cmdParams = commandLineParams(),
|
|
config
|
|
|
|
template ignoreDeprecatedOption(option: untyped): untyped =
|
|
if config.option.isSome:
|
|
warn "Config option is deprecated",
|
|
option = config.option.get
|
|
ignoreDeprecatedOption requireEngineAPI
|
|
ignoreDeprecatedOption safeSlotsToImportOptimistically
|
|
ignoreDeprecatedOption terminalTotalDifficultyOverride
|
|
ignoreDeprecatedOption optimistic
|
|
ignoreDeprecatedOption validatorMonitorTotals
|
|
ignoreDeprecatedOption web3ForcePolling
|
|
|
|
createPidFile(config.dataDir.string / "beacon_node.pid")
|
|
|
|
config.createDumpDirs()
|
|
|
|
if config.metricsEnabled:
|
|
let metricsAddress = config.metricsAddress
|
|
notice "Starting metrics HTTP server",
|
|
url = "http://" & $metricsAddress & ":" & $config.metricsPort & "/metrics"
|
|
try:
|
|
startMetricsHttpServer($metricsAddress, config.metricsPort)
|
|
except CatchableError as exc:
|
|
raise exc
|
|
except Exception as exc:
|
|
raiseAssert exc.msg # TODO fix metrics
|
|
|
|
# Nim GC metrics (for the main thread) will be collected in onSecond(), but
|
|
# we disable piggy-backing on other metrics here.
|
|
setSystemMetricsAutomaticUpdate(false)
|
|
|
|
# There are no managed event loops in here, to do a graceful shutdown, but
|
|
# letting the default Ctrl+C handler exit is safe, since we only read from
|
|
# the db.
|
|
let metadata = config.loadEth2Network()
|
|
|
|
# Updating the config based on the metadata certainly is not beautiful but it
|
|
# works
|
|
for node in metadata.bootstrapNodes:
|
|
config.bootstrapNodes.add node
|
|
if config.forkChoiceVersion.isNone:
|
|
config.forkChoiceVersion = some(ForkChoiceVersion.Pr3431)
|
|
|
|
## Ctrl+C handling
|
|
proc controlCHandler() {.noconv.} =
|
|
when defined(windows):
|
|
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
|
try:
|
|
setupForeignThreadGc()
|
|
except Exception as exc: raiseAssert exc.msg # shouldn't happen
|
|
notice "Shutting down after having received SIGINT"
|
|
bnStatus = BeaconNodeStatus.Stopping
|
|
try:
|
|
setControlCHook(controlCHandler)
|
|
except Exception as exc: # TODO Exception
|
|
warn "Cannot set ctrl-c handler", msg = exc.msg
|
|
|
|
# equivalent SIGTERM handler
|
|
when defined(posix):
|
|
proc SIGTERMHandler(signal: cint) {.noconv.} =
|
|
notice "Shutting down after having received SIGTERM"
|
|
bnStatus = BeaconNodeStatus.Stopping
|
|
c_signal(ansi_c.SIGTERM, SIGTERMHandler)
|
|
|
|
if metadata.cfg.DENEB_FORK_EPOCH != FAR_FUTURE_EPOCH:
|
|
let res =
|
|
if config.trustedSetupFile.isNone:
|
|
conf.loadKzgTrustedSetup()
|
|
else:
|
|
conf.loadKzgTrustedSetup(config.trustedSetupFile.get)
|
|
if res.isErr():
|
|
raiseAssert res.error()
|
|
|
|
let node = waitFor BeaconNode.init(rng, config, metadata)
|
|
|
|
if bnStatus == BeaconNodeStatus.Stopping:
|
|
return
|
|
|
|
when not defined(windows):
|
|
# This status bar can lock a Windows terminal emulator, blocking the whole
|
|
# event loop (seen on Windows 10, with a default MSYS2 terminal).
|
|
initStatusBar(node)
|
|
|
|
if node.nickname != "":
|
|
dynamicLogScope(node = node.nickname): node.start()
|
|
else:
|
|
node.start()
|
|
|
|
proc doRecord(config: BeaconNodeConf, rng: var HmacDrbgContext) {.
|
|
raises: [CatchableError].} =
|
|
case config.recordCmd:
|
|
of RecordCmd.create:
|
|
let netKeys = getPersistentNetKeys(rng, config)
|
|
|
|
var fieldPairs: seq[FieldPair]
|
|
for field in config.fields:
|
|
let fieldPair = field.split(":")
|
|
if fieldPair.len > 1:
|
|
fieldPairs.add(toFieldPair(fieldPair[0], hexToSeqByte(fieldPair[1])))
|
|
else:
|
|
fatal "Invalid field pair"
|
|
quit QuitFailure
|
|
|
|
let record = enr.Record.init(
|
|
config.seqNumber,
|
|
netKeys.seckey.asEthKey,
|
|
some(config.ipExt),
|
|
some(config.tcpPortExt),
|
|
some(config.udpPortExt),
|
|
fieldPairs).expect("Record within size limits")
|
|
|
|
echo record.toURI()
|
|
|
|
of RecordCmd.print:
|
|
echo $config.recordPrint
|
|
|
|
proc doWeb3Cmd(config: BeaconNodeConf, rng: var HmacDrbgContext)
|
|
{.raises: [CatchableError].} =
|
|
case config.web3Cmd:
|
|
of Web3Cmd.test:
|
|
let metadata = config.loadEth2Network()
|
|
|
|
waitFor testWeb3Provider(config.web3TestUrl,
|
|
metadata.cfg.DEPOSIT_CONTRACT_ADDRESS,
|
|
rng.loadJwtSecret(config, allowCreate = true))
|
|
|
|
proc doSlashingExport(conf: BeaconNodeConf) {.raises: [IOError].}=
|
|
let
|
|
dir = conf.validatorsDir()
|
|
filetrunc = SlashingDbName
|
|
# TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312
|
|
let db = SlashingProtectionDB.loadUnchecked(dir, filetrunc, readOnly = false)
|
|
|
|
let interchange = conf.exportedInterchangeFile.string
|
|
db.exportSlashingInterchange(interchange, conf.exportedValidators)
|
|
echo "Export finished: '", dir/filetrunc & ".sqlite3" , "' into '", interchange, "'"
|
|
|
|
proc doSlashingImport(conf: BeaconNodeConf) {.raises: [SerializationError, IOError].} =
|
|
let
|
|
dir = conf.validatorsDir()
|
|
filetrunc = SlashingDbName
|
|
# TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312
|
|
|
|
let interchange = conf.importedInterchangeFile.string
|
|
|
|
var spdir: SPDIR
|
|
try:
|
|
spdir = Json.loadFile(interchange, SPDIR,
|
|
requireAllFields = true)
|
|
except SerializationError as err:
|
|
writeStackTrace()
|
|
stderr.write $Json & " load issue for file \"", interchange, "\"\n"
|
|
stderr.write err.formatMsg(interchange), "\n"
|
|
quit 1
|
|
|
|
# Open DB and handle migration from v1 to v2 if needed
|
|
let db = SlashingProtectionDB.init(
|
|
genesis_validators_root = Eth2Digest spdir.metadata.genesis_validators_root,
|
|
basePath = dir,
|
|
dbname = filetrunc,
|
|
modes = {kCompleteArchive}
|
|
)
|
|
|
|
# Now import the slashing interchange file
|
|
# Failures mode:
|
|
# - siError can only happen with invalid genesis_validators_root which would be caught above
|
|
# - siPartial can happen for invalid public keys, slashable blocks, slashable votes
|
|
let status = db.inclSPDIR(spdir)
|
|
doAssert status in {siSuccess, siPartial}
|
|
|
|
echo "Import finished: '", interchange, "' into '", dir/filetrunc & ".sqlite3", "'"
|
|
|
|
proc doSlashingInterchange(conf: BeaconNodeConf) {.raises: [CatchableError].} =
|
|
case conf.slashingdbCmd
|
|
of SlashProtCmd.`export`:
|
|
conf.doSlashingExport()
|
|
of SlashProtCmd.`import`:
|
|
conf.doSlashingImport()
|
|
|
|
proc handleStartUpCmd(config: var BeaconNodeConf) {.raises: [CatchableError].} =
|
|
# Single RNG instance for the application - will be seeded on construction
|
|
# and avoid using system resources (such as urandom) after that
|
|
let rng = HmacDrbgContext.new()
|
|
|
|
case config.cmd
|
|
of BNStartUpCmd.noCommand: doRunBeaconNode(config, rng)
|
|
of BNStartUpCmd.deposits: doDeposits(config, rng[])
|
|
of BNStartUpCmd.wallets: doWallets(config, rng[])
|
|
of BNStartUpCmd.record: doRecord(config, rng[])
|
|
of BNStartUpCmd.web3: doWeb3Cmd(config, rng[])
|
|
of BNStartUpCmd.slashingdb: doSlashingInterchange(config)
|
|
of BNStartUpCmd.trustedNodeSync:
|
|
if config.blockId.isSome():
|
|
error "--blockId option has been removed - use --state-id instead!"
|
|
quit 1
|
|
|
|
let
|
|
metadata = loadEth2Network(config)
|
|
db = BeaconChainDB.new(config.databaseDir, metadata.cfg, inMemory = false)
|
|
genesisState = waitFor fetchGenesisState(metadata)
|
|
waitFor db.doRunTrustedNodeSync(
|
|
metadata,
|
|
config.databaseDir,
|
|
config.eraDir,
|
|
config.trustedNodeUrl,
|
|
config.stateId,
|
|
config.lcTrustedBlockRoot,
|
|
config.backfillBlocks,
|
|
config.reindex,
|
|
config.downloadDepositSnapshot,
|
|
genesisState)
|
|
db.close()
|
|
|
|
{.pop.} # TODO moduletests exceptions
|
|
|
|
programMain:
|
|
var config = makeBannerAndConfig(clientId, copyrights, nimBanner,
|
|
SPEC_VERSION, [], BeaconNodeConf).valueOr:
|
|
stderr.write error
|
|
quit QuitFailure
|
|
|
|
if not(checkAndCreateDataDir(string(config.dataDir))):
|
|
# We are unable to access/create data folder or data folder's
|
|
# permissions are insecure.
|
|
quit QuitFailure
|
|
|
|
setupFileLimits()
|
|
setupLogging(config.logLevel, config.logStdout, config.logFile)
|
|
|
|
## This Ctrl+C handler exits the program in non-graceful way.
|
|
## It's responsible for handling Ctrl+C in sub-commands such
|
|
## as `wallets *` and `deposits *`. In a regular beacon node
|
|
## run, it will be overwritten later with a different handler
|
|
## performing a graceful exit.
|
|
proc exitImmediatelyOnCtrlC() {.noconv.} =
|
|
when defined(windows):
|
|
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
|
setupForeignThreadGc()
|
|
# in case a password prompt disabled echoing
|
|
resetStdin()
|
|
echo "" # If we interrupt during an interactive prompt, this
|
|
# will move the cursor to the next line
|
|
notice "Shutting down after having received SIGINT"
|
|
quit 0
|
|
setControlCHook(exitImmediatelyOnCtrlC)
|
|
# equivalent SIGTERM handler
|
|
when defined(posix):
|
|
proc exitImmediatelyOnSIGTERM(signal: cint) {.noconv.} =
|
|
notice "Shutting down after having received SIGTERM"
|
|
quit 0
|
|
c_signal(ansi_c.SIGTERM, exitImmediatelyOnSIGTERM)
|
|
|
|
when defined(windows):
|
|
if config.runAsService:
|
|
proc exitService() =
|
|
bnStatus = BeaconNodeStatus.Stopping
|
|
establishWindowsService(clientId, copyrights, nimBanner, SPEC_VERSION,
|
|
"nimbus_beacon_node", BeaconNodeConf,
|
|
handleStartUpCmd, exitService)
|
|
else:
|
|
handleStartUpCmd(config)
|
|
else:
|
|
handleStartUpCmd(config)
|