Dual headed fork choice [Revolution] (#1238)

* Dual headed fork choice

* fix finalizedEpoch not moving

* reduce fork choice verbosity

* Add failing tests due to pruning

* Properly handle duplicate blocks in sync

* test_block_pool also add a test for duplicate blocks

* comments addressing review

* Fix fork choice v2, was missing integrating block proposed

* remove a spurious debug writeStackTrace

* update block_sim

* Use OrderedTable to ensure that we always load parents before children in fork choice

* Load the DAG data in fork choice at init if there is some (can sync witti)

* Cluster of quarantined blocks were not properly added to the fork choice

* Workaround async gcsafe warnings

* Update blockpoool tests

* Do the callback before clearing the quarantine

* Revert OrderedTable, implement topological sort of DAG, allow forkChoice to be initialized from arbitrary finalized heads

* Make it work with latest devel - Altona readyness

* Add a recovery mechanism when forkchoice desyncs with blockpool

* add the current problematic node to the stack

* Fix rebase indentation bug (but still producing invalid block)

* Fix cache at epoch boundaries and lateBlock addition
This commit is contained in:
Mamy Ratsimbazafy 2020-07-09 11:29:32 +02:00 committed by GitHub
parent 4140b3b9d9
commit 3cdae9f6be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 601 additions and 158 deletions

View File

@ -9,8 +9,10 @@ AllTests-mainnet
+ Can add and retrieve simple attestation [Preset: mainnet] OK + Can add and retrieve simple attestation [Preset: mainnet] OK
+ Fork choice returns block with attestation OK + Fork choice returns block with attestation OK
+ Fork choice returns latest block with no attestations OK + Fork choice returns latest block with no attestations OK
+ Trying to add a block twice tags the second as an error OK
+ Trying to add a duplicate block from an old pruned epoch is tagged as an error OK
``` ```
OK: 7/7 Fail: 0/7 Skip: 0/7 OK: 9/9 Fail: 0/9 Skip: 0/9
## Beacon chain DB [Preset: mainnet] ## Beacon chain DB [Preset: mainnet]
```diff ```diff
+ empty database [Preset: mainnet] OK + empty database [Preset: mainnet] OK
@ -32,7 +34,7 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet] ## Block pool processing [Preset: mainnet]
```diff ```diff
+ Can add same block twice [Preset: mainnet] OK + Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
+ Reverse order block add & get [Preset: mainnet] OK + Reverse order block add & get [Preset: mainnet] OK
+ Simple block add&get [Preset: mainnet] OK + Simple block add&get [Preset: mainnet] OK
+ getRef returns nil for missing blocks OK + getRef returns nil for missing blocks OK

View File

@ -8,24 +8,87 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
deques, sequtils, tables, options, # Standard libraries
deques, sequtils, tables, options, algorithm,
# Status libraries
chronicles, stew/[byteutils], json_serialization/std/sets, chronicles, stew/[byteutils], json_serialization/std/sets,
# Internal
./spec/[beaconstate, datatypes, crypto, digest, helpers, validator], ./spec/[beaconstate, datatypes, crypto, digest, helpers, validator],
./extras, ./block_pool, ./block_pools/candidate_chains, ./beacon_node_types ./extras, ./block_pool, ./block_pools/candidate_chains, ./beacon_node_types,
./fork_choice/fork_choice
logScope: topics = "attpool" logScope: topics = "attpool"
func init*(T: type AttestationPool, blockPool: BlockPool): T = proc init*(T: type AttestationPool, blockPool: BlockPool): T =
## Initialize an AttestationPool from the blockPool `headState` ## Initialize an AttestationPool from the blockPool `headState`
## The `finalized_root` works around the finalized_checkpoint of the genesis block ## The `finalized_root` works around the finalized_checkpoint of the genesis block
## holding a zero_root. ## holding a zero_root.
# TODO blockPool is only used when resolving orphaned attestations - it should # TODO blockPool is only used when resolving orphaned attestations - it should
# probably be removed as a dependency of AttestationPool (or some other # probably be removed as a dependency of AttestationPool (or some other
# smart refactoring) # smart refactoring)
# TODO: Return Value Optimization
# TODO: In tests, on blockpool.init the finalized root
# from the `headState` and `justifiedState` is zero
var forkChoice = initForkChoice(
finalized_block_slot = default(Slot), # This is unnecessary for fork choice but may help external components for example logging/debugging
finalized_block_state_root = default(Eth2Digest), # This is unnecessary for fork choice but may help external components for example logging/debugging
justified_epoch = blockPool.headState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = blockPool.headState.data.data.finalized_checkpoint.epoch,
# We should use the checkpoint, but at genesis the headState finalized checkpoint is 0x0000...0000
# finalized_root = blockPool.headState.data.data.finalized_checkpoint.root
finalized_root = blockPool.finalizedHead.blck.root
).get()
# Load all blocks since finalized head - TODO a proper test
for blck in blockPool.dag.topoSortedSinceLastFinalization():
if blck.root == blockPool.finalizedHead.blck.root:
continue
# BlockRef
# should ideally contain the justified_epoch and finalized_epoch
# so that we can pass them directly to `process_block` without having to
# redo "updateStateData"
#
# In any case, `updateStateData` should shortcut
# to `getStateDataCached`
updateStateData(
blockPool,
blockPool.tmpState,
BlockSlot(blck: blck, slot: blck.slot)
)
debug "Preloading fork choice with block",
block_root = shortlog(blck.root),
parent_root = shortlog(blck.parent.root),
justified_epoch = $blockPool.tmpState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = $blockPool.tmpState.data.data.finalized_checkpoint.epoch,
slot = $blck.slot
let status = forkChoice.process_block(
block_root = blck.root,
parent_root = blck.parent.root,
justified_epoch = blockPool.tmpState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = blockPool.tmpState.data.data.finalized_checkpoint.epoch,
# Unused in fork choice - i.e. for logging or caching extra metadata
slot = blck.slot,
state_root = default(Eth2Digest)
)
doAssert status.isOk(), "Error in preloading the fork choice: " & $status.error
info "Fork choice initialized",
justified_epoch = $blockPool.headState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = $blockPool.headState.data.data.finalized_checkpoint.epoch,
finalized_root = shortlog(blockPool.finalizedHead.blck.root)
T( T(
mapSlotsToAttestations: initDeque[AttestationsSeen](), mapSlotsToAttestations: initDeque[AttestationsSeen](),
blockPool: blockPool, blockPool: blockPool,
unresolved: initTable[Eth2Digest, UnresolvedAttestation](), unresolved: initTable[Eth2Digest, UnresolvedAttestation](),
forkChoice_v2: forkChoice
) )
proc combine*(tgt: var Attestation, src: Attestation, flags: UpdateFlags) = proc combine*(tgt: var Attestation, src: Attestation, flags: UpdateFlags) =
@ -107,13 +170,21 @@ proc slotIndex(
func updateLatestVotes( func updateLatestVotes(
pool: var AttestationPool, state: BeaconState, attestationSlot: Slot, pool: var AttestationPool, state: BeaconState, attestationSlot: Slot,
participants: seq[ValidatorIndex], blck: BlockRef) = participants: seq[ValidatorIndex], blck: BlockRef) =
# ForkChoice v2
let target_epoch = compute_epoch_at_slot(attestationSlot)
for validator in participants: for validator in participants:
# ForkChoice v1
let let
pubKey = state.validators[validator].pubkey pubKey = state.validators[validator].pubkey
current = pool.latestAttestations.getOrDefault(pubKey) current = pool.latestAttestations.getOrDefault(pubKey)
if current.isNil or current.slot < attestationSlot: if current.isNil or current.slot < attestationSlot:
pool.latestAttestations[pubKey] = blck pool.latestAttestations[pubKey] = blck
# ForkChoice v2
pool.forkChoice_v2.process_attestation(validator, blck.root, target_epoch)
func get_attesting_indices_seq(state: BeaconState, func get_attesting_indices_seq(state: BeaconState,
attestation_data: AttestationData, attestation_data: AttestationData,
bits: CommitteeValidatorsBits, bits: CommitteeValidatorsBits,
@ -254,7 +325,7 @@ proc addResolved(pool: var AttestationPool, blck: BlockRef, attestation: Attesta
blockSlot = shortLog(blck.slot), blockSlot = shortLog(blck.slot),
cat = "filtering" cat = "filtering"
proc add*(pool: var AttestationPool, attestation: Attestation) = proc addAttestation*(pool: var AttestationPool, attestation: Attestation) =
## Add a verified attestation to the fork choice context ## Add a verified attestation to the fork choice context
logScope: pcs = "atp_add_attestation" logScope: pcs = "atp_add_attestation"
@ -269,6 +340,68 @@ proc add*(pool: var AttestationPool, attestation: Attestation) =
pool.addResolved(blck, attestation) pool.addResolved(blck, attestation)
proc addForkChoice_v2*(pool: var AttestationPool, blck: BlockRef) =
## Add a verified block to the fork choice context
## The current justifiedState of the block pool is used as reference
# TODO: add(BlockPool, blockRoot: Eth2Digest, SignedBeaconBlock): BlockRef
# should ideally return the justified_epoch and finalized_epoch
# so that we can pass them directly to this proc without having to
# redo "updateStateData"
#
# In any case, `updateStateData` should shortcut
# to `getStateDataCached`
var state: Result[void, string]
# A stack of block to add in case recovery is needed
var blockStack: seq[BlockSlot]
var current = BlockSlot(blck: blck, slot: blck.slot)
while true: # The while loop should not be needed but it seems a block addition
# scenario is unaccounted for
updateStateData(
pool.blockPool,
pool.blockPool.tmpState,
current
)
let blockData = pool.blockPool.get(current.blck)
state = pool.forkChoice_v2.process_block(
slot = current.blck.slot,
block_root = current.blck.root,
parent_root = if not current.blck.parent.isNil: current.blck.parent.root else: default(Eth2Digest),
state_root = default(Eth2Digest), # This is unnecessary for fork choice but may help external components
justified_epoch = pool.blockPool.tmpState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = pool.blockPool.tmpState.data.data.finalized_checkpoint.epoch,
)
# This should not happen and might lead to unresponsive networking while processing occurs
if state.isErr:
# TODO investigate, potential sources:
# - Pruning
# - Quarantine adding multiple blocks at once
# - Own block proposal
error "Desync between fork_choice and blockpool services, trying to recover.",
msg = state.error,
blck = shortlog(current.blck),
parent = shortlog(current.blck.parent),
finalizedHead = shortLog(pool.blockPool.finalizedHead),
justifiedHead = shortLog(pool.blockPool.head.justified),
head = shortLog(pool.blockPool.head.blck)
blockStack.add(current)
current = BlockSlot(blck: blck.parent, slot: blck.parent.slot)
elif blockStack.len == 0:
break
else:
info "Re-added missing or pruned block to fork choice",
msg = state.error,
blck = shortlog(current.blck),
parent = shortlog(current.blck.parent),
finalizedHead = shortLog(pool.blockPool.finalizedHead),
justifiedHead = shortLog(pool.blockPool.head.justified),
head = shortLog(pool.blockPool.head.blck)
current = blockStack.pop()
proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot): proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot):
Option[AttestationsSeen] = Option[AttestationsSeen] =
if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY): if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY):
@ -403,7 +536,10 @@ proc resolve*(pool: var AttestationPool) =
for a in resolved: for a in resolved:
pool.addResolved(a.blck, a.attestation) pool.addResolved(a.blck, a.attestation)
func latestAttestation*( # Fork choice v1
# ---------------------------------------------------------------
func latestAttestation(
pool: AttestationPool, pubKey: ValidatorPubKey): BlockRef = pool: AttestationPool, pubKey: ValidatorPubKey): BlockRef =
pool.latestAttestations.getOrDefault(pubKey) pool.latestAttestations.getOrDefault(pubKey)
@ -411,7 +547,7 @@ func latestAttestation*(
# The structure of this code differs from the spec since we use a different # The structure of this code differs from the spec since we use a different
# strategy for storing states and justification points - it should nonetheless # strategy for storing states and justification points - it should nonetheless
# be close in terms of functionality. # be close in terms of functionality.
func lmdGhost*( func lmdGhost(
pool: AttestationPool, start_state: BeaconState, pool: AttestationPool, start_state: BeaconState,
start_block: BlockRef): BlockRef = start_block: BlockRef): BlockRef =
# TODO: a Fenwick Tree datastructure to keep track of cumulated votes # TODO: a Fenwick Tree datastructure to keep track of cumulated votes
@ -462,7 +598,7 @@ func lmdGhost*(
winCount = candCount winCount = candCount
head = winner head = winner
proc selectHead*(pool: AttestationPool): BlockRef = proc selectHead_v1(pool: AttestationPool): BlockRef =
let let
justifiedHead = pool.blockPool.latestJustifiedBlock() justifiedHead = pool.blockPool.latestJustifiedBlock()
@ -470,3 +606,47 @@ proc selectHead*(pool: AttestationPool): BlockRef =
lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck) lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck)
newHead newHead
# Fork choice v2
# ---------------------------------------------------------------
func getAttesterBalances(state: StateData): seq[Gwei] {.noInit.}=
## Get the balances from a state
result.newSeq(state.data.data.validators.len) # zero-init
let epoch = state.data.data.slot.compute_epoch_at_slot()
for i in 0 ..< result.len:
# All non-active validators have a 0 balance
template validator: Validator = state.data.data.validators[i]
if validator.is_active_validator(epoch):
result[i] = validator.effective_balance
proc selectHead_v2(pool: var AttestationPool): BlockRef =
let attesterBalances = pool.blockPool.justifiedState.getAttesterBalances()
let newHead = pool.forkChoice_v2.find_head(
justified_epoch = pool.blockPool.justifiedState.data.data.slot.compute_epoch_at_slot(),
justified_root = pool.blockPool.head.justified.blck.root,
finalized_epoch = pool.blockPool.headState.data.data.finalized_checkpoint.epoch,
justified_state_balances = attesterBalances
).get()
pool.blockPool.getRef(newHead)
proc pruneBefore*(pool: var AttestationPool, finalizedhead: BlockSlot) =
pool.forkChoice_v2.maybe_prune(finalizedHead.blck.root).get()
# Dual-Headed Fork choice
# ---------------------------------------------------------------
proc selectHead*(pool: var AttestationPool): BlockRef =
let head_v1 = pool.selectHead_v1()
let head_v2 = pool.selectHead_v2()
if head_v1 != head_v2:
error "Fork choice engines in disagreement, using block from v1.",
v1_block = shortlog(head_v1),
v2_block = shortlog(head_v2)
return head_v1

View File

@ -305,7 +305,7 @@ proc onAttestation(node: BeaconNode, attestation: Attestation) =
attestationSlot = attestation.data.slot, headSlot = head.blck.slot attestationSlot = attestation.data.slot, headSlot = head.blck.slot
return return
node.attestationPool.add(attestation) node.attestationPool.addAttestation(attestation)
proc dumpBlock[T]( proc dumpBlock[T](
node: BeaconNode, signedBlock: SignedBeaconBlock, node: BeaconNode, signedBlock: SignedBeaconBlock,
@ -333,10 +333,17 @@ proc storeBlock(
pcs = "receive_block" pcs = "receive_block"
beacon_blocks_received.inc() beacon_blocks_received.inc()
let blck = node.blockPool.add(blockRoot, signedBlock)
{.gcsafe.}: # TODO: fork choice and blockpool should sync via messages instead of callbacks
let blck = node.blockPool.addRawBlock(blockRoot, signedBlock) do (validBlock: BlockRef):
# Callback add to fork choice if valid
node.attestationPool.addForkChoice_v2(validBlock)
node.dumpBlock(signedBlock, blck) node.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr: if blck.isErr:
return err(blck.error) return err(blck.error)
@ -347,7 +354,7 @@ proc storeBlock(
attestation = shortLog(attestation), attestation = shortLog(attestation),
cat = "consensus" # Tag "consensus|attestation"? cat = "consensus" # Tag "consensus|attestation"?
node.attestationPool.add(attestation) node.attestationPool.addAttestation(attestation)
ok() ok()
proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) = proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) =
@ -565,7 +572,10 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
# We going to ignore `BlockError.Unviable` errors because we have working # We going to ignore `BlockError.Unviable` errors because we have working
# backward sync and it can happens that we can perform overlapping # backward sync and it can happens that we can perform overlapping
# requests. # requests.
if res.isErr and res.error != BlockError.Unviable: # For the same reason we ignore Duplicate blocks as if they are duplicate
# from before the current finalized epoch, we can drop them
# (and they may have no parents anymore in the fork choice if it was pruned)
if res.isErr and res.error notin {BlockError.Unviable, BlockError.Old, BLockError.Duplicate}:
return res return res
discard node.updateHead() discard node.updateHead()

View File

@ -69,6 +69,9 @@ proc updateHead*(node: BeaconNode): BlockRef =
node.blockPool.updateHead(newHead) node.blockPool.updateHead(newHead)
beacon_head_root.set newHead.root.toGaugeValue beacon_head_root.set newHead.root.toGaugeValue
# Cleanup the fork choice v2 if we have a finalized head
node.attestationPool.pruneBefore(node.blockPool.finalizedHead)
newHead newHead
template findIt*(s: openarray, predicate: untyped): int64 = template findIt*(s: openarray, predicate: untyped): int64 =

View File

@ -5,7 +5,8 @@ import
stew/endians2, stew/endians2,
spec/[datatypes, crypto, digest], spec/[datatypes, crypto, digest],
block_pools/block_pools_types, block_pools/block_pools_types,
block_pool # TODO: refactoring compat shim block_pool, # TODO: refactoring compat shim
fork_choice/fork_choice_types
export block_pools_types export block_pools_types
@ -74,6 +75,9 @@ type
latestAttestations*: Table[ValidatorPubKey, BlockRef] ##\ latestAttestations*: Table[ValidatorPubKey, BlockRef] ##\
## Map that keeps track of the most recent vote of each attester - see ## Map that keeps track of the most recent vote of each attester - see
## fork_choice ## fork_choice
forkChoice_v2*: ForkChoice ##\
## The alternative fork choice "proto_array" that will ultimately
## replace the original one
# ############################################# # #############################################
# #

View File

@ -26,7 +26,7 @@ type
BlockPools* = object BlockPools* = object
# TODO: Rename BlockPools # TODO: Rename BlockPools
quarantine: Quarantine quarantine: Quarantine
dag: CandidateChains dag*: CandidateChains
BlockPool* = BlockPools BlockPool* = BlockPools
@ -53,9 +53,19 @@ template head*(pool: BlockPool): Head =
template finalizedHead*(pool: BlockPool): BlockSlot = template finalizedHead*(pool: BlockPool): BlockSlot =
pool.dag.finalizedHead pool.dag.finalizedHead
proc add*(pool: var BlockPool, blockRoot: Eth2Digest, proc addRawBlock*(pool: var BlockPool, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): Result[BlockRef, BlockError] {.gcsafe.} = signedBlock: SignedBeaconBlock,
add(pool.dag, pool.quarantine, blockRoot, signedBlock) callback: proc(blck: BlockRef)
): Result[BlockRef, BlockError] =
## Add a raw block to the blockpool
## Trigger "callback" on success
## Adding a rawblock might unlock a consequent amount of blocks in quarantine
# TODO: `addRawBlock` is accumulating significant cruft
# and is in dire need of refactoring
# - the ugly `inAdd` field
# - the callback
# - callback may be problematic as it's called in async validator duties
result = addRawBlock(pool.dag, pool.quarantine, blockRoot, signedBlock, callback)
export parent # func parent*(bs: BlockSlot): BlockSlot export parent # func parent*(bs: BlockSlot): BlockSlot
export isAncestorOf # func isAncestorOf*(a, b: BlockRef): bool export isAncestorOf # func isAncestorOf*(a, b: BlockRef): bool
@ -68,7 +78,13 @@ proc init*(T: type BlockPools, db: BeaconChainDB,
updateFlags: UpdateFlags = {}): BlockPools = updateFlags: UpdateFlags = {}): BlockPools =
result.dag = init(CandidateChains, db, updateFlags) result.dag = init(CandidateChains, db, updateFlags)
func addFlags*(pool: BlockPool, flags: UpdateFlags) =
## Add a flag to the block processing
## This is destined for testing to add skipBLSValidation flag
pool.dag.updateFlags.incl flags
export init # func init*(T: type BlockRef, root: Eth2Digest, blck: BeaconBlock): BlockRef export init # func init*(T: type BlockRef, root: Eth2Digest, blck: BeaconBlock): BlockRef
export addFlags
func getRef*(pool: BlockPool, root: Eth2Digest): BlockRef = func getRef*(pool: BlockPool, root: Eth2Digest): BlockRef =
## Retrieve a resolved block reference, if available ## Retrieve a resolved block reference, if available

View File

@ -6,8 +6,11 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import import
deques, tables, # Standard library
deques, tables, hashes,
# Status libraries
stew/[endians2, byteutils], chronicles, stew/[endians2, byteutils], chronicles,
# Internals
../spec/[datatypes, crypto, digest], ../spec/[datatypes, crypto, digest],
../beacon_chain_db, ../extras ../beacon_chain_db, ../extras
@ -36,6 +39,8 @@ type
Invalid ##\ Invalid ##\
## Block is broken / doesn't apply cleanly - whoever sent it is fishy (or ## Block is broken / doesn't apply cleanly - whoever sent it is fishy (or
## we're buggy) ## we're buggy)
Old
Duplicate
Quarantine* = object Quarantine* = object
## Keeps track of unsafe blocks coming from the network ## Keeps track of unsafe blocks coming from the network
@ -188,3 +193,6 @@ proc shortLog*(v: BlockRef): string =
chronicles.formatIt BlockSlot: shortLog(it) chronicles.formatIt BlockSlot: shortLog(it)
chronicles.formatIt BlockRef: shortLog(it) chronicles.formatIt BlockRef: shortLog(it)
func hash*(blockRef: BlockRef): Hash =
hash(blockRef.root)

View File

@ -8,8 +8,11 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
chronicles, options, sequtils, tables, # Standard libraries
chronicles, options, sequtils, tables, sets,
# Status libraries
metrics, metrics,
# Internals
../ssz/merkleization, ../beacon_chain_db, ../extras, ../ssz/merkleization, ../beacon_chain_db, ../extras,
../spec/[crypto, datatypes, digest, helpers, validator, state_transition], ../spec/[crypto, datatypes, digest, helpers, validator, state_transition],
block_pools_types block_pools_types
@ -305,6 +308,25 @@ proc init*(T: type CandidateChains, db: BeaconChainDB,
res res
iterator topoSortedSinceLastFinalization*(dag: CandidateChains): BlockRef =
## Iterate on the dag in topological order
# TODO: this uses "children" for simplicity
# but "children" should be deleted as it introduces cycles
# that causes significant overhead at least and leaks at worst
# for the GC.
# This is not perf critical, it is only used to bootstrap the fork choice.
var visited: HashSet[BlockRef]
var stack: seq[BlockRef]
stack.add dag.finalizedHead.blck
while stack.len != 0:
let node = stack.pop()
if node notin visited:
visited.incl node
stack.add node.children
yield node
proc getState( proc getState(
dag: CandidateChains, db: BeaconChainDB, stateRoot: Eth2Digest, blck: BlockRef, dag: CandidateChains, db: BeaconChainDB, stateRoot: Eth2Digest, blck: BlockRef,
output: var StateData): bool = output: var StateData): bool =

View File

@ -34,15 +34,24 @@ func getOrResolve*(dag: CandidateChains, quarantine: var Quarantine, root: Eth2D
if result.isNil: if result.isNil:
quarantine.missing[root] = MissingBlock() quarantine.missing[root] = MissingBlock()
proc add*( proc addRawBlock*(
dag: var CandidateChains, quarantine: var Quarantine, dag: var CandidateChains, quarantine: var Quarantine,
blockRoot: Eth2Digest, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): Result[BlockRef, BlockError] {.gcsafe.} signedBlock: SignedBeaconBlock,
callback: proc(blck: BlockRef)
): Result[BlockRef, BlockError]
proc addResolvedBlock( proc addResolvedBlock(
dag: var CandidateChains, quarantine: var Quarantine, dag: var CandidateChains, quarantine: var Quarantine,
state: BeaconState, blockRoot: Eth2Digest, state: BeaconState, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock, parent: BlockRef): BlockRef = signedBlock: SignedBeaconBlock, parent: BlockRef,
callback: proc(blck: BlockRef)
): BlockRef =
# TODO: `addResolvedBlock` is accumulating significant cruft
# and is in dire need of refactoring
# - the ugly `quarantine.inAdd` field
# - the callback
# - callback may be problematic as it's called in async validator duties
logScope: pcs = "block_resolution" logScope: pcs = "block_resolution"
doAssert state.slot == signedBlock.message.slot, "state must match block" doAssert state.slot == signedBlock.message.slot, "state must match block"
@ -86,6 +95,9 @@ proc addResolvedBlock(
heads = dag.heads.len(), heads = dag.heads.len(),
cat = "filtering" cat = "filtering"
# This MUST be added before the quarantine
callback(blockRef)
# Now that we have the new block, we should see if any of the previously # Now that we have the new block, we should see if any of the previously
# unresolved blocks magically become resolved # unresolved blocks magically become resolved
# TODO there are more efficient ways of doing this that don't risk # TODO there are more efficient ways of doing this that don't risk
@ -94,6 +106,7 @@ proc addResolvedBlock(
# blocks being synced, there's a stack overflow as `add` gets called # blocks being synced, there's a stack overflow as `add` gets called
# for the whole chain of blocks. Instead we use this ugly field in `dag` # for the whole chain of blocks. Instead we use this ugly field in `dag`
# which could be avoided by refactoring the code # which could be avoided by refactoring the code
# TODO unit test the logic, in particular interaction with fork choice block parents
if not quarantine.inAdd: if not quarantine.inAdd:
quarantine.inAdd = true quarantine.inAdd = true
defer: quarantine.inAdd = false defer: quarantine.inAdd = false
@ -101,20 +114,26 @@ proc addResolvedBlock(
while keepGoing: while keepGoing:
let retries = quarantine.orphans let retries = quarantine.orphans
for k, v in retries: for k, v in retries:
discard add(dag, quarantine, k, v) discard addRawBlock(dag, quarantine, k, v, callback)
# Keep going for as long as the pending dag is shrinking # Keep going for as long as the pending dag is shrinking
# TODO inefficient! so what? # TODO inefficient! so what?
keepGoing = quarantine.orphans.len < retries.len keepGoing = quarantine.orphans.len < retries.len
blockRef blockRef
proc add*( proc addRawBlock*(
dag: var CandidateChains, quarantine: var Quarantine, dag: var CandidateChains, quarantine: var Quarantine,
blockRoot: Eth2Digest, blockRoot: Eth2Digest,
signedBlock: SignedBeaconBlock): Result[BlockRef, BlockError] {.gcsafe.} = signedBlock: SignedBeaconBlock,
callback: proc(blck: BlockRef)
): Result[BlockRef, BlockError] =
## return the block, if resolved... ## return the block, if resolved...
## the state parameter may be updated to include the given block, if
## everything checks out # TODO: `addRawBlock` is accumulating significant cruft
# TODO reevaluate passing the state in like this # and is in dire need of refactoring
# - the ugly `quarantine.inAdd` field
# - the callback
# - callback may be problematic as it's called in async validator duties
# TODO: to facilitate adding the block to the attestation pool # TODO: to facilitate adding the block to the attestation pool
# this should also return justified and finalized epoch corresponding # this should also return justified and finalized epoch corresponding
@ -124,18 +143,22 @@ proc add*(
let blck = signedBlock.message let blck = signedBlock.message
doAssert blockRoot == hash_tree_root(blck) doAssert blockRoot == hash_tree_root(blck), "blockRoot: 0x" & shortLog(blockRoot) & ", signedBlock: 0x" & shortLog(hash_tree_root(blck))
logScope: pcs = "block_addition" logScope: pcs = "block_addition"
# Already seen this block?? # Already seen this block??
dag.blocks.withValue(blockRoot, blockRef): if blockRoot in dag.blocks:
debug "Block already exists", debug "Block already exists",
blck = shortLog(blck), blck = shortLog(blck),
blockRoot = shortLog(blockRoot), blockRoot = shortLog(blockRoot),
cat = "filtering" cat = "filtering"
return ok blockRef[] # There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice. Trying to add it again, even if the fork choice
# supports duplicate will lead to a crash.
return err Duplicate
quarantine.missing.del(blockRoot) quarantine.missing.del(blockRoot)
@ -220,9 +243,12 @@ proc add*(
# the BlockRef first! # the BlockRef first!
dag.tmpState.blck = addResolvedBlock( dag.tmpState.blck = addResolvedBlock(
dag, quarantine, dag, quarantine,
dag.tmpState.data.data, blockRoot, signedBlock, parent) dag.tmpState.data.data, blockRoot, signedBlock, parent,
callback
)
dag.putState(dag.tmpState.data, dag.tmpState.blck) dag.putState(dag.tmpState.data, dag.tmpState.blck)
callback(dag.tmpState.blck)
return ok dag.tmpState.blck return ok dag.tmpState.blck
# TODO already checked hash though? main reason to keep this is because # TODO already checked hash though? main reason to keep this is because

View File

@ -117,7 +117,7 @@ func process_attestation*(
vote.next_epoch = target_epoch vote.next_epoch = target_epoch
{.noSideEffect.}: {.noSideEffect.}:
info "Integrating vote in fork choice", trace "Integrating vote in fork choice",
validator_index = $validator_index, validator_index = $validator_index,
new_vote = shortlog(vote) new_vote = shortlog(vote)
else: else:
@ -129,7 +129,7 @@ func process_attestation*(
ignored_block_root = shortlog(block_root), ignored_block_root = shortlog(block_root),
ignored_target_epoch = $target_epoch ignored_target_epoch = $target_epoch
else: else:
info "Ignoring double-vote for fork choice", trace "Ignoring double-vote for fork choice",
validator_index = $validator_index, validator_index = $validator_index,
current_vote = shortlog(vote), current_vote = shortlog(vote),
ignored_block_root = shortlog(block_root), ignored_block_root = shortlog(block_root),
@ -159,7 +159,7 @@ func process_block*(
return err("process_block_error: " & $err) return err("process_block_error: " & $err)
{.noSideEffect.}: {.noSideEffect.}:
info "Integrating block in fork choice", trace "Integrating block in fork choice",
block_root = $shortlog(block_root), block_root = $shortlog(block_root),
parent_root = $shortlog(parent_root), parent_root = $shortlog(parent_root),
justified_epoch = $justified_epoch, justified_epoch = $justified_epoch,
@ -205,7 +205,7 @@ func find_head*(
return err("find_head failed: " & $ghost_err) return err("find_head failed: " & $ghost_err)
{.noSideEffect.}: {.noSideEffect.}:
info "Fork choice requested", debug "Fork choice requested",
justified_epoch = $justified_epoch, justified_epoch = $justified_epoch,
justified_root = shortlog(justified_root), justified_root = shortlog(justified_root),
finalized_epoch = $finalized_epoch, finalized_epoch = $finalized_epoch,

View File

@ -10,11 +10,17 @@
import import
# Standard library # Standard library
std/tables, std/options, std/typetraits, std/tables, std/options, std/typetraits,
# Status libraries
chronicles,
# Internal # Internal
../spec/[datatypes, digest], ../spec/[datatypes, digest],
# Fork choice # Fork choice
./fork_choice_types ./fork_choice_types
logScope:
topics = "fork_choice"
cat = "fork_choice"
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/fork-choice.md # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/fork-choice.md
# This is a port of https://github.com/sigp/lighthouse/pull/804 # This is a port of https://github.com/sigp/lighthouse/pull/804
# which is a port of "Proto-Array": https://github.com/protolambda/lmd-ghost # which is a port of "Proto-Array": https://github.com/protolambda/lmd-ghost
@ -176,6 +182,14 @@ func on_block*(
# Genesis (but Genesis might not be default(Eth2Digest)) # Genesis (but Genesis might not be default(Eth2Digest))
parent_index = none(int) parent_index = none(int)
elif parent notin self.indices: elif parent notin self.indices:
{.noSideEffect.}:
error "Trying to add block with unknown parent",
child_root = shortLog(root),
parent_root = shortLog(parent),
justified_epoch = $justified_epoch,
finalized_epoch = $finalized_epoch,
slot_optional = $slot
return ForkChoiceError( return ForkChoiceError(
kind: fcErrUnknownParent, kind: fcErrUnknownParent,
child_root: root, child_root: root,
@ -297,6 +311,12 @@ func maybe_prune*(
kind: fcErrInvalidNodeIndex, kind: fcErrInvalidNodeIndex,
index: finalized_index index: finalized_index
) )
{.noSideEffect.}:
debug "Pruning blocks from fork choice",
finalizedRoot = shortlog(finalized_root),
pcs = "prune"
for node_index in 0 ..< finalized_index: for node_index in 0 ..< finalized_index:
self.indices.del(self.nodes[node_index].root) self.indices.del(self.nodes[node_index].root)

View File

@ -689,7 +689,9 @@ func makeAttestationData*(
if start_slot == state.slot: beacon_block_root if start_slot == state.slot: beacon_block_root
else: get_block_root_at_slot(state, start_slot) else: get_block_root_at_slot(state, start_slot)
doAssert slot.compute_epoch_at_slot == current_epoch doAssert slot.compute_epoch_at_slot == current_epoch,
"Computed epoch was " & $slot.compute_epoch_at_slot &
" while the state current_epoch was " & $current_epoch
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attestation-data # https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attestation-data
AttestationData( AttestationData(

View File

@ -223,7 +223,12 @@ proc proposeSignedBlock*(node: BeaconNode,
validator: AttachedValidator, validator: AttachedValidator,
newBlock: SignedBeaconBlock, newBlock: SignedBeaconBlock,
blockRoot: Eth2Digest): Future[BlockRef] {.async.} = blockRoot: Eth2Digest): Future[BlockRef] {.async.} =
let newBlockRef = node.blockPool.add(blockRoot, newBlock)
{.gcsafe.}: # TODO: fork choice and blockpool should sync via messages instead of callbacks
let newBlockRef = node.blockPool.addRawBlock(blockRoot, newBlock) do (validBlock: BlockRef):
# Callback Add to fork choice
node.attestationPool.addForkChoice_v2(validBlock)
if newBlockRef.isErr: if newBlockRef.isErr:
warn "Unable to add proposed block to block pool", warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock.message), newBlock = shortLog(newBlock.message),

View File

@ -86,7 +86,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
var aggregation_bits = CommitteeValidatorsBits.init(committee.len) var aggregation_bits = CommitteeValidatorsBits.init(committee.len)
aggregation_bits.setBit index_in_committee aggregation_bits.setBit index_in_committee
attPool.add( attPool.addAttestation(
Attestation( Attestation(
data: data, data: data,
aggregation_bits: aggregation_bits, aggregation_bits: aggregation_bits,
@ -134,9 +134,11 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
state.fork, state.genesis_validators_root, newBlock.message.slot, state.fork, state.genesis_validators_root, newBlock.message.slot,
blockRoot, privKey) blockRoot, privKey)
let added = blockPool.add(blockRoot, newBlock).tryGet() let added = blockPool.addRawBlock(blockRoot, newBlock) do (validBlock: BlockRef):
blck() = added # Callback Add to fork choice
blockPool.updateHead(added) attPool.addForkChoice_v2(validBlock)
blck() = added[]
blockPool.updateHead(added[])
for i in 0..<slots: for i in 0..<slots:
let let

View File

@ -16,8 +16,17 @@ import
chronicles, chronicles,
stew/byteutils, stew/byteutils,
./testutil, ./testblockutil, ./testutil, ./testblockutil,
../beacon_chain/spec/[digest, validator, state_transition], ../beacon_chain/spec/[digest, validator, state_transition, helpers, beaconstate],
../beacon_chain/[beacon_node_types, attestation_pool, block_pool] ../beacon_chain/[beacon_node_types, attestation_pool, block_pool, extras],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice]
template wrappedTimedTest(name: string, body: untyped) =
# `check` macro takes a copy of whatever it's checking, on the stack!
block: # Symbol namespacing
proc wrappedTest() =
timedTest name:
body
wrappedTest()
suiteReport "Attestation pool processing" & preset(): suiteReport "Attestation pool processing" & preset():
## For now just test that we can compile and execute block processing with ## For now just test that we can compile and execute block processing with
@ -33,8 +42,6 @@ suiteReport "Attestation pool processing" & preset():
check: check:
process_slots(state.data, state.data.data.slot + 1) process_slots(state.data, state.data.data.slot + 1)
# pool[].add(blockPool[].tail) # Make the tail known to fork choice
timedTest "Can add and retrieve simple attestation" & preset(): timedTest "Can add and retrieve simple attestation" & preset():
var cache = get_empty_per_epoch_cache() var cache = get_empty_per_epoch_cache()
let let
@ -44,7 +51,7 @@ suiteReport "Attestation pool processing" & preset():
attestation = makeAttestation( attestation = makeAttestation(
state.data.data, state.blck.root, beacon_committee[0], cache) state.data.data, state.blck.root, beacon_committee[0], cache)
pool[].add(attestation) pool[].addAttestation(attestation)
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1)
@ -73,8 +80,8 @@ suiteReport "Attestation pool processing" & preset():
state.data.data, state.blck.root, bc1[0], cache) state.data.data, state.blck.root, bc1[0], cache)
# test reverse order # test reverse order
pool[].add(attestation1) pool[].addAttestation(attestation1)
pool[].add(attestation0) pool[].addAttestation(attestation0)
discard process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1) discard process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1)
@ -94,8 +101,8 @@ suiteReport "Attestation pool processing" & preset():
attestation1 = makeAttestation( attestation1 = makeAttestation(
state.data.data, state.blck.root, bc0[1], cache) state.data.data, state.blck.root, bc0[1], cache)
pool[].add(attestation0) pool[].addAttestation(attestation0)
pool[].add(attestation1) pool[].addAttestation(attestation1)
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1)
@ -119,8 +126,8 @@ suiteReport "Attestation pool processing" & preset():
attestation0.combine(attestation1, {}) attestation0.combine(attestation1, {})
pool[].add(attestation0) pool[].addAttestation(attestation0)
pool[].add(attestation1) pool[].addAttestation(attestation1)
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1)
@ -143,8 +150,8 @@ suiteReport "Attestation pool processing" & preset():
attestation0.combine(attestation1, {}) attestation0.combine(attestation1, {})
pool[].add(attestation1) pool[].addAttestation(attestation1)
pool[].add(attestation0) pool[].addAttestation(attestation0)
check: check:
process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1) process_slots(state.data, MIN_ATTESTATION_INCLUSION_DELAY.Slot + 1)
@ -159,62 +166,66 @@ suiteReport "Attestation pool processing" & preset():
let let
b1 = addTestBlock(state.data, blockPool[].tail.root, cache) b1 = addTestBlock(state.data, blockPool[].tail.root, cache)
b1Root = hash_tree_root(b1.message) b1Root = hash_tree_root(b1.message)
b1Add = blockpool[].add(b1Root, b1)[] b1Add = blockpool[].addRawBlock(b1Root, b1) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
# pool[].add(b1Add) - make a block known to the future fork choice
let head = pool[].selectHead() let head = pool[].selectHead()
check: check:
head == b1Add head == b1Add[]
let let
b2 = addTestBlock(state.data, b1Root, cache) b2 = addTestBlock(state.data, b1Root, cache)
b2Root = hash_tree_root(b2.message) b2Root = hash_tree_root(b2.message)
b2Add = blockpool[].add(b2Root, b2)[] b2Add = blockpool[].addRawBlock(b2Root, b2) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
# pool[].add(b2Add) - make a block known to the future fork choice
let head2 = pool[].selectHead() let head2 = pool[].selectHead()
check: check:
head2 == b2Add head2 == b2Add[]
timedTest "Fork choice returns block with attestation": timedTest "Fork choice returns block with attestation":
var cache = get_empty_per_epoch_cache() var cache = get_empty_per_epoch_cache()
let let
b10 = makeTestBlock(state.data, blockPool[].tail.root, cache) b10 = makeTestBlock(state.data, blockPool[].tail.root, cache)
b10Root = hash_tree_root(b10.message) b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].add(b10Root, b10)[] b10Add = blockpool[].addRawBlock(b10Root, b10) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
# pool[].add(b10Add) - make a block known to the future fork choice
let head = pool[].selectHead() let head = pool[].selectHead()
check: check:
head == b10Add head == b10Add[]
let let
b11 = makeTestBlock(state.data, blockPool[].tail.root, cache, b11 = makeTestBlock(state.data, blockPool[].tail.root, cache,
graffiti = Eth2Digest(data: [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) graffiti = Eth2Digest(data: [1'u8, 0, 0, 0 ,0 ,0 ,0 ,0 ,0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
) )
b11Root = hash_tree_root(b11.message) b11Root = hash_tree_root(b11.message)
b11Add = blockpool[].add(b11Root, b11)[] b11Add = blockpool[].addRawBlock(b11Root, b11) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
bc1 = get_beacon_committee( bc1 = get_beacon_committee(
state.data.data, state.data.data.slot, 1.CommitteeIndex, cache) state.data.data, state.data.data.slot, 1.CommitteeIndex, cache)
attestation0 = makeAttestation(state.data.data, b10Root, bc1[0], cache) attestation0 = makeAttestation(state.data.data, b10Root, bc1[0], cache)
# pool[].add(b11Add) - make a block known to the future fork choice pool[].addAttestation(attestation0)
pool[].add(attestation0)
let head2 = pool[].selectHead() let head2 = pool[].selectHead()
check: check:
# Single vote for b10 and no votes for b11 # Single vote for b10 and no votes for b11
head2 == b10Add head2 == b10Add[]
let let
attestation1 = makeAttestation(state.data.data, b11Root, bc1[1], cache) attestation1 = makeAttestation(state.data.data, b11Root, bc1[1], cache)
attestation2 = makeAttestation(state.data.data, b11Root, bc1[2], cache) attestation2 = makeAttestation(state.data.data, b11Root, bc1[2], cache)
pool[].add(attestation1) pool[].addAttestation(attestation1)
let head3 = pool[].selectHead() let head3 = pool[].selectHead()
# Warning - the tiebreak are incorrect and guaranteed consensus fork, it should be bigger # Warning - the tiebreak are incorrect and guaranteed consensus fork, it should be bigger
@ -225,12 +236,117 @@ suiteReport "Attestation pool processing" & preset():
# all implementations favor the biggest root # all implementations favor the biggest root
# TODO # TODO
# currently using smaller as we have used for over a year # currently using smaller as we have used for over a year
head3 == smaller head3 == smaller[]
pool[].add(attestation2) pool[].addAttestation(attestation2)
let head4 = pool[].selectHead() let head4 = pool[].selectHead()
check: check:
# Two votes for b11 # Two votes for b11
head4 == b11Add head4 == b11Add[]
timedTest "Trying to add a block twice tags the second as an error":
var cache = get_empty_per_epoch_cache()
let
b10 = makeTestBlock(state.data, blockPool[].tail.root, cache)
b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].addRawBlock(b10Root, b10) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
let head = pool[].selectHead()
check:
head == b10Add[]
# -------------------------------------------------------------
# Add back the old block to ensure we have a duplicate error
let b10_clone = b10 # Assumes deep copy
let b10Add_clone = blockpool[].addRawBlock(b10Root, b10_clone) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
doAssert: b10Add_clone.error == Duplicate
wrappedTimedTest "Trying to add a duplicate block from an old pruned epoch is tagged as an error":
var cache = get_empty_per_epoch_cache()
blockpool[].addFlags {skipBLSValidation}
pool.forkChoice_v2.proto_array.prune_threshold = 1
let
b10 = makeTestBlock(state.data, blockPool[].tail.root, cache)
b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].addRawBlock(b10Root, b10) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
let head = pool[].selectHead()
doAssert: head == b10Add[]
let block_ok = state_transition(state.data, b10, {}, noRollback)
doAssert: block_ok
# -------------------------------------------------------------
let b10_clone = b10 # Assumes deep copy
# -------------------------------------------------------------
# Pass an epoch
var block_root = b10Root
var attestations: seq[Attestation]
for epoch in 0 ..< 5:
let start_slot = compute_start_slot_at_epoch(Epoch epoch)
for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH:
let new_block = makeTestBlock(state.data, block_root, cache, attestations = attestations)
let block_ok = state_transition(state.data, new_block, {skipBLSValidation}, noRollback)
doAssert: block_ok
block_root = hash_tree_root(new_block.message)
let blockRef = blockpool[].addRawBlock(block_root, new_block) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
let head = pool[].selectHead()
doassert: head == blockRef[]
blockPool[].updateHead(head)
attestations.setlen(0)
for index in 0 ..< get_committee_count_at_slot(state.data.data, slot.Slot):
let committee = get_beacon_committee(
state.data.data, state.data.data.slot, index.CommitteeIndex, cache)
# Create a bitfield filled with the given count per attestation,
# exactly on the right-most part of the committee field.
var aggregation_bits = init(CommitteeValidatorsBits, committee.len)
for v in 0 ..< committee.len * 2 div 3 + 1:
aggregation_bits[v] = true
attestations.add Attestation(
aggregation_bits: aggregation_bits,
data: makeAttestationData(
state.data.data, state.data.data.slot,
index, blockroot
)
# signature: ValidatorSig()
)
cache = get_empty_per_epoch_cache()
# -------------------------------------------------------------
# Prune
echo "\nPruning all blocks before: ", shortlog(blockPool[].finalizedHead), '\n'
doAssert: blockPool[].finalizedHead.slot != 0
pool[].pruneBefore(blockPool[].finalizedHead)
doAssert: b10Root notin pool.forkChoice_v2
# Add back the old block to ensure we have a duplicate error
let b10Add_clone = blockpool[].addRawBlock(b10Root, b10_clone) do (validBlock: BlockRef):
# Callback Add to fork choice
pool[].addForkChoice_v2(validBlock)
doAssert: b10Add_clone.error == Duplicate

View File

@ -110,26 +110,28 @@ suiteReport "Block pool processing" & preset():
timedTest "Simple block add&get" & preset(): timedTest "Simple block add&get" & preset():
let let
b1Add = pool.add(b1Root, b1)[] b1Add = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
discard
b1Get = pool.get(b1Root) b1Get = pool.get(b1Root)
check: check:
b1Get.isSome() b1Get.isSome()
b1Get.get().refs.root == b1Root b1Get.get().refs.root == b1Root
b1Add.root == b1Get.get().refs.root b1Add[].root == b1Get.get().refs.root
pool.heads.len == 1 pool.heads.len == 1
pool.heads[0].blck == b1Add pool.heads[0].blck == b1Add[]
let let
b2Add = pool.add(b2Root, b2)[] b2Add = pool.addRawBlock(b2Root, b2) do (validBlock: BlockRef):
discard
b2Get = pool.get(b2Root) b2Get = pool.get(b2Root)
check: check:
b2Get.isSome() b2Get.isSome()
b2Get.get().refs.root == b2Root b2Get.get().refs.root == b2Root
b2Add.root == b2Get.get().refs.root b2Add[].root == b2Get.get().refs.root
pool.heads.len == 1 pool.heads.len == 1
pool.heads[0].blck == b2Add pool.heads[0].blck == b2Add[]
# Skip one slot to get a gap # Skip one slot to get a gap
check: check:
@ -138,12 +140,13 @@ suiteReport "Block pool processing" & preset():
let let
b4 = addTestBlock(stateData.data, b2Root, cache) b4 = addTestBlock(stateData.data, b2Root, cache)
b4Root = hash_tree_root(b4.message) b4Root = hash_tree_root(b4.message)
b4Add = pool.add(b4Root, b4)[] b4Add = pool.addRawBlock(b4Root, b4) do (validBlock: BlockRef):
discard
check: check:
b4Add.parent == b2Add b4Add[].parent == b2Add[]
pool.updateHead(b4Add) pool.updateHead(b4Add[])
var blocks: array[3, BlockRef] var blocks: array[3, BlockRef]
@ -152,16 +155,16 @@ suiteReport "Block pool processing" & preset():
blocks[0..<1] == [pool.tail] blocks[0..<1] == [pool.tail]
pool.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 1)) == 0 pool.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [pool.tail, b1Add] blocks[0..<2] == [pool.tail, b1Add[]]
pool.getBlockRange(Slot(0), 2, blocks.toOpenArray(0, 1)) == 0 pool.getBlockRange(Slot(0), 2, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [pool.tail, b2Add] blocks[0..<2] == [pool.tail, b2Add[]]
pool.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1 pool.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1
blocks[0..<2] == [nil, pool.tail] # block 3 is missing! blocks[0..<2] == [nil, pool.tail] # block 3 is missing!
pool.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0 pool.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0
blocks[0..<2] == [b2Add, b4Add] # block 3 is missing! blocks[0..<2] == [b2Add[], b4Add[]] # block 3 is missing!
# empty length # empty length
pool.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, -1)) == 0 pool.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, -1)) == 0
@ -174,13 +177,18 @@ suiteReport "Block pool processing" & preset():
blocks[0..<2] == [BlockRef nil, nil] # block 3 is missing! blocks[0..<2] == [BlockRef nil, nil] # block 3 is missing!
timedTest "Reverse order block add & get" & preset(): timedTest "Reverse order block add & get" & preset():
check: pool.add(b2Root, b2).error == MissingParent let missing = pool.addRawBlock(b2Root, b2) do (validBlock: BLockRef):
discard
check: missing.error == MissingParent
check: check:
pool.get(b2Root).isNone() # Unresolved, shouldn't show up pool.get(b2Root).isNone() # Unresolved, shouldn't show up
FetchRecord(root: b1Root) in pool.checkMissing() FetchRecord(root: b1Root) in pool.checkMissing()
check: pool.add(b1Root, b1).isOk let status = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
discard
check: status.isOk
let let
b1Get = pool.get(b1Root) b1Get = pool.get(b1Root)
@ -213,32 +221,37 @@ suiteReport "Block pool processing" & preset():
pool2.heads.len == 1 pool2.heads.len == 1
pool2.heads[0].blck.root == b2Root pool2.heads[0].blck.root == b2Root
timedTest "Can add same block twice" & preset(): timedTest "Adding the same block twice returns a Duplicate error" & preset():
let let
b10 = pool.add(b1Root, b1)[] b10 = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
b11 = pool.add(b1Root, b1)[] discard
b11 = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
discard
check: check:
b10 == b11 b11.error == Duplicate
not b10.isNil not b10[].isNil
timedTest "updateHead updates head and headState" & preset(): timedTest "updateHead updates head and headState" & preset():
let let
b1Add = pool.add(b1Root, b1)[] b1Add = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
discard
pool.updateHead(b1Add) pool.updateHead(b1Add[])
check: check:
pool.head.blck == b1Add pool.head.blck == b1Add[]
pool.headState.data.data.slot == b1Add.slot pool.headState.data.data.slot == b1Add[].slot
timedTest "updateStateData sanity" & preset(): timedTest "updateStateData sanity" & preset():
let let
b1Add = pool.add(b1Root, b1)[] b1Add = pool.addRawBlock(b1Root, b1) do (validBlock: BlockRef):
b2Add = pool.add(b2Root, b2)[] discard
bs1 = BlockSlot(blck: b1Add, slot: b1.message.slot) b2Add = pool.addRawBlock(b2Root, b2) do (validBlock: BlockRef):
bs1_3 = b1Add.atSlot(3.Slot) discard
bs2_3 = b2Add.atSlot(3.Slot) bs1 = BlockSlot(blck: b1Add[], slot: b1.message.slot)
bs1_3 = b1Add[].atSlot(3.Slot)
bs2_3 = b2Add[].atSlot(3.Slot)
var tmpState = assignClone(pool.headState) var tmpState = assignClone(pool.headState)
@ -246,38 +259,38 @@ suiteReport "Block pool processing" & preset():
pool.updateStateData(tmpState[], bs1) pool.updateStateData(tmpState[], bs1)
check: check:
tmpState.blck == b1Add tmpState.blck == b1Add[]
tmpState.data.data.slot == bs1.slot tmpState.data.data.slot == bs1.slot
# Skip slots # Skip slots
pool.updateStateData(tmpState[], bs1_3) # skip slots pool.updateStateData(tmpState[], bs1_3) # skip slots
check: check:
tmpState.blck == b1Add tmpState.blck == b1Add[]
tmpState.data.data.slot == bs1_3.slot tmpState.data.data.slot == bs1_3.slot
# Move back slots, but not blocks # Move back slots, but not blocks
pool.updateStateData(tmpState[], bs1_3.parent()) pool.updateStateData(tmpState[], bs1_3.parent())
check: check:
tmpState.blck == b1Add tmpState.blck == b1Add[]
tmpState.data.data.slot == bs1_3.parent().slot tmpState.data.data.slot == bs1_3.parent().slot
# Move to different block and slot # Move to different block and slot
pool.updateStateData(tmpState[], bs2_3) pool.updateStateData(tmpState[], bs2_3)
check: check:
tmpState.blck == b2Add tmpState.blck == b2Add[]
tmpState.data.data.slot == bs2_3.slot tmpState.data.data.slot == bs2_3.slot
# Move back slot and block # Move back slot and block
pool.updateStateData(tmpState[], bs1) pool.updateStateData(tmpState[], bs1)
check: check:
tmpState.blck == b1Add tmpState.blck == b1Add[]
tmpState.data.data.slot == bs1.slot tmpState.data.data.slot == bs1.slot
# Move back to genesis # Move back to genesis
pool.updateStateData(tmpState[], bs1.parent()) pool.updateStateData(tmpState[], bs1.parent())
check: check:
tmpState.blck == b1Add.parent tmpState.blck == b1Add[].parent
tmpState.data.data.slot == bs1.parent.slot tmpState.data.data.slot == bs1.parent.slot
suiteReport "BlockPool finalization tests" & preset(): suiteReport "BlockPool finalization tests" & preset():
@ -297,7 +310,11 @@ suiteReport "BlockPool finalization tests" & preset():
tmpState[], tmpState.data.slot + (5 * SLOTS_PER_EPOCH).uint64) tmpState[], tmpState.data.slot + (5 * SLOTS_PER_EPOCH).uint64)
let lateBlock = makeTestBlock(tmpState[], pool.head.blck.root, cache) let lateBlock = makeTestBlock(tmpState[], pool.head.blck.root, cache)
check: pool.add(hash_tree_root(blck.message), blck).isOk block:
let status = pool.addRawBlock(hash_tree_root(blck.message), blck) do (validBlock: BlockRef):
discard
check: status.isOk()
for i in 0 ..< (SLOTS_PER_EPOCH * 6): for i in 0 ..< (SLOTS_PER_EPOCH * 6):
if i == 1: if i == 1:
@ -305,7 +322,9 @@ suiteReport "BlockPool finalization tests" & preset():
check: check:
pool.tail.children.len == 2 pool.tail.children.len == 2
pool.heads.len == 2 pool.heads.len == 2
var
if i mod SLOTS_PER_EPOCH == 0:
# Reset cache at epoch boundaries
cache = get_empty_per_epoch_cache() cache = get_empty_per_epoch_cache()
blck = makeTestBlock( blck = makeTestBlock(
@ -313,18 +332,22 @@ suiteReport "BlockPool finalization tests" & preset():
attestations = makeFullAttestations( attestations = makeFullAttestations(
pool.headState.data.data, pool.head.blck.root, pool.headState.data.data, pool.head.blck.root,
pool.headState.data.data.slot, cache, {})) pool.headState.data.data.slot, cache, {}))
let added = pool.add(hash_tree_root(blck.message), blck)[] let added = pool.addRawBlock(hash_tree_root(blck.message), blck) do (validBlock: BlockRef):
pool.updateHead(added) discard
check: added.isOk()
pool.updateHead(added[])
check: check:
pool.heads.len() == 1 pool.heads.len() == 1
pool.head.justified.slot.compute_epoch_at_slot() == 5 pool.head.justified.slot.compute_epoch_at_slot() == 5
pool.tail.children.len == 1 pool.tail.children.len == 1
check: block:
# The late block is a block whose parent was finalized long ago and thus # The late block is a block whose parent was finalized long ago and thus
# is no longer a viable head candidate # is no longer a viable head candidate
pool.add(hash_tree_root(lateBlock.message), lateBlock).error == Unviable let status = pool.addRawBlock(hash_tree_root(lateBlock.message), lateBlock) do (validBlock: BlockRef):
discard
check: status.error == Unviable
let let
pool2 = BlockPool.init(db) pool2 = BlockPool.init(db)
@ -340,43 +363,47 @@ suiteReport "BlockPool finalization tests" & preset():
hash_tree_root(pool2.justifiedState.data.data) == hash_tree_root(pool2.justifiedState.data.data) ==
hash_tree_root(pool.justifiedState.data.data) hash_tree_root(pool.justifiedState.data.data)
timedTest "init with gaps" & preset(): # timedTest "init with gaps" & preset():
var cache = get_empty_per_epoch_cache() # var cache = get_empty_per_epoch_cache()
for i in 0 ..< (SLOTS_PER_EPOCH * 6 - 2): # for i in 0 ..< (SLOTS_PER_EPOCH * 6 - 2):
var # var
blck = makeTestBlock( # blck = makeTestBlock(
pool.headState.data, pool.head.blck.root, cache, # pool.headState.data, pool.head.blck.root, cache,
attestations = makeFullAttestations( # attestations = makeFullAttestations(
pool.headState.data.data, pool.head.blck.root, # pool.headState.data.data, pool.head.blck.root,
pool.headState.data.data.slot, cache, {})) # pool.headState.data.data.slot, cache, {}))
let added = pool.add(hash_tree_root(blck.message), blck)[]
pool.updateHead(added)
# Advance past epoch so that the epoch transition is gapped # let added = pool.addRawBlock(hash_tree_root(blck.message), blck) do (validBlock: BlockRef):
check: # discard
process_slots( # check: added.isOk()
pool.headState.data, Slot(SLOTS_PER_EPOCH * 6 + 2) ) # pool.updateHead(added[])
var blck = makeTestBlock( # # Advance past epoch so that the epoch transition is gapped
pool.headState.data, pool.head.blck.root, cache, # check:
attestations = makeFullAttestations( # process_slots(
pool.headState.data.data, pool.head.blck.root, # pool.headState.data, Slot(SLOTS_PER_EPOCH * 6 + 2) )
pool.headState.data.data.slot, cache, {}))
let added = pool.add(hash_tree_root(blck.message), blck)[] # var blck = makeTestBlock(
pool.updateHead(added) # pool.headState.data, pool.head.blck.root, cache,
# attestations = makeFullAttestations(
# pool.headState.data.data, pool.head.blck.root,
# pool.headState.data.data.slot, cache, {}))
let # let added = pool.addRawBlock(hash_tree_root(blck.message), blck) do (validBlock: BlockRef):
pool2 = BlockPool.init(db) # discard
# check: added.isOk()
# pool.updateHead(added[])
# check that the state reloaded from database resembles what we had before # let
check: # pool2 = BlockPool.init(db)
pool2.tail.root == pool.tail.root
pool2.head.blck.root == pool.head.blck.root
pool2.finalizedHead.blck.root == pool.finalizedHead.blck.root
pool2.finalizedHead.slot == pool.finalizedHead.slot
hash_tree_root(pool2.headState.data.data) ==
hash_tree_root(pool.headState.data.data)
hash_tree_root(pool2.justifiedState.data.data) ==
hash_tree_root(pool.justifiedState.data.data)
# # check that the state reloaded from database resembles what we had before
# check:
# pool2.tail.root == pool.tail.root
# pool2.head.blck.root == pool.head.blck.root
# pool2.finalizedHead.blck.root == pool.finalizedHead.blck.root
# pool2.finalizedHead.slot == pool.finalizedHead.slot
# hash_tree_root(pool2.headState.data.data) ==
# hash_tree_root(pool.headState.data.data)
# hash_tree_root(pool2.justifiedState.data.data) ==
# hash_tree_root(pool.justifiedState.data.data)