mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-18 01:16:46 +00:00
add branch discovery module for supporting chain stall situation (#6125)
In split view situation, the canonical chain may only be served by a tiny amount of peers, and branches may span long durations. Minority branches may still have a large weight from attestations and should be discovered. To assist with that, add a branch discovery module that assists in such a situation by specifically targeting peers with unknown histories and downloading from them, in addition to sync manager work which handles popular branches.
This commit is contained in:
parent
66a9304fea
commit
fc9bc1da3a
@ -25,7 +25,7 @@ import
|
|||||||
attestation_pool, sync_committee_msg_pool, validator_change_pool],
|
attestation_pool, sync_committee_msg_pool, validator_change_pool],
|
||||||
./spec/datatypes/[base, altair],
|
./spec/datatypes/[base, altair],
|
||||||
./spec/eth2_apis/dynamic_fee_recipients,
|
./spec/eth2_apis/dynamic_fee_recipients,
|
||||||
./sync/[sync_manager, request_manager],
|
./sync/[branch_discovery, sync_manager, request_manager],
|
||||||
./validators/[
|
./validators/[
|
||||||
action_tracker, message_router, validator_monitor, validator_pool,
|
action_tracker, message_router, validator_monitor, validator_pool,
|
||||||
keystore_management],
|
keystore_management],
|
||||||
@ -35,7 +35,7 @@ export
|
|||||||
osproc, chronos, presto, action_tracker,
|
osproc, chronos, presto, action_tracker,
|
||||||
beacon_clock, beacon_chain_db, conf, light_client,
|
beacon_clock, beacon_chain_db, conf, light_client,
|
||||||
attestation_pool, sync_committee_msg_pool, validator_change_pool,
|
attestation_pool, sync_committee_msg_pool, validator_change_pool,
|
||||||
eth2_network, el_manager, request_manager, sync_manager,
|
eth2_network, el_manager, branch_discovery, request_manager, sync_manager,
|
||||||
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
|
eth2_processor, optimistic_processor, blockchain_dag, block_quarantine,
|
||||||
base, message_router, validator_monitor, validator_pool,
|
base, message_router, validator_monitor, validator_pool,
|
||||||
consensus_manager, dynamic_fee_recipients
|
consensus_manager, dynamic_fee_recipients
|
||||||
@ -85,6 +85,7 @@ type
|
|||||||
requestManager*: RequestManager
|
requestManager*: RequestManager
|
||||||
syncManager*: SyncManager[Peer, PeerId]
|
syncManager*: SyncManager[Peer, PeerId]
|
||||||
backfiller*: SyncManager[Peer, PeerId]
|
backfiller*: SyncManager[Peer, PeerId]
|
||||||
|
branchDiscovery*: ref BranchDiscovery
|
||||||
genesisSnapshotContent*: string
|
genesisSnapshotContent*: string
|
||||||
processor*: ref Eth2Processor
|
processor*: ref Eth2Processor
|
||||||
blockProcessor*: ref BlockProcessor
|
blockProcessor*: ref BlockProcessor
|
||||||
|
@ -81,7 +81,6 @@ proc addResolvedHeadBlock(
|
|||||||
epochRef = dag.getEpochRef(state, cache)
|
epochRef = dag.getEpochRef(state, cache)
|
||||||
epochRefTick = Moment.now()
|
epochRefTick = Moment.now()
|
||||||
|
|
||||||
dag.resetChainProgressWatchdog()
|
|
||||||
debug "Block resolved",
|
debug "Block resolved",
|
||||||
blockRoot = shortLog(blockRoot),
|
blockRoot = shortLog(blockRoot),
|
||||||
blck = shortLog(trustedBlock.message),
|
blck = shortLog(trustedBlock.message),
|
||||||
|
@ -2412,6 +2412,7 @@ proc updateHead*(
|
|||||||
quit 1
|
quit 1
|
||||||
|
|
||||||
dag.head = newHead
|
dag.head = newHead
|
||||||
|
dag.resetChainProgressWatchdog()
|
||||||
|
|
||||||
if dag.headState.is_merge_transition_complete() and not
|
if dag.headState.is_merge_transition_complete() and not
|
||||||
lastHeadMergeComplete and
|
lastHeadMergeComplete and
|
||||||
|
@ -55,7 +55,6 @@ const
|
|||||||
## Number of slots from wall time that we start processing every payload
|
## Number of slots from wall time that we start processing every payload
|
||||||
|
|
||||||
type
|
type
|
||||||
BlobSidecars* = seq[ref BlobSidecar]
|
|
||||||
BlockEntry = object
|
BlockEntry = object
|
||||||
blck*: ForkedSignedBeaconBlock
|
blck*: ForkedSignedBeaconBlock
|
||||||
blobs*: Opt[BlobSidecars]
|
blobs*: Opt[BlobSidecars]
|
||||||
|
@ -219,6 +219,10 @@ proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError])
|
|||||||
|
|
||||||
await peer.handleStatus(nstate, theirStatus)
|
await peer.handleStatus(nstate, theirStatus)
|
||||||
|
|
||||||
|
proc getHeadRoot*(peer: Peer): Eth2Digest =
|
||||||
|
## Returns head root for specific peer ``peer``.
|
||||||
|
peer.state(PeerSync).statusMsg.headRoot
|
||||||
|
|
||||||
proc getHeadSlot*(peer: Peer): Slot =
|
proc getHeadSlot*(peer: Peer): Slot =
|
||||||
## Returns head slot for specific peer ``peer``.
|
## Returns head slot for specific peer ``peer``.
|
||||||
peer.state(PeerSync).statusMsg.headSlot
|
peer.state(PeerSync).statusMsg.headSlot
|
||||||
|
@ -29,6 +29,8 @@ const
|
|||||||
## Peer's `status` answer is fine.
|
## Peer's `status` answer is fine.
|
||||||
PeerScoreNoValues* = -100
|
PeerScoreNoValues* = -100
|
||||||
## Peer did not respond in time to a request.
|
## Peer did not respond in time to a request.
|
||||||
|
PeerScoreGoodBatchValue* = 5
|
||||||
|
## Individual portion of peer's multi-step answer is fine.
|
||||||
PeerScoreGoodValues* = 100
|
PeerScoreGoodValues* = 100
|
||||||
## Peer's answer to our request is fine.
|
## Peer's answer to our request is fine.
|
||||||
PeerScoreBadValues* = -1000
|
PeerScoreBadValues* = -1000
|
||||||
|
@ -367,6 +367,9 @@ proc initFullNode(
|
|||||||
func getFrontfillSlot(): Slot =
|
func getFrontfillSlot(): Slot =
|
||||||
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
|
max(dag.frontfill.get(BlockId()).slot, dag.horizon)
|
||||||
|
|
||||||
|
func isBlockKnown(blockRoot: Eth2Digest): bool =
|
||||||
|
dag.getBlockRef(blockRoot).isSome
|
||||||
|
|
||||||
let
|
let
|
||||||
quarantine = newClone(
|
quarantine = newClone(
|
||||||
Quarantine.init())
|
Quarantine.init())
|
||||||
@ -398,6 +401,13 @@ proc initFullNode(
|
|||||||
# that should probably be reimagined more holistically in the future.
|
# that should probably be reimagined more holistically in the future.
|
||||||
blockProcessor[].addBlock(
|
blockProcessor[].addBlock(
|
||||||
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
|
||||||
|
branchDiscoveryBlockVerifier = proc(
|
||||||
|
signedBlock: ForkedSignedBeaconBlock,
|
||||||
|
blobs: Opt[BlobSidecars]
|
||||||
|
): Future[Result[void, VerifierError]] {.async: (raises: [
|
||||||
|
CancelledError], raw: true).} =
|
||||||
|
blockProcessor[].addBlock(
|
||||||
|
MsgSource.gossip, signedBlock, blobs, maybeFinalized = false)
|
||||||
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||||
maybeFinalized: bool):
|
maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
|
||||||
@ -448,6 +458,9 @@ proc initFullNode(
|
|||||||
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
||||||
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
||||||
maxHeadAge = 0)
|
maxHeadAge = 0)
|
||||||
|
branchDiscovery = BranchDiscovery.new(
|
||||||
|
node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown,
|
||||||
|
branchDiscoveryBlockVerifier)
|
||||||
router = (ref MessageRouter)(
|
router = (ref MessageRouter)(
|
||||||
processor: processor,
|
processor: processor,
|
||||||
network: node.network)
|
network: node.network)
|
||||||
@ -490,6 +503,7 @@ proc initFullNode(
|
|||||||
node.requestManager = requestManager
|
node.requestManager = requestManager
|
||||||
node.syncManager = syncManager
|
node.syncManager = syncManager
|
||||||
node.backfiller = backfiller
|
node.backfiller = backfiller
|
||||||
|
node.branchDiscovery = branchDiscovery
|
||||||
node.router = router
|
node.router = router
|
||||||
|
|
||||||
await node.addValidators()
|
await node.addValidators()
|
||||||
@ -1596,6 +1610,10 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
|
|||||||
|
|
||||||
await node.updateGossipStatus(slot + 1)
|
await node.updateGossipStatus(slot + 1)
|
||||||
|
|
||||||
|
# Branch discovery module is only used to support ongoing sync manager tasks
|
||||||
|
if not node.syncManager.inProgress:
|
||||||
|
await node.branchDiscovery.stop()
|
||||||
|
|
||||||
func formatNextConsensusFork(
|
func formatNextConsensusFork(
|
||||||
node: BeaconNode, withVanityArt = false): Opt[string] =
|
node: BeaconNode, withVanityArt = false): Opt[string] =
|
||||||
let consensusFork =
|
let consensusFork =
|
||||||
@ -1615,6 +1633,14 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
|||||||
let optimisticHead = not node.dag.head.executionValid
|
let optimisticHead = not node.dag.head.executionValid
|
||||||
if node.syncManager.inProgress:
|
if node.syncManager.inProgress:
|
||||||
let
|
let
|
||||||
|
degradedSuffix =
|
||||||
|
case node.branchDiscovery.state
|
||||||
|
of BranchDiscoveryState.Active:
|
||||||
|
"/discovering"
|
||||||
|
of BranchDiscoveryState.Suspended:
|
||||||
|
"/degraded"
|
||||||
|
of BranchDiscoveryState.Stopped:
|
||||||
|
""
|
||||||
optimisticSuffix =
|
optimisticSuffix =
|
||||||
if optimisticHead:
|
if optimisticHead:
|
||||||
"/opt"
|
"/opt"
|
||||||
@ -1637,7 +1663,7 @@ func syncStatus(node: BeaconNode, wallSlot: Slot): string =
|
|||||||
formatFloat(progress, ffDecimal, precision = 2) & "%"
|
formatFloat(progress, ffDecimal, precision = 2) & "%"
|
||||||
else:
|
else:
|
||||||
""
|
""
|
||||||
node.syncManager.syncStatus & optimisticSuffix &
|
node.syncManager.syncStatus & degradedSuffix & optimisticSuffix &
|
||||||
lightClientSuffix & catchingUpSuffix
|
lightClientSuffix & catchingUpSuffix
|
||||||
elif node.backfiller.inProgress:
|
elif node.backfiller.inProgress:
|
||||||
"backfill: " & node.backfiller.syncStatus
|
"backfill: " & node.backfiller.syncStatus
|
||||||
|
@ -64,6 +64,7 @@ type
|
|||||||
signed_block_header*: SignedBeaconBlockHeader
|
signed_block_header*: SignedBeaconBlockHeader
|
||||||
kzg_commitment_inclusion_proof*:
|
kzg_commitment_inclusion_proof*:
|
||||||
array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest]
|
array[KZG_COMMITMENT_INCLUSION_PROOF_DEPTH, Eth2Digest]
|
||||||
|
BlobSidecars* = seq[ref BlobSidecar]
|
||||||
|
|
||||||
# https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142
|
# https://github.com/ethereum/beacon-APIs/blob/4882aa0803b622b75bab286b285599d70b7a2429/apis/eventstream/index.yaml#L138-L142
|
||||||
# Spec object, not only internal, because it gets serialized out for the
|
# Spec object, not only internal, because it gets serialized out for the
|
||||||
|
292
beacon_chain/sync/branch_discovery.nim
Normal file
292
beacon_chain/sync/branch_discovery.nim
Normal file
@ -0,0 +1,292 @@
|
|||||||
|
# beacon_chain
|
||||||
|
# Copyright (c) 2024 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
# This module is started when a chain stall is detected, i.e.,
|
||||||
|
# a long period without any chain progress, while at the same time being
|
||||||
|
# connected to a healthy number of different peers.
|
||||||
|
#
|
||||||
|
# In such a scenario, the network may partition into multiple groups of peers
|
||||||
|
# that build on separate branches. It is vital to specifically target peers
|
||||||
|
# from different partitions and download branches which are not necessarily
|
||||||
|
# popularly served. Branches may be unpopular because they are expensive to
|
||||||
|
# apply, and may have a difficult time propagating despite a large weight in
|
||||||
|
# attestations backing them. This only exacerbates the longer the split view
|
||||||
|
# scenario is ongoing.
|
||||||
|
#
|
||||||
|
# While sync manager can sync popular chains well, it cannot reliably sync
|
||||||
|
# minority chains only served by a limited number of peers. This module
|
||||||
|
# augments sync manager in the split view scenario.
|
||||||
|
# Note that request manager is not running while sync manager is running.
|
||||||
|
#
|
||||||
|
# Once both sync manager and branch discovery stopped resolving new blocks,
|
||||||
|
# `syncStatus` will report `ChainSyncStatus.Degraded` and `blockchain_dag` state
|
||||||
|
# will be gradually advanced to the wall slot to prepare for block proposal.
|
||||||
|
# If at that time, no additional branches were discovered, validator duties
|
||||||
|
# will be performed based on local fork choice.
|
||||||
|
#
|
||||||
|
# Note that the canonical chain may not be on the highest slot number,
|
||||||
|
# as some partitions of the network may have built on top of branches
|
||||||
|
# with lower validator support while the canonical chain was not visible.
|
||||||
|
|
||||||
|
import
|
||||||
|
std/algorithm,
|
||||||
|
chronos, chronicles, metrics, results,
|
||||||
|
../spec/[forks, network],
|
||||||
|
../consensus_object_pools/block_pools_types,
|
||||||
|
../networking/[eth2_network, peer_pool],
|
||||||
|
./sync_protocol
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "branchdiscovery"
|
||||||
|
|
||||||
|
declareGauge beacon_sync_branchdiscovery_state,
|
||||||
|
"Branch discovery module operating state"
|
||||||
|
|
||||||
|
declareCounter beacon_sync_branchdiscovery_discovered_blocks,
|
||||||
|
"Number of beacon blocks discovered by the branch discovery module"
|
||||||
|
|
||||||
|
type
|
||||||
|
BranchDiscoveryState* {.pure.} = enum
|
||||||
|
Stopped,
|
||||||
|
Suspended,
|
||||||
|
Active
|
||||||
|
|
||||||
|
GetSlotCallback* =
|
||||||
|
proc(): Slot {.gcsafe, raises: [].}
|
||||||
|
|
||||||
|
IsBlockKnownCallback* =
|
||||||
|
proc(blockRoot: Eth2Digest): bool {.gcsafe, raises: [].}
|
||||||
|
|
||||||
|
BlockVerifierCallback* = proc(
|
||||||
|
signedBlock: ForkedSignedBeaconBlock,
|
||||||
|
blobs: Opt[BlobSidecars]
|
||||||
|
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
|
||||||
|
|
||||||
|
BranchDiscovery* = object
|
||||||
|
network: Eth2Node
|
||||||
|
getFinalizedSlot: GetSlotCallback
|
||||||
|
isBlockKnown: IsBlockKnownCallback
|
||||||
|
blockVerifier: BlockVerifierCallback
|
||||||
|
isActive: AsyncEvent
|
||||||
|
loopFuture: Future[void].Raising([])
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type BranchDiscovery,
|
||||||
|
network: Eth2Node,
|
||||||
|
getFinalizedSlot: GetSlotCallback,
|
||||||
|
isBlockKnown: IsBlockKnownCallback,
|
||||||
|
blockVerifier: BlockVerifierCallback): ref BranchDiscovery =
|
||||||
|
let self = (ref BranchDiscovery)(
|
||||||
|
network: network,
|
||||||
|
getFinalizedSlot: getFinalizedSlot,
|
||||||
|
isBlockKnown: isBlockKnown,
|
||||||
|
blockVerifier: blockVerifier,
|
||||||
|
isActive: newAsyncEvent())
|
||||||
|
self[].isActive.fire()
|
||||||
|
self
|
||||||
|
|
||||||
|
proc discoverBranch(
|
||||||
|
self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} =
|
||||||
|
logScope:
|
||||||
|
peer
|
||||||
|
peer_score = peer.getScore()
|
||||||
|
|
||||||
|
let
|
||||||
|
finalizedSlot = self.getFinalizedSlot()
|
||||||
|
peerHeadSlot = peer.getHeadSlot()
|
||||||
|
if peerHeadSlot <= finalizedSlot:
|
||||||
|
peer.updateScore(PeerScoreUseless)
|
||||||
|
debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot
|
||||||
|
return
|
||||||
|
|
||||||
|
var blockRoot = peer.getHeadRoot()
|
||||||
|
logScope: blockRoot
|
||||||
|
if self.isBlockKnown(blockRoot):
|
||||||
|
peer.updateScore(PeerScoreUseless)
|
||||||
|
debug "Peer's head block root is already known"
|
||||||
|
return
|
||||||
|
|
||||||
|
const
|
||||||
|
maxRequestsPerBurst = 20
|
||||||
|
burstDuration = chronos.seconds(40)
|
||||||
|
let bucket = TokenBucket.new(maxRequestsPerBurst, burstDuration)
|
||||||
|
|
||||||
|
var parentSlot = peerHeadSlot + 1
|
||||||
|
logScope: parentSlot
|
||||||
|
while true:
|
||||||
|
if self.isBlockKnown(blockRoot):
|
||||||
|
debug "Branch from peer no longer unknown"
|
||||||
|
return
|
||||||
|
if peer.getScore() < PeerScoreLowLimit:
|
||||||
|
debug "Failed to discover new branch from peer"
|
||||||
|
return
|
||||||
|
|
||||||
|
debug "Discovering new branch from peer"
|
||||||
|
try:
|
||||||
|
await bucket.consume(1)
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
||||||
|
let rsp = await peer.beaconBlocksByRoot_v2(BlockRootsList @[blockRoot])
|
||||||
|
if rsp.isErr:
|
||||||
|
# `eth2_network` already descored according to the specific error
|
||||||
|
debug "Failed to receive block", err = rsp.error
|
||||||
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
||||||
|
continue
|
||||||
|
template blocks: untyped = rsp.get
|
||||||
|
|
||||||
|
# The peer was the one providing us with this block root, it should exist
|
||||||
|
if blocks.len == 0:
|
||||||
|
peer.updateScore(PeerScoreNoValues)
|
||||||
|
debug "Received no blocks", numBlocks = blocks.len
|
||||||
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
||||||
|
continue
|
||||||
|
if blocks.len > 1:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received too many blocks", numBlocks = blocks.len
|
||||||
|
return
|
||||||
|
template blck: untyped = blocks[0][]
|
||||||
|
if blck.slot >= parentSlot:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received block older than parent", receivedSlot = blck.slot
|
||||||
|
return
|
||||||
|
if blck.root != blockRoot:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received incorrect block", receivedRoot = blck.root
|
||||||
|
return
|
||||||
|
|
||||||
|
var blobIds: seq[BlobIdentifier]
|
||||||
|
withBlck(blck):
|
||||||
|
when consensusFork >= ConsensusFork.Deneb:
|
||||||
|
for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len:
|
||||||
|
blobIds.add BlobIdentifier(block_root: blockRoot, index: i.BlobIndex)
|
||||||
|
var blobs: Opt[BlobSidecars]
|
||||||
|
if blobIds.len > 0:
|
||||||
|
while true:
|
||||||
|
if self.isBlockKnown(blockRoot):
|
||||||
|
debug "Branch from peer no longer unknown"
|
||||||
|
return
|
||||||
|
if peer.getScore() < PeerScoreLowLimit:
|
||||||
|
debug "Failed to discover new branch from peer"
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await bucket.consume(1)
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "TokenBucket.consume should not fail: " & $exc.msg
|
||||||
|
let r = await peer.blobSidecarsByRoot(BlobIdentifierList blobIds)
|
||||||
|
if r.isErr:
|
||||||
|
# `eth2_network` already descored according to the specific error
|
||||||
|
debug "Failed to receive blobs", err = r.error
|
||||||
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
||||||
|
continue
|
||||||
|
template blobSidecars: untyped = r.unsafeGet
|
||||||
|
|
||||||
|
if blobSidecars.len < blobIds.len:
|
||||||
|
peer.updateScore(PeerScoreMissingValues)
|
||||||
|
debug "Received not all blobs",
|
||||||
|
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
||||||
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
||||||
|
continue
|
||||||
|
if blobSidecars.len > blobIds.len:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received too many blobs",
|
||||||
|
numBlobs = blobSidecars.len, expectedNumBlobs = blobIds.len
|
||||||
|
return
|
||||||
|
for i, blobSidecar in blobSidecars:
|
||||||
|
let root = hash_tree_root(blobSidecar[].signed_block_header.message)
|
||||||
|
if root != blockRoot:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received unexpected blob"
|
||||||
|
return
|
||||||
|
blobSidecar[].verify_blob_sidecar_inclusion_proof().isOkOr:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received invalid blob"
|
||||||
|
return
|
||||||
|
blobs = Opt.some distinctBase(blobSidecars).sortedByIt(it.index)
|
||||||
|
for i, blobSidecar in blobs.get:
|
||||||
|
if blobSidecar[].index != i.BlobIndex:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received duplicate blobs while others are missing"
|
||||||
|
return
|
||||||
|
break
|
||||||
|
|
||||||
|
let err = (await self.blockVerifier(blck, blobs)).errorOr:
|
||||||
|
peer.updateScore(PeerScoreGoodBatchValue + PeerScoreGoodValues)
|
||||||
|
beacon_sync_branchdiscovery_discovered_blocks.inc()
|
||||||
|
info "Discovered new branch from peer"
|
||||||
|
break
|
||||||
|
case err
|
||||||
|
of VerifierError.Invalid:
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
debug "Received invalid block"
|
||||||
|
return
|
||||||
|
of VerifierError.UnviableFork:
|
||||||
|
peer.updateScore(PeerScoreUnviableFork)
|
||||||
|
debug "Received unviable block"
|
||||||
|
return
|
||||||
|
of VerifierError.Duplicate:
|
||||||
|
peer.updateScore(PeerScoreGoodValues)
|
||||||
|
debug "Connected new branch from peer"
|
||||||
|
break
|
||||||
|
of VerifierError.MissingParent:
|
||||||
|
peer.updateScore(PeerScoreGoodBatchValue)
|
||||||
|
parentSlot = blck.slot
|
||||||
|
blockRoot = blck.getForkedBlockField(parent_root)
|
||||||
|
continue
|
||||||
|
|
||||||
|
proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
|
||||||
|
try:
|
||||||
|
while true:
|
||||||
|
await self[].isActive.wait()
|
||||||
|
await sleepAsync(RESP_TIMEOUT_DUR)
|
||||||
|
|
||||||
|
let peer =
|
||||||
|
try:
|
||||||
|
self[].network.peerPool.acquireNoWait()
|
||||||
|
except PeerPoolError as exc:
|
||||||
|
debug "Failed to acquire peer", exc = exc.msg
|
||||||
|
continue
|
||||||
|
defer: self[].network.peerPool.release(peer)
|
||||||
|
|
||||||
|
await self[].discoverBranch(peer)
|
||||||
|
except CancelledError:
|
||||||
|
return
|
||||||
|
|
||||||
|
func state*(self: ref BranchDiscovery): BranchDiscoveryState =
|
||||||
|
if self[].loopFuture == nil:
|
||||||
|
BranchDiscoveryState.Stopped
|
||||||
|
elif not self[].isActive.isSet:
|
||||||
|
BranchDiscoveryState.Suspended
|
||||||
|
else:
|
||||||
|
BranchDiscoveryState.Active
|
||||||
|
|
||||||
|
proc start*(self: ref BranchDiscovery) =
|
||||||
|
doAssert self[].loopFuture == nil
|
||||||
|
info "Starting discovery of new branches"
|
||||||
|
self[].loopFuture = self.loop()
|
||||||
|
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
||||||
|
|
||||||
|
proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} =
|
||||||
|
if self[].loopFuture != nil:
|
||||||
|
info "Stopping discovery of new branches"
|
||||||
|
await self[].loopFuture.cancelAndWait()
|
||||||
|
self[].loopFuture = nil
|
||||||
|
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
||||||
|
|
||||||
|
proc suspend*(self: ref BranchDiscovery) =
|
||||||
|
self[].isActive.clear()
|
||||||
|
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
||||||
|
|
||||||
|
proc resume*(self: ref BranchDiscovery) =
|
||||||
|
self[].isActive.fire()
|
||||||
|
beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
|
@ -250,19 +250,31 @@ proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
|
|||||||
if not wallSlot.afterGenesis or
|
if not wallSlot.afterGenesis or
|
||||||
head.slot + node.config.syncHorizon >= wallSlot.slot:
|
head.slot + node.config.syncHorizon >= wallSlot.slot:
|
||||||
node.dag.resetChainProgressWatchdog()
|
node.dag.resetChainProgressWatchdog()
|
||||||
|
node.branchDiscovery.suspend()
|
||||||
return ChainSyncStatus.Synced
|
return ChainSyncStatus.Synced
|
||||||
|
|
||||||
if node.dag.chainIsProgressing():
|
|
||||||
# Chain is progressing, we are out of sync
|
|
||||||
return ChainSyncStatus.Syncing
|
|
||||||
|
|
||||||
let numPeers = len(node.network.peerPool)
|
let numPeers = len(node.network.peerPool)
|
||||||
if numPeers <= node.config.maxPeers div 4:
|
if numPeers <= node.config.maxPeers div 4:
|
||||||
# We may have poor connectivity, wait until more peers are available.
|
# We may have poor connectivity, wait until more peers are available.
|
||||||
# This could also be intermittent, as state replays while chain is degraded
|
# This could also be intermittent, as state replays while chain is degraded
|
||||||
# may take significant amounts of time, during which many peers are lost
|
# may take significant amounts of time, during which many peers are lost
|
||||||
|
node.branchDiscovery.suspend()
|
||||||
return ChainSyncStatus.Syncing
|
return ChainSyncStatus.Syncing
|
||||||
|
|
||||||
|
if node.dag.chainIsProgressing():
|
||||||
|
# Chain is progressing, we are out of sync
|
||||||
|
node.branchDiscovery.resume()
|
||||||
|
return ChainSyncStatus.Syncing
|
||||||
|
|
||||||
|
# Network connectivity is good, but we have trouble making sync progress.
|
||||||
|
# Turn on branch discovery module until we have a recent canonical head.
|
||||||
|
# The branch discovery module specifically targets peers on alternate branches
|
||||||
|
# and supports sync manager in discovering branches that are not widely seen
|
||||||
|
# but that may still have weight from attestations.
|
||||||
|
if node.branchDiscovery.state == BranchDiscoveryState.Stopped:
|
||||||
|
node.branchDiscovery.start()
|
||||||
|
node.branchDiscovery.resume()
|
||||||
|
|
||||||
let
|
let
|
||||||
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
|
maxHeadSlot = node.dag.heads.foldl(max(a, b.slot), GENESIS_SLOT)
|
||||||
numPeersWithHigherProgress = node.network.peerPool.peers
|
numPeersWithHigherProgress = node.network.peerPool.peers
|
||||||
|
Loading…
x
Reference in New Issue
Block a user