Move some attestation/block logic out of beacon node

* state data cache in block pool
* keep head state around
* more attestation logic in attestation pool
* first fork choice tests (!)
* fix fork choice (it's still likely broken / out of date)
This commit is contained in:
Jacek Sieka 2019-12-19 14:02:28 +01:00 committed by tersec
parent ea4afd7454
commit b994da78a7
9 changed files with 486 additions and 369 deletions

View File

@ -1,9 +1,8 @@
import
deques, sequtils, tables,
chronicles, stew/bitseqs, json_serialization/std/sets,
chronicles, stew/[bitseqs, byteutils], json_serialization/std/sets,
./spec/[beaconstate, datatypes, crypto, digest, helpers, validator],
./extras, ./ssz, ./block_pool,
beacon_node_types
./extras, ./ssz, ./block_pool, ./beacon_node_types
logScope: topics = "attpool"
@ -146,19 +145,33 @@ func get_attesting_indices_seq(state: BeaconState,
toSeq(items(get_attesting_indices(
state, attestation_data, bits, cache)))
proc add*(pool: var AttestationPool,
state: BeaconState,
blck: BlockRef,
attestation: Attestation) =
# TODO there are constraints on the state and block being passed in here
# but what these are is unclear.. needs analyzing from a high-level
# perspective / spec intent
# TODO should update the state correctly in here instead of forcing the caller
# to do it...
logScope: pcs = "atp_add_attestation"
func addUnresolved(pool: var AttestationPool, attestation: Attestation) =
pool.unresolved[attestation.data.beacon_block_root] =
UnresolvedAttestation(
attestation: attestation,
)
proc addResolved(pool: var AttestationPool, blck: BlockRef, attestation: Attestation) =
doAssert blck.root == attestation.data.beacon_block_root
# TODO Which state should we use to validate the attestation? It seems
# reasonable to involve the head being voted for as well as the intended
# slot of the attestation - double-check this with spec
# A basic check is that the attestation is at least as new as the block being
# voted for..
if blck.slot > attestation.data.slot:
notice "Invalid attestation (too new!)",
attestationData = shortLog(attestation.data),
blockSlot = shortLog(blck.slot)
return
updateStateData(
pool.blockPool, pool.blockPool.tmpState,
BlockSlot(blck: blck, slot: attestation.data.slot))
template state(): BeaconState = pool.blockPool.tmpState.data.data
if not validate(state, attestation):
notice "Invalid attestation",
attestationData = shortLog(attestation.data),
@ -245,11 +258,16 @@ proc add*(pool: var AttestationPool,
validations = 1,
cat = "filtering"
func addUnresolved*(pool: var AttestationPool, attestation: Attestation) =
pool.unresolved[attestation.data.beacon_block_root] =
UnresolvedAttestation(
attestation: attestation,
)
proc add*(pool: var AttestationPool, attestation: Attestation) =
logScope: pcs = "atp_add_attestation"
let blck = pool.blockPool.getOrResolve(attestation.data.beacon_block_root)
if blck.isNil:
pool.addUnresolved(attestation)
return
pool.addResolved(blck, attestation)
proc getAttestationsForBlock*(
pool: AttestationPool, state: BeaconState,
@ -333,7 +351,9 @@ proc getAttestationsForBlock*(
if result.len >= MAX_ATTESTATIONS:
return
proc resolve*(pool: var AttestationPool, cache: var StateData) =
proc resolve*(pool: var AttestationPool) =
logScope: pcs = "atp_resolve"
var
done: seq[Eth2Digest]
resolved: seq[tuple[blck: BlockRef, attestation: Attestation]]
@ -351,11 +371,71 @@ proc resolve*(pool: var AttestationPool, cache: var StateData) =
pool.unresolved.del(k)
for a in resolved:
pool.blockPool.updateStateData(
cache, BlockSlot(blck: a.blck, slot: a.blck.slot))
pool.add(cache.data.data, a.blck, a.attestation)
pool.addResolved(a.blck, a.attestation)
func latestAttestation*(
pool: AttestationPool, pubKey: ValidatorPubKey): BlockRef =
pool.latestAttestations.getOrDefault(pubKey)
# https://github.com/ethereum/eth2.0-specs/blob/v0.8.4/specs/core/0_fork-choice.md
# The structure of this code differs from the spec since we use a different
# strategy for storing states and justification points - it should nonetheless
# be close in terms of functionality.
func lmdGhost*(
pool: AttestationPool, start_state: BeaconState,
start_block: BlockRef): BlockRef =
# TODO: a Fenwick Tree datastructure to keep track of cumulated votes
# in O(log N) complexity
# https://en.wikipedia.org/wiki/Fenwick_tree
# Nim implementation for cumulative frequencies at
# https://github.com/numforge/laser/blob/990e59fffe50779cdef33aa0b8f22da19e1eb328/benchmarks/random_sampling/fenwicktree.nim
let
active_validator_indices =
get_active_validator_indices(
start_state, compute_epoch_at_slot(start_state.slot))
var latest_messages: seq[tuple[validator: ValidatorIndex, blck: BlockRef]]
for i in active_validator_indices:
let pubKey = start_state.validators[i].pubkey
if (let vote = pool.latestAttestation(pubKey); not vote.isNil):
latest_messages.add((i, vote))
template get_latest_attesting_balance(blck: BlockRef): uint64 =
var res: uint64
for validator_index, target in latest_messages.items():
if get_ancestor(target, blck.slot) == blck:
res += start_state.validators[validator_index].effective_balance
res
var head = start_block
while true:
if head.children.len() == 0:
return head
if head.children.len() == 1:
head = head.children[0]
else:
var
winner = head.children[0]
winCount = get_latest_attesting_balance(winner)
for i in 1..<head.children.len:
let
candidate = head.children[i]
candCount = get_latest_attesting_balance(candidate)
if (candCount > winCount) or
((candCount == winCount and candidate.root.data < winner.root.data)):
winner = candidate
winCount = candCount
head = winner
proc selectHead*(pool: AttestationPool): BlockRef =
let
justifiedHead = pool.blockPool.latestJustifiedBlock()
let newHead =
lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck)
newHead

View File

@ -10,7 +10,7 @@ import
# Local modules
spec/[datatypes, digest, crypto, beaconstate, helpers, validator, network],
conf, time, state_transition, fork_choice, beacon_chain_db,
conf, time, state_transition, beacon_chain_db,
validator_pool, extras, attestation_pool, block_pool, eth2_network,
beacon_node_types, mainchain_monitor, version, ssz, ssz/dynamic_navigator,
sync_protocol, request_manager, validator_keygen, interop, statusbar
@ -57,17 +57,6 @@ type
mainchainMonitor: MainchainMonitor
beaconClock: BeaconClock
stateCache: StateData ##\
## State cache object that's used as a scratch pad
## TODO this is pretty dangerous - for example if someone sets it
## to a particular state then does `await`, it might change - prone to
## async races
justifiedStateCache: StateData ##\
## A second state cache that's used during head selection, to avoid
## state replaying.
# TODO Something smarter, so we don't need to keep two full copies, wasteful
proc onBeaconBlock*(node: BeaconNode, blck: SignedBeaconBlock) {.gcsafe.}
proc updateHead(node: BeaconNode): BlockRef
@ -238,12 +227,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
onBeaconBlock(result, signedBlock))
result.stateCache = result.blockPool.loadTailState()
result.justifiedStateCache = result.stateCache
let addressFile = string(conf.dataDir) / "beacon_node.address"
result.network.saveConnectionAddressFile(addressFile)
result.beaconClock = BeaconClock.init(result.stateCache.data.data)
result.beaconClock = BeaconClock.init(result.blockPool.headState.data.data)
when useInsecureFeatures:
if conf.metricsServer:
@ -251,22 +237,6 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
info "Starting metrics HTTP server", address = metricsAddress, port = conf.metricsServerPort
metrics.startHttpServer(metricsAddress, Port(conf.metricsServerPort))
template withState(
pool: BlockPool, cache: var StateData, blockSlot: BlockSlot, body: untyped): untyped =
## Helper template that updates state to a particular BlockSlot - usage of
## cache is unsafe outside of block.
## TODO async transformations will lead to a race where cache gets updated
## while waiting for future to complete - catch this here somehow?
updateStateData(pool, cache, blockSlot)
template hashedState(): HashedBeaconState {.inject, used.} = cache.data
template state(): BeaconState {.inject, used.} = cache.data.data
template blck(): BlockRef {.inject, used.} = cache.blck
template root(): Eth2Digest {.inject, used.} = cache.data.root
body
proc connectToNetwork(node: BeaconNode) {.async.} =
if node.bootstrapNodes.len > 0:
info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes
@ -335,21 +305,15 @@ proc isSynced(node: BeaconNode, head: BlockRef): bool =
true
proc updateHead(node: BeaconNode): BlockRef =
# Use head state for attestation resolution below
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(node.stateCache)
node.attestationPool.resolve()
# TODO move all of this logic to BlockPool
# Grab the new head according to our latest attestation data
let newHead = node.attestationPool.selectHead()
let
justifiedHead = node.blockPool.latestJustifiedBlock()
let newHead = node.blockPool.withState(
node.justifiedStateCache, justifiedHead):
lmdGhost(node.attestationPool, state, justifiedHead.blck)
node.blockPool.updateHead(node.stateCache, newHead)
# Store the new head in the block pool - this may cause epochs to be
# justified and finalized
node.blockPool.updateHead(newHead)
beacon_head_root.set newHead.root.toGaugeValue
newHead
@ -430,7 +394,7 @@ proc proposeBlock(node: BeaconNode,
# Advance state to the slot immediately preceding the one we're creating a
# block for - potentially we will be processing empty slots along the way.
let (nroot, nblck) = node.blockPool.withState(
node.stateCache, BlockSlot(blck: head, slot: slot - 1)):
node.blockPool.tmpState, BlockSlot(blck: head, slot: slot - 1)):
let (eth1data, deposits) =
if node.mainchainMonitor.isNil:
(get_eth1data_stub(
@ -476,7 +440,7 @@ proc proposeBlock(node: BeaconNode,
(blockRoot, newBlock)
let newBlockRef = node.blockPool.add(node.stateCache, nroot, nblck)
let newBlockRef = node.blockPool.add(nroot, nblck)
if newBlockRef == nil:
warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock.message),
@ -516,36 +480,25 @@ proc onAttestation(node: BeaconNode, attestation: Attestation) =
signature = shortLog(attestation.signature),
cat = "consensus" # Tag "consensus|attestation"?
if (let attestedBlock = node.blockPool.getOrResolve(
attestation.data.beacon_block_root); attestedBlock != nil):
let
wallSlot = node.beaconClock.now().toSlot()
head = node.blockPool.head
let
wallSlot = node.beaconClock.now().toSlot()
head = node.blockPool.head
if not wallSlot.afterGenesis or wallSlot.slot < head.blck.slot:
warn "Received attestation before genesis or head - clock is wrong?",
afterGenesis = wallSlot.afterGenesis,
wallSlot = shortLog(wallSlot.slot),
headSlot = shortLog(head.blck.slot),
cat = "clock_drift" # Tag "attestation|clock_drift"?
return
if not wallSlot.afterGenesis or wallSlot.slot < head.blck.slot:
warn "Received attestation before genesis or head - clock is wrong?",
afterGenesis = wallSlot.afterGenesis,
wallSlot = shortLog(wallSlot.slot),
headSlot = shortLog(head.blck.slot),
cat = "clock_drift" # Tag "attestation|clock_drift"?
return
# TODO seems reasonable to use the latest head state here.. needs thinking
# though - maybe we should use the state from the block pointed to by
# the attestation for some of the check? Consider interop with block
# production!
if attestation.data.slot > head.blck.slot and
(attestation.data.slot - head.blck.slot) > maxEmptySlotCount:
warn "Ignoring attestation, head block too old (out of sync?)",
attestationSlot = attestation.data.slot, headSlot = head.blck.slot
else:
let
bs = BlockSlot(blck: head.blck, slot: wallSlot.slot)
if attestation.data.slot > head.blck.slot and
(attestation.data.slot - head.blck.slot) > maxEmptySlotCount:
warn "Ignoring attestation, head block too old (out of sync?)",
attestationSlot = attestation.data.slot, headSlot = head.blck.slot
return
node.blockPool.withState(node.stateCache, bs):
node.attestationPool.add(state, attestedBlock, attestation)
else:
node.attestationPool.addUnresolved(attestation)
node.attestationPool.add(attestation)
proc onBeaconBlock(node: BeaconNode, blck: SignedBeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
@ -559,7 +512,7 @@ proc onBeaconBlock(node: BeaconNode, blck: SignedBeaconBlock) =
beacon_blocks_received.inc()
if node.blockPool.add(node.stateCache, blockRoot, blck).isNil:
if node.blockPool.add(blockRoot, blck).isNil:
return
# The block we received contains attestations, and we might not yet know about
@ -618,7 +571,7 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# epoch since it doesn't change, but that has to be weighed against
# the complexity of handling forks correctly - instead, we use an adapted
# version here that calculates the committee for a single slot only
node.blockPool.withState(node.stateCache, attestationHead):
node.blockPool.withState(node.blockPool.tmpState, attestationHead):
var cache = get_empty_per_epoch_cache()
let committees_per_slot = get_committee_count_at_slot(state, slot)
@ -646,7 +599,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
# empty slot.. wait for the committee selection to settle, then
# revisit this - we should be able to advance behind
var cache = get_empty_per_epoch_cache()
node.blockPool.withState(node.stateCache, BlockSlot(blck: head, slot: slot)):
node.blockPool.withState(node.blockPool.tmpState, BlockSlot(blck: head, slot: slot)):
let proposerIdx = get_beacon_proposer_index(state, cache)
if proposerIdx.isNone:
notice "Missing proposer index",
@ -941,7 +894,7 @@ proc start(node: BeaconNode) =
let
bs = BlockSlot(blck: head.blck, slot: head.blck.slot)
node.blockPool.withState(node.stateCache, bs):
node.blockPool.withState(node.blockPool.tmpState, bs):
node.addLocalValidators(state)
node.run()
@ -1034,9 +987,9 @@ when hasPrompt:
of "attached_validators_balance":
var balance = uint64(0)
# TODO slow linear scan!
for idx, b in node.stateCache.data.data.balances:
for idx, b in node.blockPool.headState.data.data.balances:
if node.getAttachedValidator(
node.stateCache.data.data, ValidatorIndex(idx)) != nil:
node.blockPool.headState.data.data, ValidatorIndex(idx)) != nil:
balance += b
formatGwei(balance)

View File

@ -138,6 +138,11 @@ type
inAdd*: bool
headState*: StateData ## State given by the head block
justifiedState*: StateData ## Latest justified state, as seen from the head
tmpState*: StateData ## Scratchpad - may be any state
MissingBlock* = object
slots*: uint64 # number of slots that are suspected missing
tries*: int

View File

@ -8,6 +8,28 @@ declareCounter beacon_reorgs_total, "Total occurrences of reorganizations of the
logScope: topics = "blkpool"
proc updateStateData*(
pool: BlockPool, state: var StateData, bs: BlockSlot) {.gcsafe.}
proc add*(
pool: var BlockPool, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): BlockRef {.gcsafe.}
template withState*(
pool: BlockPool, cache: var StateData, blockSlot: BlockSlot, body: untyped): untyped =
## Helper template that updates state to a particular BlockSlot - usage of
## cache is unsafe outside of block.
## TODO async transformations will lead to a race where cache gets updated
## while waiting for future to complete - catch this here somehow?
updateStateData(pool, cache, blockSlot)
template hashedState(): HashedBeaconState {.inject, used.} = cache.data
template state(): BeaconState {.inject, used.} = cache.data.data
template blck(): BlockRef {.inject, used.} = cache.blck
template root(): Eth2Digest {.inject, used.} = cache.data.root
body
func parent*(bs: BlockSlot): BlockSlot =
BlockSlot(
blck: if bs.slot > bs.blck.slot: bs.blck else: bs.blck.parent,
@ -40,6 +62,27 @@ func isAncestorOf*(a, b: BlockRef): bool =
doAssert b.slot > b.parent.slot
b = b.parent
func getAncestor*(blck: BlockRef, slot: Slot): BlockRef =
var blck = blck
var depth = 0
const maxDepth = (100'i64 * 365 * 24 * 60 * 60 div SECONDS_PER_SLOT.int)
while true:
if blck.slot == slot:
return blck
if blck.slot < slot:
return nil
if blck.parent == nil:
return nil
doAssert depth < maxDepth
depth += 1
blck = blck.parent
func init*(T: type BlockRef, root: Eth2Digest, slot: Slot): BlockRef =
BlockRef(
root: root,
@ -114,6 +157,10 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
let slot = db.getBlock(b.root).get().message.slot
blocksBySlot.mgetOrPut(slot, @[]).add(b)
# TODO can't do straight init because in mainnet config, there are too
# many live beaconstates on the stack...
var tmpState = new Option[BeaconState]
let
# The head state is necessary to find out what we considered to be the
# finalized epoch last time we saved something.
@ -127,14 +174,17 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
# be the latest justified state or newer, meaning it's enough for
# establishing what we consider to be the finalized head. This logic
# will need revisiting however
headState = db.getState(headStateRoot).get()
tmpState[] = db.getState(headStateRoot)
let
finalizedHead =
headRef.findAncestorBySlot(
headState.finalized_checkpoint.epoch.compute_start_slot_at_epoch())
tmpState[].get().finalized_checkpoint.epoch.compute_start_slot_at_epoch())
justifiedSlot =
headState.current_justified_checkpoint.epoch.compute_start_slot_at_epoch()
tmpState[].get().current_justified_checkpoint.epoch.compute_start_slot_at_epoch()
justifiedHead = headRef.findAncestorBySlot(justifiedSlot)
head = Head(blck: headRef, justified: justifiedHead)
justifiedBlock = db.getBlock(justifiedHead.blck.root).get()
justifiedStateRoot = justifiedBlock.message.state_root
doAssert justifiedHead.slot >= finalizedHead.slot,
"justified head comes before finalized head - database corrupt?"
@ -143,7 +193,7 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
head = head.blck, finalizedHead, tail = tailRef,
totalBlocks = blocks.len, totalKnownSlots = blocksBySlot.len
BlockPool(
let res = BlockPool(
pending: initTable[Eth2Digest, SignedBeaconBlock](),
missing: initTable[Eth2Digest, MissingBlock](),
blocks: blocks,
@ -152,9 +202,22 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
head: head,
finalizedHead: finalizedHead,
db: db,
heads: @[head]
heads: @[head],
)
res.headState = StateData(
data: HashedBeaconState(data: tmpState[].get(), root: headStateRoot),
blck: headRef)
res.tmpState = res.headState
tmpState[] = db.getState(justifiedStateRoot)
res.justifiedState = StateData(
data: HashedBeaconState(data: tmpState[].get(), root: justifiedStateRoot),
blck: justifiedHead.blck)
res
proc addSlotMapping(pool: BlockPool, br: BlockRef) =
proc addIfMissing(s: var seq[BlockRef], v: BlockRef) =
if v notin s:
@ -171,13 +234,6 @@ proc delSlotMapping(pool: BlockPool, br: BlockRef) =
else:
pool.blocksBySlot[br.slot] = blks
proc updateStateData*(
pool: BlockPool, state: var StateData, bs: BlockSlot) {.gcsafe.}
proc add*(
pool: var BlockPool, state: var StateData, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): BlockRef {.gcsafe.}
proc addResolvedBlock(
pool: var BlockPool, state: var StateData, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock, parent: BlockRef): BlockRef =
@ -245,14 +301,14 @@ proc addResolvedBlock(
while keepGoing:
let retries = pool.pending
for k, v in retries:
discard pool.add(state, k, v)
discard pool.add(k, v)
# Keep going for as long as the pending pool is shrinking
# TODO inefficient! so what?
keepGoing = pool.pending.len < retries.len
blockRef
proc add*(
pool: var BlockPool, state: var StateData, blockRoot: Eth2Digest,
pool: var BlockPool, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): BlockRef {.gcsafe.} =
## return the block, if resolved...
## the state parameter may be updated to include the given block, if
@ -290,6 +346,16 @@ proc add*(
let parent = pool.blocks.getOrDefault(blck.parent_root)
if parent != nil:
if parent.slot >= blck.slot:
# TODO Malicious block? inform peer pool?
notice "Invalid block slot",
blck = shortLog(blck),
blockRoot = shortLog(blockRoot),
parentRoot = shortLog(parent.root),
parentSlot = shortLog(parent.slot)
return
# The block might have been in either of these - we don't want any more
# work done on its behalf
pool.pending.del(blockRoot)
@ -299,9 +365,9 @@ proc add*(
# TODO if the block is from the future, we should not be resolving it (yet),
# but maybe we should use it as a hint that our clock is wrong?
updateStateData(pool, state, BlockSlot(blck: parent, slot: blck.slot - 1))
updateStateData(pool, pool.tmpState, BlockSlot(blck: parent, slot: blck.slot - 1))
if not state_transition(state.data, blck, {}):
if not state_transition(pool.tmpState.data, blck, {}):
# TODO find a better way to log all this block data
notice "Invalid block",
blck = shortLog(blck),
@ -310,7 +376,9 @@ proc add*(
return
return pool.addResolvedBlock(state, blockRoot, signedBlock, parent)
# Careful, pool.tmpState is now partially inconsistent and will be updated
# inside addResolvedBlock
return pool.addResolvedBlock(pool.tmpState, blockRoot, signedBlock, parent)
# TODO already checked hash though? main reason to keep this is because
# the pending pool calls this function back later in a loop, so as long
@ -505,7 +573,6 @@ proc maybePutState(pool: BlockPool, state: HashedBeaconState, blck: BlockRef) =
# TODO this is out of sync with epoch def now, I think -- (slot + 1) mod foo.
logScope: pcs = "save_state_at_epoch_start"
if state.data.slot mod SLOTS_PER_EPOCH == 0:
if not pool.db.containsState(state.root):
info "Storing state",
@ -531,7 +598,6 @@ proc rewindState(pool: BlockPool, state: var StateData, bs: BlockSlot):
# chain of ancestors of the new block. We will do this by loading each
# successive parent block and checking if we can find the corresponding state
# in the database.
var
stateRoot = pool.db.getStateRoot(bs.blck.root, bs.slot)
curBs = bs
@ -561,7 +627,7 @@ proc rewindState(pool: BlockPool, state: var StateData, bs: BlockSlot):
doAssert false, "Oh noes, we passed big bang!"
let
ancestor = ancestors[^1]
ancestor = ancestors.pop()
ancestorState = pool.db.getState(stateRoot.get())
if ancestorState.isNone():
@ -616,7 +682,7 @@ proc updateStateData*(pool: BlockPool, state: var StateData, bs: BlockSlot) =
# Time to replay all the blocks between then and now. We skip one because
# it's the one that we found the state with, and it has already been
# applied
for i in countdown(ancestors.len - 2, 0):
for i in countdown(ancestors.len - 1, 0):
let ok =
skipAndUpdateState(state.data, ancestors[i].data.message, {skipValidation}) do(
state: HashedBeaconState):
@ -677,65 +743,71 @@ proc setTailBlock(pool: BlockPool, newTail: BlockRef) =
slot = newTail.slot,
root = shortLog(newTail.root)
proc updateHead*(pool: BlockPool, state: var StateData, blck: BlockRef) =
proc updateHead*(pool: BlockPool, newHead: BlockRef) =
## Update what we consider to be the current head, as given by the fork
## choice.
## The choice of head affects the choice of finalization point - the order
## of operations naturally becomes important here - after updating the head,
## blocks that were once considered potential candidates for a tree will
## now fall from grace, or no longer be considered resolved.
doAssert blck.parent != nil or blck.slot == 0
doAssert newHead.parent != nil or newHead.slot == 0
logScope: pcs = "fork_choice"
if pool.head.blck == blck:
if pool.head.blck == newHead:
info "No head block update",
headBlockRoot = shortLog(blck.root),
headBlockSlot = shortLog(blck.slot),
headBlockRoot = shortLog(newHead.root),
headBlockSlot = shortLog(newHead.slot),
cat = "fork_choice"
return
let
lastHead = pool.head
pool.db.putHeadBlock(blck.root)
pool.db.putHeadBlock(newHead.root)
# Start off by making sure we have the right state
updateStateData(pool, state, BlockSlot(blck: blck, slot: blck.slot))
let justifiedSlot = state.data.data
.current_justified_checkpoint
.epoch
.compute_start_slot_at_epoch()
pool.head = Head(blck: blck, justified: blck.findAncestorBySlot(justifiedSlot))
updateStateData(
pool, pool.headState, BlockSlot(blck: newHead, slot: newHead.slot))
if lastHead.blck != blck.parent:
let
justifiedSlot = pool.headState.data.data
.current_justified_checkpoint
.epoch
.compute_start_slot_at_epoch()
justifiedBS = newHead.findAncestorBySlot(justifiedSlot)
pool.head = Head(blck: newHead, justified: justifiedBS)
updateStateData(pool, pool.justifiedState, justifiedBS)
# TODO isAncestorOf may be expensive - too expensive?
if not lastHead.blck.isAncestorOf(newHead):
info "Updated head block (new parent)",
lastHeadRoot = shortLog(lastHead.blck.root),
parentRoot = shortLog(blck.parent.root),
stateRoot = shortLog(state.data.root),
headBlockRoot = shortLog(state.blck.root),
stateSlot = shortLog(state.data.data.slot),
justifiedEpoch = shortLog(state.data.data.current_justified_checkpoint.epoch),
finalizedEpoch = shortLog(state.data.data.finalized_checkpoint.epoch),
parentRoot = shortLog(newHead.parent.root),
stateRoot = shortLog(pool.headState.data.root),
headBlockRoot = shortLog(pool.headState.blck.root),
stateSlot = shortLog(pool.headState.data.data.slot),
justifiedEpoch = shortLog(pool.headState.data.data.current_justified_checkpoint.epoch),
finalizedEpoch = shortLog(pool.headState.data.data.finalized_checkpoint.epoch),
cat = "fork_choice"
# A reasonable criterion for "reorganizations of the chain"
# TODO if multiple heads have gotten skipped, could fire at
# spurious times - for example when multiple blocks have been added between
# head updates
beacon_reorgs_total.inc()
else:
info "Updated head block",
stateRoot = shortLog(state.data.root),
headBlockRoot = shortLog(state.blck.root),
stateSlot = shortLog(state.data.data.slot),
justifiedEpoch = shortLog(state.data.data.current_justified_checkpoint.epoch),
finalizedEpoch = shortLog(state.data.data.finalized_checkpoint.epoch),
stateRoot = shortLog(pool.headState.data.root),
headBlockRoot = shortLog(pool.headState.blck.root),
stateSlot = shortLog(pool.headState.data.data.slot),
justifiedEpoch = shortLog(pool.headState.data.data.current_justified_checkpoint.epoch),
finalizedEpoch = shortLog(pool.headState.data.data.finalized_checkpoint.epoch),
cat = "fork_choice"
let
finalizedEpochStartSlot = state.data.data.finalized_checkpoint.epoch.compute_start_slot_at_epoch()
finalizedEpochStartSlot =
pool.headState.data.data.finalized_checkpoint.epoch.
compute_start_slot_at_epoch()
# TODO there might not be a block at the epoch boundary - what then?
finalizedHead = blck.findAncestorBySlot(finalizedEpochStartSlot)
finalizedHead = newHead.findAncestorBySlot(finalizedEpochStartSlot)
doAssert (not finalizedHead.blck.isNil),
"Block graph should always lead to a finalized block"
@ -744,8 +816,8 @@ proc updateHead*(pool: BlockPool, state: var StateData, blck: BlockRef) =
info "Finalized block",
finalizedBlockRoot = shortLog(finalizedHead.blck.root),
finalizedBlockSlot = shortLog(finalizedHead.slot),
headBlockRoot = shortLog(blck.root),
headBlockSlot = shortLog(blck.slot),
headBlockRoot = shortLog(newHead.root),
headBlockSlot = shortLog(newHead.slot),
cat = "fork_choice"
pool.finalizedHead = finalizedHead

View File

@ -1,70 +0,0 @@
import
deques, options, sequtils, tables,
./spec/[datatypes, crypto, helpers],
./attestation_pool, ./beacon_node_types, ./ssz
func get_ancestor(blck: BlockRef, slot: Slot): BlockRef =
var blck = blck
var depth = 0
const maxDepth = (100'i64 * 365 * 24 * 60 * 60 div SECONDS_PER_SLOT.int)
while true:
if blck.slot == slot:
return blck
if blck.slot < slot:
return nil
if blck.parent == nil:
return nil
doAssert depth < maxDepth
depth += 1
blck = blck.parent
# https://github.com/ethereum/eth2.0-specs/blob/v0.8.4/specs/core/0_fork-choice.md
# The structure of this code differs from the spec since we use a different
# strategy for storing states and justification points - it should nonetheless
# be close in terms of functionality.
func lmdGhost*(
pool: AttestationPool, start_state: BeaconState,
start_block: BlockRef): BlockRef =
# TODO: a Fenwick Tree datastructure to keep track of cumulated votes
# in O(log N) complexity
# https://en.wikipedia.org/wiki/Fenwick_tree
# Nim implementation for cumulative frequencies at
# https://github.com/numforge/laser/blob/990e59fffe50779cdef33aa0b8f22da19e1eb328/benchmarks/random_sampling/fenwicktree.nim
let
active_validator_indices =
get_active_validator_indices(
start_state, compute_epoch_at_slot(start_state.slot))
var latest_messages: seq[tuple[validator: ValidatorIndex, blck: BlockRef]]
for i in active_validator_indices:
let pubKey = start_state.validators[i].pubkey
if (let vote = pool.latestAttestation(pubKey); not vote.isNil):
latest_messages.add((i, vote))
template get_latest_attesting_balance(blck: BlockRef): uint64 =
var res: uint64
for validator_index, target in latest_messages.items():
if get_ancestor(target, blck.slot) == blck:
res += start_state.validators[validator_index].effective_balance
res
var head = start_block
while true:
if head.children.len() == 0:
return head
head = head.children[0]
var
headCount = get_latest_attesting_balance(head)
for i in 1..<head.children.len:
if (let hc = get_latest_attesting_balance(head.children[i]); hc > headCount):
head = head.children[i]
headCount = hc

View File

@ -10,154 +10,211 @@
import
options, unittest,
chronicles,
stew/byteutils,
./testutil, ./testblockutil,
../beacon_chain/spec/[beaconstate, crypto, datatypes, digest, helpers, validator],
../beacon_chain/[beacon_node_types, attestation_pool, block_pool, extras, state_transition, ssz]
template withPool(body: untyped) =
mixin genState, genBlock
var
blockPool {.inject.} = BlockPool.init(makeTestDB(genState, genBlock))
pool {.inject.} = AttestationPool.init(blockPool)
state {.inject.} = loadTailState(blockPool)
# Slot 0 is a finalized slot - won't be making attestations for it..
process_slots(state.data, state.data.data.slot + 1)
body
suite "Attestation pool processing" & preset():
## For now just test that we can compile and execute block processing with
## mock data.
# Genesis state that results in 2 members per committee
let
genState = initialize_beacon_state_from_eth1(
Eth2Digest(), 0,
makeInitialDeposits(SLOTS_PER_EPOCH * 2, {skipValidation}),
{skipValidation})
genBlock = get_initial_beacon_block(genState)
setup:
# Genesis state that results in 3 members per committee
var
blockPool = BlockPool.init(makeTestDB(SLOTS_PER_EPOCH * 3))
pool = AttestationPool.init(blockPool)
state = loadTailState(blockPool)
# Slot 0 is a finalized slot - won't be making attestations for it..
process_slots(state.data, state.data.data.slot + 1)
timedTest "Can add and retrieve simple attestation" & preset():
var cache = get_empty_per_epoch_cache()
withPool:
let
# Create an attestation for slot 1!
beacon_committee = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation = makeAttestation(
state.data.data, state.blck.root, beacon_committee[0], cache)
let
# Create an attestation for slot 1!
beacon_committee = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation = makeAttestation(
state.data.data, state.blck.root, beacon_committee[0], cache)
pool.add(state.data.data, state.blck, attestation)
pool.add(attestation)
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
check:
attestations.len == 1
check:
attestations.len == 1
timedTest "Attestations may arrive in any order" & preset():
var cache = get_empty_per_epoch_cache()
withPool:
let
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
let
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
process_slots(state.data, state.data.data.slot + 1)
process_slots(state.data, state.data.data.slot + 1)
let
bc1 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc1[0], cache)
let
bc1 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc1[0], cache)
# test reverse order
pool.add(state.data.data, state.blck, attestation1)
pool.add(state.data.data, state.blck, attestation0)
# test reverse order
pool.add(attestation1)
pool.add(attestation0)
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
check:
attestations.len == 1
check:
attestations.len == 1
timedTest "Attestations should be combined" & preset():
var cache = get_empty_per_epoch_cache()
withPool:
let
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
let
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
pool.add(state.data.data, state.blck, attestation0)
pool.add(state.data.data, state.blck, attestation1)
pool.add(attestation0)
pool.add(attestation1)
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
check:
attestations.len == 1
check:
attestations.len == 1
timedTest "Attestations may overlap, bigger first" & preset():
var cache = get_empty_per_epoch_cache()
withPool:
var
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
var
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
attestation0.combine(attestation1, {skipValidation})
attestation0.combine(attestation1, {skipValidation})
pool.add(state.data.data, state.blck, attestation0)
pool.add(state.data.data, state.blck, attestation1)
pool.add(attestation0)
pool.add(attestation1)
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
check:
attestations.len == 1
check:
attestations.len == 1
timedTest "Attestations may overlap, smaller first" & preset():
var cache = get_empty_per_epoch_cache()
withPool:
var
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
var
# Create an attestation for slot 1!
bc0 = get_beacon_committee(state.data.data,
state.data.data.slot, 0, cache)
attestation0 = makeAttestation(
state.data.data, state.blck.root, bc0[0], cache)
attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache)
attestation0.combine(attestation1, {skipValidation})
attestation0.combine(attestation1, {skipValidation})
pool.add(state.data.data, state.blck, attestation1)
pool.add(state.data.data, state.blck, attestation0)
pool.add(attestation1)
pool.add(attestation0)
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot) # minus 1?
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
let attestations = pool.getAttestationsForBlock(
state.data.data, state.data.data.slot + 1)
check:
attestations.len == 1
check:
attestations.len == 1
timedTest "Fork choice returns latest block with no attestations":
let
b1 = addBlock(state.data.data, blockPool.tail.root, BeaconBlockBody())
b1Root = hash_tree_root(b1.message)
b1Add = blockPool.add(b1Root, b1)
head = pool.selectHead()
check:
head == b1Add
let
b2 = addBlock(state.data.data, b1Root, BeaconBlockBody())
b2Root = hash_tree_root(b2.message)
b2Add = blockPool.add(b2Root, b2)
head2 = pool.selectHead()
check:
head2 == b2Add
timedTest "Fork choice returns block with attestation":
var cache = get_empty_per_epoch_cache()
let
b10 = makeBlock(state.data.data, blockPool.tail.root, BeaconBlockBody())
b10Root = hash_tree_root(b10.message)
b10Add = blockPool.add(b10Root, b10)
head = pool.selectHead()
check:
head == b10Add
let
b11 = makeBlock(state.data.data, blockPool.tail.root, BeaconBlockBody(
graffiti: Eth2Digest(data: [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
))
b11Root = hash_tree_root(b11.message)
b11Add = blockPool.add(b11Root, b11)
bc1 = get_beacon_committee(state.data.data,
state.data.data.slot, 1, cache)
attestation0 = makeAttestation(
state.data.data, b10Root, bc1[0], cache)
pool.add(attestation0)
let head2 = pool.selectHead()
check:
# Single vote for b10 and no votes for b11
head2 == b10Add
let
attestation1 = makeAttestation(
state.data.data, b11Root, bc1[1], cache)
attestation2 = makeAttestation(
state.data.data, b11Root, bc1[2], cache)
pool.add(attestation1)
let head3 = pool.selectHead()
let smaller = if b10Root.data < b11Root.data: b10Add else: b11Add
check:
# Ties broken lexicographically
head3 == smaller
pool.add(attestation2)
let head4 = pool.selectHead()
check:
# Two votes for b11
head4 == b11Add

View File

@ -14,17 +14,15 @@ import
../beacon_chain/[beacon_node_types, block_pool, beacon_chain_db, extras, ssz]
suite "Block pool processing" & preset():
let
genState = initialize_beacon_state_from_eth1(
Eth2Digest(), 0,
makeInitialDeposits(flags = {skipValidation}), {skipValidation})
genBlock = get_initial_beacon_block(genState)
setup:
var
db = makeTestDB(genState, genBlock)
db = makeTestDB(SLOTS_PER_EPOCH)
pool = BlockPool.init(db)
state = pool.loadTailState()
state = pool.loadTailState().data.data
b1 = addBlock(state, pool.tail.root, BeaconBlockBody())
b1Root = hash_tree_root(b1.message)
b2 = addBlock(state, b1Root, BeaconBlockBody())
b2Root = hash_tree_root(b2.message)
timedTest "getRef returns nil for missing blocks":
check:
@ -32,55 +30,50 @@ suite "Block pool processing" & preset():
timedTest "loadTailState gets genesis block on first load" & preset():
var
b0 = pool.get(state.blck.root)
b0 = pool.get(pool.tail.root)
check:
state.data.data.slot == GENESIS_SLOT
b0.isSome()
toSeq(pool.blockRootsForSlot(GENESIS_SLOT)) == @[state.blck.root]
toSeq(pool.blockRootsForSlot(GENESIS_SLOT)) == @[pool.tail.root]
timedTest "Simple block add&get" & preset():
let
b1 = makeBlock(state.data.data, state.blck.root, BeaconBlockBody())
b1Root = hash_tree_root(b1.message)
# TODO the return value is ugly here, need to fix and test..
discard pool.add(state, b1Root, b1)
let b1Ref = pool.get(b1Root)
b1Add = pool.add(b1Root, b1)
b1Get = pool.get(b1Root)
check:
b1Ref.isSome()
b1Ref.get().refs.root == b1Root
hash_tree_root(state.data.data) == state.data.root
b1Get.isSome()
b1Get.get().refs.root == b1Root
b1Add.root == b1Get.get().refs.root
let
b2Add = pool.add(b2Root, b2)
b2Get = pool.get(b2Root)
check:
b2Get.isSome()
b2Get.get().refs.root == b2Root
b2Add.root == b2Get.get().refs.root
timedTest "Reverse order block add & get" & preset():
let
b1 = addBlock(state.data.data, state.blck.root, BeaconBlockBody(), {})
b1Root = hash_tree_root(b1.message)
b2 = addBlock(state.data.data, b1Root, BeaconBlockBody(), {})
b2Root = hash_tree_root(b2.message)
discard pool.add(state, b2Root, b2)
discard pool.add(b2Root, b2)
check:
pool.get(b2Root).isNone() # Unresolved, shouldn't show up
FetchRecord(root: b1Root, historySlots: 1) in pool.checkMissing()
discard pool.add(state, b1Root, b1)
check: hash_tree_root(state.data.data) == state.data.root
discard pool.add(b1Root, b1)
let
b1r = pool.get(b1Root)
b2r = pool.get(b2Root)
b1Get = pool.get(b1Root)
b2Get = pool.get(b2Root)
check:
b1r.isSome()
b2r.isSome()
b1Get.isSome()
b2Get.isSome()
b1r.get().refs.children[0] == b2r.get().refs
b2r.get().refs.parent == b1r.get().refs
b1Get.get().refs.children[0] == b2Get.get().refs
b2Get.get().refs.parent == b1Get.get().refs
toSeq(pool.blockRootsForSlot(b1.message.slot)) == @[b1Root]
toSeq(pool.blockRootsForSlot(b2.message.slot)) == @[b2Root]
@ -89,14 +82,13 @@ suite "Block pool processing" & preset():
# The heads structure should have been updated to contain only the new
# b2 head
check:
pool.heads.mapIt(it.blck) == @[b2r.get().refs]
pool.heads.mapIt(it.blck) == @[b2Get.get().refs]
# check that init also reloads block graph
var
pool2 = BlockPool.init(db)
check:
hash_tree_root(state.data.data) == state.data.root
pool2.get(b1Root).isSome()
pool2.get(b2Root).isSome()
@ -115,3 +107,22 @@ suite "Block pool processing" & preset():
not c.isAncestorOf(a)
not c.isAncestorOf(b)
not b.isAncestorOf(a)
timedTest "Can add same block twice" & preset():
let
b10 = pool.add(b1Root, b1)
b11 = pool.add(b1Root, b1)
check:
b10 == b11
not b10.isNil
timedTest "updateHead updates head and headState" & preset():
let
b1Add = pool.add(b1Root, b1)
pool.updateHead(b1Add)
check:
pool.head.blck == b1Add
pool.headState.data.data.slot == b1Add.slot

View File

@ -8,8 +8,9 @@
import
algorithm, strformat, stats, times, std/monotimes, stew/endians2,
chronicles, eth/trie/[db],
../beacon_chain/[beacon_chain_db, block_pool, ssz, beacon_node_types],
../beacon_chain/spec/datatypes
../beacon_chain/[beacon_chain_db, block_pool, extras, ssz, beacon_node_types],
../beacon_chain/spec/[digest, beaconstate, datatypes],
testblockutil
type
TestDuration = tuple[duration: float, label: string]
@ -75,4 +76,12 @@ proc makeTestDB*(tailState: BeaconState, tailBlock: SignedBeaconBlock): BeaconCh
result = init(BeaconChainDB, newMemoryDB())
BlockPool.preInit(result, tailState, tailBlock)
proc makeTestDB*(validators: int): BeaconChainDB =
let
genState = initialize_beacon_state_from_eth1(
Eth2Digest(), 0,
makeInitialDeposits(validators, flags = {skipValidation}), {skipValidation})
genBlock = get_initial_beacon_block(genState)
makeTestDB(genState, genBlock)
export inMicroseconds

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit e9d75c05f62a7a9628b28b822b5190a6682e2a7e
Subproject commit 9c18a1cc553bc61cdf67f0b4c12b538bde95a599