move split view catchup to research branch (#6133)
Using a dedicated branch for researching the effectiveness of split view scenario handling simplifies testing and avoids having partial work on `unstable`. If we want, we can reintroduce it under a `--debug` flag at a later time. But for now, Goerli is a rare opoprtunity to test this, maybe just for another week or so. - https://github.com/status-im/infra-nimbus/pull/179
This commit is contained in:
parent
fc9bc1da3a
commit
2dbe24c740
|
@ -25,7 +25,7 @@ import
|
||||||
attestation_pool, sync_committee_msg_pool, validator_change_pool],
|
attestation_pool, sync_committee_msg_pool, validator_change_pool],
|
||||||
./spec/datatypes/[base, altair],
|
./spec/datatypes/[base, altair],
|
||||||
./spec/eth2_apis/dynamic_fee_recipients,
|
./spec/eth2_apis/dynamic_fee_recipients,
|
||||||
./sync/[branch_discovery, sync_manager, request_manager],
|
./sync/[sync_manager, request_manager],
|
||||||
./validators/[
|
./validators/[
|
||||||
action_tracker, message_router, validator_monitor, validator_pool,
|
action_tracker, message_router, validator_monitor, validator_pool,
|
||||||
keystore_management],
|
keystore_management],
|
||||||
|
@ -35,7 +35,7 @@ export
|
||||||
osproc, chronos, presto, action_tracker,
|
osproc, chronos, presto, action_tracker,
|
||||||
beacon_clock, beacon_chain_db, conf, light_client,
|
beacon_clock, beacon_chain_db, conf, light_client,
|
||||||
attestation_pool, sync_committee_msg_pool, validator_change_pool,
|
attestation_pool, sync_committee_msg_pool, validator_change_pool,
|
||||||
eth2_network, el_manager, branch_discovery, request_manager, sync_manager,
|
eth2_network, el_manager, request_manager, sync_manager,
|
||||||
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
|
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
|
||||||
base, message_router, validator_monitor, validator_pool,
|
base, message_router, validator_monitor, validator_pool,
|
||||||
consensus_manager, dynamic_fee_recipients
|
consensus_manager, dynamic_fee_recipients
|
||||||
|
@ -85,7 +85,6 @@ type
|
||||||
requestManager*: RequestManager
|
requestManager*: RequestManager
|
||||||
syncManager*: SyncManager[Peer, PeerId]
|
syncManager*: SyncManager[Peer, PeerId]
|
||||||
backfiller*: SyncManager[Peer, PeerId]
|
backfiller*: SyncManager[Peer, PeerId]
|
||||||
branchDiscovery*: ref BranchDiscovery
|
|
||||||
genesisSnapshotContent*: string
|
genesisSnapshotContent*: string
|
||||||
processor*: ref Eth2Processor
|
processor*: ref Eth2Processor
|
||||||
blockProcessor*: ref BlockProcessor
|
blockProcessor*: ref BlockProcessor
|
||||||
|
|
|
@ -202,17 +202,10 @@ type
|
||||||
## Cached state used during block clearance - must only be used in
|
## Cached state used during block clearance - must only be used in
|
||||||
## clearance module
|
## clearance module
|
||||||
|
|
||||||
incrementalState*: ref ForkedHashedBeaconState
|
|
||||||
## State used for intermittent results of expensive computations that
|
|
||||||
## may take minutes - is only used if unavoidable, and nil otherwise
|
|
||||||
|
|
||||||
updateFlags*: UpdateFlags
|
updateFlags*: UpdateFlags
|
||||||
|
|
||||||
cfg*: RuntimeConfig
|
cfg*: RuntimeConfig
|
||||||
|
|
||||||
lastChainProgress*: Moment
|
|
||||||
## Indicates the last wall time at which meaningful progress was made
|
|
||||||
|
|
||||||
shufflingRefs*: LRUCache[16, ShufflingRef]
|
shufflingRefs*: LRUCache[16, ShufflingRef]
|
||||||
|
|
||||||
epochRefs*: LRUCache[32, EpochRef]
|
epochRefs*: LRUCache[32, EpochRef]
|
||||||
|
|
|
@ -236,8 +236,6 @@ func getBlockIdAtSlot*(dag: ChainDAGRef, slot: Slot): Opt[BlockSlotId] =
|
||||||
tryWithState dag.headState
|
tryWithState dag.headState
|
||||||
tryWithState dag.epochRefState
|
tryWithState dag.epochRefState
|
||||||
tryWithState dag.clearanceState
|
tryWithState dag.clearanceState
|
||||||
if dag.incrementalState != nil:
|
|
||||||
tryWithState dag.incrementalState[]
|
|
||||||
|
|
||||||
# Fallback to database, this only works for backfilled blocks
|
# Fallback to database, this only works for backfilled blocks
|
||||||
let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written")
|
let finlow = dag.db.finalizedBlocks.low.expect("at least tailRef written")
|
||||||
|
@ -1006,14 +1004,6 @@ proc applyBlock(
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
|
|
||||||
dag.lastChainProgress = Moment.now()
|
|
||||||
dag.incrementalState = nil
|
|
||||||
|
|
||||||
proc chainIsProgressing*(dag: ChainDAGRef): bool =
|
|
||||||
const watchdogDuration = chronos.minutes(60)
|
|
||||||
dag.lastChainProgress + watchdogDuration >= Moment.now()
|
|
||||||
|
|
||||||
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||||
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
|
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
|
||||||
eraPath = ".",
|
eraPath = ".",
|
||||||
|
@ -1054,7 +1044,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||||
# allow skipping some validation.
|
# allow skipping some validation.
|
||||||
updateFlags: updateFlags * {strictVerification},
|
updateFlags: updateFlags * {strictVerification},
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
lastChainProgress: Moment.now(),
|
|
||||||
|
|
||||||
vanityLogs: vanityLogs,
|
vanityLogs: vanityLogs,
|
||||||
|
|
||||||
|
@ -1517,8 +1506,6 @@ proc computeRandaoMixFromMemory*(
|
||||||
tryWithState dag.headState
|
tryWithState dag.headState
|
||||||
tryWithState dag.epochRefState
|
tryWithState dag.epochRefState
|
||||||
tryWithState dag.clearanceState
|
tryWithState dag.clearanceState
|
||||||
if dag.incrementalState != nil:
|
|
||||||
tryWithState dag.incrementalState[]
|
|
||||||
|
|
||||||
proc computeRandaoMixFromDatabase*(
|
proc computeRandaoMixFromDatabase*(
|
||||||
dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] =
|
dag: ChainDAGRef, bid: BlockId, lowSlot: Slot): Opt[Eth2Digest] =
|
||||||
|
@ -1590,8 +1577,6 @@ proc computeShufflingRefFromMemory*(
|
||||||
tryWithState dag.headState
|
tryWithState dag.headState
|
||||||
tryWithState dag.epochRefState
|
tryWithState dag.epochRefState
|
||||||
tryWithState dag.clearanceState
|
tryWithState dag.clearanceState
|
||||||
if dag.incrementalState != nil:
|
|
||||||
tryWithState dag.incrementalState[]
|
|
||||||
|
|
||||||
proc getShufflingRef*(
|
proc getShufflingRef*(
|
||||||
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
|
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
|
||||||
|
@ -1739,10 +1724,6 @@ proc updateState*(
|
||||||
elif exactMatch(dag.epochRefState, bsi):
|
elif exactMatch(dag.epochRefState, bsi):
|
||||||
assign(state, dag.epochRefState)
|
assign(state, dag.epochRefState)
|
||||||
found = true
|
found = true
|
||||||
elif dag.incrementalState != nil and
|
|
||||||
exactMatch(dag.incrementalState[], bsi):
|
|
||||||
assign(state, dag.incrementalState[])
|
|
||||||
found = true
|
|
||||||
|
|
||||||
const RewindBlockThreshold = 64
|
const RewindBlockThreshold = 64
|
||||||
|
|
||||||
|
@ -1775,12 +1756,6 @@ proc updateState*(
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
|
|
||||||
if dag.incrementalState != nil and
|
|
||||||
canAdvance(dag.incrementalState[], cur):
|
|
||||||
assign(state, dag.incrementalState[])
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
|
|
||||||
if cur.isProposed():
|
if cur.isProposed():
|
||||||
# This is not an empty slot, so the block will need to be applied to
|
# This is not an empty slot, so the block will need to be applied to
|
||||||
# eventually reach bs
|
# eventually reach bs
|
||||||
|
@ -2412,7 +2387,6 @@ proc updateHead*(
|
||||||
quit 1
|
quit 1
|
||||||
|
|
||||||
dag.head = newHead
|
dag.head = newHead
|
||||||
dag.resetChainProgressWatchdog()
|
|
||||||
|
|
||||||
if dag.headState.is_merge_transition_complete() and not
|
if dag.headState.is_merge_transition_complete() and not
|
||||||
lastHeadMergeComplete and
|
lastHeadMergeComplete and
|
||||||
|
@ -2655,12 +2629,7 @@ proc getProposalState*(
|
||||||
|
|
||||||
# Start with the clearance state, since this one typically has been advanced
|
# Start with the clearance state, since this one typically has been advanced
|
||||||
# and thus has a hot hash tree cache
|
# and thus has a hot hash tree cache
|
||||||
let state =
|
let state = assignClone(dag.clearanceState)
|
||||||
if dag.incrementalState != nil and
|
|
||||||
dag.incrementalState[].latest_block_id == head.bid:
|
|
||||||
assignClone(dag.incrementalState[])
|
|
||||||
else:
|
|
||||||
assignClone(dag.clearanceState)
|
|
||||||
|
|
||||||
var
|
var
|
||||||
info = ForkedEpochInfo()
|
info = ForkedEpochInfo()
|
||||||
|
|
|
@ -367,9 +367,6 @@ proc initFullNode(
|
||||||
func getFrontfillSlot(): Slot =
|
func getFrontfillSlot(): Slot =
|
||||||
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
|
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
|
||||||
|
|
||||||
func isBlockKnown(blockRoot: Eth2Digest): bool =
|
|
||||||
dag.getBlockRef(blockRoot).isSome
|
|
||||||
|
|
||||||
let
|
let
|
||||||
quarantine = newClone(
|
quarantine = newClone(
|
||||||
Quarantine.init())
|
Quarantine.init())
|
||||||
|
@ -401,13 +398,6 @@ proc initFullNode(
|
||||||
# that should probably be reimagined more holistically in the future.
|
# that should probably be reimagined more holistically in the future.
|
||||||
blockProcessor[].addBlock(
|
blockProcessor[].addBlock(
|
||||||
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
||||||
branchDiscoveryBlockVerifier = proc(
|
|
||||||
signedBlock: ForkedSignedBeaconBlock,
|
|
||||||
blobs: Opt[BlobSidecars]
|
|
||||||
): Future[Result[void, VerifierError]] {.async: (raises: [
|
|
||||||
CancelledError], raw: true).} =
|
|
||||||
blockProcessor[].addBlock(
|
|
||||||
MsgSource.gossip, signedBlock, blobs, maybeFinalized = false)
|
|
||||||
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||||
maybeFinalized: bool):
|
maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
||||||
|
@ -458,9 +448,6 @@ proc initFullNode(
|
||||||
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
||||||
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
||||||
maxHeadAge = 0)
|
maxHeadAge = 0)
|
||||||
branchDiscovery = BranchDiscovery.new(
|
|
||||||
node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown,
|
|
||||||
branchDiscoveryBlockVerifier)
|
|
||||||
router = (ref MessageRouter)(
|
router = (ref MessageRouter)(
|
||||||
processor: processor,
|
processor: processor,
|
||||||
network: node.network)
|
network: node.network)
|
||||||
|
@ -503,7 +490,6 @@ proc initFullNode(
|
||||||
node.requestManager = requestManager
|
node.requestManager = requestManager
|
||||||
node.syncManager = syncManager
|
node.syncManager = syncManager
|
||||||
node.backfiller = backfiller
|
node.backfiller = backfiller
|
||||||
node.branchDiscovery = branchDiscovery
|
|
||||||
node.router = router
|
node.router = router
|
||||||
|
|
||||||
await node.addValidators()
|
await node.addValidators()
|
||||||
|
@ -1295,8 +1281,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
|
||||||
if slot > head.slot: (slot - head.slot).uint64
|
if slot > head.slot: (slot - head.slot).uint64
|
||||||
else: 0'u64
|
else: 0'u64
|
||||||
isBehind =
|
isBehind =
|
||||||
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
|
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
|
||||||
node.syncStatus(head) == ChainSyncStatus.Syncing
|
|
||||||
targetGossipState =
|
targetGossipState =
|
||||||
getTargetGossipState(
|
getTargetGossipState(
|
||||||
slot.epoch,
|
slot.epoch,
|
||||||
|
@ -1556,37 +1541,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
||||||
# Update 1 epoch early to block non-fork-ready peers
|
# Update 1 epoch early to block non-fork-ready peers
|
||||||
node.network.updateForkId(epoch, node.dag.genesis_validators_root)
|
node.network.updateForkId(epoch, node.dag.genesis_validators_root)
|
||||||
|
|
||||||
# If the chain has halted, we have to ensure that the EL gets synced
|
|
||||||
# so that we can perform validator duties again
|
|
||||||
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
|
|
||||||
let beaconHead = node.attestationPool[].getBeaconHead(head)
|
|
||||||
discard await node.consensusManager.updateExecutionClientHead(beaconHead)
|
|
||||||
|
|
||||||
# If the chain head is far behind, we have to advance it incrementally
|
|
||||||
# to avoid lag spikes when performing validator duties
|
|
||||||
if node.syncStatus(head) == ChainSyncStatus.Degraded:
|
|
||||||
let incrementalTick = Moment.now()
|
|
||||||
if node.dag.incrementalState == nil:
|
|
||||||
node.dag.incrementalState = assignClone(node.dag.headState)
|
|
||||||
elif node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
|
||||||
node.dag.incrementalState[].assign(node.dag.headState)
|
|
||||||
else:
|
|
||||||
let
|
|
||||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
|
||||||
maxSlot = max(incrementalSlot, slot + 1)
|
|
||||||
nextSlot = min((incrementalSlot.epoch + 1).start_slot, maxSlot)
|
|
||||||
var
|
|
||||||
cache: StateCache
|
|
||||||
info: ForkedEpochInfo
|
|
||||||
node.dag.advanceSlots(
|
|
||||||
node.dag.incrementalState[], nextSlot, true, cache, info)
|
|
||||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
|
||||||
info "Head state is behind, catching up",
|
|
||||||
headSlot = node.dag.head.slot,
|
|
||||||
progressSlot = incrementalSlot,
|
|
||||||
wallSlot = slot,
|
|
||||||
dur = Moment.now() - incrementalTick
|
|
||||||
|
|
||||||
# When we're not behind schedule, we'll speculatively update the clearance
|
# When we're not behind schedule, we'll speculatively update the clearance
|
||||||
# state in anticipation of receiving the next block - we do it after
|
# state in anticipation of receiving the next block - we do it after
|
||||||
# logging slot end since the nextActionWaitTime can be short
|
# logging slot end since the nextActionWaitTime can be short
|
||||||
|
@ -1610,10 +1564,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
||||||
|
|
||||||
await node.updateGossipStatus(slot + 1)
|
await node.updateGossipStatus(slot + 1)
|
||||||
|
|
||||||
# Branch discovery module is only used to support ongoing sync manager tasks
|
|
||||||
if not node.syncManager.inProgress:
|
|
||||||
await node.branchDiscovery.stop()
|
|
||||||
|
|
||||||
func formatNextConsensusFork(
|
func formatNextConsensusFork(
|
||||||
node: BeaconNode, withVanityArt = false): Opt[string] =
|
node: BeaconNode, withVanityArt = false): Opt[string] =
|
||||||
let consensusFork =
|
let consensusFork =
|
||||||
|
@ -1633,14 +1583,6 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
||||||
let optimisticHead = not node.dag.head.executionValid
|
let optimisticHead = not node.dag.head.executionValid
|
||||||
if node.syncManager.inProgress:
|
if node.syncManager.inProgress:
|
||||||
let
|
let
|
||||||
degradedSuffix =
|
|
||||||
case node.branchDiscovery.state
|
|
||||||
of BranchDiscoveryState.Active:
|
|
||||||
"/discovering"
|
|
||||||
of BranchDiscoveryState.Suspended:
|
|
||||||
"/degraded"
|
|
||||||
of BranchDiscoveryState.Stopped:
|
|
||||||
""
|
|
||||||
optimisticSuffix =
|
optimisticSuffix =
|
||||||
if optimisticHead:
|
if optimisticHead:
|
||||||
"/opt"
|
"/opt"
|
||||||
|
@ -1651,20 +1593,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
||||||
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
|
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
|
||||||
else:
|
else:
|
||||||
""
|
""
|
||||||
catchingUpSuffix =
|
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
|
||||||
if node.dag.incrementalState != nil:
|
|
||||||
let
|
|
||||||
headSlot = node.dag.head.slot
|
|
||||||
incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
|
||||||
progress =
|
|
||||||
(incrementalSlot - headSlot).float /
|
|
||||||
max(wallSlot - headSlot, 1).float * 100.float
|
|
||||||
" - catching up: " &
|
|
||||||
formatFloat(progress, ffDecimal, precision = 2) & "%"
|
|
||||||
else:
|
|
||||||
""
|
|
||||||
node.syncManager.syncStatus & degradedSuffix & optimisticSuffix &
|
|
||||||
lightClientSuffix & catchingUpSuffix
|
|
||||||
elif node.backfiller.inProgress:
|
elif node.backfiller.inProgress:
|
||||||
"backfill: " & node.backfiller.syncStatus
|
"backfill: " & node.backfiller.syncStatus
|
||||||
elif optimistic_head:
|
elif optimistic_head:
|
||||||
|
|
|
@ -1,292 +0,0 @@
|
||||||
# beacon_chain
|
|
||||||
# Copyright (c) 2024 Status Research & Development GmbH
|
|
||||||
# Licensed and distributed under either of
|
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
||||||
|
|
||||||
{.push raises: [].}
|
|
||||||
|
|
||||||
# This module is started when a chain stall is detected, i.e.,
|
|
||||||
# a long period without any chain progress, while at the same time being
|
|
||||||
# connected to a healthy number of different peers.
|
|
||||||
#
|
|
||||||
# In such a scenario, the network may partition into multiple groups of peers
|
|
||||||
# that build on separate branches. It is vital to specifically target peers
|
|
||||||
# from different partitions and download branches which are not necessarily
|
|
||||||
# popularly served. Branches may be unpopular because they are expensive to
|
|
||||||
# apply, and may have a difficult time propagating despite a large weight in
|
|
||||||
# attestations backing them. This only exacerbates the longer the split view
|
|
||||||
# scenario is ongoing.
|
|
||||||
#
|
|
||||||
# While sync manager can sync popular chains well, it cannot reliably sync
|
|
||||||
# minority chains only served by a limited number of peers. This module
|
|
||||||
# augments sync manager in the split view scenario.
|
|
||||||
# Note that request manager is not running while sync manager is running.
|
|
||||||
#
|
|
||||||
# Once both sync manager and branch discovery stopped resolving new blocks,
|
|
||||||
# `syncStatus` will report `ChainSyncStatus.Degraded` and `blockchain_dag` state
|
|
||||||
# will be gradually advanced to the wall slot to prepare for block proposal.
|
|
||||||
# If at that time, no additional branches were discovered, validator duties
|
|
||||||
# will be performed based on local fork choice.
|
|
||||||
#
|
|
||||||
# Note that the canonical chain may not be on the highest slot number,
|
|
||||||
# as some partitions of the network may have built on top of branches
|
|
||||||
# with lower validator support while the canonical chain was not visible.
|
|
||||||
|
|
||||||
import
|
|
||||||
std/algorithm,
|
|
||||||
chronos, chronicles, metrics, results,
|
|
||||||
../spec/[forks, network],
|
|
||||||
../consensus_object_pools/block_pools_types,
|
|
||||||
../networking/[eth2_network, peer_pool],
|
|
||||||
./sync_protocol
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "branchdiscovery"
|
|
||||||
|
|
||||||
declareGauge beacon_sync_branchdiscovery_state,
|
|
||||||
"Branch discovery module operating state"
|
|
||||||
|
|
||||||
declareCounter beacon_sync_branchdiscovery_discovered_blocks,
|
|
||||||
"Number of beacon blocks discovered by the branch discovery module"
|
|
||||||
|
|
||||||
type
|
|
||||||
BranchDiscoveryState* {.pure.} = enum
|
|
||||||
Stopped,
|
|
||||||
Suspended,
|
|
||||||
Active
|
|
||||||
|
|
||||||
GetSlotCallback* =
|
|
||||||
proc(): Slot {.gcsafe, raises: [].}
|
|
||||||
|
|
||||||
IsBlockKnownCallback* =
|
|
||||||
proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].}
|
|
||||||
|
|
||||||
BlockVerifierCallback* = proc(
|
|
||||||
signedBlock: ForkedSignedBeaconBlock,
|
|
||||||
blobs: Opt[BlobSidecars]
|
|
||||||
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
|
|
||||||
|
|
||||||
BranchDiscovery* = object
|
|
||||||
network: Eth2Node
|
|
||||||
getFinalizedSlot: GetSlotCallback
|
|
||||||
isBlockKnown: IsBlockKnownCallback
|
|
||||||
blockVerifier: BlockVerifierCallback
|
|
||||||
isActive: AsyncEvent
|
|
||||||
loopFuture: Future[void].Raising([])
|
|
||||||
|
|
||||||
proc new*(
|
|
||||||
T: type BranchDiscovery,
|
|
||||||
network: Eth2Node,
|
|
||||||
getFinalizedSlot: GetSlotCallback,
|
|
||||||
isBlockKnown: IsBlockKnownCallback,
|
|
||||||
blockVerifier: BlockVerifierCallback): ref BranchDiscovery =
|
|
||||||
let self = (ref BranchDiscovery)(
|
|
||||||
network: network,
|
|
||||||
getFinalizedSlot: getFinalizedSlot,
|
|
||||||
isBlockKnown: isBlockKnown,
|
|
||||||
blockVerifier: blockVerifier,
|
|
||||||
isActive: newAsyncEvent())
|
|
||||||
self[].isActive.fire()
|
|
||||||
self
|
|
||||||
|
|
||||||
proc discoverBranch(
|
|
||||||
self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} =
|
|
||||||
logScope:
|
|
||||||
peer
|
|
||||||
peer_score = peer.getScore()
|
|
||||||
|
|
||||||
let
|
|
||||||
finalizedSlot = self.getFinalizedSlot()
|
|
||||||
peerHeadSlot = peer.getHeadSlot()
|
|
||||||
if peerHeadSlot <= finalizedSlot:
|
|
||||||
peer.updateScore(PeerScoreUseless)
|
|
||||||
debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot
|
|
||||||
return
|
|
||||||
|
|
||||||
var blockRoot = peer.getHeadRoot()
|
|
||||||
logScope: blockRoot
|
|
||||||
if self.isBlockKnown(blockRoot):
|
|
||||||
peer.updateScore(PeerScoreUseless)
|
|
||||||
debug "Peer's head block root is already known"
|
|
||||||
return
|
|
||||||
|
|
||||||
const
|
|
||||||
maxRequestsPerBurst = 20
|
|
||||||
burstDuration = chronos.seconds(40)
|
|
||||||
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
|
|
||||||
|
|
||||||
var parentSlot = peerHeadSlot + 1
|
|
||||||
logScope: parentSlot
|
|
||||||
while true:
|
|
||||||
if self.isBlockKnown(blockRoot):
|
|
||||||
debug "Branch from peer no longer unknown"
|
|
||||||
return
|
|
||||||
if peer.getScore() < PeerScoreLowLimit:
|
|
||||||
debug "Failed to discover new branch from peer"
|
|
||||||
return
|
|
||||||
|
|
||||||
debug "Discovering new branch from peer"
|
|
||||||
try:
|
|
||||||
await bucket.consume(1)
|
|
||||||
except CancelledError as exc:
|
|
||||||
raise exc
|
|
||||||
except CatchableError as exc:
|
|
||||||
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
|
||||||
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
|
|
||||||
if rsp.isErr:
|
|
||||||
# `eth2_network` already descored according to the specific error
|
|
||||||
debug "Failed to receive block", err = rsp.error
|
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
continue
|
|
||||||
template blocks: untyped = rsp.get
|
|
||||||
|
|
||||||
# The peer was the one providing us with this block root, it should exist
|
|
||||||
if blocks.len == 0:
|
|
||||||
peer.updateScore(PeerScoreNoValues)
|
|
||||||
debug "Received no blocks", numBlocks = blocks.len
|
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
continue
|
|
||||||
if blocks.len > 1:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received too many blocks", numBlocks = blocks.len
|
|
||||||
return
|
|
||||||
template blck: untyped = blocks[0][]
|
|
||||||
if blck.slot >= parentSlot:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received block older than parent", receivedSlot = blck.slot
|
|
||||||
return
|
|
||||||
if blck.root != blockRoot:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received incorrect block", receivedRoot = blck.root
|
|
||||||
return
|
|
||||||
|
|
||||||
var blobIds: seq[BlobIdentifier]
|
|
||||||
withBlck(blck):
|
|
||||||
when consensusFork >= ConsensusFork.Deneb:
|
|
||||||
for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len:
|
|
||||||
blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex)
|
|
||||||
var blobs: Opt[BlobSidecars]
|
|
||||||
if blobIds.len > 0:
|
|
||||||
while true:
|
|
||||||
if self.isBlockKnown(blockRoot):
|
|
||||||
debug "Branch from peer no longer unknown"
|
|
||||||
return
|
|
||||||
if peer.getScore() < PeerScoreLowLimit:
|
|
||||||
debug "Failed to discover new branch from peer"
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await bucket.consume(1)
|
|
||||||
except CancelledError as exc:
|
|
||||||
raise exc
|
|
||||||
except CatchableError as exc:
|
|
||||||
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
|
||||||
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
|
|
||||||
if r.isErr:
|
|
||||||
# `eth2_network` already descored according to the specific error
|
|
||||||
debug "Failed to receive blobs", err = r.error
|
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
continue
|
|
||||||
template blobSidecars: untyped = r.unsafeGet
|
|
||||||
|
|
||||||
if blobSidecars.len < blobIds.len:
|
|
||||||
peer.updateScore(PeerScoreMissingValues)
|
|
||||||
debug "Received not all blobs",
|
|
||||||
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
continue
|
|
||||||
if blobSidecars.len > blobIds.len:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received too many blobs",
|
|
||||||
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
|
||||||
return
|
|
||||||
for i, blobSidecar in blobSidecars:
|
|
||||||
let root = hash_tree_root(blobSidecar[].signed_block_header.message)
|
|
||||||
if root != blockRoot:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received unexpected blob"
|
|
||||||
return
|
|
||||||
blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received invalid blob"
|
|
||||||
return
|
|
||||||
blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index)
|
|
||||||
for i, blobSidecar in blobs.get:
|
|
||||||
if blobSidecar[].index != i.BlobIndex:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received duplicate blobs while others are missing"
|
|
||||||
return
|
|
||||||
break
|
|
||||||
|
|
||||||
let err = (await self.blockVerifier(blck, blobs)).errorOr:
|
|
||||||
peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues)
|
|
||||||
beacon_sync_branchdiscovery_discovered_blocks.inc()
|
|
||||||
info "Discovered new branch from peer"
|
|
||||||
break
|
|
||||||
case err
|
|
||||||
of VerifierError.Invalid:
|
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
|
||||||
debug "Received invalid block"
|
|
||||||
return
|
|
||||||
of VerifierError.UnviableFork:
|
|
||||||
peer.updateScore(PeerScoreUnviableFork)
|
|
||||||
debug "Received unviable block"
|
|
||||||
return
|
|
||||||
of VerifierError.Duplicate:
|
|
||||||
peer.updateScore(PeerScoreGoodValues)
|
|
||||||
debug "Connected new branch from peer"
|
|
||||||
break
|
|
||||||
of VerifierError.MissingParent:
|
|
||||||
peer.updateScore(PeerScoreGoodBatchValue)
|
|
||||||
parentSlot = blck.slot
|
|
||||||
blockRoot = blck.getForkedBlockField(parent_root)
|
|
||||||
continue
|
|
||||||
|
|
||||||
proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
|
|
||||||
try:
|
|
||||||
while true:
|
|
||||||
await self[].isActive.wait()
|
|
||||||
await sleepAsync(RESP_TIMEOUT_DUR)
|
|
||||||
|
|
||||||
let peer =
|
|
||||||
try:
|
|
||||||
self[].network.peerPool.acquireNoWait()
|
|
||||||
except PeerPoolError as exc:
|
|
||||||
debug "Failed to acquire peer", exc = exc.msg
|
|
||||||
continue
|
|
||||||
defer: self[].network.peerPool.release(peer)
|
|
||||||
|
|
||||||
await self[].discoverBranch(peer)
|
|
||||||
except CancelledError:
|
|
||||||
return
|
|
||||||
|
|
||||||
func state*(self: ref BranchDiscovery): BranchDiscoveryState =
|
|
||||||
if self[].loopFuture == nil:
|
|
||||||
BranchDiscoveryState.Stopped
|
|
||||||
elif not self[].isActive.isSet:
|
|
||||||
BranchDiscoveryState.Suspended
|
|
||||||
else:
|
|
||||||
BranchDiscoveryState.Active
|
|
||||||
|
|
||||||
proc start*(self: ref BranchDiscovery) =
|
|
||||||
doAssert self[].loopFuture == nil
|
|
||||||
info "Starting discovery of new branches"
|
|
||||||
self[].loopFuture = self.loop()
|
|
||||||
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
|
||||||
|
|
||||||
proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} =
|
|
||||||
if self[].loopFuture != nil:
|
|
||||||
info "Stopping discovery of new branches"
|
|
||||||
await self[].loopFuture.cancelAndWait()
|
|
||||||
self[].loopFuture = nil
|
|
||||||
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
|
||||||
|
|
||||||
proc suspend*(self: ref BranchDiscovery) =
|
|
||||||
self[].isActive.clear()
|
|
||||||
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
|
||||||
|
|
||||||
proc resume*(self: ref BranchDiscovery) =
|
|
||||||
self[].isActive.fire()
|
|
||||||
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
|
|
@ -227,85 +227,29 @@ proc getGraffitiBytes*(
|
||||||
getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(),
|
getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(),
|
||||||
validator.pubkey)
|
validator.pubkey)
|
||||||
|
|
||||||
type ChainSyncStatus* {.pure.} = enum
|
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
|
||||||
Syncing,
|
## TODO This function is here as a placeholder for some better heurestics to
|
||||||
Synced,
|
## determine if we're in sync and should be producing blocks and
|
||||||
Degraded
|
## attestations. Generally, the problem is that slot time keeps advancing
|
||||||
|
## even when there are no blocks being produced, so there's no way to
|
||||||
|
## distinguish validators geniunely going missing from the node not being
|
||||||
|
## well connected (during a network split or an internet outage for
|
||||||
|
## example). It would generally be correct to simply keep running as if
|
||||||
|
## we were the only legit node left alive, but then we run into issues:
|
||||||
|
## with enough many empty slots, the validator pool is emptied leading
|
||||||
|
## to empty committees and lots of empty slot processing that will be
|
||||||
|
## thrown away as soon as we're synced again.
|
||||||
|
|
||||||
proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
|
|
||||||
## Generally, the problem is that slot time keeps advancing
|
|
||||||
## even when there are no blocks being produced, so there's no way to
|
|
||||||
## distinguish validators geniunely going missing from the node not being
|
|
||||||
## well connected (during a network split or an internet outage for
|
|
||||||
## example). It would generally be correct to simply keep running as if
|
|
||||||
## we were the only legit node left alive, but then we run into issues:
|
|
||||||
## with enough many empty slots, the validator pool is emptied leading
|
|
||||||
## to empty committees and lots of empty slot processing that will be
|
|
||||||
## thrown away as soon as we're synced again.
|
|
||||||
let
|
let
|
||||||
# The slot we should be at, according to the clock
|
# The slot we should be at, according to the clock
|
||||||
beaconTime = node.beaconClock.now()
|
beaconTime = node.beaconClock.now()
|
||||||
wallSlot = beaconTime.toSlot()
|
wallSlot = beaconTime.toSlot()
|
||||||
|
|
||||||
if not wallSlot.afterGenesis or
|
# TODO if everyone follows this logic, the network will not recover from a
|
||||||
head.slot + node.config.syncHorizon >= wallSlot.slot:
|
# halt: nobody will be producing blocks because everone expects someone
|
||||||
node.dag.resetChainProgressWatchdog()
|
# else to do it
|
||||||
node.branchDiscovery.suspend()
|
not wallSlot.afterGenesis or
|
||||||
return ChainSyncStatus.Synced
|
head.slot + node.config.syncHorizon >= wallSlot.slot
|
||||||
|
|
||||||
let numPeers = len(node.network.peerPool)
|
|
||||||
if numPeers <= node.config.maxPeers div 4:
|
|
||||||
# We may have poor connectivity, wait until more peers are available.
|
|
||||||
# This could also be intermittent, as state replays while chain is degraded
|
|
||||||
# may take significant amounts of time, during which many peers are lost
|
|
||||||
node.branchDiscovery.suspend()
|
|
||||||
return ChainSyncStatus.Syncing
|
|
||||||
|
|
||||||
if node.dag.chainIsProgressing():
|
|
||||||
# Chain is progressing, we are out of sync
|
|
||||||
node.branchDiscovery.resume()
|
|
||||||
return ChainSyncStatus.Syncing
|
|
||||||
|
|
||||||
# Network connectivity is good, but we have trouble making sync progress.
|
|
||||||
# Turn on branch discovery module until we have a recent canonical head.
|
|
||||||
# The branch discovery module specifically targets peers on alternate branches
|
|
||||||
# and supports sync manager in discovering branches that are not widely seen
|
|
||||||
# but that may still have weight from attestations.
|
|
||||||
if node.branchDiscovery.state == BranchDiscoveryState.Stopped:
|
|
||||||
node.branchDiscovery.start()
|
|
||||||
node.branchDiscovery.resume()
|
|
||||||
|
|
||||||
let
|
|
||||||
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
|
|
||||||
numPeersWithHigherProgress = node.network.peerPool.peers
|
|
||||||
.countIt(it != nil and it.getHeadSlot() > maxHeadSlot)
|
|
||||||
if numPeersWithHigherProgress > node.config.maxPeers div 8:
|
|
||||||
# A peer indicates that they are on a later slot, wait for sync manager
|
|
||||||
# to progress, or for it to kick the peer if they are faking the status
|
|
||||||
warn "Chain appears to have stalled, but peers indicate higher progress",
|
|
||||||
numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers,
|
|
||||||
head, maxHeadSlot
|
|
||||||
node.dag.resetChainProgressWatchdog()
|
|
||||||
return ChainSyncStatus.Syncing
|
|
||||||
|
|
||||||
# We are on the latest slot among all of our peers, and there has been no
|
|
||||||
# chain progress for an extended period of time.
|
|
||||||
if node.dag.incrementalState == nil:
|
|
||||||
# The head state is too far in the past to timely perform validator duties
|
|
||||||
return ChainSyncStatus.Degraded
|
|
||||||
if node.dag.incrementalState[].latest_block_id != node.dag.head.bid:
|
|
||||||
# The incremental state is not yet on the correct head (see `onSlotEnd`)
|
|
||||||
return ChainSyncStatus.Degraded
|
|
||||||
let incrementalSlot = getStateField(node.dag.incrementalState[], slot)
|
|
||||||
if incrementalSlot + node.config.syncHorizon < wallSlot.slot:
|
|
||||||
# The incremental state still needs to advance further (see `onSlotEnd`)
|
|
||||||
return ChainSyncStatus.Degraded
|
|
||||||
|
|
||||||
# It is reasonable safe to assume that the network has halted, resume duties
|
|
||||||
ChainSyncStatus.Synced
|
|
||||||
|
|
||||||
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
|
|
||||||
node.syncStatus(head) == ChainSyncStatus.Synced
|
|
||||||
|
|
||||||
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
|
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
|
||||||
{.async: (raises: [CancelledError]).} =
|
{.async: (raises: [CancelledError]).} =
|
||||||
|
|
Loading…
Reference in New Issue