Dual headed fork choice [Reloaded] (#1223)

* Dual headed fork choice

* fix finalizedEpoch not moving

* reduce fork choice verbosity

* Add failing tests due to pruning

* Properly handle duplicate blocks in sync

* test_block_pool also add a test for duplicate blocks

* comments addressing review
This commit is contained in:
Mamy Ratsimbazafy 2020-06-24 20:24:36 +02:00 committed by GitHub
parent f20f077827
commit 6836d41ebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 266 additions and 28 deletions

2
.gitignore vendored
View File

@ -32,9 +32,9 @@ build/
*.sqlite3
/local_testnet_data*/
/local_testnet*_data*/
# Prometheus db
/data
# Grafana dashboards
/docker/*.json

View File

@ -9,8 +9,10 @@ AllTests-mainnet
+ Can add and retrieve simple attestation [Preset: mainnet] OK
+ Fork choice returns block with attestation OK
+ Fork choice returns latest block with no attestations OK
+ Trying to add a block twice tags the second as an error OK
+ Trying to add a duplicate block from an old pruned epoch is tagged as an error OK
```
OK: 7/7 Fail: 0/7 Skip: 0/7
OK: 9/9 Fail: 0/9 Skip: 0/9
## Beacon chain DB [Preset: mainnet]
```diff
+ empty database [Preset: mainnet] OK
@ -32,7 +34,7 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet]
```diff
+ Can add same block twice [Preset: mainnet] OK
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
+ Reverse order block add & get [Preset: mainnet] OK
+ Simple block add&get [Preset: mainnet] OK
+ getRef returns nil for missing blocks OK
@ -259,4 +261,4 @@ OK: 8/8 Fail: 0/8 Skip: 0/8
OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL---
OK: 158/161 Fail: 0/161 Skip: 3/161
OK: 160/163 Fail: 0/163 Skip: 3/163

View File

@ -11,7 +11,8 @@ import
deques, sequtils, tables, options,
chronicles, stew/[byteutils], json_serialization/std/sets,
./spec/[beaconstate, datatypes, crypto, digest, helpers, validator],
./extras, ./block_pool, ./block_pools/candidate_chains, ./beacon_node_types
./extras, ./block_pool, ./block_pools/candidate_chains, ./beacon_node_types,
./fork_choice/[fork_choice_types, fork_choice]
logScope: topics = "attpool"
@ -22,10 +23,24 @@ func init*(T: type AttestationPool, blockPool: BlockPool): T =
# TODO blockPool is only used when resolving orphaned attestations - it should
# probably be removed as a dependency of AttestationPool (or some other
# smart refactoring)
# TODO: In tests, on blockpool.init the finalized root
# from the `headState` and `justifiedState` is zero
let forkChoice = initForkChoice(
finalized_block_slot = default(Slot), # This is unnecessary for fork choice but may help external components for example logging/debugging
finalized_block_state_root = default(Eth2Digest), # This is unnecessary for fork choice but may help external components for example logging/debugging
justified_epoch = blockPool.headState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = blockPool.headState.data.data.finalized_checkpoint.epoch,
# We should use the checkpoint, but at genesis the headState finalized checkpoint is 0x0000...0000
# finalized_root = blockPool.headState.data.data.finalized_checkpoint.root
finalized_root = blockPool.finalizedHead.blck.root
).get()
T(
mapSlotsToAttestations: initDeque[AttestationsSeen](),
blockPool: blockPool,
unresolved: initTable[Eth2Digest, UnresolvedAttestation](),
forkChoice_v2: forkChoice
)
proc combine*(tgt: var Attestation, src: Attestation, flags: UpdateFlags) =
@ -107,13 +122,21 @@ proc slotIndex(
func updateLatestVotes(
pool: var AttestationPool, state: BeaconState, attestationSlot: Slot,
participants: seq[ValidatorIndex], blck: BlockRef) =
# ForkChoice v2
let target_epoch = compute_epoch_at_slot(attestationSlot)
for validator in participants:
# ForkChoice v1
let
pubKey = state.validators[validator].pubkey
current = pool.latestAttestations.getOrDefault(pubKey)
if current.isNil or current.slot < attestationSlot:
pool.latestAttestations[pubKey] = blck
# ForkChoice v2
pool.forkChoice_v2.process_attestation(validator, blck.root, target_epoch)
func get_attesting_indices_seq(state: BeaconState,
attestation_data: AttestationData,
bits: CommitteeValidatorsBits,
@ -261,6 +284,34 @@ proc add*(pool: var AttestationPool, attestation: Attestation) =
pool.addResolved(blck, attestation)
proc addForkChoice_v2*(pool: var AttestationPool, blck: BlockRef) =
## Add a verified block to the fork choice context
## The current justifiedState of the block pool is used as reference
# TODO: add(BlockPool, blockRoot: Eth2Digest, SignedBeaconBlock): BlockRef
# should ideally return the justified_epoch and finalized_epoch
# so that we can pass them directly to this proc without having to
# redo "updateStateData"
#
# In any case, `updateStateData` should shortcut
# to `getStateDataCached`
updateStateData(
pool.blockPool,
pool.blockPool.tmpState,
BlockSlot(blck: blck, slot: blck.slot)
)
let blockData = pool.blockPool.get(blck)
pool.forkChoice_v2.process_block(
slot = blck.slot,
block_root = blck.root,
parent_root = if not blck.parent.isNil: blck.parent.root else: default(Eth2Digest),
state_root = default(Eth2Digest), # This is unnecessary for fork choice but may help external components
justified_epoch = pool.blockPool.tmpState.data.data.current_justified_checkpoint.epoch,
finalized_epoch = pool.blockPool.tmpState.data.data.finalized_checkpoint.epoch,
).get()
proc getAttestationsForSlot*(pool: AttestationPool, newBlockSlot: Slot):
Option[AttestationsSeen] =
if newBlockSlot < (GENESIS_SLOT + MIN_ATTESTATION_INCLUSION_DELAY):
@ -395,7 +446,10 @@ proc resolve*(pool: var AttestationPool) =
for a in resolved:
pool.addResolved(a.blck, a.attestation)
func latestAttestation*(
# Fork choice v1
# ---------------------------------------------------------------
func latestAttestation(
pool: AttestationPool, pubKey: ValidatorPubKey): BlockRef =
pool.latestAttestations.getOrDefault(pubKey)
@ -403,7 +457,7 @@ func latestAttestation*(
# The structure of this code differs from the spec since we use a different
# strategy for storing states and justification points - it should nonetheless
# be close in terms of functionality.
func lmdGhost*(
func lmdGhost(
pool: AttestationPool, start_state: BeaconState,
start_block: BlockRef): BlockRef =
# TODO: a Fenwick Tree datastructure to keep track of cumulated votes
@ -454,7 +508,7 @@ func lmdGhost*(
winCount = candCount
head = winner
proc selectHead*(pool: AttestationPool): BlockRef =
proc selectHead_v1(pool: AttestationPool): BlockRef =
let
justifiedHead = pool.blockPool.latestJustifiedBlock()
@ -462,3 +516,47 @@ proc selectHead*(pool: AttestationPool): BlockRef =
lmdGhost(pool, pool.blockPool.justifiedState.data.data, justifiedHead.blck)
newHead
# Fork choice v2
# ---------------------------------------------------------------
func getAttesterBalances(state: StateData): seq[Gwei] {.noInit.}=
## Get the balances from a state
result.newSeq(state.data.data.validators.len) # zero-init
let epoch = state.data.data.slot.compute_epoch_at_slot()
for i in 0 ..< result.len:
# All non-active validators have a 0 balance
template validator: Validator = state.data.data.validators[i]
if validator.is_active_validator(epoch):
result[i] = validator.effective_balance
proc selectHead_v2(pool: var AttestationPool): BlockRef =
let attesterBalances = pool.blockPool.justifiedState.getAttesterBalances()
let newHead = pool.forkChoice_v2.find_head(
justified_epoch = pool.blockPool.justifiedState.data.data.slot.compute_epoch_at_slot(),
justified_root = pool.blockPool.head.justified.blck.root,
finalized_epoch = pool.blockPool.headState.data.data.finalized_checkpoint.epoch,
justified_state_balances = attesterBalances
).get()
pool.blockPool.getRef(newHead)
proc pruneBefore*(pool: var AttestationPool, finalizedhead: BlockSlot) =
pool.forkChoice_v2.maybe_prune(finalizedHead.blck.root).get()
# Dual-Headed Fork choice
# ---------------------------------------------------------------
proc selectHead*(pool: var AttestationPool): BlockRef =
let head_v1 = pool.selectHead_v1()
let head_v2 = pool.selectHead_v2()
if head_v1 != head_v2:
error "Fork choice engines in disagreement, using block from v1.",
v1_block = shortlog(head_v1),
v2_block = shortlog(head_v2)
return head_v1

View File

@ -322,9 +322,16 @@ proc storeBlock(
node.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error)
# Still here? This means we received a valid block and we need to add it
# to the fork choice
node.attestationPool.addForkChoice_v2(blck.get())
# The block we received contains attestations, and we might not yet know about
# all of them. Let's add them to the attestation pool.
let currentSlot = node.beaconClock.now.toSlot
@ -549,7 +556,10 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
# We going to ignore `BlockError.Old` errors because we have working
# backward sync and it can happens that we can perform overlapping
# requests.
if res.isErr and res.error != BlockError.Old:
# For the same reason we ignore Duplicate blocks as if they are duplicate
# from before the current finalized epoch, we can drop them
# (and they may have no parents anymore in the fork choice if it was pruned)
if res.isErr and res.error notin {BlockError.Old, BLockError.Duplicate}:
return res
discard node.updateHead()

View File

@ -20,7 +20,8 @@ import
conf, time, beacon_chain_db,
attestation_pool, block_pool, eth2_network,
beacon_node_types, mainchain_monitor, request_manager,
sync_manager
sync_manager,
fork_choice/fork_choice
# This removes an invalid Nim warning that the digest module is unused here
# It's currently used for `shortLog(head.blck.root)`
@ -68,6 +69,9 @@ proc updateHead*(node: BeaconNode): BlockRef =
node.blockPool.updateHead(newHead)
beacon_head_root.set newHead.root.toGaugeValue
# Cleanup the fork choice v2 if we have a finalized head
node.attestationPool.pruneBefore(node.blockPool.finalizedHead)
newHead
template findIt*(s: openarray, predicate: untyped): int64 =

View File

@ -5,7 +5,8 @@ import
stew/endians2,
spec/[datatypes, crypto, digest],
block_pools/block_pools_types,
block_pool # TODO: refactoring compat shim
block_pool, # TODO: refactoring compat shim
fork_choice/fork_choice_types
export block_pools_types
@ -74,6 +75,9 @@ type
latestAttestations*: Table[ValidatorPubKey, BlockRef] ##\
## Map that keeps track of the most recent vote of each attester - see
## fork_choice
forkChoice_v2*: ForkChoice ##\
## The alternative fork choice "proto_array" that will ultimately
## replace the original one
# #############################################
#

View File

@ -68,7 +68,13 @@ proc init*(T: type BlockPools, db: BeaconChainDB,
updateFlags: UpdateFlags = {}): BlockPools =
result.dag = init(CandidateChains, db, updateFlags)
func addFlags*(pool: BlockPool, flags: UpdateFlags) =
## Add a flag to the block processing
## This is destined for testing to add skipBLSValidation flag
pool.dag.updateFlags.incl flags
export init # func init*(T: type BlockRef, root: Eth2Digest, blck: BeaconBlock): BlockRef
export addFlags
func getRef*(pool: BlockPool, root: Eth2Digest): BlockRef =
## Retrieve a resolved block reference, if available

View File

@ -27,6 +27,7 @@ type
MissingParent
Old
Invalid
Duplicate
Quarantine* = object
## Keeps track of unsafe blocks coming from the network

View File

@ -135,7 +135,11 @@ proc add*(
blockRoot = shortLog(blockRoot),
cat = "filtering"
return ok blockRef[]
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice. Trying to add it again, even if the fork choice
# supports duplicate will lead to a crash.
return err Duplicate
quarantine.missing.del(blockRoot)

View File

@ -117,7 +117,7 @@ func process_attestation*(
vote.next_epoch = target_epoch
{.noSideEffect.}:
info "Integrating vote in fork choice",
trace "Integrating vote in fork choice",
validator_index = $validator_index,
new_vote = shortlog(vote)
else:
@ -129,7 +129,7 @@ func process_attestation*(
ignored_block_root = shortlog(block_root),
ignored_target_epoch = $target_epoch
else:
info "Ignoring double-vote for fork choice",
trace "Ignoring double-vote for fork choice",
validator_index = $validator_index,
current_vote = shortlog(vote),
ignored_block_root = shortlog(block_root),
@ -159,7 +159,7 @@ func process_block*(
return err("process_block_error: " & $err)
{.noSideEffect.}:
info "Integrating block in fork choice",
trace "Integrating block in fork choice",
block_root = $shortlog(block_root),
parent_root = $shortlog(parent_root),
justified_epoch = $justified_epoch,
@ -205,7 +205,7 @@ func find_head*(
return err("find_head failed: " & $ghost_err)
{.noSideEffect.}:
info "Fork choice requested",
debug "Fork choice requested",
justified_epoch = $justified_epoch,
justified_root = shortlog(justified_root),
finalized_epoch = $finalized_epoch,

View File

@ -671,7 +671,9 @@ func makeAttestationData*(
if start_slot == state.slot: beacon_block_root
else: get_block_root_at_slot(state, start_slot)
doAssert slot.compute_epoch_at_slot == current_epoch
doAssert slot.compute_epoch_at_slot == current_epoch,
"Computed epoch was " & $slot.compute_epoch_at_slot &
" while the state current_epoch was " & $current_epoch
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.2/specs/phase0/validator.md#attestation-data
AttestationData(

View File

@ -16,8 +16,17 @@ import
chronicles,
stew/byteutils,
./testutil, ./testblockutil,
../beacon_chain/spec/[digest, validator, state_transition],
../beacon_chain/[beacon_node_types, attestation_pool, block_pool]
../beacon_chain/spec/[digest, validator, state_transition, helpers, beaconstate],
../beacon_chain/[beacon_node_types, attestation_pool, block_pool, extras],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice]
template wrappedTimedTest(name: string, body: untyped) =
# `check` macro takes a copy of whatever it's checking, on the stack!
block: # Symbol namespacing
proc wrappedTest() =
timedTest name:
body
wrappedTest()
suiteReport "Attestation pool processing" & preset():
## For now just test that we can compile and execute block processing with
@ -33,7 +42,7 @@ suiteReport "Attestation pool processing" & preset():
check:
process_slots(state.data, state.data.data.slot + 1)
# pool[].add(blockPool[].tail) # Make the tail known to fork choice
pool[].addForkChoice_v2(blockPool[].tail) # Make the tail known to fork choice
timedTest "Can add and retrieve simple attestation" & preset():
var cache = get_empty_per_epoch_cache()
@ -161,7 +170,7 @@ suiteReport "Attestation pool processing" & preset():
b1Root = hash_tree_root(b1.message)
b1Add = blockpool[].add(b1Root, b1)[]
# pool[].add(b1Add) - make a block known to the future fork choice
pool[].addForkChoice_v2(b1Add)
let head = pool[].selectHead()
check:
@ -172,7 +181,7 @@ suiteReport "Attestation pool processing" & preset():
b2Root = hash_tree_root(b2.message)
b2Add = blockpool[].add(b2Root, b2)[]
# pool[].add(b2Add) - make a block known to the future fork choice
pool[].addForkChoice_v2(b2Add)
let head2 = pool[].selectHead()
check:
@ -185,7 +194,7 @@ suiteReport "Attestation pool processing" & preset():
b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].add(b10Root, b10)[]
# pool[].add(b10Add) - make a block known to the future fork choice
pool[].addForkChoice_v2(b10Add)
let head = pool[].selectHead()
check:
@ -202,7 +211,7 @@ suiteReport "Attestation pool processing" & preset():
state.data.data, state.data.data.slot, 1.CommitteeIndex, cache)
attestation0 = makeAttestation(state.data.data, b10Root, bc1[0], cache)
# pool[].add(b11Add) - make a block known to the future fork choice
pool[].addForkChoice_v2(b11Add)
pool[].add(attestation0)
let head2 = pool[].selectHead()
@ -234,3 +243,102 @@ suiteReport "Attestation pool processing" & preset():
check:
# Two votes for b11
head4 == b11Add
timedTest "Trying to add a block twice tags the second as an error":
var cache = get_empty_per_epoch_cache()
let
b10 = makeTestBlock(state.data, blockPool[].tail.root, cache)
b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].add(b10Root, b10)[]
pool[].addForkChoice_v2(b10Add)
let head = pool[].selectHead()
check:
head == b10Add
# -------------------------------------------------------------
# Add back the old block to ensure we have a duplicate error
let b10_clone = b10 # Assumes deep copy
let b10Add_clone = blockpool[].add(b10Root, b10_clone)
doAssert: b10Add_clone.error == Duplicate
wrappedTimedTest "Trying to add a duplicate block from an old pruned epoch is tagged as an error":
var cache = get_empty_per_epoch_cache()
blockpool[].addFlags {skipBLSValidation}
pool.forkChoice_v2.proto_array.prune_threshold = 1
let
b10 = makeTestBlock(state.data, blockPool[].tail.root, cache)
b10Root = hash_tree_root(b10.message)
b10Add = blockpool[].add(b10Root, b10)[]
pool[].addForkChoice_v2(b10Add)
let head = pool[].selectHead()
doAssert: head == b10Add
let block_ok = state_transition(state.data, b10, {}, noRollback)
doAssert: block_ok
# -------------------------------------------------------------
let b10_clone = b10 # Assumes deep copy
# -------------------------------------------------------------
# Pass an epoch
var block_root = b10Root
var attestations: seq[Attestation]
for epoch in 0 ..< 5:
let start_slot = compute_start_slot_at_epoch(Epoch epoch)
for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH:
let new_block = makeTestBlock(state.data, block_root, cache, attestations = attestations)
let block_ok = state_transition(state.data, new_block, {skipBLSValidation}, noRollback)
doAssert: block_ok
block_root = hash_tree_root(new_block.message)
let blockRef = blockpool[].add(block_root, new_block)[]
pool[].addForkChoice_v2(blockRef)
let head = pool[].selectHead()
doassert: head == blockRef
blockPool[].updateHead(head)
attestations.setlen(0)
for index in 0 ..< get_committee_count_at_slot(state.data.data, slot.Slot):
let committee = get_beacon_committee(
state.data.data, state.data.data.slot, index.CommitteeIndex, cache)
# Create a bitfield filled with the given count per attestation,
# exactly on the right-most part of the committee field.
var aggregation_bits = init(CommitteeValidatorsBits, committee.len)
for v in 0 ..< committee.len * 2 div 3 + 1:
aggregation_bits[v] = true
attestations.add Attestation(
aggregation_bits: aggregation_bits,
data: makeAttestationData(
state.data.data, state.data.data.slot,
index, blockroot
)
# signature: ValidatorSig()
)
cache = get_empty_per_epoch_cache()
# -------------------------------------------------------------
# Prune
echo "\nPruning all blocks before: ", shortlog(blockPool[].finalizedHead), '\n'
doAssert: blockPool[].finalizedHead.slot != 0
pool[].pruneBefore(blockPool[].finalizedHead)
doAssert: b10Root notin pool.forkChoice_v2
# Add back the old block to ensure we have a duplicate error
let b10Add_clone = blockpool[].add(b10Root, b10_clone)
doAssert: b10Add_clone.error == Duplicate

View File

@ -213,13 +213,13 @@ suiteReport "Block pool processing" & preset():
pool2.heads.len == 1
pool2.heads[0].blck.root == b2Root
timedTest "Can add same block twice" & preset():
timedTest "Adding the same block twice returns a Duplicate error" & preset():
let
b10 = pool.add(b1Root, b1)[]
b11 = pool.add(b1Root, b1)[]
b11 = pool.add(b1Root, b1)
check:
b10 == b11
b11.error == Duplicate
not b10.isNil
timedTest "updateHead updates head and headState" & preset():
@ -370,4 +370,3 @@ when const_preset == "minimal": # These require some minutes in mainnet
hash_tree_root(pool.headState.data.data)
hash_tree_root(pool2.justifiedState.data.data) ==
hash_tree_root(pool.justifiedState.data.data)