add branch discovery module for use in split view scenarios

When the network is partitioned for a long time, e.g., Goerli, branches
start forming where different peers have distinct views about the chain
state. The current syncing solution with sync manager doesn't handle the
case well, as it is optimized for a healthy network where syncing can be
parallelized across different peers. To support sync manager discovering
additional branches, a new module is added that pulls in histories from
peers on unknown branches in a backwards manner.
This commit is contained in:
Etan Kissling 2024-03-25 21:48:06 +01:00
parent 9ad8ea0f7a
commit 08b87e2506
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
7 changed files with 385 additions and 5 deletions

View File

@ -25,7 +25,7 @@ import
attestation_pool, sync_committee_msg_pool, validator_change_pool],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./sync/[sync_manager, request_manager],
./sync/[branch_discovery, sync_manager, request_manager],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool,
keystore_management],
@ -35,7 +35,7 @@ export
osproc, chronos, presto, action_tracker,
beacon_clock, beacon_chain_db, conf, light_client,
attestation_pool, sync_committee_msg_pool, validator_change_pool,
eth2_network, el_manager, request_manager, sync_manager,
eth2_network, el_manager, branch_discovery, request_manager, sync_manager,
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
base, message_router, validator_monitor, validator_pool,
consensus_manager, dynamic_fee_recipients
@ -85,6 +85,7 @@ type
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
branchDiscovery*: ref BranchDiscovery
genesisSnapshotContent*: string
processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor

View File

@ -584,6 +584,12 @@ type
defaultValue: true
name: "doppelganger-detection" .}: bool
splitViewsMerge* {.
hidden
desc: "Whether or not to try and discover unknown branches in situations where the network has partitioned into split views"
defaultValue: false
name: "debug-split-views-merge" .}: bool
syncHorizon* {.
hidden
desc: "Number of empty slots to process before considering the client out of sync. Defaults to the number of slots in 10 minutes"

View File

@ -206,6 +206,9 @@ type
cfg*: RuntimeConfig
lastChainProgress*: Moment
## Indicates the last wall time at which meaningful progress was made
shufflingRefs*: LRUCache[16, ShufflingRef]
epochRefs*: LRUCache[32, EpochRef]

View File

@ -1004,6 +1004,13 @@ proc applyBlock(
ok()
proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
dag.lastChainProgress = Moment.now()
proc chainIsProgressing*(dag: ChainDAGRef): bool =
const watchdogDuration = chronos.minutes(60)
dag.lastChainProgress + watchdogDuration >= Moment.now()
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
eraPath = ".",
@ -1044,6 +1051,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# allow skipping some validation.
updateFlags: updateFlags * {strictVerification},
cfg: cfg,
lastChainProgress: Moment.now(),
vanityLogs: vanityLogs,
@ -2387,6 +2395,7 @@ proc updateHead*(
quit 1
dag.head = newHead
dag.resetChainProgressWatchdog()
if dag.headState.is_merge_transition_complete() and not
lastHeadMergeComplete and

View File

@ -367,6 +367,9 @@ proc initFullNode(
func getFrontfillSlot(): Slot =
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
func isBlockKnown(blockRoot: Eth2Digest): bool =
dag.getBlockRef(blockRoot).isSome
let
quarantine = newClone(
Quarantine.init())
@ -398,6 +401,13 @@ proc initFullNode(
# that should probably be reimagined more holistically in the future.
blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
branchDiscoveryBlockVerifier = proc(
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]
): Future[Result[void, VerifierError]] {.async: (raises: [
CancelledError], raw: true).} =
blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = false)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
@ -448,6 +458,9 @@ proc initFullNode(
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, dag.backfill.slot, blockVerifier,
maxHeadAge = 0)
branchDiscovery = BranchDiscovery.new(
node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown,
branchDiscoveryBlockVerifier)
router = (ref MessageRouter)(
processor: processor,
network: node.network)
@ -490,6 +503,7 @@ proc initFullNode(
node.requestManager = requestManager
node.syncManager = syncManager
node.backfiller = backfiller
node.branchDiscovery = branchDiscovery
node.router = router
await node.addValidators()
@ -1564,6 +1578,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
await node.updateGossipStatus(slot + 1)
# Branch discovery module is only used to support ongoing sync manager tasks
if not node.syncManager.inProgress:
await node.branchDiscovery.stop()
func formatNextConsensusFork(
node: BeaconNode, withVanityArt = false): Opt[string] =
let consensusFork =
@ -1583,6 +1601,14 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
let optimisticHead = not node.dag.head.executionValid
if node.syncManager.inProgress:
let
degradedSuffix =
case node.branchDiscovery.state
of BranchDiscoveryState.Active:
"/discovering"
of BranchDiscoveryState.Suspended:
"/degraded"
of BranchDiscoveryState.Stopped:
""
optimisticSuffix =
if optimisticHead:
"/opt"
@ -1593,7 +1619,8 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
" - lc: " & $shortLog(node.consensusManager[].optimisticHead)
else:
""
node.syncManager.syncStatus & optimisticSuffix & lightClientSuffix
node.syncManager.syncStatus & degradedSuffix & optimisticSuffix &
lightClientSuffix
elif node.backfiller.inProgress:
"backfill: " & node.backfiller.syncStatus
elif optimistic_head:

View File

@ -0,0 +1,286 @@
# beacon_chain
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
# This module is started when a chain stall is detected, i.e.,
# a long period without any chain progress, while at the same time being
# connected to a healthy number of different peers.
#
# In such a scenario, the network may partition into multiple groups of peers
# that build on separate branches. It is vital to specifically target peers
# from different partitions and download branches which are not necessarily
# popularly served. Branches may be unpopular because they are expensive to
# apply, and may have a difficult time propagating despite a large weight in
# attestations backing them. This only exacerbates the longer the split view
# scenario is ongoing.
#
# While sync manager can sync popular chains well, it cannot reliably sync
# minority chains only served by a limited number of peers. This module
# augments sync manager in the split view scenario.
# Note that request manager is not running while sync manager is running.
#
# Note that the canonical chain may not be on the highest slot number,
# as some partitions of the network may have built on top of branches
# with lower validator support while the canonical chain was not visible.
import
std/algorithm,
chronos, chronicles, metrics, results,
../spec/[forks, network],
../consensus_object_pools/block_pools_types,
../networking/[eth2_network, peer_pool],
./sync_protocol
logScope:
topics = "branchdiscovery"
declareGauge beacon_sync_branchdiscovery_state,
"Branch discovery module operating state"
declareCounter beacon_sync_branchdiscovery_discovered_blocks,
"Number of beacon blocks discovered by the branch discovery module"
type
BranchDiscoveryState* {.pure.} = enum
Stopped,
Suspended,
Active
GetSlotCallback* =
proc(): Slot {.gcsafe, raises: [].}
IsBlockKnownCallback* =
proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].}
BlockVerifierCallback* = proc(
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BranchDiscovery* = object
network: Eth2Node
getFinalizedSlot: GetSlotCallback
isBlockKnown: IsBlockKnownCallback
blockVerifier: BlockVerifierCallback
isActive: AsyncEvent
loopFuture: Future[void].Raising([])
proc new*(
T: type BranchDiscovery,
network: Eth2Node,
getFinalizedSlot: GetSlotCallback,
isBlockKnown: IsBlockKnownCallback,
blockVerifier: BlockVerifierCallback): ref BranchDiscovery =
let self = (ref BranchDiscovery)(
network: network,
getFinalizedSlot: getFinalizedSlot,
isBlockKnown: isBlockKnown,
blockVerifier: blockVerifier,
isActive: newAsyncEvent())
self[].isActive.fire()
self
proc discoverBranch(
self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} =
logScope:
peer
peer_score = peer.getScore()
let
finalizedSlot = self.getFinalizedSlot()
peerHeadSlot = peer.getHeadSlot()
if peerHeadSlot <= finalizedSlot:
peer.updateScore(PeerScoreUseless)
debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot
return
var blockRoot = peer.getHeadRoot()
logScope: blockRoot
if self.isBlockKnown(blockRoot):
peer.updateScore(PeerScoreUseless)
debug "Peer's head block root is already known"
return
const
maxRequestsPerBurst = 20
burstDuration = chronos.seconds(40)
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
var parentSlot = peerHeadSlot + 1
logScope: parentSlot
while true:
if self.isBlockKnown(blockRoot):
debug "Branch from peer no longer unknown"
return
if peer.getScore() < PeerScoreLowLimit:
debug "Failed to discover new branch from peer"
return
debug "Discovering new branch from peer"
try:
await bucket.consume(1)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
if rsp.isErr:
# `eth2_network` already descored according to the specific error
debug "Failed to receive block", err = rsp.error
await sleepAsync(RESP_TIMEOUT_DUR)
continue
template blocks: untyped = rsp.get
# The peer was the one providing us with this block root, it should exist
if blocks.len == 0:
peer.updateScore(PeerScoreNoValues)
debug "Received no blocks", numBlocks = blocks.len
await sleepAsync(RESP_TIMEOUT_DUR)
continue
if blocks.len > 1:
peer.updateScore(PeerScoreBadResponse)
debug "Received too many blocks", numBlocks = blocks.len
return
template blck: untyped = blocks[0][]
if blck.slot >= parentSlot:
peer.updateScore(PeerScoreBadResponse)
debug "Received block older than parent", receivedSlot = blck.slot
return
if blck.root != blockRoot:
peer.updateScore(PeerScoreBadResponse)
debug "Received incorrect block", receivedRoot = blck.root
return
var blobIds: seq[BlobIdentifier]
withBlck(blck):
when consensusFork >= ConsensusFork.Deneb:
for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len:
blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex)
var blobs: Opt[BlobSidecars]
if blobIds.len > 0:
while true:
if self.isBlockKnown(blockRoot):
debug "Branch from peer no longer unknown"
return
if peer.getScore() < PeerScoreLowLimit:
debug "Failed to discover new branch from peer"
return
try:
await bucket.consume(1)
except CancelledError as exc:
raise exc
except CatchableError as exc:
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
if r.isErr:
# `eth2_network` already descored according to the specific error
debug "Failed to receive blobs", err = r.error
await sleepAsync(RESP_TIMEOUT_DUR)
continue
template blobSidecars: untyped = r.unsafeGet
if blobSidecars.len < blobIds.len:
peer.updateScore(PeerScoreMissingValues)
debug "Received not all blobs",
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
await sleepAsync(RESP_TIMEOUT_DUR)
continue
if blobSidecars.len > blobIds.len:
peer.updateScore(PeerScoreBadResponse)
debug "Received too many blobs",
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
return
for i, blobSidecar in blobSidecars:
let root = hash_tree_root(blobSidecar[].signed_block_header.message)
if root != blockRoot:
peer.updateScore(PeerScoreBadResponse)
debug "Received unexpected blob"
return
blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr:
peer.updateScore(PeerScoreBadResponse)
debug "Received invalid blob"
return
blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index)
for i, blobSidecar in blobs.get:
if blobSidecar[].index != i.BlobIndex:
peer.updateScore(PeerScoreBadResponse)
debug "Received duplicate blobs while others are missing"
return
break
let err = (await self.blockVerifier(blck, blobs)).errorOr:
peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues)
beacon_sync_branchdiscovery_discovered_blocks.inc()
info "Discovered new branch from peer"
break
case err
of VerifierError.Invalid:
peer.updateScore(PeerScoreBadResponse)
debug "Received invalid block"
return
of VerifierError.UnviableFork:
peer.updateScore(PeerScoreUnviableFork)
debug "Received unviable block"
return
of VerifierError.Duplicate:
peer.updateScore(PeerScoreGoodValues)
debug "Connected new branch from peer"
break
of VerifierError.MissingParent:
peer.updateScore(PeerScoreGoodBatchValue)
parentSlot = blck.slot
blockRoot = blck.getForkedBlockField(parent_root)
continue
proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
try:
while true:
await self[].isActive.wait()
await sleepAsync(RESP_TIMEOUT_DUR)
let peer =
try:
self[].network.peerPool.acquireNoWait()
except PeerPoolError as exc:
debug "Failed to acquire peer", exc = exc.msg
continue
defer: self[].network.peerPool.release(peer)
await self[].discoverBranch(peer)
except CancelledError:
return
func state*(self: ref BranchDiscovery): BranchDiscoveryState =
if self[].loopFuture == nil:
BranchDiscoveryState.Stopped
elif not self[].isActive.isSet:
BranchDiscoveryState.Suspended
else:
BranchDiscoveryState.Active
proc start*(self: ref BranchDiscovery) =
doAssert self[].loopFuture == nil
info "Starting discovery of new branches"
self[].loopFuture = self.loop()
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} =
if self[].loopFuture != nil:
info "Stopping discovery of new branches"
await self[].loopFuture.cancelAndWait()
self[].loopFuture = nil
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
proc suspend*(self: ref BranchDiscovery) =
self[].isActive.clear()
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
proc resume*(self: ref BranchDiscovery) =
self[].isActive.fire()
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)

View File

@ -245,11 +245,59 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
beaconTime = node.beaconClock.now()
wallSlot = beaconTime.toSlot()
if not wallSlot.afterGenesis or
head.slot + node.config.syncHorizon >= wallSlot.slot:
node.dag.resetChainProgressWatchdog()
node.branchDiscovery.suspend()
return true
if not node.config.splitViewsMerge:
# Continue syncing and wait for someone else to propose the next block
return false
let
numPeers = len(node.network.peerPool)
minPeers = max(node.config.maxPeers div 4, SyncWorkersCount * 2)
if numPeers <= minPeers:
# We may have poor connectivity, wait until more peers are available.
# This could also be intermittent, as state replays while chain is degraded
# may take significant amounts of time, during which many peers are lost
node.branchDiscovery.suspend()
return false
if node.dag.chainIsProgressing():
# Chain is progressing, we are out of sync
node.branchDiscovery.resume()
return false
# Network connectivity is good, but we have trouble making sync progress.
# Turn on branch discovery module until we have a recent canonical head.
# The branch discovery module specifically targets peers on alternate branches
# and supports sync manager in discovering branches that are not widely seen
# but that may still have weight from attestations.
if node.config.splitViewsMerge and
node.branchDiscovery.state == BranchDiscoveryState.Stopped:
node.branchDiscovery.start()
node.branchDiscovery.resume()
let
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
numPeersWithHigherProgress = node.network.peerPool.peers
.countIt(it != nil and it.getHeadSlot() > maxHeadSlot)
significantNumPeers = node.config.maxPeers div 8
if numPeersWithHigherProgress > significantNumPeers:
# A peer indicates that they are on a later slot, wait for sync manager
# to progress, or for it to kick the peer if they are faking the status
warn "Chain appears to have stalled, but peers indicate higher progress",
numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers,
head, maxHeadSlot
node.dag.resetChainProgressWatchdog()
return false
# TODO if everyone follows this logic, the network will not recover from a
# halt: nobody will be producing blocks because everone expects someone
# else to do it
not wallSlot.afterGenesis or
head.slot + node.config.syncHorizon >= wallSlot.slot
false
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
{.async: (raises: [CancelledError]).} =