From 9f37ffdc621bc25be0ee8bc0d503f6aed8b927a7 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 27 Mar 2024 16:00:02 +0100 Subject: [PATCH 1/4] suspend light client sync while branch discovery is in progress --- beacon_chain/beacon_node_light_client.nim | 6 ++++ beacon_chain/light_client.nim | 12 ++++++-- beacon_chain/nimbus_beacon_node.nim | 6 ++++ beacon_chain/sync/light_client_manager.nim | 33 ++++++++++++---------- 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/beacon_chain/beacon_node_light_client.nim b/beacon_chain/beacon_node_light_client.nim index 3f0af21e2..9a63649ab 100644 --- a/beacon_chain/beacon_node_light_client.nim +++ b/beacon_chain/beacon_node_light_client.nim @@ -125,6 +125,12 @@ proc startLightClient*(node: BeaconNode) = node.lightClient.start() +proc stopLightClient*(node: BeaconNode) {.async: (raises: []).} = + if not node.config.syncLightClient: + return + + await node.lightClient.stop() + proc installLightClientMessageValidators*(node: BeaconNode) = let eth2Processor = if node.config.lightClientDataServe: diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim index e757be813..977947cbc 100644 --- a/beacon_chain/light_client.nim +++ b/beacon_chain/light_client.nim @@ -43,7 +43,7 @@ type getBeaconTime: GetBeaconTimeFn store: ref ForkedLightClientStore processor: ref LightClientProcessor - manager: LightClientManager + manager: ref LightClientManager gossipState: GossipState onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback bootstrapObserver*: LightClientBootstrapObserver @@ -173,7 +173,7 @@ proc createLightClient( else: GENESIS_SLOT.sync_committee_period - lightClient.manager = LightClientManager.init( + lightClient.manager = LightClientManager.new( lightClient.network, rng, getTrustedBlockRoot, bootstrapVerifier, updateVerifier, finalityVerifier, optimisticVerifier, isLightClientStoreInitialized, isNextSyncCommitteeKnown, @@ -215,10 +215,18 @@ proc createLightClient*( cfg, forkDigests, getBeaconTime, genesis_validators_root, finalizationMode) proc start*(lightClient: LightClient) = + if lightClient.manager.isRunning: + return notice "Starting light client", trusted_block_root = lightClient.trustedBlockRoot lightClient.manager.start() +proc stop*(lightClient: LightClient) {.async: (raises: [], raw: true).} = + if not lightClient.manager.isRunning: + return + notice "Stopping light client" + lightClient.manager.stop() + proc resetToFinalizedHeader*( lightClient: LightClient, header: ForkedLightClientHeader, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 96945882b..0471ddc73 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1584,6 +1584,12 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = if not node.syncManager.inProgress: await node.branchDiscovery.stop() + # Light client is stopped while branch discovery is ongoing + if node.branchDiscovery.state != BranchDiscoveryState.Stopped: + node.startLightClient() + else: + await node.stopLightClient() + func formatNextConsensusFork( node: BeaconNode, withVanityArt = false): Opt[string] = let consensusFork = diff --git a/beacon_chain/sync/light_client_manager.nim b/beacon_chain/sync/light_client_manager.nim index 7abc451bf..0c18fc057 100644 --- a/beacon_chain/sync/light_client_manager.nim +++ b/beacon_chain/sync/light_client_manager.nim @@ -67,7 +67,7 @@ type getBeaconTime: GetBeaconTimeFn loopFuture: Future[void].Raising([CancelledError]) -func init*( +func new*( T: type LightClientManager, network: Eth2Node, rng: ref HmacDrbgContext, @@ -81,9 +81,9 @@ func init*( getFinalizedPeriod: GetSyncCommitteePeriodCallback, getOptimisticPeriod: GetSyncCommitteePeriodCallback, getBeaconTime: GetBeaconTimeFn -): LightClientManager = +): ref LightClientManager = ## Initialize light client manager. - LightClientManager( + (ref LightClientManager)( network: network, rng: rng, getTrustedBlockRoot: getTrustedBlockRoot, @@ -99,16 +99,16 @@ func init*( ) proc isGossipSupported*( - self: LightClientManager, + self: ref LightClientManager, period: SyncCommitteePeriod ): bool = ## Indicate whether the light client is sufficiently synced to accept gossip. - if not self.isLightClientStoreInitialized(): + if not self[].isLightClientStoreInitialized(): return false period.isGossipSupported( - finalizedPeriod = self.getFinalizedPeriod(), - isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown()) + finalizedPeriod = self[].getFinalizedPeriod(), + isNextSyncCommitteeKnown = self[].isNextSyncCommitteeKnown()) # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap proc doRequest( @@ -381,13 +381,16 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} = isNextSyncCommitteeKnown = self.isNextSyncCommitteeKnown(), didLatestSyncTaskProgress = didProgress) -proc start*(self: var LightClientManager) = - ## Start light client manager's loop. - doAssert self.loopFuture == nil - self.loopFuture = self.loop() +func isRunning*(self: ref LightClientManager): bool = + self[].loopFuture != nil -proc stop*(self: var LightClientManager) {.async: (raises: []).} = +proc start*(self: ref LightClientManager) = + ## Start light client manager's loop. + doAssert self[].loopFuture == nil + self[].loopFuture = self[].loop() + +proc stop*(self: ref LightClientManager) {.async: (raises: []).} = ## Stop light client manager's loop. - if self.loopFuture != nil: - await noCancel self.loopFuture.cancelAndWait() - self.loopFuture = nil + if self[].loopFuture != nil: + await noCancel self[].loopFuture.cancelAndWait() + self[].loopFuture = nil From f8be7c326e3764502284691693f5e696450a5fb1 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 27 Mar 2024 16:00:21 +0100 Subject: [PATCH 2/4] be careful not to disconnect syncing peers in fragmented network --- beacon_chain/networking/peer_protocol.nim | 3 +++ beacon_chain/sync/branch_discovery.nim | 21 +++++++++++++++++---- beacon_chain/sync/sync_manager.nim | 6 ------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/beacon_chain/networking/peer_protocol.nim b/beacon_chain/networking/peer_protocol.nim index ff47b17dd..74b45e12e 100644 --- a/beacon_chain/networking/peer_protocol.nim +++ b/beacon_chain/networking/peer_protocol.nim @@ -208,6 +208,9 @@ proc handleStatus(peer: Peer, await peer.handlePeer() true +const StatusExpirationTime* = chronos.minutes(2) + ## Time time it takes for the peer's status information to expire. + proc updateStatus*(peer: Peer): Future[bool] {.async: (raises: [CancelledError]).} = ## Request `status` of remote peer ``peer``. let diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim index 5be514f43..54d205c48 100644 --- a/beacon_chain/sync/branch_discovery.nim +++ b/beacon_chain/sync/branch_discovery.nim @@ -92,10 +92,20 @@ proc discoverBranch( peer peer_score = peer.getScore() - let - finalizedSlot = self.getFinalizedSlot() - peerHeadSlot = peer.getHeadSlot() + let oldPeerHeadSlot = peer.getHeadSlot() + if Moment.now() - peer.getStatusLastTime() >= StatusExpirationTime: + if not(await peer.updateStatus()): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to update status" + return + let peerHeadSlot = peer.getHeadSlot() + if peerHeadSlot != oldPeerHeadSlot: + peer.updateScore(PeerScoreGoodStatus) + debug "Peer has synced to a new head", oldPeerHeadSlot, peerHeadSlot + + let finalizedSlot = self.getFinalizedSlot() if peerHeadSlot <= finalizedSlot: + # This peer can sync from different peers, it is useless to us at this time peer.updateScore(PeerScoreUseless) debug "Peer's head slot is already finalized", peerHeadSlot, finalizedSlot return @@ -103,11 +113,14 @@ proc discoverBranch( var blockRoot = peer.getHeadRoot() logScope: blockRoot if self.isBlockKnown(blockRoot): - peer.updateScore(PeerScoreUseless) + # This peer may be actively syncing from us, only descore if no disconnect + if peer.getScore() >= PeerScoreLowLimit - PeerScoreUseless: + peer.updateScore(PeerScoreUseless) debug "Peer's head block root is already known" return # Many peers disconnect on rate limit, we have to avoid getting hit by it + # to have a chance in picking up branches that don't have good propagation const maxRequestsPerBurst = 15 burstDuration = chronos.seconds(30) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 3640980e8..ebe91ec6a 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -28,12 +28,6 @@ const SyncWorkersCount* = 10 ## Number of sync workers to spawn - StatusUpdateInterval* = chronos.minutes(1) - ## Minimum time between two subsequent calls to update peer's status - - StatusExpirationTime* = chronos.minutes(2) - ## Time time it takes for the peer's status information to expire. - type PeerSyncer*[T] = proc(peer: T) {.gcsafe, raises: [].} From 02a69be4e2b94d790d085ba2adcb1a256e8e9659 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 27 Mar 2024 16:00:36 +0100 Subject: [PATCH 3/4] generic branch discovery version that supports mocking peers --- beacon_chain/beacon_node.nim | 2 +- beacon_chain/nimbus_beacon_node.nim | 4 ++-- beacon_chain/sync/branch_discovery.nim | 33 +++++++++++++------------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 34f23eb00..8a4d2cd5a 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -85,7 +85,7 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] - branchDiscovery*: ref BranchDiscovery + branchDiscovery*: ref BranchDiscovery[Peer, PeerId] genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 0471ddc73..993f2627f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -445,8 +445,8 @@ proc initFullNode( blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) - branchDiscovery = BranchDiscovery.new( - node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscovery = BranchDiscovery[Peer, PeerId].new( + node.network.peerPool, getFirstSlotAtFinalizedEpoch, isBlockKnown, branchDiscoveryBlockVerifier) fallbackSyncer = proc(peer: Peer) = branchDiscovery.transferOwnership(peer) diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim index 54d205c48..7d67fbd14 100644 --- a/beacon_chain/sync/branch_discovery.nim +++ b/beacon_chain/sync/branch_discovery.nim @@ -62,23 +62,23 @@ type blobs: Opt[BlobSidecars] ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} - BranchDiscovery* = object - network: Eth2Node + BranchDiscovery*[A, B] = object + pool: PeerPool[A, B] getFinalizedSlot: GetSlotCallback isBlockKnown: IsBlockKnownCallback blockVerifier: BlockVerifierCallback isActive: AsyncEvent loopFuture: Future[void].Raising([]) - peerQueue: Deque[Peer] + peerQueue: Deque[A] -proc new*( - T: type BranchDiscovery, - network: Eth2Node, +proc new*[A, B]( + T: type BranchDiscovery[A, B], + pool: PeerPool[A, B], getFinalizedSlot: GetSlotCallback, isBlockKnown: IsBlockKnownCallback, - blockVerifier: BlockVerifierCallback): ref BranchDiscovery = - let self = (ref BranchDiscovery)( - network: network, + blockVerifier: BlockVerifierCallback): ref BranchDiscovery[A, B] = + let self = (ref BranchDiscovery[A, B])( + pool: pool, getFinalizedSlot: getFinalizedSlot, isBlockKnown: isBlockKnown, blockVerifier: blockVerifier, @@ -86,8 +86,9 @@ proc new*( self[].isActive.fire() self -proc discoverBranch( - self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = +proc discoverBranch[A, B]( + self: BranchDiscovery[A, B], + peer: A) {.async: (raises: [CancelledError]).} = logScope: peer peer_score = peer.getScore() @@ -263,11 +264,11 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = self[].peerQueue.popFirst() else: try: - self[].network.peerPool.acquireNoWait() + self[].pool.acquireNoWait() except PeerPoolError as exc: debug "Failed to acquire peer", exc = exc.msg continue - defer: self[].network.peerPool.release(peer) + defer: self[].pool.release(peer) await self[].discoverBranch(peer) except CancelledError: @@ -284,7 +285,7 @@ func state*(self: ref BranchDiscovery): BranchDiscoveryState = proc clearPeerQueue(self: ref BranchDiscovery) = while self[].peerQueue.len > 0: let peer = self[].peerQueue.popLast() - self[].network.peerPool.release(peer) + self[].pool.release(peer) proc start*(self: ref BranchDiscovery) = doAssert self[].loopFuture == nil @@ -309,13 +310,13 @@ proc resume*(self: ref BranchDiscovery) = self[].isActive.fire() beacon_sync_branchdiscovery_state.set(self.state.ord().int64) -proc transferOwnership*(self: ref BranchDiscovery, peer: Peer) = +proc transferOwnership*[A, B](self: ref BranchDiscovery[A, B], peer: A) = const maxPeersInQueue = 10 if self.state != BranchDiscoveryState.Active or self[].peerQueue.len >= maxPeersInQueue or peer.getHeadSlot() <= self[].getFinalizedSlot() or self[].isBlockKnown(peer.getHeadRoot()): - self[].network.peerPool.release(peer) + self[].pool.release(peer) return debug "Peer transferred to branch discovery", From 3376887ba766f703580d29222949c4a3a54f6497 Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Wed, 27 Mar 2024 16:00:51 +0100 Subject: [PATCH 4/4] add research notes --- beacon_chain/sync/branch_discovery.nim | 43 ++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim index 7d67fbd14..241374f46 100644 --- a/beacon_chain/sync/branch_discovery.nim +++ b/beacon_chain/sync/branch_discovery.nim @@ -27,6 +27,49 @@ # Note that the canonical chain may not be on the highest slot number, # as some partitions of the network may have built on top of branches # with lower validator support while the canonical chain was not visible. +# +# Despite its simplicity and brute-force approach, this module has been highly +# effective in the final month of Goerli. It managed to sync the entire Nimbus +# testnet fleet to the same branch, while also tracking >25 alternate branches. +# Further improvements should be applied: +# +# 1. Progress is currently limited by the size of `block_quarantine` per cycle, +# as this module downloads in backward order into the quarantine before the +# results get applied in forward order. This further limits the concurrency +# to a single peer at a time, because there is only a single quarantine that +# can hold a pending branch history. +# +# This could be addressed by probing the peer about the branch that it's on. +# We could send a by-root request for all our known heads to identify which +# ones they are aware of, followed by a binary search back to finalized slot +# to determine how far along the peer's progress is. From there on, by range +# requests allow forward sync and remembering partial progress along the way. +# We also wouldn't have to be as careful to avoid rate limit disconnections. +# Empty epoch progress also needs to be remembered across syncing sessions, +# because in a split view scenario often there are hundreds of empty epochs, +# and by-range syncing is highly ineffective. +# +# 2. The peer pool currently provides the best available peer on acquisition. +# Its filtering should be extended to have a better targeting for interesting +# peers, i.e., those that claim to know about head roots that we are unaware +# of and also have a head slot in the past, indicating that sync manager will +# not target those peers and will not manage to pull their branches quickly. +# +# 3. When monitoring gossip, peers that inform about blocks with unknown parent +# roots or aggregates referring to unknown beacon roots should be transferred +# into branch discovery as well. Gossip only propagates through peers that +# have validated the data themselves, so they must have the parent data. +# +# 4. Testing. Beyond Goerli, there is no regular long-lasting low participation +# network that reflects a realistic scenario. The network needs to be huge, +# geographically distributed with a variety of clients and lots of activity. +# Blocks need to take a while to apply to test the slow propagation when +# there are lots of empty epochs between blocks. There must be reorgs of +# hundreds of blocks to reflect EL suddenly going back to optimistic mode. +# A smaller simulation to run in CI may be achieveable by intentionally +# setting the `SECONDS_PER_SLOT` to a low value. Furthermore, synthetic +# scenarios can be tested in unit tests by mocking peers and blocks and +# making timers and rate limits configurable. import std/[algorithm, deques],