Merge branch 'dev/etan/zf-branchpull' into feat/splitview

This commit is contained in:
Etan Kissling 2024-03-27 16:14:38 +01:00
commit b869546524
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
8 changed files with 123 additions and 46 deletions

View File

@ -85,7 +85,7 @@ type
requestManager*: RequestManager requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerId] syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId]
branchDiscovery*: ref BranchDiscovery branchDiscovery*: ref BranchDiscovery[Peer, PeerId]
genesisSnapshotContent*: string genesisSnapshotContent*: string
processor*: ref Eth2Processor processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor blockProcessor*: ref BlockProcessor

View File

@ -125,6 +125,12 @@ proc startLightClient*(node: BeaconNode) =
node.lightClient.start() node.lightClient.start()
proc stopLightClient*(node: BeaconNode) {.async: (raises: []).} =
if not node.config.syncLightClient:
return
await node.lightClient.stop()
proc installLightClientMessageValidators*(node: BeaconNode) = proc installLightClientMessageValidators*(node: BeaconNode) =
let eth2Processor = let eth2Processor =
if node.config.lightClientDataServe: if node.config.lightClientDataServe:

View File

@ -43,7 +43,7 @@ type
getBeaconTime: GetBeaconTimeFn getBeaconTime: GetBeaconTimeFn
store: ref ForkedLightClientStore store: ref ForkedLightClientStore
processor: ref LightClientProcessor processor: ref LightClientProcessor
manager: LightClientManager manager: ref LightClientManager
gossipState: GossipState gossipState: GossipState
onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback
bootstrapObserver*: LightClientBootstrapObserver bootstrapObserver*: LightClientBootstrapObserver
@ -173,7 +173,7 @@ proc createLightClient(
else: else:
GENESIS_SLOT.sync_committee_period GENESIS_SLOT.sync_committee_period
lightClient.manager = LightClientManager.init( lightClient.manager = LightClientManager.new(
lightClient.network, rng, getTrustedBlockRoot, lightClient.network, rng, getTrustedBlockRoot,
bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier,
isLightClientStoreInitialized, isNextSyncCommitteeKnown, isLightClientStoreInitialized, isNextSyncCommitteeKnown,
@ -215,10 +215,18 @@ proc createLightClient*(
cfg, forkDigests, getBeaconTime, genesis_validators_root, finalizationMode) cfg, forkDigests, getBeaconTime, genesis_validators_root, finalizationMode)
proc start*(lightClient: LightClient) = proc start*(lightClient: LightClient) =
if lightClient.manager.isRunning:
return
notice "Starting light client", notice "Starting light client",
trusted_block_root = lightClient.trustedBlockRoot trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start() lightClient.manager.start()
proc stop*(lightClient: LightClient) {.async: (raises: [], raw: true).} =
if not lightClient.manager.isRunning:
return
notice "Stopping light client"
lightClient.manager.stop()
proc resetToFinalizedHeader*( proc resetToFinalizedHeader*(
lightClient: LightClient, lightClient: LightClient,
header: ForkedLightClientHeader, header: ForkedLightClientHeader,

View File

@ -208,6 +208,9 @@ proc handleStatus(peer: Peer,
await peer.handlePeer() await peer.handlePeer()
true true
const StatusExpirationTime* = chronos.minutes(2)
## Time time it takes for the peer's status information to expire.
proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} = proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} =
## Request `status` of remote peer ``peer``. ## Request `status` of remote peer ``peer``.
let let

View File

@ -445,8 +445,8 @@ proc initFullNode(
blockProcessor, node.validatorMonitor, dag, attestationPool, blockProcessor, node.validatorMonitor, dag, attestationPool,
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
branchDiscovery = BranchDiscovery.new( branchDiscovery = BranchDiscovery[Peer, PeerId].new(
node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, node.network.peerPool, getFirstSlotAtFinalizedEpoch, isBlockKnown,
branchDiscoveryBlockVerifier) branchDiscoveryBlockVerifier)
fallbackSyncer = proc(peer: Peer) = fallbackSyncer = proc(peer: Peer) =
branchDiscovery.transferOwnership(peer) branchDiscovery.transferOwnership(peer)
@ -1637,6 +1637,12 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
if not node.syncManager.inProgress: if not node.syncManager.inProgress:
await node.branchDiscovery.stop() await node.branchDiscovery.stop()
# Light client is stopped while branch discovery is ongoing
if node.branchDiscovery.state != BranchDiscoveryState.Stopped:
node.startLightClient()
else:
await node.stopLightClient()
func formatNextConsensusFork( func formatNextConsensusFork(
node: BeaconNode, withVanityArt = false): Opt[string] = node: BeaconNode, withVanityArt = false): Opt[string] =
let consensusFork = let consensusFork =

View File

@ -27,6 +27,49 @@
# Note that the canonical chain may not be on the highest slot number, # Note that the canonical chain may not be on the highest slot number,
# as some partitions of the network may have built on top of branches # as some partitions of the network may have built on top of branches
# with lower validator support while the canonical chain was not visible. # with lower validator support while the canonical chain was not visible.
#
# Despite its simplicity and brute-force approach, this module has been highly
# effective in the final month of Goerli. It managed to sync the entire Nimbus
# testnet fleet to the same branch, while also tracking >25 alternate branches.
# Further improvements should be applied:
#
# 1. Progress is currently limited by the size of `block_quarantine` per cycle,
# as this module downloads in backward order into the quarantine before the
# results get applied in forward order. This further limits the concurrency
# to a single peer at a time, because there is only a single quarantine that
# can hold a pending branch history.
#
# This could be addressed by probing the peer about the branch that it's on.
# We could send a by-root request for all our known heads to identify which
# ones they are aware of, followed by a binary search back to finalized slot
# to determine how far along the peer's progress is. From there on, by range
# requests allow forward sync and remembering partial progress along the way.
# We also wouldn't have to be as careful to avoid rate limit disconnections.
# Empty epoch progress also needs to be remembered across syncing sessions,
# because in a split view scenario often there are hundreds of empty epochs,
# and by-range syncing is highly ineffective.
#
# 2. The peer pool currently provides the best available peer on acquisition.
# Its filtering should be extended to have a better targeting for interesting
# peers, i.e., those that claim to know about head roots that we are unaware
# of and also have a head slot in the past, indicating that sync manager will
# not target those peers and will not manage to pull their branches quickly.
#
# 3. When monitoring gossip, peers that inform about blocks with unknown parent
# roots or aggregates referring to unknown beacon roots should be transferred
# into branch discovery as well. Gossip only propagates through peers that
# have validated the data themselves, so they must have the parent data.
#
# 4. Testing. Beyond Goerli, there is no regular long-lasting low participation
# network that reflects a realistic scenario. The network needs to be huge,
# geographically distributed with a variety of clients and lots of activity.
# Blocks need to take a while to apply to test the slow propagation when
# there are lots of empty epochs between blocks. There must be reorgs of
# hundreds of blocks to reflect EL suddenly going back to optimistic mode.
# A smaller simulation to run in CI may be achieveable by intentionally
# setting the `SECONDS_PER_SLOT` to a low value. Furthermore, synthetic
# scenarios can be tested in unit tests by mocking peers and blocks and
# making timers and rate limits configurable.
import import
std/[algorithm, deques], std/[algorithm, deques],
@ -62,23 +105,23 @@ type
blobs: Opt[BlobSidecars] blobs: Opt[BlobSidecars]
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BranchDiscovery* = object BranchDiscovery*[A, B] = object
network: Eth2Node pool: PeerPool[A, B]
getFinalizedSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback
isBlockKnown: IsBlockKnownCallback isBlockKnown: IsBlockKnownCallback
blockVerifier: BlockVerifierCallback blockVerifier: BlockVerifierCallback
isActive: AsyncEvent isActive: AsyncEvent
loopFuture: Future[void].Raising([]) loopFuture: Future[void].Raising([])
peerQueue: Deque[Peer] peerQueue: Deque[A]
proc new*( proc new*[A, B](
T: type BranchDiscovery, T: type BranchDiscovery[A, B],
network: Eth2Node, pool: PeerPool[A, B],
getFinalizedSlot: GetSlotCallback, getFinalizedSlot: GetSlotCallback,
isBlockKnown: IsBlockKnownCallback, isBlockKnown: IsBlockKnownCallback,
blockVerifier: BlockVerifierCallback): ref BranchDiscovery = blockVerifier: BlockVerifierCallback): ref BranchDiscovery[A, B] =
let self = (ref BranchDiscovery)( let self = (ref BranchDiscovery[A, B])(
network: network, pool: pool,
getFinalizedSlot: getFinalizedSlot, getFinalizedSlot: getFinalizedSlot,
isBlockKnown: isBlockKnown, isBlockKnown: isBlockKnown,
blockVerifier: blockVerifier, blockVerifier: blockVerifier,
@ -86,16 +129,27 @@ proc new*(
self[].isActive.fire() self[].isActive.fire()
self self
proc discoverBranch( proc discoverBranch[A, B](
self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = self: BranchDiscovery[A, B],
peer: A) {.async: (raises: [CancelledError]).} =
logScope: logScope:
peer peer
peer_score = peer.getScore() peer_score = peer.getScore()
let let oldPeerHeadSlot = peer.getHeadSlot()
finalizedSlot = self.getFinalizedSlot() if Moment.now() - peer.getStatusLastTime() >= StatusExpirationTime:
peerHeadSlot = peer.getHeadSlot() if not(await peer.updateStatus()):
peer.updateScore(PeerScoreNoStatus)
debug "Failed to update status"
return
let peerHeadSlot = peer.getHeadSlot()
if peerHeadSlot != oldPeerHeadSlot:
peer.updateScore(PeerScoreGoodStatus)
debug "Peer has synced to a new head", oldPeerHeadSlot, peerHeadSlot
let finalizedSlot = self.getFinalizedSlot()
if peerHeadSlot <= finalizedSlot: if peerHeadSlot <= finalizedSlot:
# This peer can sync from different peers, it is useless to us at this time
peer.updateScore(PeerScoreUseless) peer.updateScore(PeerScoreUseless)
debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot
return return
@ -103,11 +157,14 @@ proc discoverBranch(
var blockRoot = peer.getHeadRoot() var blockRoot = peer.getHeadRoot()
logScope: blockRoot logScope: blockRoot
if self.isBlockKnown(blockRoot): if self.isBlockKnown(blockRoot):
peer.updateScore(PeerScoreUseless) # This peer may be actively syncing from us, only descore if no disconnect
if peer.getScore() >= PeerScoreLowLimit - PeerScoreUseless:
peer.updateScore(PeerScoreUseless)
debug "Peer's head block root is already known" debug "Peer's head block root is already known"
return return
# Many peers disconnect on rate limit, we have to avoid getting hit by it # Many peers disconnect on rate limit, we have to avoid getting hit by it
# to have a chance in picking up branches that don't have good propagation
const const
maxRequestsPerBurst = 15 maxRequestsPerBurst = 15
burstDuration = chronos.seconds(30) burstDuration = chronos.seconds(30)
@ -250,11 +307,11 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
self[].peerQueue.popFirst() self[].peerQueue.popFirst()
else: else:
try: try:
self[].network.peerPool.acquireNoWait() self[].pool.acquireNoWait()
except PeerPoolError as exc: except PeerPoolError as exc:
debug "Failed to acquire peer", exc = exc.msg debug "Failed to acquire peer", exc = exc.msg
continue continue
defer: self[].network.peerPool.release(peer) defer: self[].pool.release(peer)
await self[].discoverBranch(peer) await self[].discoverBranch(peer)
except CancelledError: except CancelledError:
@ -271,7 +328,7 @@ func state*(self: ref BranchDiscovery): BranchDiscoveryState =
proc clearPeerQueue(self: ref BranchDiscovery) = proc clearPeerQueue(self: ref BranchDiscovery) =
while self[].peerQueue.len > 0: while self[].peerQueue.len > 0:
let peer = self[].peerQueue.popLast() let peer = self[].peerQueue.popLast()
self[].network.peerPool.release(peer) self[].pool.release(peer)
proc start*(self: ref BranchDiscovery) = proc start*(self: ref BranchDiscovery) =
doAssert self[].loopFuture == nil doAssert self[].loopFuture == nil
@ -296,13 +353,13 @@ proc resume*(self: ref BranchDiscovery) =
self[].isActive.fire() self[].isActive.fire()
beacon_sync_branchdiscovery_state.set(self.state.ord().int64) beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
proc transferOwnership*(self: ref BranchDiscovery, peer: Peer) = proc transferOwnership*[A, B](self: ref BranchDiscovery[A, B], peer: A) =
const maxPeersInQueue = 10 const maxPeersInQueue = 10
if self.state != BranchDiscoveryState.Active or if self.state != BranchDiscoveryState.Active or
self[].peerQueue.len >= maxPeersInQueue or self[].peerQueue.len >= maxPeersInQueue or
peer.getHeadSlot() <= self[].getFinalizedSlot() or peer.getHeadSlot() <= self[].getFinalizedSlot() or
self[].isBlockKnown(peer.getHeadRoot()): self[].isBlockKnown(peer.getHeadRoot()):
self[].network.peerPool.release(peer) self[].pool.release(peer)
return return
debug "Peer transferred to branch discovery", debug "Peer transferred to branch discovery",

View File

@ -67,7 +67,7 @@ type
getBeaconTime: GetBeaconTimeFn getBeaconTime: GetBeaconTimeFn
loopFuture: Future[void].Raising([CancelledError]) loopFuture: Future[void].Raising([CancelledError])
func init*( func new*(
T: type LightClientManager, T: type LightClientManager,
network: Eth2Node, network: Eth2Node,
rng: ref HmacDrbgContext, rng: ref HmacDrbgContext,
@ -81,9 +81,9 @@ func init*(
getFinalizedPeriod: GetSyncCommitteePeriodCallback, getFinalizedPeriod: GetSyncCommitteePeriodCallback,
getOptimisticPeriod: GetSyncCommitteePeriodCallback, getOptimisticPeriod: GetSyncCommitteePeriodCallback,
getBeaconTime: GetBeaconTimeFn getBeaconTime: GetBeaconTimeFn
): LightClientManager = ): ref LightClientManager =
## Initialize light client manager. ## Initialize light client manager.
LightClientManager( (ref LightClientManager)(
network: network, network: network,
rng: rng, rng: rng,
getTrustedBlockRoot: getTrustedBlockRoot, getTrustedBlockRoot: getTrustedBlockRoot,
@ -99,16 +99,16 @@ func init*(
) )
proc isGossipSupported*( proc isGossipSupported*(
self: LightClientManager, self: ref LightClientManager,
period: SyncCommitteePeriod period: SyncCommitteePeriod
): bool = ): bool =
## Indicate whether the light client is sufficiently synced to accept gossip. ## Indicate whether the light client is sufficiently synced to accept gossip.
if not self.isLightClientStoreInitialized(): if not self[].isLightClientStoreInitialized():
return false return false
period.isGossipSupported( period.isGossipSupported(
finalizedPeriod = self.getFinalizedPeriod(), finalizedPeriod = self[].getFinalizedPeriod(),
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) isNextSyncCommitteeKnown = self[].isNextSyncCommitteeKnown())
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
proc doRequest( proc doRequest(
@ -381,13 +381,16 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(), isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(),
didLatestSyncTaskProgress = didProgress) didLatestSyncTaskProgress = didProgress)
proc start*(self: var LightClientManager) = func isRunning*(self: ref LightClientManager): bool =
## Start light client manager's loop. self[].loopFuture != nil
doAssert self.loopFuture == nil
self.loopFuture = self.loop()
proc stop*(self: var LightClientManager) {.async: (raises: []).} = proc start*(self: ref LightClientManager) =
## Start light client manager's loop.
doAssert self[].loopFuture == nil
self[].loopFuture = self[].loop()
proc stop*(self: ref LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop. ## Stop light client manager's loop.
if self.loopFuture != nil: if self[].loopFuture != nil:
await noCancel self.loopFuture.cancelAndWait() await noCancel self[].loopFuture.cancelAndWait()
self.loopFuture = nil self[].loopFuture = nil

View File

@ -28,12 +28,6 @@ const
SyncWorkersCount* = 10 SyncWorkersCount* = 10
## Number of sync workers to spawn ## Number of sync workers to spawn
StatusUpdateInterval* = chronos.minutes(1)
## Minimum time between two subsequent calls to update peer's status
StatusExpirationTime* = chronos.minutes(2)
## Time time it takes for the peer's status information to expire.
type type
PeerSyncer*[T] = proc(peer: T) {.gcsafe, raises: [].} PeerSyncer*[T] = proc(peer: T) {.gcsafe, raises: [].}