add beacon clock and rework timing

* have regular slot tick, decide on work to do lazily
* use beacon clock instance that will eventually sync time
* skip work when falling behind
* add one more state cache for justified state (since that's used in
fork choice)
* remove old sync cruft
* single timer per slot, serializing block production and attestation
This commit is contained in:
Jacek Sieka 2019-03-22 09:49:37 -06:00 committed by zah
parent e4c10a31c9
commit e7b36c4389
7 changed files with 403 additions and 364 deletions

View File

@ -170,6 +170,7 @@ proc updateLatestVotes(
proc add*(pool: var AttestationPool, proc add*(pool: var AttestationPool,
state: BeaconState, state: BeaconState,
attestation: Attestation) = attestation: Attestation) =
# TODO should validate against the state of the block being attested to?
if not validate(state, attestation, {skipValidation}): if not validate(state, attestation, {skipValidation}):
return return

View File

@ -1,5 +1,6 @@
import import
std_shims/[os_shims, objects], net, sequtils, options, tables, osproc, random, std_shims/[os_shims, objects], net, sequtils, options, tables, osproc, random,
times,
chronos, chronicles, confutils, serialization/errors, chronos, chronicles, confutils, serialization/errors,
spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator], spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator],
conf, time, conf, time,
@ -35,9 +36,6 @@ func localValidatorsDir(conf: BeaconNodeConf): string =
func databaseDir(conf: BeaconNodeConf): string = func databaseDir(conf: BeaconNodeConf): string =
conf.dataDir / "db" conf.dataDir / "db"
func slotStart(node: BeaconNode, slot: Slot): Timestamp =
node.state.data.slotStart(slot)
template `//`(url, fragment: string): string = template `//`(url, fragment: string): string =
url & "/" & fragment url & "/" & fragment
@ -168,6 +166,8 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
result.network = await createEth2Node(conf) result.network = await createEth2Node(conf)
# TODO sync is called when a remote peer is connected - is that the right
# time to do so?
let sync = result.network.protocolState(BeaconSync) let sync = result.network.protocolState(BeaconSync)
sync.networkId = result.networkMetadata.networkId sync.networkId = result.networkMetadata.networkId
sync.node = result sync.node = result
@ -176,9 +176,11 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
let head = result.blockPool.get(result.db.getHeadBlock().get()) let head = result.blockPool.get(result.db.getHeadBlock().get())
result.state = result.blockPool.loadTailState() result.state = result.blockPool.loadTailState()
result.justifiedStateCache = result.state
let addressFile = string(conf.dataDir) / "beacon_node.address" let addressFile = string(conf.dataDir) / "beacon_node.address"
result.network.saveConnectionAddressFile(addressFile) result.network.saveConnectionAddressFile(addressFile)
result.beaconClock = BeaconClock.init(result.state.data)
proc connectToNetwork(node: BeaconNode) {.async.} = proc connectToNetwork(node: BeaconNode) {.async.} =
let localKeys = ensureNetworkKeys(node.config) let localKeys = ensureNetworkKeys(node.config)
@ -205,52 +207,6 @@ proc connectToNetwork(node: BeaconNode) {.async.} =
await node.network.connectToNetwork(bootstrapNodes) await node.network.connectToNetwork(bootstrapNodes)
proc sync*(node: BeaconNode): Future[bool] {.async.} =
if node.state.data.slotDistanceFromNow() > WEAK_SUBJECTVITY_PERIOD.int64:
# node.state.data = await obtainTrustedStateSnapshot(node.db)
return false
else:
# TODO waiting for genesis should probably be moved elsewhere.. it has
# little to do with syncing..
let t = now()
if t < node.state.data.genesis_time * 1000:
notice "Waiting for genesis",
fromNow = int(node.state.data.genesis_time * 1000 - t) div 1000
await sleepAsync int(node.state.data.genesis_time * 1000 - t)
let
targetSlot = node.state.data.getSlotFromTime()
# TODO: change this to a full sync / block download
info "Syncing state from remote peers",
finalized_epoch = humaneEpochNum(node.state.data.finalized_epoch),
target_slot_epoch = humaneEpochNum(targetSlot.slot_to_epoch)
# TODO: sync is called at the beginning of the program, but doing this kind
# of catching up here is wrong - if we fall behind on processing
# for whatever reason, we want to be safe against the damage that
# might cause regardless if we just started or have been running for
# long. A classic example where this might happen is when the
# computer goes to sleep - when waking up, we'll be in the middle of
# processing, but behind everyone else.
# TOOD we now detect during epoch scheduling if we're very far behind -
# that would potentially be a good place to run the sync (?)
# while node.beaconState.finalized_epoch < targetSlot.slot_to_epoch:
# var (peer, changeLog) = await node.network.getValidatorChangeLog(
# node.beaconState.validator_registry_delta_chain_tip)
# if peer == nil:
# error "Failed to sync with any peer"
# return false
# if applyValidatorChangeLog(changeLog, node.beaconState):
# node.db.persistState(node.beaconState)
# node.db.persistBlock(changeLog.signedBlock)
# else:
# warn "Ignoring invalid validator change log", sentFrom = peer
return true
template findIt(s: openarray, predicate: untyped): int = template findIt(s: openarray, predicate: untyped): int =
var res = -1 var res = -1
for i, it {.inject.} in s: for i, it {.inject.} in s:
@ -284,7 +240,14 @@ proc getAttachedValidator(node: BeaconNode, idx: int): AttachedValidator =
let validatorKey = node.state.data.validator_registry[idx].pubkey let validatorKey = node.state.data.validator_registry[idx].pubkey
return node.attachedValidators.getValidator(validatorKey) return node.attachedValidators.getValidator(validatorKey)
proc updateHead(node: BeaconNode): BlockRef = proc updateHead(node: BeaconNode, slot: Slot): BlockRef =
# Use head state for attestation resolution below
# TODO do we need to resolve attestations using all available head states?
node.blockPool.updateState(node.state, node.blockPool.head, slot)
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(node.state.data)
# TODO move all of this logic to BlockPool # TODO move all of this logic to BlockPool
info "Preparing for fork choice", info "Preparing for fork choice",
connectedPeers = node.network.connectedPeers connectedPeers = node.network.connectedPeers
@ -295,57 +258,21 @@ proc updateHead(node: BeaconNode): BlockRef =
# TODO slot number is wrong here, it should be the start of the epoch that # TODO slot number is wrong here, it should be the start of the epoch that
# got finalized: # got finalized:
# https://github.com/ethereum/eth2.0-specs/issues/768 # https://github.com/ethereum/eth2.0-specs/issues/768
node.blockPool.updateState(node.state, justifiedHead, justifiedHead.slot) node.blockPool.updateState(
node.justifiedStateCache, justifiedHead, justifiedHead.slot)
let newHead = lmdGhost(node.attestationPool, node.state.data, justifiedHead) let newHead = lmdGhost(
node.attestationPool, node.justifiedStateCache.data, justifiedHead)
node.blockPool.updateHead(node.state, newHead) node.blockPool.updateHead(node.state, newHead)
newHead newHead
proc makeAttestation(node: BeaconNode, proc makeAttestation(node: BeaconNode,
validator: AttachedValidator, validator: AttachedValidator,
slot: Slot, state: BeaconState,
head: BlockRef,
shard: uint64, shard: uint64,
committeeLen: int, committeeLen: int,
indexInCommittee: int) {.async.} = indexInCommittee: int) {.async.} =
doAssert node != nil
doAssert validator != nil
# It's time to make an attestation. To do so, we must determine what we
# consider to be the head block - this is done by the fork choice rule.
# TODO this lazy update of the head is good because it delays head resolution
# until the very latest moment - on the other hand, if it takes long, the
# attestation might be late!
let
head = node.updateHead()
if slot + MIN_ATTESTATION_INCLUSION_DELAY < head.slot:
# What happened here is that we're being really slow or there's something
# really fishy going on with the slot - let's not send out any attestations
# just in case...
# TODO is this the right cutoff?
notice "Skipping attestation, head is too recent",
headSlot = humaneSlotNum(head.slot),
slot = humaneSlotNum(slot)
return
let attestationHead = head.findAncestorBySlot(slot)
if head != attestationHead:
# In rare cases, such as when we're busy syncing or just slow, we'll be
# attesting to a past state - we must then recreate the world as it looked
# like back then
notice "Attesting to a state in the past, falling behind?",
headSlot = humaneSlotNum(head.slot),
attestationHeadSlot = humaneSlotNum(attestationHead.slot),
attestationSlot = humaneSlotNum(slot)
# We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers.
node.blockPool.updateState(node.state, attestationHead, slot)
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(node.state.data)
let let
attestationData = attestationData =
makeAttestationData(node.state.data, shard, node.state.blck.root) makeAttestationData(node.state.data, shard, node.state.blck.root)
@ -375,20 +302,14 @@ proc makeAttestation(node: BeaconNode,
proc proposeBlock(node: BeaconNode, proc proposeBlock(node: BeaconNode,
validator: AttachedValidator, validator: AttachedValidator,
slot: Slot) {.async.} = head: BlockRef,
doAssert node != nil slot: Slot): Future[BlockRef] {.async.} =
doAssert validator != nil
doAssert validator.idx < node.state.data.validator_registry.len
# To propose a block, we should know what the head is, because that's what
# we'll be building the next block upon..
let head = node.updateHead()
if head.slot > slot: if head.slot > slot:
notice "Skipping proposal, we've already selected a newer head", notice "Skipping proposal, we've already selected a newer head",
headSlot = humaneSlotNum(head.slot), headSlot = humaneSlotNum(head.slot),
headBlockRoot = shortLog(head.root), headBlockRoot = shortLog(head.root),
slot = humaneSlotNum(slot) slot = humaneSlotNum(slot)
return head
if head.slot == slot: if head.slot == slot:
# Weird, we should never see as head the same slot as we're proposing a # Weird, we should never see as head the same slot as we're proposing a
@ -401,28 +322,25 @@ proc proposeBlock(node: BeaconNode,
doAssert false, "head slot matches proposal slot (!)" doAssert false, "head slot matches proposal slot (!)"
# return # return
# There might be gaps between our proposal and what we think is the head -
# make sure the state we get takes that into account: we want it to point
# to the slot just before our proposal.
node.blockPool.updateState(node.state, head, slot - 1) node.blockPool.updateState(node.state, head, slot - 1)
# To create a block, we'll first apply a partial block to the state, skipping # To create a block, we'll first apply a partial block to the state, skipping
# some validations. # some validations.
var blockBody = BeaconBlockBody( let
blockBody = BeaconBlockBody(
randao_reveal: validator.genRandaoReveal(node.state.data, slot), randao_reveal: validator.genRandaoReveal(node.state.data, slot),
eth1_data: node.mainchainMonitor.getBeaconBlockRef(), eth1_data: node.mainchainMonitor.getBeaconBlockRef(),
attestations: node.attestationPool.getAttestationsForBlock(slot)) attestations: node.attestationPool.getAttestationsForBlock(slot))
var newBlock = BeaconBlock( var
newBlock = BeaconBlock(
slot: slot, slot: slot,
previous_block_root: node.state.blck.root, previous_block_root: head.root,
body: blockBody, body: blockBody,
signature: ValidatorSig(), # we need the rest of the block first! signature: ValidatorSig(), # we need the rest of the block first!
) )
let ok = let ok =
updateState( updateState(node.state.data, head.root, newBlock, {skipValidation})
node.state.data, node.state.blck.root, newBlock, {skipValidation})
doAssert ok # TODO: err, could this fail somehow? doAssert ok # TODO: err, could this fail somehow?
node.state.root = hash_tree_root(node.state.data) node.state.root = hash_tree_root(node.state.data)
@ -431,166 +349,22 @@ proc proposeBlock(node: BeaconNode,
newBlock.signature = newBlock.signature =
await validator.signBlockProposal(node.state.data.fork, newBlock) await validator.signBlockProposal(node.state.data.fork, newBlock)
let blockRoot = hash_tree_root(newBlock)
# TODO return new BlockRef from add?
let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock)
info "Block proposed",
blck = shortLog(newBlock),
blockRoot = shortLog(newBlockRef.root),
validator = shortValidatorKey(node, validator.idx),
idx = validator.idx
# TODO what are we waiting for here? broadcast should never block, and never # TODO what are we waiting for here? broadcast should never block, and never
# fail... # fail...
await node.network.broadcast(topicBeaconBlocks, newBlock) await node.network.broadcast(topicBeaconBlocks, newBlock)
info "Block proposed", return newBlockRef
blck = shortLog(newBlock),
blockRoot = shortLog(Eth2Digest(data: signed_root(newBlock))),
validator = shortValidatorKey(node, validator.idx),
idx = validator.idx
proc scheduleBlockProposal(node: BeaconNode,
slot: Slot,
validator: AttachedValidator) =
# TODO:
# This function exists only to hide a bug with Nim's closures.
# If you inline it in `scheduleEpochActions`, you'll see the
# internal `doAssert` starting to fail.
doAssert validator != nil
let
at = node.slotStart(slot)
now = fastEpochTime()
if now > at:
warn "Falling behind on block proposals", at, now, slot
info "Scheduling block proposal",
validator = shortValidatorKey(node, validator.idx),
idx = validator.idx,
slot = humaneSlotNum(slot),
fromNow = (at - now) div 1000
addTimer(at) do (x: pointer) {.gcsafe.}:
# TODO timers are generally not accurate / guaranteed to fire at the right
# time - need to guard here against early / late firings
doAssert validator != nil
asyncCheck proposeBlock(node, validator, slot)
proc scheduleAttestation(node: BeaconNode,
validator: AttachedValidator,
slot: Slot,
shard: uint64,
committeeLen: int,
indexInCommittee: int) =
# TODO:
# This function exists only to hide a bug with Nim's closures.
# If you inline it in `scheduleEpochActions`, you'll see the
# internal `doAssert` starting to fail.
doAssert validator != nil
let
at = node.slotStart(slot)
now = fastEpochTime()
if now > at:
warn "Falling behind on attestations", at, now, slot
debug "Scheduling attestation",
validator = shortValidatorKey(node, validator.idx),
fromNow = (at - now) div 1000,
slot = humaneSlotNum(slot),
shard
addTimer(at) do (p: pointer) {.gcsafe.}:
doAssert validator != nil
asyncCheck makeAttestation(node, validator, slot,
shard, committeeLen, indexInCommittee)
proc scheduleEpochActions(node: BeaconNode, epoch: Epoch) =
## This schedules the required block proposals and
## attestations from our attached validators.
doAssert node != nil
doAssert epoch >= GENESIS_EPOCH,
"Epoch: " & $epoch & ", humane epoch: " & $humaneEpochNum(epoch)
# In case some late blocks dropped in..
let head = node.updateHead()
# Sanity check - verify that the current head block is not too far behind
# TODO what if the head block is too far ahead? that would be.. weird.
if head.slot.slot_to_epoch() + 1 < epoch:
# We're hopelessly behind!
#
# There's a few ways this can happen:
#
# * we receive no attestations or blocks for an extended period of time
# * all the attestations we receive are bogus - maybe we're connected to
# the wrong network?
# * we just started and still haven't synced
#
# TODO make an effort to find other nodes and sync? A worst case scenario
# here is that the network stalls because nobody is sending out
# attestations because nobody is scheduling them, in a vicious
# circle
# TODO diagnose the various scenarios and do something smart...
let
expectedSlot = node.state.data.getSlotFromTime()
nextSlot = expectedSlot + 1
at = node.slotStart(nextSlot)
notice "Delaying epoch scheduling, head too old - scheduling new attempt",
headSlot = humaneSlotNum(head.slot),
expectedEpoch = humaneEpochNum(epoch),
expectedSlot = humaneSlotNum(expectedSlot),
fromNow = (at - fastEpochTime()) div 1000
addTimer(at) do (p: pointer):
node.scheduleEpochActions(nextSlot.slot_to_epoch())
return
updateState(node.blockPool, node.state, head, epoch.get_epoch_start_slot())
# TODO: is this necessary with the new shuffling?
# see get_beacon_proposer_index
var nextState = node.state.data
# TODO we don't need to do anything at slot 0 - what about slots we missed
# if we got delayed above?
let start = if epoch == GENESIS_EPOCH: 1.uint64 else: 0.uint64
for i in start ..< SLOTS_PER_EPOCH:
let slot = (epoch * SLOTS_PER_EPOCH + i).Slot
nextState.slot = slot # ugly trick, see get_beacon_proposer_index
block: # Schedule block proposals
let proposerIdx = get_beacon_proposer_index(nextState, slot)
let validator = node.getAttachedValidator(proposerIdx)
if validator != nil:
# TODO:
# Warm-up the proposer earlier to try to obtain previous
# missing blocks if necessary
scheduleBlockProposal(node, slot, validator)
block: # Schedule attestations
for crosslink_committee in get_crosslink_committees_at_slot(
nextState, slot):
for i, validatorIdx in crosslink_committee.committee:
let validator = node.getAttachedValidator(validatorIdx)
if validator != nil:
scheduleAttestation(
node, validator, slot, crosslink_committee.shard,
crosslink_committee.committee.len, i)
let
# TODO we need to readjust here for wall clock time, in case computer
# goes to sleep for example, so that we don't walk epochs one by one
# to catch up.. we should also check the current head most likely to
# see if we're suspiciously off, in terms of wall clock vs head time.
nextEpoch = epoch + 1
at = node.slotStart(nextEpoch.get_epoch_start_slot())
info "Scheduling next epoch update",
fromNow = (at - fastEpochTime()) div 1000,
epoch = humaneEpochNum(nextEpoch)
addTimer(at) do (p: pointer):
node.scheduleEpochActions(nextEpoch)
proc fetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) = proc fetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) =
if roots.len == 0: return if roots.len == 0: return
@ -609,24 +383,6 @@ proc onFetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) =
# TODO should obviously not spam, but rather send it back to the requester # TODO should obviously not spam, but rather send it back to the requester
asyncCheck node.network.broadcast(topicBeaconBlocks, blck.get()) asyncCheck node.network.broadcast(topicBeaconBlocks, blck.get())
proc scheduleSlotStartActions(node: BeaconNode, slot: Slot) =
# TODO in this setup, we retry fetching blocks at the beginning of every slot,
# hoping that we'll get some before it's time to attest or propose - is
# there a better time to do this?
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)
let
nextSlot = slot + 1
at = node.slotStart(nextSlot)
info "Scheduling next slot start action block",
fromNow = (at - fastEpochTime()) div 1000,
slot = humaneSlotNum(nextSlot)
addTimer(at) do (p: pointer):
node.scheduleSlotStartActions(nextSlot)
proc onAttestation(node: BeaconNode, attestation: Attestation) = proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it # We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain # yet - in particular, we haven't verified that it belongs to particular chain
@ -645,10 +401,10 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
blck = shortLog(blck), blck = shortLog(blck),
blockRoot = shortLog(blockRoot) blockRoot = shortLog(blockRoot)
if not node.blockPool.add(node.state, blockRoot, blck): if node.blockPool.add(node.state, blockRoot, blck).isNil:
# TODO the fact that add returns a bool that causes the parent block to be # TODO this will cause us to fetch parent, even for invalid blocks.. fix
# pre-emptively fetched is quite ugly - fix.
node.fetchBlocks(@[blck.previous_block_root]) node.fetchBlocks(@[blck.previous_block_root])
return
# The block we received contains attestations, and we might not yet know about # The block we received contains attestations, and we might not yet know about
# all of them. Let's add them to the attestation pool - in case they block # all of them. Let's add them to the attestation pool - in case they block
@ -658,6 +414,215 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
# attestations! # attestations!
discard # node.onAttestation(attestation) discard # node.onAttestation(attestation)
proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
## Perform all attestations that the validators attached to this node should
## perform during the given slot
if slot + SLOTS_PER_EPOCH < head.slot:
# The latest block we know about is a lot newer than the slot we're being
# asked to attest to - this makes it unlikely that it will be included
# at all.
# TODO the oldest attestations allowed are those that are older than the
# finalized epoch.. also, it seems that posting very old attestations
# is risky from a slashing perspective. More work is needed here.
notice "Skipping attestation, head is too recent",
headSlot = humaneSlotNum(head.slot),
slot = humaneSlotNum(slot)
return
let attestationHead = head.findAncestorBySlot(slot)
if head != attestationHead:
# In rare cases, such as when we're busy syncing or just slow, we'll be
# attesting to a past state - we must then recreate the world as it looked
# like back then
notice "Attesting to a state in the past, falling behind?",
headSlot = humaneSlotNum(head.slot),
attestationHeadSlot = humaneSlotNum(attestationHead.slot),
attestationSlot = humaneSlotNum(slot)
# We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers.
node.blockPool.updateState(node.state, attestationHead, slot)
for crosslink_committee in get_crosslink_committees_at_slot(
node.state.data, slot):
for i, validatorIdx in crosslink_committee.committee:
let validator = node.getAttachedValidator(validatorIdx)
if validator != nil:
asyncDiscard makeAttestation(node, validator, node.state.data, head,
crosslink_committee.shard,
crosslink_committee.committee.len, i)
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
## Perform the proposal for the given slot, iff we have a validator attached
## that is supposed to do so, given the shuffling in head
# TODO here we advanced the state to the new slot, but later we'll be
# proposing for it - basically, we're selecting proposer based on an
# empty slot.. wait for the committee selection to settle, then
# revisit this - we should be able to advance behind
node.blockPool.updateState(node.state, head, slot)
let proposerIdx = get_beacon_proposer_index(node.state.data, slot)
let validator = node.getAttachedValidator(proposerIdx)
if validator != nil:
# TODO:
# Warm-up the proposer earlier to try to obtain previous
# missing blocks if necessary
return await proposeBlock(node, validator, head, slot)
debug "Expecting proposal",
headRoot = shortLog(head.root),
slot = humaneSlotNum(slot),
proposer = shortValidatorKey(node.state.data, proposerIdx)
return head
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## lastSlot: the last slot that we sucessfully processed, so we know where to
## start work from
## scheduledSlot: the slot that we were aiming for, in terms of timing
let
# The slot we should be at, according to the clock
slot = node.beaconClock.now().toSlot()
nextSlot = slot + 1
debug "Slot start",
lastSlot = humaneSlotNum(lastSlot),
scheduledSlot = humaneSlotNum(scheduledSlot),
slot = humaneSlotNum(slot)
# TODO in this setup, we retry fetching blocks at the beginning of every slot,
# hoping that we'll get some before it's time to attest or propose - is
# there a better time to do this?
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)
if slot < lastSlot:
# This can happen if the system clock changes time for example, and it's
# pretty bad
# TODO shut down? time either was or is bad, and PoS relies on accuracy..
warn "Beacon clock time moved back, rescheduling slot actions",
slot = humaneSlotNum(slot),
scheduledSlot = humaneSlotNum(scheduledSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)
return
if slot > lastSlot + SLOTS_PER_EPOCH:
# We've fallen behind more than an epoch - there's nothing clever we can
# do here really, except skip all the work and try again later.
# TODO how long should the period be? Using an epoch because that's roughly
# how long attestations remain interesting
# TODO should we shut down instead? clearly we're unable to keep up
warn "Unable to keep up, skipping ahead without doing work",
lastSlot = humaneSlotNum(lastSlot),
slot = humaneSlotNum(slot),
scheduledSlot = humaneSlotNum(scheduledSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer):
# We pass the current slot here to indicate that work should be skipped!
asyncCheck node.onSlotStart(slot, nextSlot)
return
# Whatever we do during the slot, we need to know the head, because this will
# give us a state to work with and thus a shuffling.
# TODO typically, what consitutes correct actions stays constant between slot
# updates and is stable across some epoch transitions as well - see how
# we can avoid recalculating everything here
var head = node.updateHead(slot)
# TODO if the head is very old, that is indicative of something being very
# wrong - us being out of sync or disconnected from the network - need
# to consider what to do in that case:
# * nothing - the other parts of the application will reconnect and
# start listening to broadcasts, learn a new head etc..
# risky, because the network might stall if everyone does
# this, because no blocks will be produced
# * shut down - this allows the user to notice and take action, but is
# kind of harsh
# * keep going - we create blocks and attestations as usual and send them
# out - if network conditions improve, fork choice should
# eventually select the correct head and the rest will
# disappear naturally - risky because user is not aware,
# and might lose stake on canonical chain but "just works"
# when reconnected..
# Right now, we keep going
var curSlot = lastSlot + 1
while curSlot < slot:
# Timers may be delayed because we're busy processing, and we might have
# more work to do. We'll try to do so in an expedited way.
# TODO maybe even collect all work synchronously to avoid unnecessary
# state rewinds while waiting for async operations like validator
# signature..
notice "Catching up",
curSlot = humaneSlotNum(curSlot),
lastSlot = humaneSlotNum(lastSlot),
slot = humaneSlotNum(slot)
# For every slot we're catching up, we'll propose then send
# attestations - head should normally be advancing along the same branch
# in this case
# TODO what if we receive blocks / attestations while doing this work?
head = await handleProposal(node, head, curSlot)
# For each slot we missed, we need to send out attestations - if we were
# proposing during this time, we'll use the newly proposed head, else just
# keep reusing the same - the attestation that goes out will actually
# rewind the state to what it looked like at the time of that slot
# TODO smells like there's an optimization opportunity here
handleAttestations(node, head, curSlot)
curSlot += 1
head = await handleProposal(node, head, slot)
# We've been doing lots of work up until now which took time. Normally, we
# send out attestations at the slot mid-point, so we go back to the clock
# to see how much time we need to wait.
# TODO the beacon clock might jump here also. It's probably easier to complete
# the work for the whole slot using a monotonic clock instead, then deal
# with any clock discrepancies once only, at the start of slot timer
# processing..
let
attestationStart = node.beaconClock.fromNow(slot)
halfSlot = seconds(int64(SECONDS_PER_SLOT div 2))
if attestationStart.inFuture or attestationStart.offset <= halfSlot:
debug "Waiting to send attestations",
slot = humaneSlotNum(slot),
fromNow = shortLog(attestationStart.offset + halfSlot)
await sleepAsync(attestationStart.offset + halfSlot)
# Time passed - we might need to select a new head in that case
head = node.updateHead(slot)
handleAttestations(node, head, slot)
# TODO ... and beacon clock might jump here also. sigh.
let
nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot))
info "Scheduling slot actions",
lastSlot = humaneSlotNum(slot),
slot = humaneSlotNum(slot),
nextSlot = humaneSlotNum(nextSlot),
fromNow = shortLog(nextSlotStart)
addTimer(nextSlotStart) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)
proc run*(node: BeaconNode) = proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock): waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
node.onBeaconBlock(blck) node.onBeaconBlock(blck)
@ -668,10 +633,19 @@ proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]): waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
node.onFetchBlocks(roots) node.onFetchBlocks(roots)
let nowSlot = node.state.data.getSlotFromTime() let
slot = node.beaconClock.now().toSlot()
startSlot =
if slot >= GENESIS_SLOT: slot + 1
else: GENESIS_SLOT + 1
fromNow = saturate(node.beaconClock.fromNow(startSlot))
node.scheduleEpochActions(nowSlot.slot_to_epoch()) info "Scheduling first slot action",
node.scheduleSlotStartActions(nowSlot) nextSlot = humaneSlotNum(startSlot),
fromNow = shortLog(fromNow)
addTimer(fromNow) do (p: pointer):
asyncCheck node.onSlotStart(startSlot - 1, startSlot)
runForever() runForever()
@ -689,11 +663,10 @@ proc start(node: BeaconNode) =
# prepared to handle it. # prepared to handle it.
waitFor node.connectToNetwork() waitFor node.connectToNetwork()
if not waitFor node.sync():
quit 1
info "Starting beacon node", info "Starting beacon node",
slotsSinceFinalization = node.state.data.slotDistanceFromNow(), slotsSinceFinalization =
int64(node.blockPool.finalizedHead.slot) -
int64(node.beaconClock.now()),
stateSlot = humaneSlotNum(node.state.data.slot), stateSlot = humaneSlotNum(node.state.data.slot),
SHARD_COUNT, SHARD_COUNT,
SLOTS_PER_EPOCH, SLOTS_PER_EPOCH,
@ -719,7 +692,7 @@ when isMainModule:
let initialState = get_genesis_beacon_state( let initialState = get_genesis_beacon_state(
deposits, deposits,
uint64(int(fastEpochTime() div 1000) + config.genesisOffset), uint64(times.toUnix(times.getTime()) + config.genesisOffset),
Eth1Data(), {}) Eth1Data(), {})
Json.saveFile(config.outputGenesis.string, initialState, pretty = true) Json.saveFile(config.outputGenesis.string, initialState, pretty = true)
@ -776,7 +749,6 @@ when isMainModule:
quit 1 quit 1
of noCommand: of noCommand:
waitFor synchronizeClock()
createPidFile(config.dataDir.string / "beacon_node.pid") createPidFile(config.dataDir.string / "beacon_node.pid")
var node = waitFor BeaconNode.init(config) var node = waitFor BeaconNode.init(config)
@ -785,4 +757,3 @@ when isMainModule:
dynamicLogScope(node = node.nickname): node.start() dynamicLogScope(node = node.nickname): node.start()
else: else:
node.start() node.start()

View File

@ -1,7 +1,8 @@
import # Beacon Node import # Beacon Node
eth/[p2p, keys], eth/[p2p, keys],
spec/[bitfield, digest], spec/[bitfield, digest],
beacon_chain_db, conf, mainchain_monitor, eth2_network beacon_chain_db, conf, mainchain_monitor, eth2_network,
./time
import # Attestation Pool import # Attestation Pool
spec/[bitfield, datatypes, crypto, digest], spec/[bitfield, datatypes, crypto, digest],
@ -33,14 +34,21 @@ type
keys*: KeyPair keys*: KeyPair
attachedValidators*: ValidatorPool attachedValidators*: ValidatorPool
blockPool*: BlockPool blockPool*: BlockPool
attestationPool*: AttestationPool
mainchainMonitor*: MainchainMonitor
beaconClock*: BeaconClock
state*: StateData ##\ state*: StateData ##\
## State cache object that's used as a scratch pad ## State cache object that's used as a scratch pad
## TODO this is pretty dangerous - for example if someone sets it ## TODO this is pretty dangerous - for example if someone sets it
## to a particular state then does `await`, it might change - prone to ## to a particular state then does `await`, it might change - prone to
## async races ## async races
attestationPool*: AttestationPool
mainchainMonitor*: MainchainMonitor justifiedStateCache*: StateData ##\
potentialHeads*: seq[Eth2Digest] ## A second state cache that's used during head selection, to avoid
## state replaying.
# TODO Something smarter, so we don't need to keep two full copies, wasteful
# ############################################# # #############################################
# #

View File

@ -136,13 +136,11 @@ proc updateState*(
proc add*( proc add*(
pool: var BlockPool, state: var StateData, blockRoot: Eth2Digest, pool: var BlockPool, state: var StateData, blockRoot: Eth2Digest,
blck: BeaconBlock): bool {.gcsafe.} = blck: BeaconBlock): BlockRef {.gcsafe.} =
## return false indicates that the block parent was missing and should be ## return the block, if resolved...
## fetched
## the state parameter may be updated to include the given block, if ## the state parameter may be updated to include the given block, if
## everything checks out ## everything checks out
# TODO reevaluate passing the state in like this # TODO reevaluate passing the state in like this
# TODO reevaluate this API - it's pretty ugly with the bool return
doAssert blockRoot == hash_tree_root(blck) doAssert blockRoot == hash_tree_root(blck)
# Already seen this block?? # Already seen this block??
@ -151,7 +149,7 @@ proc add*(
blck = shortLog(blck), blck = shortLog(blck),
blockRoot = shortLog(blockRoot) blockRoot = shortLog(blockRoot)
return true return pool.blocks[blockRoot]
# If the block we get is older than what we finalized already, we drop it. # If the block we get is older than what we finalized already, we drop it.
# One way this can happen is that we start resolving a block and finalization # One way this can happen is that we start resolving a block and finalization
@ -163,7 +161,7 @@ proc add*(
tailSlot = humaneSlotNum(pool.tail.slot), tailSlot = humaneSlotNum(pool.tail.slot),
blockRoot = shortLog(blockRoot) blockRoot = shortLog(blockRoot)
return true return
let parent = pool.blocks.getOrDefault(blck.previous_block_root) let parent = pool.blocks.getOrDefault(blck.previous_block_root)
@ -175,6 +173,8 @@ proc add*(
# The block is resolved, now it's time to validate it to ensure that the # The block is resolved, now it's time to validate it to ensure that the
# blocks we add to the database are clean for the given state # blocks we add to the database are clean for the given state
# TODO if the block is from the future, we should not be resolving it (yet),
# but maybe we should use it as a hint that our clock is wrong?
updateState(pool, state, parent, blck.slot - 1) updateState(pool, state, parent, blck.slot - 1)
if not updateState(state.data, parent.root, blck, {}): if not updateState(state.data, parent.root, blck, {}):
@ -223,7 +223,7 @@ proc add*(
for k, v in retries: for k, v in retries:
discard pool.add(state, k, v) discard pool.add(state, k, v)
return true return blockRef
# TODO possibly, it makes sense to check the database - that would allow sync # TODO possibly, it makes sense to check the database - that would allow sync
# to simply fill up the database with random blocks the other clients # to simply fill up the database with random blocks the other clients
@ -231,7 +231,7 @@ proc add*(
# junk that's not part of the block graph # junk that's not part of the block graph
if blck.previous_block_root in pool.unresolved: if blck.previous_block_root in pool.unresolved:
return true return
# This is an unresolved block - put it on the unresolved list for now... # This is an unresolved block - put it on the unresolved list for now...
# TODO if we receive spam blocks, one heurestic to implement might be to wait # TODO if we receive spam blocks, one heurestic to implement might be to wait
@ -250,8 +250,6 @@ proc add*(
pool.unresolved[blck.previous_block_root] = UnresolvedBlock() pool.unresolved[blck.previous_block_root] = UnresolvedBlock()
pool.pending[blockRoot] = blck pool.pending[blockRoot] = blck
false
proc get*(pool: BlockPool, blck: BlockRef): BlockData = proc get*(pool: BlockPool, blck: BlockRef): BlockData =
## Retrieve the associated block body of a block reference ## Retrieve the associated block body of a block reference
doAssert (not blck.isNil), "Trying to get nil BlockRef" doAssert (not blck.isNil), "Trying to get nil BlockRef"
@ -318,10 +316,13 @@ proc maybePutState(pool: BlockPool, state: BeaconState) =
# potentially save multiple states per slot if reorgs happen, meaning # potentially save multiple states per slot if reorgs happen, meaning
# we could easily see a state explosion # we could easily see a state explosion
if state.slot mod SLOTS_PER_EPOCH == 0: if state.slot mod SLOTS_PER_EPOCH == 0:
let root = hash_tree_root(state)
if not pool.db.containsState(root):
info "Storing state", info "Storing state",
stateSlot = humaneSlotNum(state.slot), stateSlot = humaneSlotNum(state.slot),
stateRoot = shortLog(hash_tree_root(state)) # TODO cache? stateRoot = shortLog(root)
pool.db.putState(state) pool.db.putState(root, state)
proc updateState*( proc updateState*(
pool: BlockPool, state: var StateData, blck: BlockRef, slot: Slot) = pool: BlockPool, state: var StateData, blck: BlockRef, slot: Slot) =
@ -438,11 +439,23 @@ proc updateHead*(pool: BlockPool, state: var StateData, blck: BlockRef) =
return return
let
lastHead = pool.head
pool.head = blck pool.head = blck
# Start off by making sure we have the right state # Start off by making sure we have the right state
updateState(pool, state, blck, blck.slot) updateState(pool, state, blck, blck.slot)
if lastHead != blck.parent:
notice "Updated head with new parent",
lastHeadRoot = shortLog(lastHead.root),
parentRoot = shortLog(blck.parent.root),
stateRoot = shortLog(state.root),
headBlockRoot = shortLog(state.blck.root),
stateSlot = humaneSlotNum(state.data.slot),
justifiedEpoch = humaneEpochNum(state.data.current_justified_epoch),
finalizedEpoch = humaneEpochNum(state.data.finalized_epoch)
else:
info "Updated head", info "Updated head",
stateRoot = shortLog(state.root), stateRoot = shortLog(state.root),
headBlockRoot = shortLog(state.blck.root), headBlockRoot = shortLog(state.blck.root),

View File

@ -1,51 +1,94 @@
import import
random,
chronos, chronos,
spec/[datatypes, helpers] spec/[datatypes]
from times import Time, getTime, fromUnix, `<`, `-`
type type
Timestamp* = uint64 # Unix epoch timestamp in millisecond resolution BeaconClock* = object
## The beacon clock represents time as it passes on a beacon chain. Beacon
## time is locked to unix time, starting at a particular offset set during
## beacon chain instantiation.
##
## Time on the beacon chain determines what actions should be taken and
## which blocks are valid - in particular, blocks are not valid if they
## come from the future as seen from the local clock.
##
## https://github.com/ethereum/eth2.0-specs/blob/v0.5.0/specs/core/0_beacon-chain.md#beacon-chain-processing
##
# TODO replace time in chronos with a proper unit type, then this code can
# follow:
# https://github.com/status-im/nim-chronos/issues/15
# TODO consider NTP and network-adjusted timestamps as outlined here:
# https://ethresear.ch/t/network-adjusted-timestamps/4187
genesis: Time
var BeaconTime* = distinct int64 ## Seconds from beacon genesis time
detectedClockDrift: int64
template now*: auto = fastEpochTime() proc init*(T: type BeaconClock, state: BeaconState): T =
## Initialize time from a beacon state. The genesis time of a beacon state is
## constant throughout its lifetime, so the state from any slot will do,
## including the genesis state.
# TODO underflow when genesis has not yet happened! let
proc timeSinceGenesis*(s: BeaconState): Timestamp = unixGenesis = fromUnix(state.genesis_time.int64)
Timestamp(int64(fastEpochTime() - s.genesis_time * 1000) - # GENESIS_SLOT offsets slot time, but to simplify calculations, we apply that
detectedClockDrift) # offset to genesis instead of applying it at every time conversion
unixGenesisOffset = times.seconds(int(GENESIS_SLOT * SECONDS_PER_SLOT))
proc getSlotFromTime*(s: BeaconState, t = now()): Slot = T(genesis: unixGenesis - unixGenesisOffset)
GENESIS_SLOT + uint64((int64(t - s.genesis_time * 1000) - detectedClockDrift) div
int64(SECONDS_PER_SLOT * 1000))
func slotStart*(s: BeaconState, slot: Slot): Timestamp = func toSlot*(t: BeaconTime): Slot =
(s.genesis_time + ((slot - GENESIS_SLOT) * SECONDS_PER_SLOT)) * 1000 Slot(uint64(t) div SECONDS_PER_SLOT)
func slotMiddle*(s: BeaconState, slot: Slot): Timestamp = func toBeaconTime*(c: BeaconClock, t: Time): BeaconTime =
s.slotStart(slot) + SECONDS_PER_SLOT * 500 doAssert t > c.genesis,
"Cannot represent time before genesis, fix BeaconClock"
func slotEnd*(s: BeaconState, slot: Slot): Timestamp = BeaconTime(times.seconds(t - c.genesis).uint64)
# TODO this is actually past the end, by nim inclusive semantics (sigh)
s.slotStart(slot + 1)
proc randomTimeInSlot*(s: BeaconState, func toSlot*(c: BeaconClock, t: Time): Slot =
slot: Slot, c.toBeaconTime(t).toSlot()
interval: HSlice[float, float]): Timestamp =
## Returns a random moment within the slot.
## The interval must be a sub-interval of [0..1].
## Zero marks the begginning of the slot and One marks the end.
s.slotStart(slot) + Timestamp(rand(interval) * float(SECONDS_PER_SLOT * 1000))
proc slotDistanceFromNow*(s: BeaconState): int64 = func toBeaconTime*(s: Slot, offset = chronos.seconds(0)): BeaconTime =
## Returns how many slots have passed since a particular BeaconState was finalized BeaconTime(int64(uint64(s) * SECONDS_PER_SLOT) + seconds(offset))
int64(s.getSlotFromTime() - s.finalized_epoch.get_epoch_start_slot)
proc synchronizeClock*() {.async.} = proc now*(c: BeaconClock): BeaconTime =
## This should determine the offset of the local clock against a global ## Current time, in slots - this may end up being less than GENESIS_SLOT(!)
## trusted time (e.g. it can be obtained from multiple time servers). toBeaconTime(c, getTime())
# TODO: implement this properly proc fromNow*(c: BeaconClock, t: BeaconTime): tuple[inFuture: bool, offset: Duration] =
detectedClockDrift = 0 let now = c.now()
if int64(t) > int64(now):
(true, seconds(int64(t) - int64(now)))
else:
(false, seconds(int64(now) - int64(t)))
proc fromNow*(c: BeaconClock, slot: Slot): tuple[inFuture: bool, offset: Duration] =
c.fromNow(slot.toBeaconTime())
proc saturate*(d: tuple[inFuture: bool, offset: Duration]): Duration =
if d.inFuture: d.offset else: seconds(0)
proc addTimer*(fromNow: Duration, cb: CallbackFunc, udata: pointer = nil) =
addTimer(Moment.now() + fromNow, cb, udata)
proc shortLog*(d: Duration): string =
let dd = int64(d.milliseconds())
if dd < 1000:
$dd & "ms"
elif dd < 60 * 1000:
$(dd div 1000) & "s"
elif dd < 60 * 60 * 1000:
let secs = dd div 1000
var tmp = $(secs div 60) & "m"
if (let frac = secs mod 60; frac > 0):
tmp &= $frac & "s"
tmp
else:
let mins = dd div 60 * 1000
var tmp = $(mins div 60) & "h"
if (let frac = mins mod 60; frac > 0):
tmp &= $frac & "m"
tmp

View File

@ -3,7 +3,8 @@ import
spec/[datatypes, crypto, digest, beaconstate], beacon_chain_db, conf spec/[datatypes, crypto, digest, beaconstate], beacon_chain_db, conf
const const
WEAK_SUBJECTVITY_PERIOD* = uint64(4 * 30 * 24 * 60 * 60) div SECONDS_PER_SLOT WEAK_SUBJECTVITY_PERIOD* =
Slot(uint64(4 * 30 * 24 * 60 * 60) div SECONDS_PER_SLOT)
# TODO: This needs revisiting. # TODO: This needs revisiting.
# Why was the validator WITHDRAWAL_PERIOD altered in the spec? # Why was the validator WITHDRAWAL_PERIOD altered in the spec?
@ -27,5 +28,7 @@ proc obtainTrustedStateSnapshot*(db: BeaconChainDB): Future[BeaconState] {.async
# #
# 5. Check that the state snapshot hash is correct and save it in the DB. # 5. Check that the state snapshot hash is correct and save it in the DB.
# TODO or just pass a state root via command line?
doAssert(false, "Not implemented") doAssert(false, "Not implemented")

View File

@ -46,7 +46,7 @@ cli do (validators: int = 125000,
deposit = Deposit( deposit = Deposit(
deposit_data: DepositData( deposit_data: DepositData(
amount: MAX_DEPOSIT_AMOUNT, amount: MAX_DEPOSIT_AMOUNT,
timestamp: now(), timestamp: 0, # TODO https://github.com/ethereum/eth2.0-specs/pull/834
deposit_input: DepositInput( deposit_input: DepositInput(
pubkey: pubKey, pubkey: pubKey,
proof_of_possession: proofOfPossession, proof_of_possession: proofOfPossession,