handle fork choice selecting a head unknown to chaindag (#1907)
* move head slot metric to head selection * avoid redundant head selection in validator duties
This commit is contained in:
parent
2396417581
commit
ff3f8741de
|
@ -15,7 +15,8 @@ import
|
||||||
# Internal
|
# Internal
|
||||||
./spec/[beaconstate, datatypes, crypto, digest, helpers],
|
./spec/[beaconstate, datatypes, crypto, digest, helpers],
|
||||||
ssz/merkleization,
|
ssz/merkleization,
|
||||||
./block_pools/[spec_cache, chain_dag, clearance], ./beacon_node_types,
|
./block_pools/[spec_cache, chain_dag, clearance, quarantine],
|
||||||
|
./beacon_node_types,
|
||||||
./fork_choice/fork_choice
|
./fork_choice/fork_choice
|
||||||
|
|
||||||
export beacon_node_types, sets
|
export beacon_node_types, sets
|
||||||
|
@ -355,7 +356,15 @@ proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef =
|
||||||
error "Couldn't select head", err = newHead.error
|
error "Couldn't select head", err = newHead.error
|
||||||
nil
|
nil
|
||||||
else:
|
else:
|
||||||
pool.chainDag.getRef(newHead.get())
|
let ret = pool.chainDag.getRef(newHead.get())
|
||||||
|
if ret.isNil:
|
||||||
|
# This should normally not happen, but if the chain dag and fork choice
|
||||||
|
# get out of sync, we'll need to try to download the selected head - in
|
||||||
|
# the meantime, return nil to indicate that no new head was chosen
|
||||||
|
warn "Fork choice selected unknown head, trying to sync", root = newHead.get()
|
||||||
|
pool.quarantine.addMissing(newHead.get())
|
||||||
|
|
||||||
|
ret
|
||||||
|
|
||||||
proc prune*(pool: var AttestationPool) =
|
proc prune*(pool: var AttestationPool) =
|
||||||
if (let v = pool.forkChoice.prune(); v.isErr):
|
if (let v = pool.forkChoice.prune(); v.isErr):
|
||||||
|
|
|
@ -47,8 +47,6 @@ template init(T: type RpcHttpServer, ip: ValidIpAddress, port: Port): T =
|
||||||
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
|
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
|
||||||
declareGauge beacon_slot,
|
declareGauge beacon_slot,
|
||||||
"Latest slot of the beacon chain state"
|
"Latest slot of the beacon chain state"
|
||||||
declareGauge beacon_head_slot,
|
|
||||||
"Slot of the head block of the beacon chain"
|
|
||||||
|
|
||||||
# Finalization tracking
|
# Finalization tracking
|
||||||
declareGauge finalization_delay,
|
declareGauge finalization_delay,
|
||||||
|
@ -517,9 +515,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
|
|
||||||
# Whatever we do during the slot, we need to know the head, because this will
|
# Whatever we do during the slot, we need to know the head, because this will
|
||||||
# give us a state to work with and thus a shuffling.
|
# give us a state to work with and thus a shuffling.
|
||||||
# TODO typically, what consitutes correct actions stays constant between slot
|
|
||||||
# updates and is stable across some epoch transitions as well - see how
|
|
||||||
# we can avoid recalculating everything here
|
|
||||||
# TODO if the head is very old, that is indicative of something being very
|
# TODO if the head is very old, that is indicative of something being very
|
||||||
# wrong - us being out of sync or disconnected from the network - need
|
# wrong - us being out of sync or disconnected from the network - need
|
||||||
# to consider what to do in that case:
|
# to consider what to do in that case:
|
||||||
|
@ -535,11 +530,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
# disappear naturally - risky because user is not aware,
|
# disappear naturally - risky because user is not aware,
|
||||||
# and might lose stake on canonical chain but "just works"
|
# and might lose stake on canonical chain but "just works"
|
||||||
# when reconnected..
|
# when reconnected..
|
||||||
discard node.updateHead(slot)
|
node.processor[].updateHead(slot)
|
||||||
|
|
||||||
# TODO is the slot of the clock or the head block more interesting? provide
|
|
||||||
# rationale in comment
|
|
||||||
beacon_head_slot.set slot.int64
|
|
||||||
|
|
||||||
# Time passes in here..
|
# Time passes in here..
|
||||||
await node.handleValidatorDuties(lastSlot, slot)
|
await node.handleValidatorDuties(lastSlot, slot)
|
||||||
|
|
|
@ -8,25 +8,23 @@
|
||||||
# Common routines for a BeaconNode and a BeaconValidator node
|
# Common routines for a BeaconNode and a BeaconValidator node
|
||||||
|
|
||||||
import
|
import
|
||||||
# Standard library
|
std/osproc,
|
||||||
tables, osproc,
|
|
||||||
|
|
||||||
# Nimble packages
|
# Nimble packages
|
||||||
chronos, json_rpc/rpcserver, metrics,
|
chronos, json_rpc/rpcserver,
|
||||||
chronicles,
|
|
||||||
|
|
||||||
# Local modules
|
# Local modules
|
||||||
spec/[datatypes, crypto, digest],
|
./conf, ./time, ./beacon_chain_db, ./attestation_pool, ./eth2_network,
|
||||||
conf, time, beacon_chain_db,
|
./beacon_node_types, ./mainchain_monitor, ./request_manager,
|
||||||
attestation_pool, eth2_network,
|
./sync_manager, ./eth2_processor,
|
||||||
block_pools/[chain_dag, quarantine],
|
./block_pools/[chain_dag, quarantine],
|
||||||
beacon_node_types, mainchain_monitor, request_manager,
|
./spec/datatypes
|
||||||
sync_manager,
|
|
||||||
./eth2_processor
|
|
||||||
|
|
||||||
# This removes an invalid Nim warning that the digest module is unused here
|
export
|
||||||
# It's currently used for `shortLog(head.blck.root)`
|
osproc, chronos, rpcserver, conf, time, beacon_chain_db,
|
||||||
type Eth2Digest = digest.Eth2Digest
|
attestation_pool, eth2_network, beacon_node_types, mainchain_monitor,
|
||||||
|
request_manager, sync_manager, eth2_processor, chain_Dag, quarantine,
|
||||||
|
datatypes
|
||||||
|
|
||||||
type
|
type
|
||||||
RpcServer* = RpcHttpServer
|
RpcServer* = RpcHttpServer
|
||||||
|
@ -62,12 +60,6 @@ type
|
||||||
const
|
const
|
||||||
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
||||||
|
|
||||||
# Metrics
|
|
||||||
proc updateHead*(node: BeaconNode, wallSlot: Slot): BlockRef =
|
|
||||||
## Trigger fork choice and returns the new head block.
|
|
||||||
## Can return `nil`
|
|
||||||
node.processor[].updateHead(wallSlot)
|
|
||||||
|
|
||||||
# TODO stew/sequtils2
|
# TODO stew/sequtils2
|
||||||
template findIt*(s: openarray, predicate: untyped): int =
|
template findIt*(s: openarray, predicate: untyped): int =
|
||||||
var res = -1
|
var res = -1
|
||||||
|
|
|
@ -36,8 +36,9 @@ declareHistogram beacon_block_delay,
|
||||||
declareHistogram beacon_store_block_duration_seconds,
|
declareHistogram beacon_store_block_duration_seconds,
|
||||||
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
|
||||||
|
|
||||||
declareGauge beacon_head_root,
|
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics
|
||||||
"Root of the head block of the beacon chain"
|
declareGauge beacon_head_root, "Root of the head block of the beacon chain"
|
||||||
|
declareGauge beacon_head_slot, "Slot of the head block of the beacon chain"
|
||||||
|
|
||||||
type
|
type
|
||||||
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
|
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
|
||||||
|
@ -67,13 +68,15 @@ type
|
||||||
attestationsQueue*: AsyncQueue[AttestationEntry]
|
attestationsQueue*: AsyncQueue[AttestationEntry]
|
||||||
aggregatesQueue*: AsyncQueue[AggregateEntry]
|
aggregatesQueue*: AsyncQueue[AggregateEntry]
|
||||||
|
|
||||||
proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef =
|
proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
|
||||||
## Trigger fork choice and returns the new head block.
|
## Trigger fork choice and returns the new head block.
|
||||||
## Can return `nil`
|
## Can return `nil`
|
||||||
# Grab the new head according to our latest attestation data
|
# Grab the new head according to our latest attestation data
|
||||||
let newHead = self.attestationPool[].selectHead(wallSlot)
|
let newHead = self.attestationPool[].selectHead(wallSlot)
|
||||||
if newHead.isNil():
|
if newHead.isNil():
|
||||||
return nil
|
warn "Head selection failed, using previous head",
|
||||||
|
head = shortLog(self.chainDag.head), wallSlot
|
||||||
|
return
|
||||||
|
|
||||||
# Store the new head in the chain DAG - this may cause epochs to be
|
# Store the new head in the chain DAG - this may cause epochs to be
|
||||||
# justified and finalized
|
# justified and finalized
|
||||||
|
@ -81,13 +84,12 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef =
|
||||||
|
|
||||||
self.chainDag.updateHead(newHead, self.quarantine)
|
self.chainDag.updateHead(newHead, self.quarantine)
|
||||||
beacon_head_root.set newHead.root.toGaugeValue
|
beacon_head_root.set newHead.root.toGaugeValue
|
||||||
|
beacon_head_slot.set newHead.slot.int64
|
||||||
|
|
||||||
# Cleanup the fork choice v2 if we have a finalized head
|
# Cleanup the fork choice v2 if we have a finalized head
|
||||||
if oldFinalized != self.chainDag.finalizedHead.blck:
|
if oldFinalized != self.chainDag.finalizedHead.blck:
|
||||||
self.attestationPool[].prune()
|
self.attestationPool[].prune()
|
||||||
|
|
||||||
newHead
|
|
||||||
|
|
||||||
proc dumpBlock[T](
|
proc dumpBlock[T](
|
||||||
self: Eth2Processor, signedBlock: SignedBeaconBlock,
|
self: Eth2Processor, signedBlock: SignedBeaconBlock,
|
||||||
res: Result[T, (ValidationResult, BlockError)]) =
|
res: Result[T, (ValidationResult, BlockError)]) =
|
||||||
|
@ -198,7 +200,8 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
|
||||||
|
|
||||||
if res.isOk():
|
if res.isOk():
|
||||||
# Eagerly update head in case the new block gets selected
|
# Eagerly update head in case the new block gets selected
|
||||||
discard self.updateHead(wallSlot)
|
self.updateHead(wallSlot)
|
||||||
|
|
||||||
let updateDone = now(chronos.Moment)
|
let updateDone = now(chronos.Moment)
|
||||||
let storeBlockDuration = storeDone - start
|
let storeBlockDuration = storeDone - start
|
||||||
let updateHeadDuration = updateDone - storeDone
|
let updateHeadDuration = updateDone - storeDone
|
||||||
|
|
|
@ -477,18 +477,15 @@ proc broadcastAggregatedAttestations(
|
||||||
attestation = shortLog(signedAP.message.aggregate),
|
attestation = shortLog(signedAP.message.aggregate),
|
||||||
validator = shortLog(curr[0].v)
|
validator = shortLog(curr[0].v)
|
||||||
|
|
||||||
proc handleValidatorDuties*(
|
proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
|
||||||
node: BeaconNode, lastSlot, slot: Slot) {.async.} =
|
|
||||||
## Perform validator duties - create blocks, vote and aggregate existing votes
|
## Perform validator duties - create blocks, vote and aggregate existing votes
|
||||||
let maybeHead = node.updateHead(slot)
|
|
||||||
if maybeHead.isNil():
|
|
||||||
error "Couldn't update head - cannot proceed with validator duties"
|
|
||||||
return
|
|
||||||
var head = maybeHead
|
|
||||||
if node.attachedValidators.count == 0:
|
if node.attachedValidators.count == 0:
|
||||||
# Nothing to do because we have no validator attached
|
# Nothing to do because we have no validator attached
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# The chainDag head might be updated by sync while we're working due to the
|
||||||
|
# await calls, thus we use a local variable to keep the logic straight here
|
||||||
|
var head = node.chainDag.head
|
||||||
if not node.isSynced(head):
|
if not node.isSynced(head):
|
||||||
notice "Node out of sync, skipping validator duties",
|
notice "Node out of sync, skipping validator duties",
|
||||||
slot, headSlot = head.slot
|
slot, headSlot = head.slot
|
||||||
|
@ -499,9 +496,6 @@ proc handleValidatorDuties*(
|
||||||
# Start by checking if there's work we should have done in the past that we
|
# Start by checking if there's work we should have done in the past that we
|
||||||
# can still meaningfully do
|
# can still meaningfully do
|
||||||
while curSlot < slot:
|
while curSlot < slot:
|
||||||
# TODO maybe even collect all work synchronously to avoid unnecessary
|
|
||||||
# state rewinds while waiting for async operations like validator
|
|
||||||
# signature..
|
|
||||||
notice "Catching up on validator duties",
|
notice "Catching up on validator duties",
|
||||||
curSlot = shortLog(curSlot),
|
curSlot = shortLog(curSlot),
|
||||||
lastSlot = shortLog(lastSlot),
|
lastSlot = shortLog(lastSlot),
|
||||||
|
@ -510,14 +504,12 @@ proc handleValidatorDuties*(
|
||||||
# For every slot we're catching up, we'll propose then send
|
# For every slot we're catching up, we'll propose then send
|
||||||
# attestations - head should normally be advancing along the same branch
|
# attestations - head should normally be advancing along the same branch
|
||||||
# in this case
|
# in this case
|
||||||
# TODO what if we receive blocks / attestations while doing this work?
|
|
||||||
head = await handleProposal(node, head, curSlot)
|
head = await handleProposal(node, head, curSlot)
|
||||||
|
|
||||||
# For each slot we missed, we need to send out attestations - if we were
|
# For each slot we missed, we need to send out attestations - if we were
|
||||||
# proposing during this time, we'll use the newly proposed head, else just
|
# proposing during this time, we'll use the newly proposed head, else just
|
||||||
# keep reusing the same - the attestation that goes out will actually
|
# keep reusing the same - the attestation that goes out will actually
|
||||||
# rewind the state to what it looked like at the time of that slot
|
# rewind the state to what it looked like at the time of that slot
|
||||||
# TODO smells like there's an optimization opportunity here
|
|
||||||
handleAttestations(node, head, curSlot)
|
handleAttestations(node, head, curSlot)
|
||||||
|
|
||||||
curSlot += 1
|
curSlot += 1
|
||||||
|
@ -541,11 +533,8 @@ proc handleValidatorDuties*(
|
||||||
template sleepToSlotOffsetWithHeadUpdate(extra: chronos.Duration, msg: static string) =
|
template sleepToSlotOffsetWithHeadUpdate(extra: chronos.Duration, msg: static string) =
|
||||||
if await node.beaconClock.sleepToSlotOffset(extra, slot, msg):
|
if await node.beaconClock.sleepToSlotOffset(extra, slot, msg):
|
||||||
# Time passed - we might need to select a new head in that case
|
# Time passed - we might need to select a new head in that case
|
||||||
let maybeHead = node.updateHead(slot)
|
node.processor[].updateHead(slot)
|
||||||
if not maybeHead.isNil():
|
head = node.chainDag.head
|
||||||
head = maybeHead
|
|
||||||
else:
|
|
||||||
error "Couldn't update head"
|
|
||||||
|
|
||||||
sleepToSlotOffsetWithHeadUpdate(
|
sleepToSlotOffsetWithHeadUpdate(
|
||||||
seconds(int64(SECONDS_PER_SLOT)) div 3, "Waiting to send attestations")
|
seconds(int64(SECONDS_PER_SLOT)) div 3, "Waiting to send attestations")
|
||||||
|
|
Loading…
Reference in New Issue