nimbus-eth2/beacon_chain/beacon_node.nim
Dustin Brody 3e587d4667
Justification/finalization amelioration using testnet1 shard count and epoch length (#331)
* working justification, 1 node, 64 validators

* closer to tn1 params: 256 validators

* more debugging output; switch to minimum test case

* working justification and finalization in local network simulation

* fix currently incorrect state transition/attestation test assumption
2019-08-14 08:56:32 +00:00

873 lines
34 KiB
Nim

import
net, sequtils, options, tables, osproc, random, strutils, times, strformat,
stew/shims/os, stew/[objects, bitseqs],
chronos, chronicles, confutils, serialization/errors,
eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils,
spec/[datatypes, digest, crypto, beaconstate, helpers, validator,
state_transition_block],
conf, time, state_transition, fork_choice, ssz, beacon_chain_db,
validator_pool, extras, attestation_pool, block_pool, eth2_network,
beacon_node_types, mainchain_monitor, trusted_state_snapshots, version,
sync_protocol, request_manager, genesis
const
topicBeaconBlocks = "/eth2/beacon_block/ssz"
topicAttestations = "/eth2/beacon_attestation/ssz"
topicVoluntaryExits = "/eth2/voluntary_exit/ssz"
topicProposerSlashings = "/eth2/proposer_slashing/ssz"
topicAttesterSlashings = "/eth2/attester_slashing/ssz"
dataDirValidators = "validators"
networkMetadataFile = "network.json"
genesisFile = "genesis.json"
testnetsBaseUrl = "https://serenity-testnets.status.im"
proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}
func localValidatorsDir(conf: BeaconNodeConf): string =
conf.dataDir / "validators"
func databaseDir(conf: BeaconNodeConf): string =
conf.dataDir / "db"
template `//`(url, fragment: string): string =
url & "/" & fragment
proc downloadFile(url: string): Future[string] {.async.} =
let cmd = "curl --fail " & url
let (fileContents, errorCode) = execCmdEx(cmd, options = {poUsePath})
if errorCode != 0:
raise newException(IOError, "Failed external command: '" & cmd & "', exit code: " & $errorCode & ", output: '" & fileContents & "'")
return fileContents
proc updateTestnetMetadata(conf: BeaconNodeConf): Future[NetworkMetadata] {.async.} =
let metadataUrl = testnetsBaseUrl // $conf.network // networkMetadataFile
let latestMetadata = await downloadFile(metadataUrl)
try:
result = Json.decode(latestMetadata, NetworkMetadata)
except SerializationError as err:
stderr.write "Error while loading the testnet metadata. Your client my be out of date.\n"
stderr.write err.formatMsg(metadataUrl), "\n"
stderr.write "Please follow the instructions at https://github.com/status-im/nim-beacon-chain " &
"in order to produce an up-to-date build.\n"
quit 1
let localMetadataFile = conf.dataDir / networkMetadataFile
if fileExists(localMetadataFile) and readFile(localMetadataFile).string == latestMetadata:
return
info "New testnet genesis data received. Starting with a fresh database."
createDir conf.dataDir.string
removeDir conf.databaseDir
writeFile localMetadataFile, latestMetadata
let newGenesis = await downloadFile(testnetsBaseUrl // $conf.network // genesisFile)
writeFile conf.dataDir / genesisFile, newGenesis
proc obtainTestnetKey(conf: BeaconNodeConf): Future[(string, string)] {.async.} =
let
metadata = await updateTestnetMetadata(conf)
privKeyName = validatorFileBaseName(rand(metadata.userValidatorsRange)) & ".privkey"
privKeyUrl = testnetsBaseUrl // $conf.network // privKeyName
privKeyContent = strip await downloadFile(privKeyUrl)
let key = ValidatorPrivKey.init(privKeyContent)
return (privKeyName, privKeyContent)
proc saveValidatorKey(keyName, key: string, conf: BeaconNodeConf) =
let validatorsDir = conf.dataDir / dataDirValidators
let outputFile = validatorsDir / keyName
createDir validatorsDir
writeFile(outputFile, key)
info "Imported validator key", file = outputFile
proc initGenesis(node: BeaconNode) {.async.} =
template conf: untyped = node.config
var tailState: BeaconState
if conf.depositWeb3Url.len != 0:
info "Waiting for genesis state from eth1"
tailState = await getGenesisFromEth1(conf)
else:
var snapshotFile = conf.dataDir / genesisFile
if conf.stateSnapshot.isSome:
snapshotFile = conf.stateSnapshot.get.string
info "Importing snapshot file", path = snapshotFile
if not fileExists(snapshotFile):
error "Nimbus database not initialized. Please specify the initial state snapshot file."
quit 1
try:
tailState = Json.loadFile(snapshotFile, BeaconState)
except SerializationError as err:
stderr.write "Failed to import ", snapshotFile, "\n"
stderr.write err.formatMsg(snapshotFile), "\n"
quit 1
info "Got genesis state", hash = hash_tree_root(tailState)
node.forkVersion = tailState.fork.current_version
try:
let tailBlock = get_initial_beacon_block(tailState)
BlockPool.preInit(node.db, tailState, tailBlock)
except:
stderr.write "Failed to initialize database\n"
stderr.write getCurrentExceptionMsg(), "\n"
quit 1
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
new result
result.onBeaconBlock = onBeaconBlock
result.config = conf
result.networkIdentity = getPersistentNetIdentity(conf)
result.nickname = if conf.nodename == "auto": shortForm(result.networkIdentity)
else: conf.nodename
template fail(args: varargs[untyped]) =
stderr.write args, "\n"
quit 1
case conf.network
of "mainnet":
fail "The Serenity mainnet hasn't been launched yet"
of "testnet0", "testnet1":
result.networkMetadata = await updateTestnetMetadata(conf)
else:
try:
result.networkMetadata = Json.loadFile(conf.network, NetworkMetadata)
except SerializationError as err:
fail "Failed to load network metadata: \n", err.formatMsg(conf.network)
var metadataErrorMsg = ""
template checkCompatibility(metadataField, LOCAL_CONSTANT) =
let metadataValue = metadataField
if metadataValue != LOCAL_CONSTANT:
if metadataErrorMsg.len > 0: metadataErrorMsg.add " and"
metadataErrorMsg.add " -d:" & astToStr(LOCAL_CONSTANT) & "=" & $metadataValue &
" (instead of " & $LOCAL_CONSTANT & ")"
if result.networkMetadata.networkGeneration != semanticVersion:
let newerVersionRequired = result.networkMetadata.networkGeneration.int > semanticVersion
let newerOrOlder = if newerVersionRequired: "a newer" else: "an older"
stderr.write &"Connecting to '{conf.network}' requires {newerOrOlder} version of Nimbus. "
if newerVersionRequired:
stderr.write "Please follow the instructions at https://github.com/status-im/nim-beacon-chain " &
"in order to produce an up-to-date build.\n"
quit 1
checkCompatibility result.networkMetadata.numShards , SHARD_COUNT
checkCompatibility result.networkMetadata.slotDuration , SECONDS_PER_SLOT
checkCompatibility result.networkMetadata.slotsPerEpoch , SLOTS_PER_EPOCH
if metadataErrorMsg.len > 0:
fail "To connect to the ", conf.network, " network, please compile with", metadataErrorMsg
for bootNode in result.networkMetadata.bootstrapNodes:
if bootNode.isSameNode(result.networkIdentity):
result.isBootstrapNode = true
else:
result.bootstrapNodes.add bootNode
for bootNode in conf.bootstrapNodes:
result.bootstrapNodes.add BootstrapAddr.init(bootNode)
let bootstrapFile = string conf.bootstrapNodesFile
if bootstrapFile.len > 0:
for ln in lines(bootstrapFile):
result.bootstrapNodes.add BootstrapAddr.init(string ln)
result.attachedValidators = ValidatorPool.init
init result.mainchainMonitor, "", Port(0) # TODO: specify geth address and port
let trieDB = trieDB newChainDb(string conf.databaseDir)
result.db = BeaconChainDB.init(trieDB)
# TODO this is problably not the right place to ensure that db is sane..
# TODO does it really make sense to load from DB if a state snapshot has been
# specified on command line? potentially, this should be the other way
# around...
if result.db.getHeadBlock().isNone():
await result.initGenesis()
result.blockPool = BlockPool.init(result.db)
result.attestationPool = AttestationPool.init(result.blockPool)
result.network = await createEth2Node(conf, result.bootstrapNodes)
result.requestManager.init result.network
# TODO sync is called when a remote peer is connected - is that the right
# time to do so?
let sync = result.network.protocolState(BeaconSync)
sync.node = result
sync.db = result.db
result.stateCache = result.blockPool.loadTailState()
result.justifiedStateCache = result.stateCache
let addressFile = string(conf.dataDir) / "beacon_node.address"
result.network.saveConnectionAddressFile(addressFile)
result.beaconClock = BeaconClock.init(result.stateCache.data.data)
template withState(
pool: BlockPool, cache: var StateData, blockSlot: BlockSlot, body: untyped): untyped =
## Helper template that updates state to a particular BlockSlot - usage of
## cache is unsafe outside of block.
## TODO async transformations will lead to a race where cache gets updated
## while waiting for future to complete - catch this here somehow?
updateStateData(pool, cache, blockSlot)
template hashedState(): HashedBeaconState {.inject.} = cache.data
template state(): BeaconState {.inject.} = cache.data.data
template blck(): BlockRef {.inject.} = cache.blck
template root(): Eth2Digest {.inject.} = cache.data.root
body
proc connectToNetwork(node: BeaconNode) {.async.} =
if node.bootstrapNodes.len > 0:
info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes
else:
info "Waiting for connections"
await node.network.connectToNetwork(node.bootstrapNodes)
template findIt(s: openarray, predicate: untyped): int =
var res = -1
for i, it {.inject.} in s:
if predicate:
res = i
break
res
proc addLocalValidator(
node: BeaconNode, state: BeaconState, privKey: ValidatorPrivKey) =
let pubKey = privKey.pubKey()
let idx = state.validators.findIt(it.pubKey == pubKey)
if idx == -1:
warn "Validator not in registry", pubKey
else:
node.attachedValidators.addLocalValidator(pubKey, privKey)
proc addLocalValidators(node: BeaconNode, state: BeaconState) =
for validatorKeyFile in node.config.validators:
node.addLocalValidator state, validatorKeyFile.load
for kind, file in walkDir(node.config.localValidatorsDir):
if kind in {pcFile, pcLinkToFile}:
node.addLocalValidator state, ValidatorPrivKey.init(readFile(file).string)
info "Local validators attached ", count = node.attachedValidators.count
proc getAttachedValidator(
node: BeaconNode, state: BeaconState, idx: int): AttachedValidator =
let validatorKey = state.validators[idx].pubkey
node.attachedValidators.getValidator(validatorKey)
proc updateHead(node: BeaconNode, slot: Slot): BlockRef =
# Use head state for attestation resolution below
# TODO do we need to resolve attestations using all available head states?
node.blockPool.withState(
node.stateCache, BlockSlot(blck: node.blockPool.head.blck, slot: slot)):
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(state)
# TODO move all of this logic to BlockPool
debug "Preparing for fork choice",
stateRoot = shortLog(root),
connectedPeers = node.network.peersCount,
stateSlot = humaneSlotNum(state.slot),
stateEpoch = humaneEpochNum(state.slot.computeEpochOfSlot)
let
justifiedHead = node.blockPool.latestJustifiedBlock()
# TODO slot number is wrong here, it should be the start of the epoch that
# got finalized:
# https://github.com/ethereum/eth2.0-specs/issues/768
let newHead = node.blockPool.withState(
node.justifiedStateCache,
BlockSlot(blck: justifiedHead, slot: justifiedHead.slot)):
lmdGhost(node.attestationPool, state, justifiedHead)
info "Fork chosen",
newHeadSlot = humaneSlotNum(newHead.slot),
newHeadEpoch = humaneEpochNum(newHead.slot.computeEpochOfSlot),
newHeadBlockRoot = shortLog(newHead.root)
node.blockPool.updateHead(node.stateCache, newHead)
newHead
proc sendAttestation(node: BeaconNode,
validator: AttachedValidator,
attestationData: AttestationData,
committeeLen: int,
indexInCommittee: int) {.async.} =
let
validatorSignature = await validator.signAttestation(attestationData)
var aggregationBits = CommitteeValidatorsBits.init(committeeLen)
aggregationBits.raiseBit indexInCommittee
var attestation = Attestation(
data: attestationData,
signature: validatorSignature,
aggregation_bits: aggregationBits,
# Stub in phase0
custody_bits: CommitteeValidatorsBits.init(committeeLen)
)
node.network.broadcast(topicAttestations, attestation)
info "Attestation sent",
attestationData = shortLog(attestationData),
validator = shortLog(validator),
signature = shortLog(validatorSignature),
indexInCommittee = indexInCommittee
proc proposeBlock(node: BeaconNode,
validator: AttachedValidator,
head: BlockRef,
slot: Slot): Future[BlockRef] {.async.} =
if head.slot > slot:
notice "Skipping proposal, we've already selected a newer head",
headSlot = humaneSlotNum(head.slot),
headBlockRoot = shortLog(head.root),
slot = humaneSlotNum(slot)
return head
if head.slot == slot:
# Weird, we should never see as head the same slot as we're proposing a
# block for - did someone else steal our slot? why didn't we discard it?
warn "Found head at same slot as we're supposed to propose for!",
headSlot = humaneSlotNum(head.slot),
headBlockRoot = shortLog(head.root)
# TODO investigate how and when this happens.. maybe it shouldn't be an
# assert?
doAssert false, "head slot matches proposal slot (!)"
# return
var (nroot, nblck) = node.blockPool.withState(
node.stateCache, BlockSlot(blck: head, slot: slot - 1)):
# To create a block, we'll first apply a partial block to the state, skipping
# some validations.
let
blockBody = BeaconBlockBody(
randao_reveal: validator.genRandaoReveal(state, slot),
eth1_data: node.mainchainMonitor.getBeaconBlockRef(),
attestations:
node.attestationPool.getAttestationsForBlock(state, slot))
var
newBlock = BeaconBlock(
slot: slot,
parent_root: head.root,
body: blockBody,
signature: ValidatorSig(), # we need the rest of the block first!
)
var
tmpState = hashedState
cache = get_empty_per_epoch_cache()
let ok = state_transition(tmpState, newBlock, {skipValidation})
# TODO only enable in fast-fail debugging situations
# otherwise, bad attestations can bring down network
# doAssert ok # TODO: err, could this fail somehow?
newBlock.state_root = tmpState.root
let blockRoot = signing_root(newBlock)
# Careful, state no longer valid after here..
newBlock.signature =
await validator.signBlockProposal(state, slot, blockRoot)
(blockRoot, newBlock)
let newBlockRef = node.blockPool.add(node.stateCache, nroot, nblck)
if newBlockRef == nil:
warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock),
blockRoot = shortLog(blockRoot)
return head
info "Block proposed",
blck = shortLog(newBlock),
blockRoot = shortLog(newBlockRef.root),
validator = shortLog(validator)
node.network.broadcast(topicBeaconBlocks, newBlock)
return newBlockRef
proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain
# we're on, or that it follows the rules of the protocol
debug "Attestation received",
attestationData = shortLog(attestation.data),
signature = shortLog(attestation.signature)
# TODO seems reasonable to use the latest head state here.. needs thinking
# though - maybe we should use the state from the block pointed to by
# the attestation for some of the check? Consider interop with block
# production!
node.blockPool.withState(node.stateCache,
BlockSlot(blck: node.blockPool.head.blck, slot: node.beaconClock.now().toSlot())):
var stateCache = get_empty_per_epoch_cache()
node.attestationPool.add(state, attestation)
debug "Attestation received 2",
start_attestation_data_slot =
get_attestation_data_slot(state, attestation.data),
indexed_attesters =
get_indexed_attestation(state, attestation, stateCache),
target_epoch =
attestation.data.target.epoch,
cur_epoch = get_current_epoch(state),
cur_slot = state.slot
#doAssert get_current_epoch(state) < attestation.data.target.epoch or state.slot <= get_attestation_data_slot(state, attestation.data)
proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
# don't know if it's part of the chain we're currently building.
let blockRoot = signing_root(blck)
debug "Block received",
blck = shortLog(blck),
blockRoot = shortLog(blockRoot)
if node.blockPool.add(node.stateCache, blockRoot, blck).isNil:
return
# The block we received contains attestations, and we might not yet know about
# all of them. Let's add them to the attestation pool - in case they block
# is not yet resolved, neither will the attestations be!
# TODO shouldn't add attestations if the block turns out to be invalid..
for attestation in blck.body.attestations:
node.onAttestation(attestation)
proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
## Perform all attestations that the validators attached to this node should
## perform during the given slot
if slot + SLOTS_PER_EPOCH < head.slot:
# The latest block we know about is a lot newer than the slot we're being
# asked to attest to - this makes it unlikely that it will be included
# at all.
# TODO the oldest attestations allowed are those that are older than the
# finalized epoch.. also, it seems that posting very old attestations
# is risky from a slashing perspective. More work is needed here.
notice "Skipping attestation, head is too recent",
headSlot = humaneSlotNum(head.slot),
slot = humaneSlotNum(slot)
return
let attestationHead = head.findAncestorBySlot(slot)
if head != attestationHead.blck:
# In rare cases, such as when we're busy syncing or just slow, we'll be
# attesting to a past state - we must then recreate the world as it looked
# like back then
notice "Attesting to a state in the past, falling behind?",
headSlot = humaneSlotNum(head.slot),
attestationHeadSlot = humaneSlotNum(attestationHead.slot),
attestationSlot = humaneSlotNum(slot)
debug "Checking attestations",
attestationHeadRoot = shortLog(attestationHead.blck.root),
attestationSlot = humaneSlotNum(slot)
# Collect data to send before node.stateCache grows stale
var attestations: seq[tuple[
data: AttestationData, committeeLen, indexInCommittee: int,
validator: AttachedValidator]]
# We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers.
node.blockPool.withState(node.stateCache, attestationHead):
var cache = get_empty_per_epoch_cache()
let epoch = compute_epoch_of_slot(slot)
for committee_index in 0'u64 ..< get_committee_count(state, epoch):
## TODO verify that this is the correct mapping; it's consistent with
## other code
let
shard = committee_index mod SHARD_COUNT
shard2 = (committee_index + get_start_shard(state, epoch)) mod SHARD_COUNT
committee = get_crosslink_committee(state, epoch, shard2, cache)
for i, validatorIdx in committee:
let validator = node.getAttachedValidator(state, validatorIdx)
if validator != nil:
let ad = makeAttestationData(state, shard2, blck.root)
attestations.add (
ad,
committee.len, i, validator)
debug "handleAttestations: adding attestation to list for broadcast",
data=ad,
epoch=epoch,
committeeIdx = i,
validatorIdx = validatorIdx.int,
shard=shard2,
committee = committee
for a in attestations:
traceAsyncErrors sendAttestation(
node, a.validator, a.data, a.committeeLen, a.indexInCommittee)
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
## Perform the proposal for the given slot, iff we have a validator attached
## that is supposed to do so, given the shuffling in head
# TODO here we advanced the state to the new slot, but later we'll be
# proposing for it - basically, we're selecting proposer based on an
# empty slot.. wait for the committee selection to settle, then
# revisit this - we should be able to advance behind
var cache = get_empty_per_epoch_cache()
node.blockPool.withState(node.stateCache, BlockSlot(blck: head, slot: slot)):
# justification won't happen in odd case anyway
let prev_epoch = get_previous_epoch(state)
let prev_epoch_attestations = node.attestationPool.getAttestationsForTargetEpoch(state, prev_epoch)
debug "handleProposal: getAttestationsForTargetEpoch attesting indices for prev_epoch",
attesting_indices = mapIt(prev_epoch_attestations, get_attesting_indices(state, it.data, it.aggregation_bits, cache))
let
proposerIdx = get_beacon_proposer_index(state, cache)
validator = node.getAttachedValidator(state, proposerIdx)
# Ugly hack.
# TODO handle MAX_ATTESTATIONS & merging pointless dupes for this purpose?
blockBody2 = BeaconBlockBody(
attestations:
#node.attestationPool.getAttestationsForBlock(state, slot - min(MIN_ATTESTATION_INCLUSION_DELAY.uint64, slot.uint64)))
prev_epoch_attestations)
newBlock2 = BeaconBlock(
slot: slot,
parent_root: head.root,
body: blockBody2
)
if newBlock2.slot > 0 and not processAttestations(state, newBlock2, {skipValidation}, cache):
debug "when calling processAttestations from handleAttestations, failed"
else:
debug "when calling processAttestations from handleAttestations, succeeded"
if validator != nil:
return await proposeBlock(node, validator, head, slot)
debug "Expecting proposal",
headRoot = shortLog(head.root),
slot = humaneSlotNum(slot),
proposer = shortLog(state.validators[proposerIdx].pubKey)
return head
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## lastSlot: the last slot that we sucessfully processed, so we know where to
## start work from
## scheduledSlot: the slot that we were aiming for, in terms of timing
let
# The slot we should be at, according to the clock
slot = node.beaconClock.now().toSlot()
nextSlot = slot + 1
debug "Slot start",
lastSlot = humaneSlotNum(lastSlot),
scheduledSlot = humaneSlotNum(scheduledSlot),
slot = humaneSlotNum(slot)
if slot < lastSlot:
# This can happen if the system clock changes time for example, and it's
# pretty bad
# TODO shut down? time either was or is bad, and PoS relies on accuracy..
warn "Beacon clock time moved back, rescheduling slot actions",
slot = humaneSlotNum(slot),
scheduledSlot = humaneSlotNum(scheduledSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)
return
if slot > lastSlot + SLOTS_PER_EPOCH:
# We've fallen behind more than an epoch - there's nothing clever we can
# do here really, except skip all the work and try again later.
# TODO how long should the period be? Using an epoch because that's roughly
# how long attestations remain interesting
# TODO should we shut down instead? clearly we're unable to keep up
warn "Unable to keep up, skipping ahead without doing work",
lastSlot = humaneSlotNum(lastSlot),
slot = humaneSlotNum(slot),
scheduledSlot = humaneSlotNum(scheduledSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer):
# We pass the current slot here to indicate that work should be skipped!
asyncCheck node.onSlotStart(slot, nextSlot)
return
# Whatever we do during the slot, we need to know the head, because this will
# give us a state to work with and thus a shuffling.
# TODO typically, what consitutes correct actions stays constant between slot
# updates and is stable across some epoch transitions as well - see how
# we can avoid recalculating everything here
var head = node.updateHead(slot)
# TODO if the head is very old, that is indicative of something being very
# wrong - us being out of sync or disconnected from the network - need
# to consider what to do in that case:
# * nothing - the other parts of the application will reconnect and
# start listening to broadcasts, learn a new head etc..
# risky, because the network might stall if everyone does
# this, because no blocks will be produced
# * shut down - this allows the user to notice and take action, but is
# kind of harsh
# * keep going - we create blocks and attestations as usual and send them
# out - if network conditions improve, fork choice should
# eventually select the correct head and the rest will
# disappear naturally - risky because user is not aware,
# and might lose stake on canonical chain but "just works"
# when reconnected..
# Right now, we keep going
var curSlot = lastSlot + 1
while curSlot < slot:
# Timers may be delayed because we're busy processing, and we might have
# more work to do. We'll try to do so in an expedited way.
# TODO maybe even collect all work synchronously to avoid unnecessary
# state rewinds while waiting for async operations like validator
# signature..
notice "Catching up",
curSlot = humaneSlotNum(curSlot),
lastSlot = humaneSlotNum(lastSlot),
slot = humaneSlotNum(slot)
# For every slot we're catching up, we'll propose then send
# attestations - head should normally be advancing along the same branch
# in this case
# TODO what if we receive blocks / attestations while doing this work?
head = await handleProposal(node, head, curSlot)
# For each slot we missed, we need to send out attestations - if we were
# proposing during this time, we'll use the newly proposed head, else just
# keep reusing the same - the attestation that goes out will actually
# rewind the state to what it looked like at the time of that slot
# TODO smells like there's an optimization opportunity here
handleAttestations(node, head, curSlot)
curSlot += 1
head = await handleProposal(node, head, slot)
# We've been doing lots of work up until now which took time. Normally, we
# send out attestations at the slot mid-point, so we go back to the clock
# to see how much time we need to wait.
# TODO the beacon clock might jump here also. It's probably easier to complete
# the work for the whole slot using a monotonic clock instead, then deal
# with any clock discrepancies once only, at the start of slot timer
# processing..
let
attestationStart = node.beaconClock.fromNow(slot)
halfSlot = seconds(int64(SECONDS_PER_SLOT div 2))
if attestationStart.inFuture or attestationStart.offset <= halfSlot:
let fromNow =
if attestationStart.inFuture: attestationStart.offset + halfSlot
else: halfSlot - attestationStart.offset
debug "Waiting to send attestations",
slot = humaneSlotNum(slot),
fromNow = shortLog(fromNow)
await sleepAsync(fromNow)
# Time passed - we might need to select a new head in that case
head = node.updateHead(slot)
handleAttestations(node, head, slot)
# TODO ... and beacon clock might jump here also. sigh.
let
nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot))
info "Scheduling slot actions",
lastSlot = humaneSlotNum(slot),
slot = humaneSlotNum(slot),
nextSlot = humaneSlotNum(nextSlot),
fromNow = shortLog(nextSlotStart)
addTimer(nextSlotStart) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)
proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
let missingBlocks = node.blockPool.checkMissing()
if missingBlocks.len > 0:
info "Requesting detected missing blocks", missingBlocks
node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock):
onBeaconBlock(node ,b)
let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
addTimer(nextSecond) do (p: pointer):
asyncCheck node.onSecond(nextSecond)
proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
onBeaconBlock(node, blck)
waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation):
node.onAttestation(attestation)
let
slot = node.beaconClock.now().toSlot()
startSlot =
if slot >= GENESIS_SLOT: slot + 1
else: GENESIS_SLOT + 1
fromNow = saturate(node.beaconClock.fromNow(startSlot))
info "Scheduling first slot action",
nextSlot = humaneSlotNum(startSlot),
fromNow = shortLog(fromNow)
addTimer(fromNow) do (p: pointer):
asyncCheck node.onSlotStart(startSlot - 1, startSlot)
let second = Moment.now() + chronos.seconds(1)
addTimer(second) do (p: pointer):
asyncCheck node.onSecond(second)
runForever()
var gPidFile: string
proc createPidFile(filename: string) =
createDir splitFile(filename).dir
writeFile filename, $os.getCurrentProcessId()
gPidFile = filename
addQuitProc proc {.noconv.} = removeFile gPidFile
proc start(node: BeaconNode, headState: BeaconState) =
# TODO: while it's nice to cheat by waiting for connections here, we
# actually need to make this part of normal application flow -
# losing all connections might happen at any time and we should be
# prepared to handle it.
waitFor node.connectToNetwork()
info "Starting beacon node",
slotsSinceFinalization =
int64(node.blockPool.finalizedHead.slot) -
int64(node.beaconClock.now()),
stateSlot = humaneSlotNum(headState.slot),
SHARD_COUNT,
SLOTS_PER_EPOCH,
SECONDS_PER_SLOT,
SPEC_VERSION
node.addLocalValidators(headState)
node.run()
when isMainModule:
randomize()
let config = BeaconNodeConf.load(version = fullVersionStr())
if config.logLevel != LogLevel.NONE:
setLogLevel(config.logLevel)
## Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
debug "Shutting down after having received SIGINT"
quit(1)
setControlCHook(controlCHandler)
case config.cmd
of createTestnet:
var deposits: seq[Deposit]
for i in config.firstValidator.int ..< config.totalValidators.int:
let depositFile = config.validatorsDir /
validatorFileBaseName(i) & ".deposit.json"
try:
deposits.add Json.loadFile(depositFile, Deposit)
except SerializationError as err:
stderr.write "Error while loading a deposit file:\n"
stderr.write err.formatMsg(depositFile), "\n"
stderr.write "Please regenerate the deposit files by running validator_keygen again\n"
quit 1
let initialState = initialize_beacon_state_from_eth1(
deposits,
uint64(times.toUnix(times.getTime()) + config.genesisOffset),
Eth1Data(), {skipValidation})
doAssert initialState.validators.len > 0
Json.saveFile(config.outputGenesis.string, initialState, pretty = true)
echo "Wrote ", config.outputGenesis.string
var
bootstrapAddress = getPersistenBootstrapAddr(
config, parseIpAddress(config.bootstrapAddress), Port config.bootstrapPort)
testnetMetadata = NetworkMetadata(
networkId: config.networkId,
networkGeneration: semanticVersion,
genesisRoot: hash_tree_root(initialState),
bootstrapNodes: @[bootstrapAddress],
numShards: SHARD_COUNT,
slotDuration: SECONDS_PER_SLOT,
slotsPerEpoch: SLOTS_PER_EPOCH,
totalValidators: config.totalValidators,
lastUserValidator: config.lastUserValidator)
Json.saveFile(config.outputNetwork.string, testnetMetadata, pretty = true)
echo "Wrote ", config.outputNetwork.string
of updateTestnet:
discard waitFor updateTestnetMetadata(config)
of importValidator:
template reportFailureFor(keyExpr) =
error "Failed to import validator key", key = keyExpr
programResult = 1
for keyFile in config.keyFiles:
try:
saveValidatorKey(keyFile.string.extractFilename,
readFile(keyFile.string), config)
except:
reportFailureFor keyFile.string
if config.keyFiles.len == 0:
if config.network in ["testnet0", "testnet1"]:
try:
let (keyName, key) = waitFor obtainTestnetKey(config)
saveValidatorKey(keyName, key, config)
except:
stderr.write "Failed to download key\n", getCurrentExceptionMsg()
quit 1
else:
echo "Validator keys can be downloaded only for testnets"
quit 1
of noCommand:
createPidFile(config.dataDir.string / "beacon_node.pid")
var node = waitFor BeaconNode.init(config)
# TODO slightly ugly to rely on node.stateCache state here..
if node.nickname != "":
dynamicLogScope(node = node.nickname): node.start(node.stateCache.data.data)
else:
node.start(node.stateCache.data.data)