nimbus-eth2/beacon_chain/nimbus_beacon_node.nim

2151 lines
81 KiB
Nim
Raw Normal View History

# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
2019-10-02 12:38:14 +00:00
# Standard library
std/[math, os, osproc, random, sequtils, strformat, strutils,
tables, times, terminal],
2019-10-02 12:38:14 +00:00
# Nimble packages
stew/[objects, byteutils, endians2, io2], stew/shims/macros,
chronos, confutils, metrics, metrics/chronos_httpserver,
2021-03-17 18:46:45 +00:00
chronicles, bearssl, blscurve, presto,
json_serialization/std/[options, sets, net], serialization/errors,
taskpools,
eth/[keys, async_utils], eth/net/nat,
eth/db/[kvstore, kvstore_sqlite3],
2021-07-29 18:53:58 +00:00
eth/p2p/discoveryv5/[protocol, enr, random2],
2019-10-02 12:38:14 +00:00
# Local modules
"."/[
beacon_clock, beacon_chain_db, beacon_node_common, beacon_node_status,
beacon_node_types, conf, filepath, interop, nimbus_binary_common, statusbar,
version],
./networking/[eth2_discovery, eth2_network, network_metadata],
./gossip_processing/[eth2_processor, block_processor, consensus_manager],
./validators/[
validator_duties, validator_pool,
slashing_protection, keystore_management],
./sync/[sync_manager, sync_protocol, request_manager],
./rpc/[rest_api, rpc_api],
./spec/datatypes/[altair, phase0],
./spec/eth2_apis/rpc_beacon_client,
./spec/[
beaconstate, forks, helpers, network, weak_subjectivity, signatures,
validator],
./consensus_object_pools/[
blockchain_dag, block_quarantine, block_clearance, block_pools_types,
attestation_pool, sync_committee_msg_pool, exit_pool, spec_cache],
./eth1/eth1_monitor
from eth/common/eth_types import BlockHashOrNumber
when defined(posix):
import system/ansi_c
from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init
type
RpcServer* = RpcHttpServer
2020-06-05 15:08:50 +00:00
template init(T: type RpcHttpServer, ip: ValidIpAddress, port: Port): T =
newRpcHttpServer([initTAddress(ip, port)])
2021-03-17 18:46:45 +00:00
template init(T: type RestServerRef, ip: ValidIpAddress, port: Port): T =
let address = initTAddress(ip, port)
let serverFlags = {HttpServerFlags.QueryCommaSeparatedArray,
HttpServerFlags.NotifyDisconnect}
# We increase default timeout to help validator clients who poll our server
# at least once per slot (12.seconds).
let headersTimeout = seconds(2'i64 * int64(SECONDS_PER_SLOT))
let res = RestServerRef.new(getRouter(), address, serverFlags = serverFlags,
httpHeadersTimeout = headersTimeout)
2021-03-17 18:46:45 +00:00
if res.isErr():
notice "Rest server could not be started", address = $address,
reason = res.error()
nil
else:
res.get()
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
declareGauge beacon_slot, "Latest slot of the beacon chain state"
declareGauge beacon_current_epoch, "Current epoch"
# Finalization tracking
declareGauge finalization_delay,
"Epoch delay between scheduled epoch and finalized epoch"
declareGauge ticks_delay,
"How long does to take to run the onSecond loop"
declareGauge next_action_wait,
"Seconds until the next attestation will be sent"
declareGauge versionGauge, "Nimbus version info (as metric labels)", ["version", "commit"], name = "version"
versionGauge.set(1, labelValues=[fullVersionStr, gitRevision])
logScope: topics = "beacnde"
2021-05-19 06:38:13 +00:00
const SlashingDbName = "slashing_protection"
# changing this requires physical file rename as well or history is lost.
func getBeaconTimeFn(clock: BeaconClock): GetBeaconTimeFn =
return proc(): BeaconTime = clock.now()
proc init*(T: type BeaconNode,
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
cfg: RuntimeConfig,
rng: ref BrHmacDrbgContext,
config: BeaconNodeConf,
depositContractDeployedAt: BlockHashOrNumber,
eth1Network: Option[Eth1Network],
genesisStateContents: string,
genesisDepositsSnapshotContents: string): BeaconNode {.
raises: [Defect, CatchableError].} =
var taskpool: TaskpoolPtr
try:
if config.numThreads < 0:
fatal "The number of threads --numThreads cannot be negative."
quit 1
elif config.numThreads == 0:
taskpool = TaskpoolPtr.new()
else:
taskpool = TaskpoolPtr.new(numThreads = config.numThreads)
info "Threadpool started", numThreads = taskpool.numThreads
except Exception as exc:
raise newException(Defect, "Failure in taskpool initialization.")
let
eventBus = newAsyncEventBus()
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
var
genesisState, checkpointState: ref phase0.BeaconState
checkpointBlock: phase0.TrustedSignedBeaconBlock
proc onAttestationReceived(data: Attestation) =
eventBus.emit("attestation-received", data)
proc onAttestationSent(data: Attestation) =
eventBus.emit("attestation-sent", data)
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
eventBus.emit("voluntary-exit", data)
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
eventBus.emit("signed-beacon-block", data)
proc onHeadChanged(data: HeadChangeInfoObject) =
eventBus.emit("head-change", data)
proc onChainReorg(data: ReorgInfoObject) =
eventBus.emit("chain-reorg", data)
proc onFinalization(data: FinalizationInfoObject) =
eventBus.emit("finalization", data)
proc onSyncContribution(data: SignedContributionAndProof) =
eventBus.emit("sync-contribution-and-proof", data)
if config.finalizedCheckpointState.isSome:
let checkpointStatePath = config.finalizedCheckpointState.get.string
checkpointState = try:
newClone(SSZ.loadFile(checkpointStatePath, phase0.BeaconState))
except SerializationError as err:
fatal "Checkpoint state deserialization failed",
err = formatMsg(err, checkpointStatePath)
quit 1
except CatchableError as err:
fatal "Failed to read checkpoint state file", err = err.msg
quit 1
if config.finalizedCheckpointBlock.isNone:
if checkpointState.slot > 0:
fatal "Specifying a non-genesis --finalized-checkpoint-state requires specifying --finalized-checkpoint-block as well"
quit 1
else:
let checkpointBlockPath = config.finalizedCheckpointBlock.get.string
try:
# TODO Perform sanity checks like signature and slot verification at least
checkpointBlock = SSZ.loadFile(checkpointBlockPath, phase0.TrustedSignedBeaconBlock)
except SerializationError as err:
fatal "Invalid checkpoint block", err = err.formatMsg(checkpointBlockPath)
quit 1
except IOError as err:
fatal "Failed to load the checkpoint block", err = err.msg
quit 1
elif config.finalizedCheckpointBlock.isSome:
# TODO We can download the state from somewhere in the future relying
# on the trusted `state_root` appearing in the checkpoint block.
fatal "--finalized-checkpoint-block cannot be specified without --finalized-checkpoint-state"
quit 1
var eth1Monitor: Eth1Monitor
if not ChainDAGRef.isInitialized(db):
var
tailState: ref phase0.BeaconState
tailBlock: phase0.TrustedSignedBeaconBlock
if genesisStateContents.len == 0 and checkpointState == nil:
when hasGenesisDetection:
if genesisDepositsSnapshotContents != nil:
fatal "A deposits snapshot cannot be provided without also providing a matching beacon state snapshot"
quit 1
# This is a fresh start without a known genesis state
# (most likely, it hasn't arrived yet). We'll try to
# obtain a genesis through the Eth1 deposits monitor:
if config.web3Urls.len == 0:
fatal "Web3 URL not specified"
quit 1
# TODO Could move this to a separate "GenesisMonitor" process or task
# that would do only this - see Paul's proposal for this.
let eth1MonitorRes = waitFor Eth1Monitor.init(
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
cfg,
db,
config.web3Urls,
depositContractDeployedAt,
eth1Network)
if eth1MonitorRes.isErr:
fatal "Failed to start Eth1 monitor",
reason = eth1MonitorRes.error,
web3Urls = config.web3Urls,
depositContractDeployedAt
quit 1
else:
eth1Monitor = eth1MonitorRes.get
genesisState = waitFor eth1Monitor.waitGenesis()
if bnStatus == BeaconNodeStatus.Stopping:
return nil
tailState = genesisState
tailBlock = get_initial_beacon_block(genesisState[])
notice "Eth2 genesis state detected",
genesisTime = genesisState.genesisTime,
eth1Block = genesisState.eth1_data.block_hash,
totalDeposits = genesisState.eth1_data.deposit_count
else:
fatal "The beacon node must be compiled with -d:has_genesis_detection " &
"in order to support monitoring for genesis events"
quit 1
2019-10-25 14:53:31 +00:00
elif genesisStateContents.len == 0:
if checkpointState.slot == GENESIS_SLOT:
genesisState = checkpointState
tailState = checkpointState
tailBlock = get_initial_beacon_block(genesisState[])
else:
fatal "State checkpoints cannot be provided for a network without a known genesis state"
quit 1
else:
try:
genesisState = newClone(SSZ.decode(genesisStateContents, phase0.BeaconState))
except CatchableError as err:
raiseAssert "Invalid baked-in state: " & err.msg
if checkpointState != nil:
tailState = checkpointState
tailBlock = checkpointBlock
else:
tailState = genesisState
tailBlock = get_initial_beacon_block(genesisState[])
try:
ChainDAGRef.preInit(db, genesisState[], tailState[], tailBlock)
doAssert ChainDAGRef.isInitialized(db), "preInit should have initialized db"
except CatchableError as exc:
error "Failed to initialize database", err = exc.msg
quit 1
# Doesn't use std/random directly, but dependencies might
randomize(rng[].rand(high(int)))
info "Loading block dag from database", path = config.databaseDir
let
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
else: {}
dag = ChainDAGRef.init(cfg, db, chainDagFlags, onBlockAdded, onHeadChanged,
onChainReorg, onFinalization)
quarantine = QuarantineRef.init(rng, taskpool)
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
if genesisStateContents.len != 0:
let
networkGenesisValidatorsRoot =
extractGenesisValidatorRootFromSnapshop(genesisStateContents)
if networkGenesisValidatorsRoot != databaseGenesisValidatorsRoot:
fatal "The specified --data-dir contains data for a different network",
networkGenesisValidatorsRoot, databaseGenesisValidatorsRoot,
dataDir = config.dataDir
quit 1
let beaconClock = BeaconClock.init(
getStateField(dag.headState.data, genesis_time))
if config.weakSubjectivityCheckpoint.isSome:
let
currentSlot = beaconClock.now.slotOrZero
isCheckpointStale = not is_within_weak_subjectivity_period(
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
cfg,
currentSlot,
dag.headState.data,
config.weakSubjectivityCheckpoint.get)
if isCheckpointStale:
error "Weak subjectivity checkpoint is stale",
currentSlot,
checkpoint = config.weakSubjectivityCheckpoint.get,
headStateSlot = getStateField(dag.headState.data, slot)
quit 1
if checkpointState != nil:
let checkpointGenesisValidatorsRoot = checkpointState[].genesis_validators_root
if checkpointGenesisValidatorsRoot != databaseGenesisValidatorsRoot:
fatal "The specified checkpoint state is intended for a different network",
checkpointGenesisValidatorsRoot, databaseGenesisValidatorsRoot,
dataDir = config.dataDir
quit 1
dag.setTailState(checkpointState[], checkpointBlock)
if eth1Monitor.isNil and
config.web3Urls.len > 0 and
genesisDepositsSnapshotContents.len > 0:
let genesisDepositsSnapshot = SSZ.decode(genesisDepositsSnapshotContents,
DepositContractSnapshot)
eth1Monitor = Eth1Monitor.init(
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
cfg,
db,
config.web3Urls,
genesisDepositsSnapshot,
eth1Network)
let rpcServer = if config.rpcEnabled:
RpcServer.init(config.rpcAddress, config.rpcPort)
else:
nil
2021-03-17 18:46:45 +00:00
let restServer = if config.restEnabled:
RestServerRef.init(config.restAddress, config.restPort)
else:
nil
let
netKeys = getPersistentNetKeys(rng[], config)
nickname = if config.nodeName == "auto": shortForm(netKeys)
else: config.nodeName
getBeaconTime = beaconClock.getBeaconTimeFn()
network = createEth2Node(
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
getStateField(dag.headState.data, genesis_validators_root))
attestationPool = newClone(
AttestationPool.init(dag, quarantine, onAttestationReceived)
)
syncCommitteeMsgPool = newClone(
SyncCommitteeMsgPool.init(onSyncContribution)
)
exitPool = newClone(ExitPool.init(dag, onVoluntaryExitAdded))
case config.slashingDbKind
of SlashingDbKind.v2:
discard
of SlashingDbKind.v1:
error "Slashing DB v1 is no longer supported for writing"
quit 1
of SlashingDbKind.both:
warn "Slashing DB v1 deprecated, writing only v2"
info "Loading slashing protection database (v2)",
path = config.validatorsDir()
let
slashingProtectionDB =
SlashingProtectionDB.init(
getStateField(dag.headState.data, genesis_validators_root),
2021-05-19 06:38:13 +00:00
config.validatorsDir(), SlashingDbName)
validatorPool = newClone(ValidatorPool.init(slashingProtectionDB))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
consensusManager, getBeaconTime)
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
syncCommitteeMsgPool, quarantine, rng, getBeaconTime, taskpool)
var node = BeaconNode(
nickname: nickname,
graffitiBytes: if config.graffiti.isSome: config.graffiti.get.GraffitiBytes
else: defaultGraffitiBytes(),
network: network,
netKeys: netKeys,
db: db,
config: config,
dag: dag,
gossipState: GossipState.Disconnected,
quarantine: quarantine,
attestationPool: attestationPool,
syncCommitteeMsgPool: syncCommitteeMsgPool,
attachedValidators: validatorPool,
exitPool: exitPool,
eth1Monitor: eth1Monitor,
rpcServer: rpcServer,
2021-03-17 18:46:45 +00:00
restServer: restServer,
eventBus: eventBus,
processor: processor,
blockProcessor: blockProcessor,
consensusManager: consensusManager,
requestManager: RequestManager.init(network, blockProcessor),
beaconClock: beaconClock,
taskpool: taskpool,
onAttestationSent: onAttestationSent
)
# set topic validation routine
network.setValidTopics(
block:
var
topics = @[
getBeaconBlocksTopic(network.forkDigests.phase0),
getAttesterSlashingsTopic(network.forkDigests.phase0),
getProposerSlashingsTopic(network.forkDigests.phase0),
getVoluntaryExitsTopic(network.forkDigests.phase0),
getAggregateAndProofsTopic(network.forkDigests.phase0),
getBeaconBlocksTopic(dag.forkDigests.altair),
getAttesterSlashingsTopic(network.forkDigests.altair),
getProposerSlashingsTopic(network.forkDigests.altair),
getVoluntaryExitsTopic(network.forkDigests.altair),
getAggregateAndProofsTopic(network.forkDigests.altair),
]
if not config.verifyFinalization:
topics &= getSyncCommitteeContributionAndProofTopic(network.forkDigests.altair)
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id))
topics &= getAttestationTopic(network.forkDigests.altair, SubnetId(subnet_id))
if not config.verifyFinalization:
for subnet_id in 0'u64 ..< SYNC_COMMITTEE_SUBNET_COUNT:
topics &= getSyncCommitteeTopic(network.forkDigests.altair, SyncCommitteeIndex(subnet_id))
topics)
if node.config.inProcessValidators:
node.addLocalValidators()
else:
let cmd = getAppDir() / "nimbus_signing_process".addFileExt(ExeExt)
let args = [$node.config.validatorsDir, $node.config.secretsDir]
2020-09-22 08:27:46 +00:00
let workdir = io2.getCurrentDir().tryGet()
node.vcProcess = try: startProcess(cmd, workdir, args)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
node.addRemoteValidators()
network.initBeaconSync(dag, getBeaconTime)
2020-12-16 13:03:04 +00:00
node.updateValidatorMetrics()
2020-12-16 13:03:04 +00:00
return node
func verifyFinalization(node: BeaconNode, slot: Slot) =
# Epoch must be >= 4 to check finalization
const SETTLING_TIME_OFFSET = 1'u64
let epoch = slot.compute_epoch_at_slot()
# Don't static-assert this -- if this isn't called, don't require it
doAssert SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET
# Intentionally, loudly assert. Point is to fail visibly and unignorably
# during testing.
if epoch >= 4 and slot mod SLOTS_PER_EPOCH > SETTLING_TIME_OFFSET:
let finalizedEpoch =
node.dag.finalizedHead.slot.compute_epoch_at_slot()
# Finalization rule 234, that has the most lag slots among the cases, sets
# state.finalized_checkpoint = old_previous_justified_checkpoint.epoch + 3
# and then state.slot gets incremented, to increase the maximum offset, if
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch
func toBitArray(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] =
for subnetInfo in stabilitySubnets:
result[subnetInfo.subnet_id.int] = true
proc getAttachedValidators(node: BeaconNode):
Table[ValidatorIndex, AttachedValidator] =
for validatorIndex in 0 ..<
getStateField(node.dag.headState.data, validators).len:
let attachedValidator = node.getAttachedValidator(
getStateField(node.dag.headState.data, validators),
validatorIndex.ValidatorIndex)
if attachedValidator.isNil:
continue
result[validatorIndex.ValidatorIndex] = attachedValidator
proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
doAssert epoch >= 1
let
attachedValidators = node.getAttachedValidators()
validatorIndices = toIntSet(toSeq(attachedValidators.keys()))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#lookahead
# Only subscribe when this node should aggregate; libp2p broadcasting works
# on subnet topics regardless.
let epochRef = node.dag.getEpochRef(node.dag.head, epoch)
# Update proposals
node.attestationSubnets.proposingSlots[epoch mod 2] = 0
for i, proposer in epochRef.beacon_proposers:
if proposer.isSome and proposer.get() in attachedValidators:
node.attestationsubnets.proposingSlots[epoch mod 2] =
node.attestationsubnets.proposingSlots[epoch mod 2] or (1'u32 shl i)
# Update attestations
template isAnyCommitteeValidatorAggregating(
validatorIndices, committeeLen: untyped, slot: Slot): bool =
anyIt(
validatorIndices,
is_aggregator(
committeeLen,
await attachedValidators[it.ValidatorIndex].getSlotSig(
getStateField(node.dag.headState.data, fork),
getStateField(
node.dag.headState.data, genesis_validators_root), slot)))
node.attestationSubnets.lastCalculatedEpoch = epoch
node.attestationSubnets.attestingSlots[epoch mod 2] = 0
# The relevant bitmaps are 32 bits each.
static: doAssert SLOTS_PER_EPOCH <= 32
for (validatorIndices, committeeIndex, subnet_id, slot) in
get_committee_assignments(epochRef, epoch, validatorIndices):
doAssert compute_epoch_at_slot(slot) == epoch
# Each get_committee_assignments() call here is on the next epoch. At any
# given time, only care about two epochs, the current and next epoch. So,
# after it is done for an epoch, [aS[epoch mod 2], aS[1 - (epoch mod 2)]]
# provides, sequentially, the current and next epochs' slot schedules. If
# get_committee_assignments() has not been called for the next epoch yet,
# typically because there hasn't been a block in the current epoch, there
# isn't valid information in aS[1 - (epoch mod 2)], and only slots within
# the current epoch can be known. Usually, this is not a major issue, but
# when there hasn't been a block substantially through an epoch, it might
# prove misleading to claim that there aren't attestations known, when it
# only might be known either way for 3 more slots. However, it's also not
# as important to attest when blocks aren't flowing as only attestions in
# blocks garner rewards.
node.attestationSubnets.attestingSlots[epoch mod 2] =
node.attestationSubnets.attestingSlots[epoch mod 2] or
(1'u32 shl (slot mod SLOTS_PER_EPOCH))
if not isAnyCommitteeValidatorAggregating(
validatorIndices,
get_beacon_committee_len(epochRef, slot, committeeIndex), slot):
continue
node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])
if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]:
# The lead time here allows for the gossip mesh to stabilise well before
# attestations start flowing on the channel - the downside of a long lead
# time is that we waste bandwidth and CPU on traffic we're not strictly
# interested in - it could potentially be decreased, specially when peers
# are selected based on their stability subnet connectivity
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 6
node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
# Queue upcoming subscription potentially earlier
# SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch
# cycling mechanism timing buffers
min(
slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS),
node.attestationSubnets.subscribeSlot[subnet_id.uint64])
func updateStabilitySubnets(node: BeaconNode, slot: Slot): BitArray[ATTESTATION_SUBNET_COUNT] =
# Equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions.
let epoch = slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
for ss in node.attestationSubnets.stabilitySubnets.mitems():
if epoch >= ss.expiration:
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = epoch + node.network.getStabilitySubnetLength()
result[ss.subnet_id.int] = true
proc cycleAttestationSubnetsPerEpoch(
node: BeaconNode, wallSlot: Slot,
prevStabilitySubnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[BitArray[ATTESTATION_SUBNET_COUNT]] {.async.} =
# Per-epoch portion of subnet cycling: updating stability subnets and
# calculating future attestation subnets.
# Only know RANDAO mix, which determines shuffling seed, one epoch in
# advance. When getStateField(node.dag.headState, slot).epoch is
# ahead of wallSlot, the clock's just incorrect. If the slot's behind
# wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs
# ahead to compute the shuffling determining the beacon committees.
static: doAssert MIN_SEED_LOOKAHEAD == 1
if getStateField(node.dag.headState.data, slot).epoch != wallSlot.epoch:
debug "Requested attestation subnets too far in advance",
wallSlot,
stateSlot = getStateField(node.dag.headState.data, slot)
return prevStabilitySubnets
# This works so long as at least one block in an epoch provides a basis for
# calculating the shuffling for the next epoch. It will keep checking for a
# block, each slot, until a block comes in, even if the first few blocks in
# an epoch are missing. If a whole epoch without blocks occurs, it's not as
# important to attest regardless, as those upcoming blocks will hit maximum
# attestations quickly and any individual attestation's likelihood of being
# selected is low.
if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch:
await node.updateSubscriptionSchedule(wallSlot.epoch + 1)
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
let stabilitySubnets = node.updateStabilitySubnets(wallSlot)
if not node.config.subscribeAllSubnets and
stabilitySubnets != prevStabilitySubnets:
# In subscribeAllSubnets mode, this only gets set once, at initial subnet
# attestation handler creation, since they're all considered as stability
# subnets in that case.
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
return stabilitySubnets
func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets
let
prevStabilitySubnets =
node.attestationSubnets.stabilitySubnets.toBitArray()
stabilitySubnets =
await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
let prevAggregateSubnets = node.attestationSubnets.aggregateSubnets
for i in 0..<node.attestationSubnets.aggregateSubnets.len():
if node.attestationSubnets.aggregateSubnets[i]:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = false
else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = true
# Accounting specific to non-stability subnets
for i in (prevAggregateSubnets - node.attestationSubnets.aggregateSubnets).
oneIndices():
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
let
prevAllSubnets = prevAggregateSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.aggregateSubnets + stabilitySubnets
unsubscribeSubnets = prevAllSubnets - allSubnets
subscribeSubnets = allSubnets - prevAllSubnets
case node.gossipState
of GossipState.Disconnected:
discard
of GossipState.ConnectedToPhase0:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
of GossipState.InTransitionToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
of GossipState.ConnectedToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
debug "Attestation subnets",
wallSlot,
wallEpoch = wallSlot.epoch,
prevAggregateSubnets = subnetLog(prevAggregateSubnets),
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
prevStabilitySubnets = subnetLog(prevStabilitySubnets),
stabilitySubnets = subnetLog(stabilitySubnets),
subscribeSubnets = subnetLog(subscribeSubnets),
unsubscribeSubnets = subnetLog(unsubscribeSubnets)
proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
let
wallEpoch = node.beaconClock.now.slotOrZero.epoch
validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys()))
template mergeAggregateSubnets(epoch: Epoch) =
# TODO when https://github.com/nim-lang/Nim/issues/15972 and
# https://github.com/nim-lang/Nim/issues/16217 are fixed, in
# Nimbus's Nim, use (_, _, subnetIndex, slot).
let epochRef = node.dag.getEpochRef(node.dag.head, epoch)
for (_, ci, subnet_id, slot) in get_committee_assignments(
epochRef, epoch, validatorIndices):
result.withValue(subnet_id, v) do:
v[] = max(v[], slot + 1)
do:
result[subnet_id] = slot + 1
# Either wallEpoch is 0, in which case it might be pre-genesis, but we only
# care about the already-known first two epochs of attestations, or it's in
# epoch 0 for real, in which case both are also already known; or wallEpoch
# is greater than 0, in which case it's being called from onSlotStart which
# has enough state to calculate wallEpoch + {0,1}'s attestations.
mergeAggregateSubnets(wallEpoch)
mergeAggregateSubnets(wallEpoch + 1)
proc subscribeAttestationSubnetHandlers(node: BeaconNode,
forkDigest: ForkDigest) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# TODO:
# We might want to reuse the previous stability subnet if not expired when:
# - Restarting the node with a presistent netkey
# - When going from synced -> syncing -> synced state
if node.config.subscribeAllSubnets:
# In all-subnets mode, we create a stability subnet subscription for every
# subnet - this will be propagated in the attnets ENR entry
node.attestationSubnets.stabilitySubnets.setLen(ATTESTATION_SUBNET_COUNT)
for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = SubnetId(i)
ss.expiration = FAR_FUTURE_EPOCH
else:
let wallEpoch = node.beaconClock.now.slotOrZero.epoch
# TODO make length dynamic when validator-client-based validators join and leave
# In normal mode, there's one subnet subscription per validator, changing
# randomly over time
node.attestationSubnets.stabilitySubnets.setLen(
node.attachedValidators[].count)
for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = wallEpoch + node.network.getStabilitySubnetLength()
let stabilitySubnets =
node.attestationSubnets.stabilitySubnets.toBitArray()
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
let
aggregateSubnets = node.getInitialAggregateSubnets()
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if SubnetId(i) in aggregateSubnets:
node.attestationSubnets.aggregateSubnets[i] = true
node.attestationSubnets.unsubscribeSlot[i] =
try: aggregateSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in"
else:
node.attestationSubnets.aggregateSubnets[i] = false
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed",
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
stabilitySubnets = subnetLog(stabilitySubnets)
node.network.subscribeAttestationSubnets(
node.attestationSubnets.aggregateSubnets + stabilitySubnets,
forkDigest)
# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
const
blocksTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 1.1471603557060206,
firstMessageDeliveriesDecay: 0.9928302477768374,
firstMessageDeliveriesCap: 34.86870846001471,
meshMessageDeliveriesWeight: -458.31054878249114,
meshMessageDeliveriesDecay: 0.9716279515771061,
meshMessageDeliveriesThreshold: 0.6849191409056553,
meshMessageDeliveriesCap: 2.054757422716966,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -458.31054878249114 ,
meshFailurePenaltyDecay: 0.9716279515771061,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
aggregateTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 0.10764904539552399,
firstMessageDeliveriesDecay: 0.8659643233600653,
firstMessageDeliveriesCap: 371.5778421725158,
meshMessageDeliveriesWeight: -0.07538533073670682,
meshMessageDeliveriesDecay: 0.930572040929699,
meshMessageDeliveriesThreshold: 53.404248450179836,
meshMessageDeliveriesCap: 213.61699380071934,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -0.07538533073670682 ,
meshFailurePenaltyDecay: 0.930572040929699,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
basicParams = TopicParams.init()
static:
# compile time validation
blocksTopicParams.validateParameters().tryGet()
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.subscribeAttestationSubnetHandlers(forkDigest)
proc addPhase0MessageHandlers(node: BeaconNode) =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(forkDigest))
node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest))
node.network.unsubscribe(getAggregateAndProofsTopic(forkDigest))
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
node.network.unsubscribe(
getAttestationTopic(forkDigest, SubnetId(subnet_id)))
proc removePhase0MessageHandlers(node: BeaconNode) =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
node.addPhase0MessageHandlers(node.dag.forkDigests.altair)
var syncnets: BitArray[SYNC_COMMITTEE_SUBNET_COUNT]
# TODO: What are the best topic params for this?
for committeeIdx in allSyncCommittees():
closureScope:
let idx = committeeIdx
# TODO This should be done in dynamic way in trackSyncCommitteeTopics
node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), basicParams)
syncnets.setBit(idx.asInt)
node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), basicParams)
node.network.updateSyncnetsMetadata(syncnets)
proc removeAltairMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers(node.dag.forkDigests.altair)
for committeeIdx in allSyncCommittees():
closureScope:
let idx = committeeIdx
# TODO This should be done in dynamic way in trackSyncCommitteeTopics
node.network.unsubscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx))
node.network.unsubscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair))
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled
proc removeAllMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers()
node.removeAltairMessageHandlers()
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect
# potential duplicate validators, which can trigger slashing.
#
# Every missed attestation costs approximately 3*get_base_reward(), which
# can be up to around 10,000 Wei. Thus, skipping attestations isn't cheap
# and one should gauge the likelihood of this simultaneous launch to tune
# the epoch delay to one's perceived risk.
const duplicateValidatorEpochs = 2
node.processor.doppelgangerDetection.broadcastStartEpoch =
slot.epoch + duplicateValidatorEpochs
debug "Setting up doppelganger protection",
epoch = slot.epoch,
broadcastStartEpoch =
node.processor.doppelgangerDetection.broadcastStartEpoch
proc trackSyncCommitteeTopics*(node: BeaconNode) =
# TODO
discard
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, CatchableError].} =
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required
const
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
HYSTERESIS_BUFFER = 16
let
syncQueueLen = node.syncManager.syncQueueLen
targetGossipState =
# SyncManager forward sync by default runs until maxHeadAge slots, or one
# epoch range is achieved. This particular condition has a couple caveats
# including that under certain conditions, debtsCount appears to push len
# (here, syncQueueLen) to underflow-like values; and even when exactly at
# the expected walltime slot the queue isn't necessarily empty. Therefore
# TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that
# are left. Furthermore, even when 0 peers are being used, this won't get
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a
# networking interaction cannot happen under such circumstances.
if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
GossipState.Disconnected
elif slot.epoch + 1 < node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToPhase0
elif slot.epoch >= node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToAltair
else:
GossipState.InTransitionToAltair
if node.gossipState == GossipState.Disconnected and
targetGossipState != GossipState.Disconnected:
# We are synced, so we will connect
debug "Enabling topic subscriptions",
wallSlot = slot,
headSlot = node.dag.head.slot,
syncQueueLen
node.setupDoppelgangerDetection(slot)
block addRemoveHandlers:
case targetGossipState
of GossipState.Disconnected:
case node.gossipState:
of GossipState.Disconnected: break
else:
if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
# Filter out underflow from debtsCount; plausible queue lengths can't
# exceed wallslot, with safety margin.
syncQueueLen < 2 * slot.uint64:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = node.dag.head.slot,
syncQueueLen
node.removeAllMessageHandlers()
node.gossipState = GossipState.Disconnected
break
of GossipState.ConnectedToPhase0:
case node.gossipState:
of GossipState.ConnectedToPhase0: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
of GossipState.InTransitionToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
node.addPhase0MessageHandlers()
of GossipState.InTransitionToAltair:
case node.gossipState:
of GossipState.InTransitionToAltair: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.addPhase0MessageHandlers()
of GossipState.ConnectedToAltair:
case node.gossipState:
of GossipState.ConnectedToAltair: break
of GossipState.Disconnected:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.removePhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.InTransitionToAltair:
node.removePhase0MessageHandlers()
node.gossipState = targetGossipState
# Subscription or unsubscription might have occurred; recheck. Since Nimbus
# initially subscribes to all subnets, simply do not ever cycle attestation
# subnets and they'll all remain subscribed.
if node.getTopicSubscriptionEnabled and not node.config.subscribeAllSubnets:
# This exits early all but one call each epoch.
traceAsyncErrors node.cycleAttestationSubnets(slot)
func getNextValidatorAction(
actionSlotSource: auto, lastCalculatedEpoch: Epoch, slot: Slot): Slot =
# The relevant actions are in, depending on calculated bounds:
# [aS[epoch mod 2], aS[1 - (epoch mod 2)]]
# current epoch next epoch
let orderedActionSlots = [
actionSlotSource[ slot.epoch mod 2'u64],
actionSlotSource[1 - (slot.epoch mod 2'u64)]]
static: doAssert MIN_ATTESTATION_INCLUSION_DELAY == 1
# Cleverer ways exist, but a short loop is fine. O(n) vs O(log n) isn't that
# important when n is 32 or 64, with early exit on average no more than half
# through.
for i in [0'u64, 1'u64]:
let bitmapEpoch = slot.epoch + i
if bitmapEpoch > lastCalculatedEpoch:
return FAR_FUTURE_SLOT
for slotOffset in 0 ..< SLOTS_PER_EPOCH:
let nextActionSlot =
compute_start_slot_at_epoch(bitmapEpoch) + slotOffset
if ((orderedActionSlots[i] and (1'u32 shl slotOffset)) != 0) and
nextActionSlot > slot:
return nextActionSlot
FAR_FUTURE_SLOT
proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the
# next slot
if node.dag.needStateCachesAndForkChoicePruning():
if node.attachedValidators.validators.len > 0:
node.attachedValidators
.slashingProtection
# pruning is only done if the DB is set to pruning mode.
.pruneAfterFinalization(
node.dag.finalizedHead.slot.compute_epoch_at_slot()
)
# Delay part of pruning until latency critical duties are done.
# The other part of pruning, `pruneBlocksDAG`, is done eagerly.
# ----
# This is the last pruning to do as it clears the "needPruning" condition.
node.consensusManager[].pruneStateCachesAndForkChoice()
when declared(GC_fullCollect):
# The slots in the beacon node work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy
# garbage collection tasks when time is of essence in the middle of a slot -
# while this does not guarantee that we'll never collect during a slot, it
# makes sure that all the scratch space we used during slot tasks (logging,
# temporary buffers etc) gets recycled for the next slot that is likely to
# need similar amounts of memory.
GC_fullCollect()
# Checkpoint the database to clear the WAL file and make sure changes in
# the database are synced with the filesystem.
node.db.checkpoint()
node.syncCommitteeMsgPool[].pruneData(slot)
# -1 is a more useful output than 18446744073709551615 as an indicator of
# no future attestation/proposal known.
template displayInt64(x: Slot): int64 =
if x == high(uint64).Slot:
-1'i64
else:
toGaugeValue(x)
let
nextAttestationSlot = getNextValidatorAction(
node.attestationSubnets.attestingSlots,
node.attestationSubnets.lastCalculatedEpoch, slot)
nextProposalSlot = getNextValidatorAction(
node.attestationSubnets.proposingSlots,
node.attestationSubnets.lastCalculatedEpoch, slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(slot + 1),
head = shortLog(node.dag.head),
headEpoch = shortLog(node.dag.head.slot.compute_epoch_at_slot()),
finalizedHead = shortLog(node.dag.finalizedHead.blck),
finalizedEpoch =
shortLog(node.dag.finalizedHead.blck.slot.compute_epoch_at_slot()),
nextAttestationSlot = displayInt64(nextAttestationSlot),
nextProposalSlot = displayInt64(nextProposalSlot),
nextActionWait =
if nextAttestationSlot == FAR_FUTURE_SLOT:
"n/a"
else:
shortLog(nextActionWaitTime)
if nextAttestationSlot != FAR_FUTURE_SLOT:
next_action_wait.set(nextActionWaitTime.toFloatSeconds)
let epoch = slot.epoch
if epoch >= node.network.forkId.next_fork_epoch:
node.network.updateForkId(
node.dag.cfg.getENRForkID(epoch, node.dag.genesisValidatorsRoot))
node.updateGossipStatus(slot)
# When we're not behind schedule, we'll speculatively update the clearance
# state in anticipation of receiving the next block - we do it after logging
# slot end since the nextActionWaitTime can be short
let
advanceCutoff = node.beaconClock.fromNow(
slot.toBeaconTime(chronos.seconds(int(SECONDS_PER_SLOT - 1))))
if advanceCutoff.inFuture:
# We wait until there's only a second left before the next slot begins, then
# we advance the clearance state to the next slot - this gives us a high
# probability of being prepared for the block that will arrive and the
# epoch processing that follows
await sleepAsync(advanceCutoff.offset)
node.dag.advanceClearanceState()
proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
2020-06-26 13:51:20 +00:00
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
let
# The slot we should be at, according to the clock
wallSlot = wallTime.slotOrZero
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
finalizedEpoch =
node.dag.finalizedHead.blck.slot.compute_epoch_at_slot()
delay = wallTime - expectedSlot.toBeaconTime()
info "Slot start",
2019-08-15 16:01:55 +00:00
lastSlot = shortLog(lastSlot),
wallSlot = shortLog(wallSlot),
delay = shortLog(delay),
peers = len(node.network.peerPool),
head = shortLog(node.dag.head),
headEpoch = shortLog(node.dag.head.slot.compute_epoch_at_slot()),
finalized = shortLog(node.dag.finalizedHead.blck),
finalizedEpoch = shortLog(finalizedEpoch),
sync =
if node.syncManager.inProgress: node.syncManager.syncStatus
else: "synced"
# Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch)
beacon_slot.set wallSlot.toGaugeValue
beacon_current_epoch.set wallSlot.epoch.toGaugeValue
# both non-negative, so difference can't overflow or underflow int64
finalization_delay.set(
wallSlot.epoch.toGaugeValue - finalizedEpoch.toGaugeValue)
if node.config.verifyFinalization:
verifyFinalization(node, wallSlot)
node.consensusManager[].updateHead(wallSlot)
await node.handleValidatorDuties(lastSlot, wallSlot)
2019-08-16 11:16:56 +00:00
await onSlotEnd(node, wallSlot)
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine.checkMissing()
if missingBlocks.len > 0:
debug "Requesting detected missing blocks", blocks = shortLog(missingBlocks)
node.requestManager.fetchAncestorBlocks(missingBlocks)
proc onSecond(node: BeaconNode) =
## This procedure will be called once per second.
if not(node.syncManager.inProgress):
node.handleMissingBlocks()
proc runOnSecondLoop(node: BeaconNode) {.async.} =
let sleepTime = chronos.seconds(1)
const nanosecondsIn1s = float(chronos.seconds(1).nanoseconds)
while true:
let start = chronos.now(chronos.Moment)
await chronos.sleepAsync(sleepTime)
let afterSleep = chronos.now(chronos.Moment)
let sleepTime = afterSleep - start
node.onSecond()
let finished = chronos.now(chronos.Moment)
let processingTime = finished - afterSleep
ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s)
trace "onSecond task completed", sleepTime, processingTime
proc startSyncManager(node: BeaconNode) =
func getLocalHeadSlot(): Slot =
node.dag.head.slot
2020-04-20 14:59:18 +00:00
proc getLocalWallSlot(): Slot =
node.beaconClock.now.slotOrZero
2020-04-20 14:59:18 +00:00
func getFirstSlotAtFinalizedEpoch(): Slot =
node.dag.finalizedHead.slot
proc scoreCheck(peer: Peer): bool =
if peer.score < PeerScoreLowLimit:
false
else:
true
proc onDeletePeer(peer: Peer) =
if peer.connectionState notin {ConnectionState.Disconnecting,
ConnectionState.Disconnected}:
if peer.score < PeerScoreLowLimit:
debug "Peer was removed from PeerPool due to low score", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
score_high_limit = PeerScoreHighLimit
asyncSpawn(try: peer.disconnect(PeerScoreLow)
except Exception as exc: raiseAssert exc.msg) # Shouldn't actually happen!
else:
debug "Peer was removed from PeerPool", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
score_high_limit = PeerScoreHighLimit
asyncSpawn(try: peer.disconnect(FaultOrError)
except Exception as exc: raiseAssert exc.msg) # Shouldn't actually happen!
node.network.peerPool.setScoreCheck(scoreCheck)
node.network.peerPool.setOnDeletePeer(onDeletePeer)
node.syncManager = newSyncManager[Peer, PeerID](
2020-04-20 14:59:18 +00:00
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, node.blockProcessor, chunkSize = 32
2020-04-20 14:59:18 +00:00
)
node.syncManager.start()
func connectedPeersCount(node: BeaconNode): int =
len(node.network.peerPool)
proc installRpcHandlers(rpcServer: RpcServer, node: BeaconNode) {.
raises: [Defect, CatchableError].} =
rpcServer.installBeaconApiHandlers(node)
rpcServer.installConfigApiHandlers(node)
rpcServer.installDebugApiHandlers(node)
rpcServer.installEventApiHandlers(node)
rpcServer.installNimbusApiHandlers(node)
rpcServer.installNodeApiHandlers(node)
rpcServer.installValidatorApiHandlers(node)
2021-03-17 18:46:45 +00:00
proc installRestHandlers(restServer: RestServerRef, node: BeaconNode) =
restServer.router.installBeaconApiHandlers(node)
restServer.router.installConfigApiHandlers(node)
restServer.router.installDebugApiHandlers(node)
restServer.router.installEventApiHandlers(node)
restServer.router.installNimbusApiHandlers(node)
2021-03-17 18:46:45 +00:00
restServer.router.installNodeApiHandlers(node)
restServer.router.installValidatorApiHandlers(node)
2021-03-17 18:46:45 +00:00
proc installMessageValidators(node: BeaconNode) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# These validators stay around the whole time, regardless of which specific
# subnets are subscribed to during any given epoch.
# TODO altair-transition, well, without massive copy/pasting (extract to template or etc)
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let subnet_id = SubnetId(it)
node.network.addAsyncValidator(
getAttestationTopic(node.dag.forkDigests.phase0, subnet_id),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, subnet_id))
node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.dag.forkDigests.phase0),
proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] =
node.processor.aggregateValidator(signedAggregateAndProof))
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
node.processor[].blockValidator(signedBlock))
node.network.addValidator(
getAttesterSlashingsTopic(node.dag.forkDigests.phase0),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
node.processor[].attesterSlashingValidator(attesterSlashing))
node.network.addValidator(
getProposerSlashingsTopic(node.dag.forkDigests.phase0),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
node.processor[].proposerSlashingValidator(proposerSlashing))
node.network.addValidator(
getVoluntaryExitsTopic(node.dag.forkDigests.phase0),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
node.processor[].voluntaryExitValidator(signedVoluntaryExit))
# TODO copy/paste starts here; templatize whole thing
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let subnet_id = SubnetId(it)
node.network.addAsyncValidator(
getAttestationTopic(node.dag.forkDigests.altair, subnet_id),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, subnet_id))
node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.dag.forkDigests.altair),
proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] =
node.processor.aggregateValidator(signedAggregateAndProof))
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
node.processor[].blockValidator(signedBlock))
node.network.addValidator(
getAttesterSlashingsTopic(node.dag.forkDigests.altair),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
node.processor[].attesterSlashingValidator(attesterSlashing))
node.network.addValidator(
getProposerSlashingsTopic(node.dag.forkDigests.altair),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
node.processor[].proposerSlashingValidator(proposerSlashing))
node.network.addValidator(
getVoluntaryExitsTopic(node.dag.forkDigests.altair),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
node.processor[].voluntaryExitValidator(signedVoluntaryExit))
for committeeIdx in allSyncCommittees():
closureScope:
let idx = committeeIdx
node.network.addValidator(
getSyncCommitteeTopic(node.dag.forkDigests.altair, idx),
# This proc needs to be within closureScope; don't lift out of loop.
proc(msg: SyncCommitteeMessage): ValidationResult =
node.processor.syncCommitteeMsgValidator(msg, idx))
node.network.addValidator(
getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair),
proc(msg: SignedContributionAndProof): ValidationResult =
node.processor.syncCommitteeContributionValidator(msg))
proc stop*(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping
notice "Graceful shutdown"
if not node.config.inProcessValidators:
try:
node.vcProcess.close()
except Exception as exc:
warn "Couldn't close vc process", msg = exc.msg
try:
waitFor node.network.stop()
except CatchableError as exc:
warn "Couldn't stop network", msg = exc.msg
node.attachedValidators.slashingProtection.close()
node.db.close()
notice "Databases closed"
proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
if bnStatus == BeaconNodeStatus.Starting:
# it might have been set to "Stopping" with Ctrl+C
bnStatus = BeaconNodeStatus.Running
2021-03-17 18:46:45 +00:00
if not(isNil(node.rpcServer)):
node.rpcServer.installRpcHandlers(node)
node.rpcServer.start()
2021-03-17 18:46:45 +00:00
if not(isNil(node.restServer)):
node.restServer.installRestHandlers(node)
node.restServer.start()
node.installMessageValidators()
let startTime = node.beaconClock.now()
New validator client using REST API. (#2651) * Initial commit. * Exporting getConfig(). * Add beacon node checking procedures. * Post rebase fixes. * Use runSlotLoop() from nimbus_beacon_node. Fallback implementation. Fixes for ETH2 REST serialization. * Add beacon_clock.durationToNextSlot(). Move type declarations from beacon_rest_api to json_rest_serialization. Fix seq[ValidatorIndex] serialization. Refactor ValidatorPool and add some utility procedures. Create separate version of validator_client. * Post-rebase fixes. Remove CookedPubKey from validator_pool.nim. * Now we should be able to produce attestations and aggregate and proofs. But its not working yet. * Debugging attestation sending. * Add durationToNextAttestation. Optimize some debug logs. Fix aggregation_bits encoding. Bump chronos/presto. * Its alive. * Fixes for launch_local_testnet script. Bump chronos. * Switch client API to not use `/api` prefix. * Post-rebase adjustments. * Fix endpoint for publishBlock(). * Add CONFIG_NAME. Add more checks to ensure that beacon_node is compatible. * Add beacon committee subscription support to validator_client. * Fix stacktrace should be an array of strings. Fix committee subscriptions should not be `data` keyed. * Log duration to next block proposal. * Fix beacon_node_status import. * Use jsonMsgResponse() instead of jsonError(). * Fix graffityBytes usage. Remove unnecessary `await`. Adjust creation of SignedBlock instance. Remove legacy files. * Rework durationToNextSlot() and durationToNextEpoch() to use `fromNow`. * Fix race condition for block proposal and attestations for same slot. Fix local_testnet script to properly kill tasks on Windows. Bump chronos and nim-http-tools, to allow connections to infura.io (basic auth). * Catch services errors. Improve performance of local_testnet.sh script on Windows. Fix race condition when attestation producing. * Post-rebase fixes. * Bump chronos and presto. * Calculate block publishing delay. Fix pkill in one more place. * Add error handling and timeouts to firstSuccess() template. Add onceToAll() template. Add checkNodes() procedure. Refactor firstSuccess() template. Add error checking to api.nim calls. * Deprecated usage onceToAll() for better stability. Address comment and send attestations asap. * Avoid unnecessary loop when calculating minimal duration.
2021-07-13 11:15:07 +00:00
asyncSpawn runSlotLoop(node, startTime, onSlotStart)
asyncSpawn runOnSecondLoop(node)
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
node.requestManager.start()
node.startSyncManager()
node.updateGossipStatus(startTime.slotOrZero)
## Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as exc: raiseAssert exc.msg # shouldn't happen
notice "Shutting down after having received SIGINT"
bnStatus = BeaconNodeStatus.Stopping
try:
setControlCHook(controlCHandler)
except Exception as exc: # TODO Exception
warn "Cannot set ctrl-c handler", msg = exc.msg
# equivalent SIGTERM handler
when defined(posix):
proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM"
bnStatus = BeaconNodeStatus.Stopping
c_signal(SIGTERM, SIGTERMHandler)
# main event loop
while bnStatus == BeaconNodeStatus.Running:
poll() # if poll fails, the network is broken
2020-04-20 14:59:18 +00:00
# time to say goodbye
node.stop()
var gPidFile: string
proc createPidFile(filename: string) {.raises: [Defect, IOError].} =
2019-07-07 09:53:58 +00:00
writeFile filename, $os.getCurrentProcessId()
gPidFile = filename
addQuitProc proc {.noconv.} = discard io2.removeFile(gPidFile)
proc initializeNetworking(node: BeaconNode) {.async.} =
info "Listening to incoming network requests"
await node.network.startListening()
let addressFile = node.config.dataDir / "beacon_node.enr"
writeFile(addressFile, node.network.announcedENR.toURI)
await node.network.start()
func shouldWeStartWeb3(node: BeaconNode): bool =
(node.config.web3Mode == Web3Mode.enabled) or
(node.config.web3Mode == Web3Mode.auto and node.attachedValidators[].count > 0)
proc start(node: BeaconNode) {.raises: [Defect, CatchableError].} =
let
head = node.dag.head
finalizedHead = node.dag.finalizedHead
genesisTime = node.beaconClock.fromNow(toBeaconTime(Slot 0))
notice "Starting beacon node",
version = fullVersionStr,
enr = node.network.announcedENR.toURI,
peerId = $node.network.switch.peerInfo.peerId,
2019-08-16 11:16:56 +00:00
timeSinceFinalization =
node.beaconClock.now() - finalizedHead.slot.toBeaconTime(),
head = shortLog(head),
finalizedHead = shortLog(finalizedHead),
SLOTS_PER_EPOCH,
SECONDS_PER_SLOT,
SPEC_VERSION,
dataDir = node.config.dataDir.string,
validators = node.attachedValidators[].count
if genesisTime.inFuture:
notice "Waiting for genesis", genesisIn = genesisTime.offset
waitFor node.initializeNetworking()
# TODO this does not account for validators getting attached "later"
if node.eth1Monitor != nil and node.shouldWeStartWeb3:
node.eth1Monitor.start()
node.run()
2019-10-03 01:51:44 +00:00
func formatGwei(amount: uint64): string =
# TODO This is implemented in a quite a silly way.
# Better routines for formatting decimal numbers
# should exists somewhere else.
let
eth = amount div 1000000000
remainder = amount mod 1000000000
result = $eth
if remainder != 0:
result.add '.'
let remainderStr = $remainder
for i in remainderStr.len ..< 9:
result.add '0'
result.add remainderStr
2019-10-03 01:51:44 +00:00
while result[^1] == '0':
result.setLen(result.len - 1)
proc initStatusBar(node: BeaconNode) {.raises: [Defect, ValueError].} =
if not isatty(stdout): return
if not node.config.statusBarEnabled: return
try:
enableTrueColors()
except Exception as exc: # TODO Exception
error "Couldn't enable colors", err = exc.msg
proc dataResolver(expr: string): string {.raises: [Defect].} =
template justified: untyped = node.dag.head.atEpochStart(
getStateField(
node.dag.headState.data, current_justified_checkpoint).epoch)
# TODO:
# We should introduce a general API for resolving dot expressions
# such as `db.latest_block.slot` or `metrics.connected_peers`.
# Such an API can be shared between the RPC back-end, CLI tools
# such as ncli, a potential GraphQL back-end and so on.
# The status bar feature would allow the user to specify an
# arbitrary expression that is resolvable through this API.
case expr.toLowerAscii
of "connected_peers":
$(node.connectedPeersCount)
of "head_root":
shortLog(node.dag.head.root)
of "head_epoch":
$(node.dag.head.slot.epoch)
of "head_epoch_slot":
$(node.dag.head.slot mod SLOTS_PER_EPOCH)
of "head_slot":
$(node.dag.head.slot)
of "justifed_root":
shortLog(justified.blck.root)
of "justifed_epoch":
$(justified.slot.epoch)
of "justifed_epoch_slot":
$(justified.slot mod SLOTS_PER_EPOCH)
of "justifed_slot":
$(justified.slot)
of "finalized_root":
shortLog(node.dag.finalizedHead.blck.root)
of "finalized_epoch":
$(node.dag.finalizedHead.slot.epoch)
of "finalized_epoch_slot":
$(node.dag.finalizedHead.slot mod SLOTS_PER_EPOCH)
of "finalized_slot":
$(node.dag.finalizedHead.slot)
of "epoch":
$node.currentSlot.epoch
of "epoch_slot":
$(node.currentSlot mod SLOTS_PER_EPOCH)
of "slot":
$node.currentSlot
of "slots_per_epoch":
$SLOTS_PER_EPOCH
of "slot_trailing_digits":
var slotStr = $node.currentSlot
if slotStr.len > 3: slotStr = slotStr[^3..^1]
slotStr
of "attached_validators_balance":
formatGwei(node.attachedValidatorBalanceTotal)
of "sync_status":
if isNil(node.syncManager):
"pending"
else:
if node.syncManager.inProgress:
node.syncManager.syncStatus
else:
"synced"
else:
# We ignore typos for now and just render the expression
# as it was written. TODO: come up with a good way to show
# an error message to the user.
"$" & expr
var statusBar = StatusBarView.init(
node.config.statusBarContents,
dataResolver)
when compiles(defaultChroniclesStream.output.writer):
defaultChroniclesStream.output.writer =
proc (logLevel: LogLevel, msg: LogOutputStr) {.raises: [Defect].} =
try:
# p.hidePrompt
erase statusBar
# p.writeLine msg
stdout.write msg
render statusBar
# p.showPrompt
except Exception as e: # render raises Exception
logLoggingFailure(cstring(msg), e)
proc statusBarUpdatesPollingLoop() {.async.} =
try:
while true:
update statusBar
erase statusBar
render statusBar
await sleepAsync(chronos.seconds(1))
except CatchableError as exc:
warn "Failed to update status bar, no further updates", err = exc.msg
asyncSpawn statusBarUpdatesPollingLoop()
proc handleValidatorExitCommand(config: BeaconNodeConf) {.async.} =
let port = try:
let value = parseInt(config.rpcUrlForExit.port)
if value < Port.low.int or value > Port.high.int:
raise newException(ValueError,
"The port number must be between " & $Port.low & " and " & $Port.high)
Port value
except CatchableError as err:
fatal "Invalid port number", err = err.msg
quit 1
let rpcClient = newRpcHttpClient()
try:
await connect(rpcClient, config.rpcUrlForExit.hostname, port)
except CatchableError as err:
fatal "Failed to connect to the beacon node RPC service", err = err.msg
quit 1
let (validator, validatorIdx, _, _) = try:
await rpcClient.get_v1_beacon_states_stateId_validators_validatorId(
"head", config.exitedValidator)
except CatchableError as err:
fatal "Failed to obtain information for validator", err = err.msg
quit 1
let exitAtEpoch = if config.exitAtEpoch.isSome:
Epoch config.exitAtEpoch.get
else:
let headSlot = try:
await rpcClient.getBeaconHead()
except CatchableError as err:
fatal "Failed to obtain the current head slot", err = err.msg
quit 1
headSlot.epoch
let
validatorsDir = config.validatorsDir
validatorKeyAsStr = "0x" & $validator.pubkey
keystoreDir = validatorsDir / validatorKeyAsStr
if not dirExists(keystoreDir):
echo "The validator keystores directory '" & config.validatorsDir.string &
"' does not contain a keystore for the selected validator with public " &
"key '" & validatorKeyAsStr & "'."
quit 1
let signingKey = loadKeystore(
validatorsDir,
config.secretsDir,
validatorKeyAsStr,
config.nonInteractive)
if signingKey.isNone:
fatal "Unable to continue without decrypted signing key"
quit 1
let fork = try:
await rpcClient.get_v1_beacon_states_fork("head")
except CatchableError as err:
fatal "Failed to obtain the fork id of the head state", err = err.msg
quit 1
let genesisValidatorsRoot = try:
(await rpcClient.get_v1_beacon_genesis()).genesis_validators_root
except CatchableError as err:
fatal "Failed to obtain the genesis validators root of the network",
err = err.msg
quit 1
var signedExit = SignedVoluntaryExit(
message: VoluntaryExit(
epoch: exitAtEpoch,
validator_index: validatorIdx))
signedExit.signature = get_voluntary_exit_signature(
fork, genesisValidatorsRoot, signedExit.message, signingKey.get).toValidatorSig()
template ask(prompt: string): string =
try:
stdout.write prompt, ": "
stdin.readLine()
except IOError:
fatal "Failed to read user input from stdin"
quit 1
try:
echoP "PLEASE BEWARE!"
echoP "Publishing a voluntary exit is an irreversible operation! " &
"You won't be able to restart again with the same validator."
echoP "By requesting an exit now, you'll be exempt from penalties " &
"stemming from not performing your validator duties, but you " &
"won't be able to withdraw your deposited funds for the time " &
"being. This means that your funds will be effectively frozen " &
"until withdrawals are enabled in a future phase of Eth2."
echoP "To understand more about the Eth2 roadmap, we recommend you " &
"have a look at\n" &
"https://ethereum.org/en/eth2/#roadmap"
echoP "You must keep your validator running for at least 5 epochs " &
"(32 minutes) after requesting a validator exit, as you will " &
"still be required to perform validator duties until your exit " &
"has been processed. The number of epochs could be significantly " &
"higher depending on how many other validators are queued to exit."
echoP "As such, we recommend you keep track of your validator's status " &
"using an Eth2 block explorer before shutting down your beacon node."
const
confirmation = "I understand the implications of submitting a voluntary exit"
while true:
echoP "To proceed to submitting your voluntary exit, please type '" &
confirmation & "' (without the quotes) in the prompt below and " &
"press ENTER or type 'q' to quit."
echo ""
let choice = ask "Your choice"
if choice == "q":
quit 0
elif choice == confirmation:
let success = await rpcClient.post_v1_beacon_pool_voluntary_exits(signedExit)
if success:
echo "Successfully published voluntary exit for validator " &
$validatorIdx & "(" & validatorKeyAsStr[0..9] & ")."
quit 0
else:
echo "The voluntary exit was not submitted successfully. Please try again."
quit 1
except CatchableError as err:
fatal "Failed to send the signed exit message to the beacon node RPC",
err = err.msg
quit 1
proc loadEth2Network(config: BeaconNodeConf): Eth2NetworkMetadata {.raises: [Defect, IOError].} =
if config.eth2Network.isSome:
getMetadataForNetwork(config.eth2Network.get)
else:
when const_preset == "mainnet":
mainnetMetadata
else:
# Presumably other configurations can have other defaults, but for now
# this simplifies the flow
echo "Must specify network on non-mainnet node"
quit 1
proc loadBeaconNode(config: var BeaconNodeConf, rng: ref BrHmacDrbgContext): BeaconNode {.
raises: [Defect, CatchableError].} =
let metadata = config.loadEth2Network()
# Updating the config based on the metadata certainly is not beautiful but it
# works
for node in metadata.bootstrapNodes:
config.bootstrapNodes.add node
BeaconNode.init(
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
metadata.cfg,
rng,
config,
metadata.depositContractDeployedAt,
metadata.eth1Network,
metadata.genesisData,
metadata.genesisDepositsSnapshot)
proc doRunBeaconNode(config: var BeaconNodeConf, rng: ref BrHmacDrbgContext) {.raises: [Defect, CatchableError].} =
info "Launching beacon node",
version = fullVersionStr,
bls_backend = $BLS_BACKEND,
cmdParams = commandLineParams(),
config
createPidFile(config.dataDir.string / "beacon_node.pid")
config.createDumpDirs()
if config.metricsEnabled:
let metricsAddress = config.metricsAddress
notice "Starting metrics HTTP server",
url = "http://" & $metricsAddress & ":" & $config.metricsPort & "/metrics"
try:
startMetricsHttpServer($metricsAddress, config.metricsPort)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg # TODO fix metrics
# There are no managed event loops in here, to do a graceful shutdown, but
# letting the default Ctrl+C handler exit is safe, since we only read from
# the db.
let node = loadBeaconNode(config, rng)
if bnStatus == BeaconNodeStatus.Stopping:
return
initStatusBar(node)
if node.nickname != "":
dynamicLogScope(node = node.nickname): node.start()
else:
node.start()
proc doCreateTestnet(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.raises: [Defect, CatchableError].} =
let launchPadDeposits = try:
Json.loadFile(config.testnetDepositsFile.string, seq[LaunchPadDeposit])
except SerializationError as err:
error "Invalid LaunchPad deposits file",
err = formatMsg(err, config.testnetDepositsFile.string)
quit 1
var deposits: seq[DepositData]
2021-04-14 13:15:22 +00:00
for i in 0 ..< launchPadDeposits.len:
deposits.add(launchPadDeposits[i] as DepositData)
let
startTime = uint64(times.toUnix(times.getTime()) + config.genesisOffset)
outGenesis = config.outputGenesis.string
eth1Hash = if config.web3Urls.len == 0: eth1BlockHash
else: (waitFor getEth1BlockHash(config.web3Urls[0], blockId("latest"))).asEth2Digest
cfg = getRuntimeConfig(config.eth2Network)
var
initialState = initialize_beacon_state_from_eth1(
cfg, eth1Hash, startTime, deposits, {skipBlsValidation})
# https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start#create-genesis-state
initialState.genesis_time = startTime
doAssert initialState.validators.len > 0
let outGenesisExt = splitFile(outGenesis).ext
if cmpIgnoreCase(outGenesisExt, ".json") == 0:
Json.saveFile(outGenesis, initialState, pretty = true)
echo "Wrote ", outGenesis
let outSszGenesis = outGenesis.changeFileExt "ssz"
SSZ.saveFile(outSszGenesis, initialState[])
echo "Wrote ", outSszGenesis
let bootstrapFile = config.outputBootstrapFile.string
if bootstrapFile.len > 0:
let
networkKeys = getPersistentNetKeys(rng, config)
netMetadata = getPersistentNetMetadata(config)
forkId = getENRForkID(
cfg,
initialState[].slot.epoch,
initialState[].genesis_validators_root)
bootstrapEnr = enr.Record.init(
1, # sequence number
networkKeys.seckey.asEthKey,
some(config.bootstrapAddress),
some(config.bootstrapPort),
some(config.bootstrapPort),
[
toFieldPair(enrForkIdField, SSZ.encode(forkId)),
toFieldPair(enrAttestationSubnetsField, SSZ.encode(netMetadata.attnets))
])
writeFile(bootstrapFile, bootstrapEnr.tryGet().toURI)
echo "Wrote ", bootstrapFile
2021-04-06 22:12:07 +00:00
proc findWalletWithoutErrors(config: BeaconNodeConf,
name: WalletName): Option[WalletPathPair] =
let res = findWallet(config, name)
if res.isErr:
fatal "Failed to locate wallet", error = res.error
quit 1
res.get
proc doDeposits(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.
raises: [Defect, CatchableError].} =
case config.depositsCmd
of DepositsCmd.createTestnetDeposits:
if config.eth2Network.isNone:
fatal "Please specify the intended testnet for the deposits"
quit 1
2021-04-06 22:12:07 +00:00
let metadata = config.loadEth2Network()
var seed: KeySeed
defer: burnMem(seed)
var walletPath: WalletPathPair
if config.existingWalletId.isSome:
let
id = config.existingWalletId.get
2021-04-06 22:12:07 +00:00
found = findWalletWithoutErrors(config, id)
if found.isSome:
walletPath = found.get
else:
fatal "Unable to find wallet with the specified name/uuid", id
quit 1
var unlocked = unlockWalletInteractively(walletPath.wallet)
if unlocked.isOk:
swap(seed, unlocked.get)
else:
# The failure will be reported in `unlockWalletInteractively`.
quit 1
else:
2021-04-06 22:12:07 +00:00
var walletRes = createWalletInteractively(rng, config)
if walletRes.isErr:
fatal "Unable to create wallet", err = walletRes.error
quit 1
else:
swap(seed, walletRes.get.seed)
walletPath = walletRes.get.walletPath
let vres = secureCreatePath(config.outValidatorsDir)
if vres.isErr():
fatal "Could not create directory", path = config.outValidatorsDir
quit QuitFailure
let sres = secureCreatePath(config.outSecretsDir)
if sres.isErr():
fatal "Could not create directory", path = config.outSecretsDir
quit QuitFailure
2019-09-08 15:32:38 +00:00
let deposits = generateDeposits(
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
metadata.cfg,
2021-04-06 22:12:07 +00:00
rng,
seed,
walletPath.wallet.nextAccount,
config.totalDeposits,
config.outValidatorsDir,
config.outSecretsDir)
if deposits.isErr:
fatal "Failed to generate deposits", err = deposits.error
quit 1
try:
let depositDataPath = if config.outDepositsFile.isSome:
config.outDepositsFile.get.string
else:
config.outValidatorsDir / "deposit_data-" & $epochTime() & ".json"
let launchPadDeposits =
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
mapIt(deposits.value, LaunchPadDeposit.init(metadata.cfg, it))
Json.saveFile(depositDataPath, launchPadDeposits)
echo "Deposit data written to \"", depositDataPath, "\""
walletPath.wallet.nextAccount += deposits.value.len
let status = saveWallet(walletPath)
if status.isErr:
fatal "Failed to update wallet file after generating deposits",
wallet = walletPath.path,
error = status.error
quit 1
except CatchableError as err:
fatal "Failed to create launchpad deposit data file", err = err.msg
quit 1
2021-04-06 22:12:07 +00:00
#[
of DepositsCmd.status:
echo "The status command is not implemented yet"
quit 1
2021-04-06 22:12:07 +00:00
]#
of DepositsCmd.`import`:
let validatorKeysDir = if config.importedDepositsDir.isSome:
config.importedDepositsDir.get
else:
let cwd = os.getCurrentDir()
if dirExists(cwd / "validator_keys"):
InputDir(cwd / "validator_keys")
else:
echo "The default search path for validator keys is a sub-directory " &
"named 'validator_keys' in the current working directory. Since " &
"no such directory exists, please either provide the correct path" &
"as an argument or copy the imported keys in the expected location."
quit 1
importKeystoresFromDir(
rng,
validatorKeysDir.string,
config.validatorsDir, config.secretsDir)
2020-08-27 18:23:41 +00:00
of DepositsCmd.exit:
waitFor handleValidatorExitCommand(config)
2019-11-05 18:16:10 +00:00
proc doWallets(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.
raises: [Defect, CatchableError].} =
case config.walletsCmd:
of WalletsCmd.create:
if config.createdWalletNameFlag.isSome:
let
name = config.createdWalletNameFlag.get
2021-04-06 22:12:07 +00:00
existingWallet = findWalletWithoutErrors(config, name)
if existingWallet.isSome:
echo "The Wallet '" & name.string & "' already exists."
quit 1
var walletRes = createWalletInteractively(rng, config)
if walletRes.isErr:
fatal "Unable to create wallet", err = walletRes.error
quit 1
burnMem(walletRes.get.seed)
of WalletsCmd.list:
for kind, walletFile in walkDir(config.walletsDir):
if kind != pcFile: continue
if checkSensitiveFilePermissions(walletFile):
let walletRes = loadWallet(walletFile)
if walletRes.isOk:
echo walletRes.get.longName
else:
warn "Found corrupt wallet file",
wallet = walletFile, error = walletRes.error
else:
warn "Found wallet file with insecure permissions",
wallet = walletFile
of WalletsCmd.restore:
restoreWalletInteractively(rng, config)
proc doRecord(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.
raises: [Defect, CatchableError].} =
case config.recordCmd:
of RecordCmd.create:
let netKeys = getPersistentNetKeys(rng, config)
var fieldPairs: seq[FieldPair]
for field in config.fields:
let fieldPair = field.split(":")
if fieldPair.len > 1:
fieldPairs.add(toFieldPair(fieldPair[0], hexToSeqByte(fieldPair[1])))
else:
fatal "Invalid field pair"
quit QuitFailure
let record = enr.Record.init(
config.seqNumber,
netKeys.seckey.asEthKey,
some(config.ipExt),
some(config.tcpPortExt),
some(config.udpPortExt),
fieldPairs).expect("Record within size limits")
echo record.toURI()
of RecordCmd.print:
echo $config.recordPrint
proc doWeb3Cmd(config: BeaconNodeConf) {.raises: [Defect, CatchableError].} =
case config.web3Cmd:
of Web3Cmd.test:
let metadata = config.loadEth2Network()
waitFor testWeb3Provider(config.web3TestUrl,
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
metadata.cfg.DEPOSIT_CONTRACT_ADDRESS)
2021-05-19 06:38:13 +00:00
proc doSlashingExport(conf: BeaconNodeConf) {.raises: [IOError, Defect].}=
let
dir = conf.validatorsDir()
filetrunc = SlashingDbName
# TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312
let db = SlashingProtectionDB.loadUnchecked(dir, filetrunc, readOnly = false)
let interchange = conf.exportedInterchangeFile.string
db.exportSlashingInterchange(interchange, conf.exportedValidators)
echo "Export finished: '", dir/filetrunc & ".sqlite3" , "' into '", interchange, "'"
proc doSlashingImport(conf: BeaconNodeConf) {.raises: [SerializationError, IOError, Defect].} =
let
dir = conf.validatorsDir()
filetrunc = SlashingDbName
# TODO: Make it read-only https://github.com/status-im/nim-eth/issues/312
let interchange = conf.importedInterchangeFile.string
var spdir: SPDIR
try:
spdir = JSON.loadFile(interchange, SPDIR)
except SerializationError as err:
writeStackTrace()
stderr.write $JSON & " load issue for file \"", interchange, "\"\n"
stderr.write err.formatMsg(interchange), "\n"
quit 1
# Open DB and handle migration from v1 to v2 if needed
let db = SlashingProtectionDB.init(
genesis_validators_root = Eth2Digest spdir.metadata.genesis_validators_root,
basePath = dir,
dbname = filetrunc,
modes = {kCompleteArchive}
)
# Now import the slashing interchange file
# Failures mode:
# - siError can only happen with invalid genesis_validators_root which would be caught above
# - siPartial can happen for invalid public keys, slashable blocks, slashable votes
let status = db.inclSPDIR(spdir)
doAssert status in {siSuccess, siPartial}
echo "Import finished: '", interchange, "' into '", dir/filetrunc & ".sqlite3", "'"
proc doSlashingInterchange(conf: BeaconNodeConf) {.raises: [Defect, CatchableError].} =
doAssert conf.cmd == slashingdb
case conf.slashingdbCmd
of SlashProtCmd.`export`:
conf.doSlashingExport()
of SlashProtCmd.`import`:
conf.doSlashingImport()
{.pop.} # TODO moduletests exceptions
programMain:
var
config = makeBannerAndConfig(clientId, BeaconNodeConf)
2020-11-28 18:50:09 +00:00
setupStdoutLogging(config.logLevel)
if not(checkAndCreateDataDir(string(config.dataDir))):
# We are unable to access/create data folder or data folder's
# permissions are insecure.
quit QuitFailure
setupLogging(config.logLevel, config.logFile)
## This Ctrl+C handler exits the program in non-graceful way.
## It's responsible for handling Ctrl+C in sub-commands such
## as `wallets *` and `deposits *`. In a regular beacon node
## run, it will be overwritten later with a different handler
## performing a graceful exit.
proc exitImmediatelyOnCtrlC() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
# in case a password prompt disabled echoing
resetStdin()
echo "" # If we interrupt during an interactive prompt, this
# will move the cursor to the next line
notice "Shutting down after having received SIGINT"
quit 0
setControlCHook(exitImmediatelyOnCtrlC)
# equivalent SIGTERM handler
when defined(posix):
proc exitImmediatelyOnSIGTERM(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM"
quit 0
c_signal(SIGTERM, exitImmediatelyOnSIGTERM)
# Single RNG instance for the application - will be seeded on construction
# and avoid using system resources (such as urandom) after that
let rng = keys.newRng()
case config.cmd
of createTestnet: doCreateTestnet(config, rng[])
of noCommand: doRunBeaconNode(config, rng)
of deposits: doDeposits(config, rng[])
of wallets: doWallets(config, rng[])
of record: doRecord(config, rng[])
of web3: doWeb3Cmd(config)
2021-05-19 06:38:13 +00:00
of slashingdb: doSlashingInterchange(config)